fedimint_server/consensus/
db.rs
1use std::collections::BTreeMap;
2use std::fmt::Debug;
3
4use fedimint_core::core::ModuleInstanceId;
5use fedimint_core::db::{DatabaseTransaction, DatabaseVersion, IDatabaseTransactionOpsCoreTyped};
6use fedimint_core::encoding::{Decodable, Encodable};
7use fedimint_core::epoch::ConsensusItem;
8use fedimint_core::session_outcome::{AcceptedItem, SignedSessionOutcome};
9use fedimint_core::util::BoxStream;
10use fedimint_core::{TransactionId, apply, async_trait_maybe_send, impl_db_lookup, impl_db_record};
11use fedimint_server_core::migration::{
12 DynModuleHistoryItem, DynServerDbMigrationFn, IServerDbMigrationContext,
13};
14use futures::StreamExt;
15use serde::Serialize;
16
17use crate::db::DbKeyPrefix;
18
19#[derive(Clone, Debug, Encodable, Decodable)]
20pub struct AcceptedItemKey(pub u64);
21
22#[derive(Clone, Debug, Encodable, Decodable)]
23pub struct AcceptedItemPrefix;
24
25impl_db_record!(
26 key = AcceptedItemKey,
27 value = AcceptedItem,
28 db_prefix = DbKeyPrefix::AcceptedItem,
29 notify_on_modify = false,
30);
31impl_db_lookup!(key = AcceptedItemKey, query_prefix = AcceptedItemPrefix);
32
33#[derive(Debug, Encodable, Decodable, Serialize)]
34pub struct AcceptedTransactionKey(pub TransactionId);
35
36#[derive(Debug, Encodable, Decodable)]
37pub struct AcceptedTransactionKeyPrefix;
38
39impl_db_record!(
40 key = AcceptedTransactionKey,
41 value = Vec<ModuleInstanceId>,
42 db_prefix = DbKeyPrefix::AcceptedTransaction,
43 notify_on_modify = true,
44);
45impl_db_lookup!(
46 key = AcceptedTransactionKey,
47 query_prefix = AcceptedTransactionKeyPrefix
48);
49
50#[derive(Debug, Encodable, Decodable)]
51pub struct SignedSessionOutcomeKey(pub u64);
52
53#[derive(Debug, Encodable, Decodable)]
54pub struct SignedSessionOutcomePrefix;
55
56impl_db_record!(
57 key = SignedSessionOutcomeKey,
58 value = SignedSessionOutcome,
59 db_prefix = DbKeyPrefix::SignedSessionOutcome,
60 notify_on_modify = true,
61);
62impl_db_lookup!(
63 key = SignedSessionOutcomeKey,
64 query_prefix = SignedSessionOutcomePrefix
65);
66
67#[derive(Debug, Encodable, Decodable)]
68pub struct AlephUnitsKey(pub u64);
69
70#[derive(Debug, Encodable, Decodable)]
71pub struct AlephUnitsPrefix;
72
73impl_db_record!(
74 key = AlephUnitsKey,
75 value = Vec<u8>,
76 db_prefix = DbKeyPrefix::AlephUnits,
77 notify_on_modify = false,
78);
79impl_db_lookup!(key = AlephUnitsKey, query_prefix = AlephUnitsPrefix);
80
81pub fn get_global_database_migrations() -> BTreeMap<DatabaseVersion, DynServerDbMigrationFn> {
82 BTreeMap::new()
83}
84
85pub struct ServerDbMigrationContext;
88
89#[apply(async_trait_maybe_send!)]
90impl IServerDbMigrationContext for ServerDbMigrationContext {
91 async fn get_module_history_stream<'s, 'tx>(
92 &'s self,
93 module_instance_id: ModuleInstanceId,
94 dbtx: &'s mut DatabaseTransaction<'tx>,
95 ) -> BoxStream<'s, DynModuleHistoryItem>
96 where
97 'tx: 's,
98 {
99 dbtx.ensure_global().expect("Dbtx must be global");
100
101 let active_session_items = dbtx
105 .find_by_prefix(&AcceptedItemPrefix)
106 .await
107 .map(|(_, item)| item)
108 .collect::<Vec<_>>()
109 .await;
110
111 let stream = dbtx
112 .find_by_prefix(&SignedSessionOutcomePrefix)
113 .await
114 .flat_map(|(_, signed_session_outcome): (_, SignedSessionOutcome)| {
116 futures::stream::iter(signed_session_outcome.session_outcome.items)
117 })
118 .chain(futures::stream::iter(active_session_items))
121 .flat_map(move |item| {
122 let history_items = match item.item {
123 ConsensusItem::Transaction(tx) => tx
124 .inputs
125 .into_iter()
126 .filter_map(|input| {
127 (input.module_instance_id() == module_instance_id)
128 .then_some(DynModuleHistoryItem::Input(input))
129 })
130 .chain(tx.outputs.into_iter().filter_map(|output| {
131 (output.module_instance_id() == module_instance_id)
132 .then_some(DynModuleHistoryItem::Output(output))
133 }))
134 .collect::<Vec<_>>(),
135 ConsensusItem::Module(mci) => {
136 if mci.module_instance_id() == module_instance_id {
137 vec![DynModuleHistoryItem::ConsensusItem(mci)]
138 } else {
139 vec![]
140 }
141 }
142 ConsensusItem::Default { .. } => {
143 unreachable!("We never save unknown CIs on the server side")
144 }
145 };
146 futures::stream::iter(history_items)
147 });
148
149 Box::pin(stream)
150 }
151}