fedimint_server/consensus/
db.rs

1use std::collections::BTreeMap;
2use std::fmt::Debug;
3
4use fedimint_core::core::ModuleInstanceId;
5use fedimint_core::db::{DatabaseTransaction, DatabaseVersion, IDatabaseTransactionOpsCoreTyped};
6use fedimint_core::encoding::{Decodable, Encodable};
7use fedimint_core::epoch::ConsensusItem;
8use fedimint_core::session_outcome::{AcceptedItem, SignedSessionOutcome};
9use fedimint_core::util::BoxStream;
10use fedimint_core::{
11    OutPoint, TransactionId, apply, async_trait_maybe_send, impl_db_lookup, impl_db_record,
12};
13use fedimint_server_core::migration::{
14    DynModuleHistoryItem, DynServerDbMigrationFn, IServerDbMigrationContext,
15};
16use futures::StreamExt;
17use serde::Serialize;
18
19use crate::db::DbKeyPrefix;
20
21#[derive(Clone, Debug, Encodable, Decodable)]
22pub struct AcceptedItemKey(pub u64);
23
24#[derive(Clone, Debug, Encodable, Decodable)]
25pub struct AcceptedItemPrefix;
26
27impl_db_record!(
28    key = AcceptedItemKey,
29    value = AcceptedItem,
30    db_prefix = DbKeyPrefix::AcceptedItem,
31    notify_on_modify = false,
32);
33impl_db_lookup!(key = AcceptedItemKey, query_prefix = AcceptedItemPrefix);
34
35#[derive(Debug, Encodable, Decodable, Serialize)]
36pub struct AcceptedTransactionKey(pub TransactionId);
37
38#[derive(Debug, Encodable, Decodable)]
39pub struct AcceptedTransactionKeyPrefix;
40
41impl_db_record!(
42    key = AcceptedTransactionKey,
43    value = Vec<ModuleInstanceId>,
44    db_prefix = DbKeyPrefix::AcceptedTransaction,
45    notify_on_modify = true,
46);
47impl_db_lookup!(
48    key = AcceptedTransactionKey,
49    query_prefix = AcceptedTransactionKeyPrefix
50);
51
52#[derive(Debug, Encodable, Decodable)]
53pub struct SignedSessionOutcomeKey(pub u64);
54
55#[derive(Debug, Encodable, Decodable)]
56pub struct SignedSessionOutcomePrefix;
57
58impl_db_record!(
59    key = SignedSessionOutcomeKey,
60    value = SignedSessionOutcome,
61    db_prefix = DbKeyPrefix::SignedSessionOutcome,
62    notify_on_modify = true,
63);
64impl_db_lookup!(
65    key = SignedSessionOutcomeKey,
66    query_prefix = SignedSessionOutcomePrefix
67);
68
69#[derive(Debug, Encodable, Decodable)]
70pub struct AlephUnitsKey(pub u64);
71
72#[derive(Debug, Encodable, Decodable)]
73pub struct AlephUnitsPrefix;
74
75impl_db_record!(
76    key = AlephUnitsKey,
77    value = Vec<u8>,
78    db_prefix = DbKeyPrefix::AlephUnits,
79    notify_on_modify = false,
80);
81impl_db_lookup!(key = AlephUnitsKey, query_prefix = AlephUnitsPrefix);
82
83pub fn get_global_database_migrations() -> BTreeMap<DatabaseVersion, DynServerDbMigrationFn> {
84    BTreeMap::new()
85}
86
87/// A concrete implementation of [`IServerDbMigrationContext`] APIs
88/// available for server-module db migrations.
89pub struct ServerDbMigrationContext;
90
91#[apply(async_trait_maybe_send!)]
92impl IServerDbMigrationContext for ServerDbMigrationContext {
93    async fn get_module_history_stream<'s, 'tx>(
94        &'s self,
95        module_instance_id: ModuleInstanceId,
96        dbtx: &'s mut DatabaseTransaction<'tx>,
97    ) -> BoxStream<'s, DynModuleHistoryItem>
98    where
99        'tx: 's,
100    {
101        dbtx.ensure_global().expect("Dbtx must be global");
102
103        // Items of the currently ongoing session, that have already been processed. We
104        // have to query them in full first and collect them into a vector so we don't
105        // hold two references to the dbtx at the same time.
106        let active_session_items = dbtx
107            .find_by_prefix(&AcceptedItemPrefix)
108            .await
109            .map(|(_, item)| item)
110            .collect::<Vec<_>>()
111            .await;
112
113        let stream =
114            dbtx.find_by_prefix(&SignedSessionOutcomePrefix)
115                .await
116                // Transform the session stream into an accepted item stream
117                .flat_map(|(_, signed_session_outcome): (_, SignedSessionOutcome)| {
118                    futures::stream::iter(signed_session_outcome.session_outcome.items)
119                })
120                // Append the accepted items from the current session after all the signed session
121                // items have been processed
122                .chain(futures::stream::iter(active_session_items))
123                .flat_map(move |item| {
124                    let history_items =
125                        match item.item {
126                            ConsensusItem::Transaction(tx) => {
127                                let txid = tx.tx_hash();
128                                let input_items = tx.inputs.into_iter().filter_map(|input| {
129                                    (input.module_instance_id() == module_instance_id)
130                                        .then_some(DynModuleHistoryItem::Input(input))
131                                });
132
133                                let output_items = tx.outputs.into_iter().zip(0..).filter_map(
134                                    |(output, out_idx)| {
135                                        (output.module_instance_id() == module_instance_id)
136                                            .then_some(DynModuleHistoryItem::Output(
137                                                output,
138                                                OutPoint { txid, out_idx },
139                                            ))
140                                    },
141                                );
142
143                                input_items.chain(output_items).collect::<Vec<_>>()
144                            }
145                            ConsensusItem::Module(mci) => {
146                                if mci.module_instance_id() == module_instance_id {
147                                    vec![DynModuleHistoryItem::ConsensusItem(mci)]
148                                } else {
149                                    vec![]
150                                }
151                            }
152                            ConsensusItem::Default { .. } => {
153                                unreachable!("We never save unknown CIs on the server side")
154                            }
155                        };
156                    futures::stream::iter(history_items)
157                });
158
159        Box::pin(stream)
160    }
161}