fedimint_server/consensus/
db.rs

1use std::collections::BTreeMap;
2use std::fmt::Debug;
3
4use fedimint_core::core::ModuleInstanceId;
5use fedimint_core::db::{DatabaseTransaction, DatabaseVersion, IDatabaseTransactionOpsCoreTyped};
6use fedimint_core::encoding::{Decodable, Encodable};
7use fedimint_core::epoch::ConsensusItem;
8use fedimint_core::session_outcome::{AcceptedItem, SignedSessionOutcome};
9use fedimint_core::util::BoxStream;
10use fedimint_core::{TransactionId, apply, async_trait_maybe_send, impl_db_lookup, impl_db_record};
11use fedimint_server_core::migration::{
12    DynModuleHistoryItem, DynServerDbMigrationFn, IServerDbMigrationContext,
13};
14use futures::StreamExt;
15use serde::Serialize;
16
17use crate::db::DbKeyPrefix;
18
19#[derive(Clone, Debug, Encodable, Decodable)]
20pub struct AcceptedItemKey(pub u64);
21
22#[derive(Clone, Debug, Encodable, Decodable)]
23pub struct AcceptedItemPrefix;
24
25impl_db_record!(
26    key = AcceptedItemKey,
27    value = AcceptedItem,
28    db_prefix = DbKeyPrefix::AcceptedItem,
29    notify_on_modify = false,
30);
31impl_db_lookup!(key = AcceptedItemKey, query_prefix = AcceptedItemPrefix);
32
33#[derive(Debug, Encodable, Decodable, Serialize)]
34pub struct AcceptedTransactionKey(pub TransactionId);
35
36#[derive(Debug, Encodable, Decodable)]
37pub struct AcceptedTransactionKeyPrefix;
38
39impl_db_record!(
40    key = AcceptedTransactionKey,
41    value = Vec<ModuleInstanceId>,
42    db_prefix = DbKeyPrefix::AcceptedTransaction,
43    notify_on_modify = true,
44);
45impl_db_lookup!(
46    key = AcceptedTransactionKey,
47    query_prefix = AcceptedTransactionKeyPrefix
48);
49
50#[derive(Debug, Encodable, Decodable)]
51pub struct SignedSessionOutcomeKey(pub u64);
52
53#[derive(Debug, Encodable, Decodable)]
54pub struct SignedSessionOutcomePrefix;
55
56impl_db_record!(
57    key = SignedSessionOutcomeKey,
58    value = SignedSessionOutcome,
59    db_prefix = DbKeyPrefix::SignedSessionOutcome,
60    notify_on_modify = true,
61);
62impl_db_lookup!(
63    key = SignedSessionOutcomeKey,
64    query_prefix = SignedSessionOutcomePrefix
65);
66
67#[derive(Debug, Encodable, Decodable)]
68pub struct AlephUnitsKey(pub u64);
69
70#[derive(Debug, Encodable, Decodable)]
71pub struct AlephUnitsPrefix;
72
73impl_db_record!(
74    key = AlephUnitsKey,
75    value = Vec<u8>,
76    db_prefix = DbKeyPrefix::AlephUnits,
77    notify_on_modify = false,
78);
79impl_db_lookup!(key = AlephUnitsKey, query_prefix = AlephUnitsPrefix);
80
81pub fn get_global_database_migrations() -> BTreeMap<DatabaseVersion, DynServerDbMigrationFn> {
82    BTreeMap::new()
83}
84
85/// A concrete implementation of [`IServerDbMigrationContext`] APIs
86/// available for server-module db migrations.
87pub struct ServerDbMigrationContext;
88
89#[apply(async_trait_maybe_send!)]
90impl IServerDbMigrationContext for ServerDbMigrationContext {
91    async fn get_module_history_stream<'s, 'tx>(
92        &'s self,
93        module_instance_id: ModuleInstanceId,
94        dbtx: &'s mut DatabaseTransaction<'tx>,
95    ) -> BoxStream<'s, DynModuleHistoryItem>
96    where
97        'tx: 's,
98    {
99        dbtx.ensure_global().expect("Dbtx must be global");
100
101        // Items of the currently ongoing session, that have already been processed. We
102        // have to query them in full first and collect them into a vector so we don't
103        // hold two references to the dbtx at the same time.
104        let active_session_items = dbtx
105            .find_by_prefix(&AcceptedItemPrefix)
106            .await
107            .map(|(_, item)| item)
108            .collect::<Vec<_>>()
109            .await;
110
111        let stream = dbtx
112            .find_by_prefix(&SignedSessionOutcomePrefix)
113            .await
114            // Transform the session stream into an accepted item stream
115            .flat_map(|(_, signed_session_outcome): (_, SignedSessionOutcome)| {
116                futures::stream::iter(signed_session_outcome.session_outcome.items)
117            })
118            // Append the accepted items from the current session after all the signed session items
119            // have been processed
120            .chain(futures::stream::iter(active_session_items))
121            .flat_map(move |item| {
122                let history_items = match item.item {
123                    ConsensusItem::Transaction(tx) => tx
124                        .inputs
125                        .into_iter()
126                        .filter_map(|input| {
127                            (input.module_instance_id() == module_instance_id)
128                                .then_some(DynModuleHistoryItem::Input(input))
129                        })
130                        .chain(tx.outputs.into_iter().filter_map(|output| {
131                            (output.module_instance_id() == module_instance_id)
132                                .then_some(DynModuleHistoryItem::Output(output))
133                        }))
134                        .collect::<Vec<_>>(),
135                    ConsensusItem::Module(mci) => {
136                        if mci.module_instance_id() == module_instance_id {
137                            vec![DynModuleHistoryItem::ConsensusItem(mci)]
138                        } else {
139                            vec![]
140                        }
141                    }
142                    ConsensusItem::Default { .. } => {
143                        unreachable!("We never save unknown CIs on the server side")
144                    }
145                };
146                futures::stream::iter(history_items)
147            });
148
149        Box::pin(stream)
150    }
151}