fedimint_server_core/
migration.rs1use std::collections::BTreeMap;
2use std::marker;
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use fedimint_core::core::{DynInput, DynModuleConsensusItem, DynOutput, ModuleInstanceId};
7use fedimint_core::db::{
8 Database, DatabaseTransaction, DatabaseVersion, DbMigrationFn, DbMigrationFnContext,
9 apply_migrations_dbtx,
10};
11use fedimint_core::module::ModuleCommon;
12use fedimint_core::util::BoxStream;
13use fedimint_core::{apply, async_trait_maybe_send};
14use futures::StreamExt as _;
15
16use crate::ServerModule;
17
18pub enum DynModuleHistoryItem {
20 ConsensusItem(DynModuleConsensusItem),
21 Input(DynInput),
22 Output(DynOutput),
23}
24
25pub enum ModuleHistoryItem<M: ModuleCommon> {
27 ConsensusItem(M::ConsensusItem),
28 Input(M::Input),
29 Output(M::Output),
30}
31
32#[apply(async_trait_maybe_send!)]
38pub trait IServerDbMigrationContext {
39 async fn get_module_history_stream<'s, 'tx>(
41 &'s self,
42 module_id: ModuleInstanceId,
43 dbtx: &'s mut DatabaseTransaction<'tx>,
44 ) -> BoxStream<'s, DynModuleHistoryItem>;
45}
46
47pub type DynServerDbMigrationContext = Arc<dyn IServerDbMigrationContext + Send + Sync + 'static>;
49
50pub struct ServerModuleDbMigrationContext<M> {
55 ctx: DynServerDbMigrationContext,
56 module: marker::PhantomData<M>,
57}
58
59impl<M> ServerModuleDbMigrationContext<M> {
60 pub(crate) fn new(ctx: DynServerDbMigrationContext) -> Self {
61 Self {
62 ctx,
63 module: marker::PhantomData,
64 }
65 }
66
67 fn ctx(&self) -> &DynServerDbMigrationContext {
68 &self.ctx
69 }
70}
71
72pub type ServerModuleDbMigrationFnContext<'tx, M> =
75 DbMigrationFnContext<'tx, ServerModuleDbMigrationContext<M>>;
76
77#[async_trait]
83pub trait ServerModuleDbMigrationFnContextExt<M>
84where
85 M: ServerModule,
86{
87 async fn get_typed_module_history_stream(
88 &mut self,
89 ) -> BoxStream<ModuleHistoryItem<<M as ServerModule>::Common>>;
90}
91
92#[async_trait]
93impl<M> ServerModuleDbMigrationFnContextExt<M> for ServerModuleDbMigrationFnContext<'_, M>
94where
95 M: ServerModule + Send + Sync,
96{
97 async fn get_typed_module_history_stream(
98 &mut self,
99 ) -> BoxStream<ModuleHistoryItem<<M as ServerModule>::Common>> {
100 let module_instance_id = self
101 .module_instance_id()
102 .expect("module_instance_id must be set");
103 let (dbtx, ctx) = self.split_dbtx_ctx();
104
105 Box::pin(
106 ctx
107 .ctx()
108 .get_module_history_stream(
109 module_instance_id,
110 dbtx
111 )
112 .await
113 .map(|item| match item {
114 DynModuleHistoryItem::ConsensusItem(ci) => ModuleHistoryItem::ConsensusItem(
115 ci.as_any()
116 .downcast_ref::<<<M as ServerModule>::Common as ModuleCommon>::ConsensusItem>()
117 .expect("Wrong module type")
118 .clone(),
119 ),
120 DynModuleHistoryItem::Input(input) => ModuleHistoryItem::Input(
121 input
122 .as_any()
123 .downcast_ref::<<<M as ServerModule>::Common as ModuleCommon>::Input>()
124 .expect("Wrong module type")
125 .clone(),
126 ),
127 DynModuleHistoryItem::Output(output) => ModuleHistoryItem::Output(
128 output
129 .as_any()
130 .downcast_ref::<<<M as ServerModule>::Common as ModuleCommon>::Output>()
131 .expect("Wrong module type")
132 .clone(),
133 ),
134 }),
135 )
136 }
137}
138
139pub type ServerModuleDbMigrationFn<M> = DbMigrationFn<ServerModuleDbMigrationContext<M>>;
142
143pub type DynServerDbMigrationFn = DbMigrationFn<DynServerDbMigrationContext>;
145pub type ServerDbMigrationFnContext<'tx> = DbMigrationFnContext<'tx, DynServerDbMigrationContext>;
148
149pub async fn apply_migrations_server(
151 ctx: DynServerDbMigrationContext,
152 db: &Database,
153 kind: String,
154 migrations: BTreeMap<DatabaseVersion, DynServerDbMigrationFn>,
155) -> Result<(), anyhow::Error> {
156 let mut global_dbtx = db.begin_transaction().await;
157 global_dbtx.ensure_global()?;
158 apply_migrations_server_dbtx(&mut global_dbtx.to_ref_nc(), ctx, kind, migrations).await?;
159 global_dbtx.commit_tx_result().await
160}
161
162pub async fn apply_migrations_server_dbtx(
164 global_dbtx: &mut DatabaseTransaction<'_>,
165 ctx: DynServerDbMigrationContext,
166 kind: String,
167 migrations: BTreeMap<DatabaseVersion, DynServerDbMigrationFn>,
168) -> Result<(), anyhow::Error> {
169 global_dbtx.ensure_global()?;
170 apply_migrations_dbtx(global_dbtx, ctx, kind, migrations, None, None).await
171}