fedimint_server_core/
migration.rs

1use std::collections::BTreeMap;
2use std::marker;
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use fedimint_core::core::{DynInput, DynModuleConsensusItem, DynOutput, ModuleInstanceId};
7use fedimint_core::db::{
8    Database, DatabaseTransaction, DatabaseVersion, DbMigrationFn, DbMigrationFnContext,
9    apply_migrations_dbtx,
10};
11use fedimint_core::module::ModuleCommon;
12use fedimint_core::util::BoxStream;
13use fedimint_core::{apply, async_trait_maybe_send};
14use futures::StreamExt as _;
15
16use crate::ServerModule;
17
18/// Typed history item of a module
19pub enum DynModuleHistoryItem {
20    ConsensusItem(DynModuleConsensusItem),
21    Input(DynInput),
22    Output(DynOutput),
23}
24
25/// Typed history item of a module
26pub enum ModuleHistoryItem<M: ModuleCommon> {
27    ConsensusItem(M::ConsensusItem),
28    Input(M::Input),
29    Output(M::Output),
30}
31
32/// An interface a server module db migration context needs to implement
33///
34/// An instance of this type is injected to server-side migrations from
35/// `fedimint-server`, but users of it (and `fedimint-server-core`) do not need
36/// to know the implementation.
37#[apply(async_trait_maybe_send!)]
38pub trait IServerDbMigrationContext {
39    /// Get a stream of historical consensus items belonging to the module
40    async fn get_module_history_stream<'s, 'tx>(
41        &'s self,
42        module_id: ModuleInstanceId,
43        dbtx: &'s mut DatabaseTransaction<'tx>,
44    ) -> BoxStream<'s, DynModuleHistoryItem>;
45}
46
47/// A type-erased value implementing [`IServerDbMigrationContext`]
48pub type DynServerDbMigrationContext = Arc<dyn IServerDbMigrationContext + Send + Sync + 'static>;
49
50/// A module-typed wrapper over a typed-erased [`DynServerDbMigrationContext`]
51///
52/// This is to wrap erased [`IServerDbMigrationContext`] interfaces and
53/// expose typed interfaces to the server module db migrations.
54pub struct ServerModuleDbMigrationContext<M> {
55    ctx: DynServerDbMigrationContext,
56    module: marker::PhantomData<M>,
57}
58
59impl<M> ServerModuleDbMigrationContext<M> {
60    pub(crate) fn new(ctx: DynServerDbMigrationContext) -> Self {
61        Self {
62            ctx,
63            module: marker::PhantomData,
64        }
65    }
66
67    fn ctx(&self) -> &DynServerDbMigrationContext {
68        &self.ctx
69    }
70}
71
72/// A type alias of a [`DbMigrationFnContext`] with inner context
73/// set to module-specific-typed [`ServerModuleDbMigrationContext`]
74pub type ServerModuleDbMigrationFnContext<'tx, M> =
75    DbMigrationFnContext<'tx, ServerModuleDbMigrationContext<M>>;
76
77/// An extension trait to access module-specific-typed apis of
78/// [`IServerDbMigrationContext`] injected by the `fedimint-server`.
79///
80/// Needs to be an extension trait, as `fedimint-server-core` can't
81/// implement things on general-purpose [`DbMigrationFnContext`]
82#[async_trait]
83pub trait ServerModuleDbMigrationFnContextExt<M>
84where
85    M: ServerModule,
86{
87    async fn get_typed_module_history_stream(
88        &mut self,
89    ) -> BoxStream<ModuleHistoryItem<<M as ServerModule>::Common>>;
90}
91
92#[async_trait]
93impl<M> ServerModuleDbMigrationFnContextExt<M> for ServerModuleDbMigrationFnContext<'_, M>
94where
95    M: ServerModule + Send + Sync,
96{
97    async fn get_typed_module_history_stream(
98        &mut self,
99    ) -> BoxStream<ModuleHistoryItem<<M as ServerModule>::Common>> {
100        let module_instance_id = self
101            .module_instance_id()
102            .expect("module_instance_id must be set");
103        let (dbtx, ctx) = self.split_dbtx_ctx();
104
105        Box::pin(
106            ctx
107                .ctx()
108                .get_module_history_stream(
109                    module_instance_id,
110                    dbtx
111                )
112                .await
113                .map(|item| match item {
114                    DynModuleHistoryItem::ConsensusItem(ci) => ModuleHistoryItem::ConsensusItem(
115                        ci.as_any()
116                            .downcast_ref::<<<M as ServerModule>::Common as ModuleCommon>::ConsensusItem>()
117                            .expect("Wrong module type")
118                            .clone(),
119                    ),
120                    DynModuleHistoryItem::Input(input) => ModuleHistoryItem::Input(
121                        input
122                            .as_any()
123                            .downcast_ref::<<<M as ServerModule>::Common as ModuleCommon>::Input>()
124                            .expect("Wrong module type")
125                            .clone(),
126                    ),
127                    DynModuleHistoryItem::Output(output) => ModuleHistoryItem::Output(
128                        output
129                            .as_any()
130                            .downcast_ref::<<<M as ServerModule>::Common as ModuleCommon>::Output>()
131                            .expect("Wrong module type")
132                            .clone(),
133                    ),
134                }),
135        )
136    }
137}
138
139/// A [`DbMigrationFn`] with inner-context type-specific for a given server
140/// module
141pub type ServerModuleDbMigrationFn<M> = DbMigrationFn<ServerModuleDbMigrationContext<M>>;
142
143/// A [`DbMigrationFn`] with inner-context type-erased for all server modules
144pub type DynServerDbMigrationFn = DbMigrationFn<DynServerDbMigrationContext>;
145/// A [`DbMigrationFnContext`] with inner-context type-erased around
146/// [`IServerDbMigrationContext`]
147pub type ServerDbMigrationFnContext<'tx> = DbMigrationFnContext<'tx, DynServerDbMigrationContext>;
148
149/// See [`apply_migrations_server_dbtx`]
150pub async fn apply_migrations_server(
151    ctx: DynServerDbMigrationContext,
152    db: &Database,
153    kind: String,
154    migrations: BTreeMap<DatabaseVersion, DynServerDbMigrationFn>,
155) -> Result<(), anyhow::Error> {
156    let mut global_dbtx = db.begin_transaction().await;
157    global_dbtx.ensure_global()?;
158    apply_migrations_server_dbtx(&mut global_dbtx.to_ref_nc(), ctx, kind, migrations).await?;
159    global_dbtx.commit_tx_result().await
160}
161
162/// Applies the database migrations to a non-isolated database.
163pub async fn apply_migrations_server_dbtx(
164    global_dbtx: &mut DatabaseTransaction<'_>,
165    ctx: DynServerDbMigrationContext,
166    kind: String,
167    migrations: BTreeMap<DatabaseVersion, DynServerDbMigrationFn>,
168) -> Result<(), anyhow::Error> {
169    global_dbtx.ensure_global()?;
170    apply_migrations_dbtx(global_dbtx, ctx, kind, migrations, None, None).await
171}