fedimint_server/consensus/
db.rs1use std::collections::BTreeMap;
2use std::fmt::Debug;
3
4use fedimint_core::core::ModuleInstanceId;
5use fedimint_core::db::{DatabaseTransaction, DatabaseVersion, IDatabaseTransactionOpsCoreTyped};
6use fedimint_core::encoding::{Decodable, Encodable};
7use fedimint_core::epoch::ConsensusItem;
8use fedimint_core::session_outcome::{AcceptedItem, SignedSessionOutcome};
9use fedimint_core::util::BoxStream;
10use fedimint_core::{
11 OutPoint, TransactionId, apply, async_trait_maybe_send, impl_db_lookup, impl_db_record,
12};
13use fedimint_server_core::migration::{
14 DynModuleHistoryItem, DynServerDbMigrationFn, IServerDbMigrationContext,
15};
16use futures::StreamExt;
17use serde::Serialize;
18
19use crate::db::DbKeyPrefix;
20
21#[derive(Clone, Debug, Encodable, Decodable)]
22pub struct AcceptedItemKey(pub u64);
23
24#[derive(Clone, Debug, Encodable, Decodable)]
25pub struct AcceptedItemPrefix;
26
27impl_db_record!(
28 key = AcceptedItemKey,
29 value = AcceptedItem,
30 db_prefix = DbKeyPrefix::AcceptedItem,
31 notify_on_modify = false,
32);
33impl_db_lookup!(key = AcceptedItemKey, query_prefix = AcceptedItemPrefix);
34
35#[derive(Debug, Encodable, Decodable, Serialize)]
36pub struct AcceptedTransactionKey(pub TransactionId);
37
38#[derive(Debug, Encodable, Decodable)]
39pub struct AcceptedTransactionKeyPrefix;
40
41impl_db_record!(
42 key = AcceptedTransactionKey,
43 value = Vec<ModuleInstanceId>,
44 db_prefix = DbKeyPrefix::AcceptedTransaction,
45 notify_on_modify = true,
46);
47impl_db_lookup!(
48 key = AcceptedTransactionKey,
49 query_prefix = AcceptedTransactionKeyPrefix
50);
51
52#[derive(Debug, Encodable, Decodable)]
53pub struct SignedSessionOutcomeKey(pub u64);
54
55#[derive(Debug, Encodable, Decodable)]
56pub struct SignedSessionOutcomePrefix;
57
58impl_db_record!(
59 key = SignedSessionOutcomeKey,
60 value = SignedSessionOutcome,
61 db_prefix = DbKeyPrefix::SignedSessionOutcome,
62 notify_on_modify = true,
63);
64impl_db_lookup!(
65 key = SignedSessionOutcomeKey,
66 query_prefix = SignedSessionOutcomePrefix
67);
68
69#[derive(Debug, Encodable, Decodable)]
70pub struct AlephUnitsKey(pub u64);
71
72#[derive(Debug, Encodable, Decodable)]
73pub struct AlephUnitsPrefix;
74
75impl_db_record!(
76 key = AlephUnitsKey,
77 value = Vec<u8>,
78 db_prefix = DbKeyPrefix::AlephUnits,
79 notify_on_modify = false,
80);
81impl_db_lookup!(key = AlephUnitsKey, query_prefix = AlephUnitsPrefix);
82
83pub fn get_global_database_migrations() -> BTreeMap<DatabaseVersion, DynServerDbMigrationFn> {
84 BTreeMap::new()
85}
86
87pub struct ServerDbMigrationContext;
90
91#[apply(async_trait_maybe_send!)]
92impl IServerDbMigrationContext for ServerDbMigrationContext {
93 async fn get_module_history_stream<'s, 'tx>(
94 &'s self,
95 module_instance_id: ModuleInstanceId,
96 dbtx: &'s mut DatabaseTransaction<'tx>,
97 ) -> BoxStream<'s, DynModuleHistoryItem>
98 where
99 'tx: 's,
100 {
101 dbtx.ensure_global().expect("Dbtx must be global");
102
103 let active_session_items = dbtx
107 .find_by_prefix(&AcceptedItemPrefix)
108 .await
109 .map(|(_, item)| item)
110 .collect::<Vec<_>>()
111 .await;
112
113 let stream =
114 dbtx.find_by_prefix(&SignedSessionOutcomePrefix)
115 .await
116 .flat_map(|(_, signed_session_outcome): (_, SignedSessionOutcome)| {
118 futures::stream::iter(signed_session_outcome.session_outcome.items)
119 })
120 .chain(futures::stream::iter(active_session_items))
123 .flat_map(move |item| {
124 let history_items =
125 match item.item {
126 ConsensusItem::Transaction(tx) => {
127 let txid = tx.tx_hash();
128 let input_items = tx.inputs.into_iter().filter_map(|input| {
129 (input.module_instance_id() == module_instance_id)
130 .then_some(DynModuleHistoryItem::Input(input))
131 });
132
133 let output_items = tx.outputs.into_iter().zip(0..).filter_map(
134 |(output, out_idx)| {
135 (output.module_instance_id() == module_instance_id)
136 .then_some(DynModuleHistoryItem::Output(
137 output,
138 OutPoint { txid, out_idx },
139 ))
140 },
141 );
142
143 input_items.chain(output_items).collect::<Vec<_>>()
144 }
145 ConsensusItem::Module(mci) => {
146 if mci.module_instance_id() == module_instance_id {
147 vec![DynModuleHistoryItem::ConsensusItem(mci)]
148 } else {
149 vec![]
150 }
151 }
152 ConsensusItem::Default { .. } => {
153 unreachable!("We never save unknown CIs on the server side")
154 }
155 };
156 futures::stream::iter(history_items)
157 });
158
159 Box::pin(stream)
160 }
161}