fedimint_client/
db.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::time::SystemTime;
3
4use anyhow::{anyhow, bail};
5use bitcoin::hex::DisplayHex as _;
6use fedimint_api_client::api::ApiVersionSet;
7use fedimint_client_module::db::ClientModuleMigrationFn;
8use fedimint_client_module::module::recovery::RecoveryProgress;
9use fedimint_client_module::oplog::{JsonStringed, OperationLogEntry, OperationOutcome};
10use fedimint_client_module::sm::{ActiveStateMeta, InactiveStateMeta};
11use fedimint_core::config::{ClientConfig, ClientConfigV0, FederationId, GlobalClientConfig};
12use fedimint_core::core::{ModuleInstanceId, OperationId};
13use fedimint_core::db::{
14    Database, DatabaseTransaction, DatabaseVersion, DatabaseVersionKey,
15    IDatabaseTransactionOpsCore, IDatabaseTransactionOpsCoreTyped, MODULE_GLOBAL_PREFIX,
16    apply_migrations_dbtx, create_database_version_dbtx, get_current_database_version,
17};
18use fedimint_core::encoding::{Decodable, Encodable};
19use fedimint_core::module::SupportedApiVersionsSummary;
20use fedimint_core::module::registry::ModuleRegistry;
21use fedimint_core::{PeerId, impl_db_lookup, impl_db_record};
22use fedimint_eventlog::{
23    DB_KEY_PREFIX_EVENT_LOG, DB_KEY_PREFIX_UNORDERED_EVENT_LOG, EventLogId, UnordedEventLogId,
24};
25use fedimint_logging::LOG_CLIENT_DB;
26use futures::StreamExt;
27use serde::{Deserialize, Serialize};
28use strum::IntoEnumIterator as _;
29use strum_macros::EnumIter;
30use tracing::{debug, info, trace, warn};
31
32use crate::backup::{ClientBackup, Metadata};
33use crate::sm::executor::{
34    ActiveStateKeyBytes, ActiveStateKeyPrefixBytes, ExecutorDbPrefixes, InactiveStateKeyBytes,
35    InactiveStateKeyPrefixBytes,
36};
37
38#[repr(u8)]
39#[derive(Clone, EnumIter, Debug)]
40pub enum DbKeyPrefix {
41    EncodedClientSecret = 0x28,
42    ClientSecret = 0x29, // Unused
43    ClientPreRootSecretHash = 0x2a,
44    OperationLog = 0x2c,
45    ChronologicalOperationLog = 0x2d,
46    CommonApiVersionCache = 0x2e,
47    ClientConfig = 0x2f,
48    PendingClientConfig = 0x3b,
49    ClientInviteCode = 0x30, // Unused; clean out remnant data before re-using!
50    ClientInitState = 0x31,
51    ClientMetadata = 0x32,
52    ClientLastBackup = 0x33,
53    ClientMetaField = 0x34,
54    ClientMetaServiceInfo = 0x35,
55    ApiSecret = 0x36,
56    PeerLastApiVersionsSummaryCache = 0x37,
57    ApiUrlAnnouncement = 0x38,
58    EventLog = fedimint_eventlog::DB_KEY_PREFIX_EVENT_LOG,
59    UnorderedEventLog = fedimint_eventlog::DB_KEY_PREFIX_UNORDERED_EVENT_LOG,
60    EventLogTrimable = fedimint_eventlog::DB_KEY_PREFIX_EVENT_LOG_TRIMABLE,
61    ClientModuleRecovery = 0x40,
62
63    DatabaseVersion = fedimint_core::db::DbKeyPrefix::DatabaseVersion as u8,
64    ClientBackup = fedimint_core::db::DbKeyPrefix::ClientBackup as u8,
65
66    ActiveStates = ExecutorDbPrefixes::ActiveStates as u8,
67    InactiveStates = ExecutorDbPrefixes::InactiveStates as u8,
68
69    /// Arbitrary data of the applications integrating Fedimint client and
70    /// wanting to store some Federation-specific data in Fedimint client
71    /// database.
72    ///
73    /// New users are encouraged to use this single prefix only.
74    //
75    // TODO: https://github.com/fedimint/fedimint/issues/4444
76    //       in the future, we should make all global access to the db private
77    //       and only expose a getter returning isolated database.
78    UserData = 0xb0,
79    /// Prefixes between 0xb1..=0xcf shall all be considered allocated for
80    /// historical and future external use
81    ExternalReservedStart = 0xb1,
82    /// Prefixes between 0xb1..=0xcf shall all be considered allocated for
83    /// historical and future external use
84    ExternalReservedEnd = 0xcf,
85    /// 0xd0.. reserved for Fedimint internal use
86    InternalReservedStart = 0xd0,
87    /// Per-module instance data
88    ModuleGlobalPrefix = 0xff,
89}
90
91pub(crate) async fn verify_client_db_integrity_dbtx(dbtx: &mut DatabaseTransaction<'_>) {
92    let prefixes: BTreeSet<u8> = DbKeyPrefix::iter().map(|prefix| prefix as u8).collect();
93
94    let mut records = dbtx.raw_find_by_prefix(&[]).await.expect("DB fail");
95    while let Some((k, v)) = records.next().await {
96        // from here and above, we don't want to spend time verifying it
97        if DbKeyPrefix::UserData as u8 <= k[0] {
98            break;
99        }
100
101        assert!(
102            prefixes.contains(&k[0]),
103            "Unexpected client db record found: {}: {}",
104            k.as_hex(),
105            v.as_hex()
106        );
107    }
108}
109
110impl std::fmt::Display for DbKeyPrefix {
111    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
112        write!(f, "{self:?}")
113    }
114}
115
116#[derive(Debug, Encodable, Decodable)]
117pub struct EncodedClientSecretKey;
118
119#[derive(Debug, Encodable, Decodable)]
120pub struct EncodedClientSecretKeyPrefix;
121
122impl_db_record!(
123    key = EncodedClientSecretKey,
124    value = Vec<u8>,
125    db_prefix = DbKeyPrefix::EncodedClientSecret,
126);
127impl_db_lookup!(
128    key = EncodedClientSecretKey,
129    query_prefix = EncodedClientSecretKeyPrefix
130);
131
132#[derive(Debug, Encodable, Decodable, Serialize)]
133pub struct OperationLogKey {
134    pub operation_id: OperationId,
135}
136
137impl_db_record!(
138    key = OperationLogKey,
139    value = OperationLogEntry,
140    db_prefix = DbKeyPrefix::OperationLog
141);
142
143#[derive(Debug, Encodable)]
144pub struct OperationLogKeyPrefix;
145
146impl_db_lookup!(key = OperationLogKey, query_prefix = OperationLogKeyPrefix);
147
148#[derive(Debug, Encodable, Decodable, Serialize)]
149pub struct OperationLogKeyV0 {
150    pub operation_id: OperationId,
151}
152
153#[derive(Debug, Encodable)]
154pub struct OperationLogKeyPrefixV0;
155
156impl_db_record!(
157    key = OperationLogKeyV0,
158    value = OperationLogEntryV0,
159    db_prefix = DbKeyPrefix::OperationLog
160);
161
162impl_db_lookup!(
163    key = OperationLogKeyV0,
164    query_prefix = OperationLogKeyPrefixV0
165);
166
167#[derive(Debug, Encodable, Decodable, Serialize)]
168pub struct ClientPreRootSecretHashKey;
169
170impl_db_record!(
171    key = ClientPreRootSecretHashKey,
172    value = [u8; 8],
173    db_prefix = DbKeyPrefix::ClientPreRootSecretHash
174);
175
176/// Key used to lookup operation log entries in chronological order
177#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq, Encodable, Decodable, Serialize, Deserialize)]
178pub struct ChronologicalOperationLogKey {
179    pub creation_time: std::time::SystemTime,
180    pub operation_id: OperationId,
181}
182
183#[derive(Debug, Encodable)]
184pub struct ChronologicalOperationLogKeyPrefix;
185
186impl_db_record!(
187    key = ChronologicalOperationLogKey,
188    value = (),
189    db_prefix = DbKeyPrefix::ChronologicalOperationLog
190);
191
192impl_db_lookup!(
193    key = ChronologicalOperationLogKey,
194    query_prefix = ChronologicalOperationLogKeyPrefix
195);
196
197#[derive(Debug, Encodable, Decodable)]
198pub struct CachedApiVersionSetKey;
199
200#[derive(Debug, Encodable, Decodable)]
201pub struct CachedApiVersionSet(pub ApiVersionSet);
202
203impl_db_record!(
204    key = CachedApiVersionSetKey,
205    value = CachedApiVersionSet,
206    db_prefix = DbKeyPrefix::CommonApiVersionCache
207);
208
209#[derive(Debug, Encodable, Decodable)]
210pub struct PeerLastApiVersionsSummaryKey(pub PeerId);
211
212#[derive(Debug, Encodable, Decodable)]
213pub struct PeerLastApiVersionsSummary(pub SupportedApiVersionsSummary);
214
215impl_db_record!(
216    key = PeerLastApiVersionsSummaryKey,
217    value = PeerLastApiVersionsSummary,
218    db_prefix = DbKeyPrefix::PeerLastApiVersionsSummaryCache
219);
220
221#[derive(Debug, Encodable, Decodable, Serialize)]
222pub struct ClientConfigKey;
223
224impl_db_record!(
225    key = ClientConfigKey,
226    value = ClientConfig,
227    db_prefix = DbKeyPrefix::ClientConfig
228);
229
230#[derive(Debug, Encodable, Decodable, Serialize)]
231pub struct PendingClientConfigKey;
232
233impl_db_record!(
234    key = PendingClientConfigKey,
235    value = ClientConfig,
236    db_prefix = DbKeyPrefix::PendingClientConfig
237);
238
239#[derive(Debug, Encodable, Decodable, Serialize)]
240pub struct ClientConfigKeyV0 {
241    pub id: FederationId,
242}
243
244#[derive(Debug, Encodable)]
245pub struct ClientConfigKeyPrefixV0;
246
247impl_db_record!(
248    key = ClientConfigKeyV0,
249    value = ClientConfigV0,
250    db_prefix = DbKeyPrefix::ClientConfig
251);
252
253impl_db_lookup!(
254    key = ClientConfigKeyV0,
255    query_prefix = ClientConfigKeyPrefixV0
256);
257
258#[derive(Debug, Encodable, Decodable, Serialize)]
259pub struct ApiSecretKey;
260
261#[derive(Debug, Encodable)]
262pub struct ApiSecretKeyPrefix;
263
264impl_db_record!(
265    key = ApiSecretKey,
266    value = String,
267    db_prefix = DbKeyPrefix::ApiSecret
268);
269
270impl_db_lookup!(key = ApiSecretKey, query_prefix = ApiSecretKeyPrefix);
271
272/// Client metadata that will be stored/restored on backup&recovery
273#[derive(Debug, Encodable, Decodable, Serialize)]
274pub struct ClientMetadataKey;
275
276#[derive(Debug, Encodable)]
277pub struct ClientMetadataPrefix;
278
279impl_db_record!(
280    key = ClientMetadataKey,
281    value = Metadata,
282    db_prefix = DbKeyPrefix::ClientMetadata
283);
284
285impl_db_lookup!(key = ClientMetadataKey, query_prefix = ClientMetadataPrefix);
286
287/// Does the client modules need to run recovery before being usable?
288#[derive(Debug, Encodable, Decodable, Serialize)]
289pub struct ClientInitStateKey;
290
291#[derive(Debug, Encodable)]
292pub struct ClientInitStatePrefix;
293
294/// Client initialization mode
295#[derive(Debug, Encodable, Decodable)]
296pub enum InitMode {
297    /// Should only be used with freshly generated root secret
298    Fresh,
299    /// Should be used with root secrets provided by the user to recover a
300    /// (even if just possibly) already used secret.
301    Recover { snapshot: Option<ClientBackup> },
302}
303
304/// Like `InitMode`, but without no longer required data.
305///
306/// This is distinct from `InitMode` to prevent holding on to `snapshot`
307/// forever both for user's privacy and space use. In case user get hacked
308/// or phone gets stolen.
309#[derive(Debug, Encodable, Decodable)]
310pub enum InitModeComplete {
311    Fresh,
312    Recover,
313}
314
315/// The state of the client initialization
316#[derive(Debug, Encodable, Decodable)]
317pub enum InitState {
318    /// Client data initialization might still require some work (e.g. client
319    /// recovery)
320    Pending(InitMode),
321    /// Client initialization was complete
322    Complete(InitModeComplete),
323}
324
325impl InitState {
326    pub fn into_complete(self) -> Self {
327        match self {
328            InitState::Pending(p) => InitState::Complete(match p {
329                InitMode::Fresh => InitModeComplete::Fresh,
330                InitMode::Recover { .. } => InitModeComplete::Recover,
331            }),
332            InitState::Complete(t) => InitState::Complete(t),
333        }
334    }
335
336    pub fn does_require_recovery(&self) -> Option<Option<ClientBackup>> {
337        match self {
338            InitState::Pending(p) => match p {
339                InitMode::Fresh => None,
340                InitMode::Recover { snapshot } => Some(snapshot.clone()),
341            },
342            InitState::Complete(_) => None,
343        }
344    }
345
346    pub fn is_pending(&self) -> bool {
347        match self {
348            InitState::Pending(_) => true,
349            InitState::Complete(_) => false,
350        }
351    }
352}
353
354impl_db_record!(
355    key = ClientInitStateKey,
356    value = InitState,
357    db_prefix = DbKeyPrefix::ClientInitState
358);
359
360impl_db_lookup!(
361    key = ClientInitStateKey,
362    query_prefix = ClientInitStatePrefix
363);
364
365#[derive(Debug, Encodable, Decodable, Serialize)]
366pub struct ClientModuleRecovery {
367    pub module_instance_id: ModuleInstanceId,
368}
369
370#[derive(Debug, Clone, Encodable, Decodable)]
371pub struct ClientModuleRecoveryState {
372    pub progress: RecoveryProgress,
373}
374
375impl ClientModuleRecoveryState {
376    pub fn is_done(&self) -> bool {
377        self.progress.is_done()
378    }
379}
380
381impl_db_record!(
382    key = ClientModuleRecovery,
383    value = ClientModuleRecoveryState,
384    db_prefix = DbKeyPrefix::ClientModuleRecovery,
385);
386
387/// Old (incorrect) version of the [`ClientModuleRecoveryState`]
388/// that used the wrong prefix.
389///
390/// See <https://github.com/fedimint/fedimint/issues/7367>.
391///
392/// Used only for the migration.
393#[derive(Debug, Encodable, Decodable, Serialize)]
394pub struct ClientModuleRecoveryIncorrectDoNotUse {
395    pub module_instance_id: ModuleInstanceId,
396}
397
398impl_db_record!(
399    key = ClientModuleRecoveryIncorrectDoNotUse,
400    value = ClientModuleRecoveryState,
401    // This was wrong and we keep it wrong for a migration.
402    db_prefix = DbKeyPrefix::ClientInitState,
403);
404
405/// Last valid backup the client attempted to make
406///
407/// Can be used to find previous valid versions of
408/// module backup.
409#[derive(Debug, Encodable, Decodable)]
410pub struct LastBackupKey;
411
412impl_db_record!(
413    key = LastBackupKey,
414    value = ClientBackup,
415    db_prefix = DbKeyPrefix::ClientLastBackup
416);
417
418#[derive(Encodable, Decodable, Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
419pub(crate) struct MetaFieldPrefix;
420
421#[derive(Encodable, Decodable, Debug)]
422pub struct MetaServiceInfoKey;
423
424#[derive(Encodable, Decodable, Debug)]
425pub struct MetaServiceInfo {
426    pub last_updated: SystemTime,
427    pub revision: u64,
428}
429
430#[derive(
431    Encodable, Decodable, Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Serialize, Deserialize,
432)]
433pub(crate) struct MetaFieldKey(pub fedimint_client_module::meta::MetaFieldKey);
434
435#[derive(Encodable, Decodable, Debug, Clone, Serialize, Deserialize)]
436pub(crate) struct MetaFieldValue(pub fedimint_client_module::meta::MetaFieldValue);
437
438impl_db_record!(
439    key = MetaFieldKey,
440    value = MetaFieldValue,
441    db_prefix = DbKeyPrefix::ClientMetaField
442);
443
444impl_db_record!(
445    key = MetaServiceInfoKey,
446    value = MetaServiceInfo,
447    db_prefix = DbKeyPrefix::ClientMetaServiceInfo
448);
449
450impl_db_lookup!(key = MetaFieldKey, query_prefix = MetaFieldPrefix);
451
452pub fn get_core_client_database_migrations()
453-> BTreeMap<DatabaseVersion, fedimint_core::db::ClientCoreDbMigrationFn> {
454    let mut migrations: BTreeMap<DatabaseVersion, fedimint_core::db::ClientCoreDbMigrationFn> =
455        BTreeMap::new();
456    migrations.insert(
457        DatabaseVersion(0),
458        Box::new(|mut ctx| {
459            Box::pin(async move {
460                let mut dbtx = ctx.dbtx();
461
462                let config_v0 = dbtx
463                    .find_by_prefix(&ClientConfigKeyPrefixV0)
464                    .await
465                    .collect::<Vec<_>>()
466                    .await;
467
468                assert!(config_v0.len() <= 1);
469                let Some((id, config_v0)) = config_v0.into_iter().next() else {
470                    return Ok(());
471                };
472
473                let global = GlobalClientConfig {
474                    api_endpoints: config_v0.global.api_endpoints,
475                    broadcast_public_keys: None,
476                    consensus_version: config_v0.global.consensus_version,
477                    meta: config_v0.global.meta,
478                };
479
480                let config = ClientConfig {
481                    global,
482                    modules: config_v0.modules,
483                };
484
485                dbtx.remove_entry(&id).await;
486                dbtx.insert_new_entry(&ClientConfigKey, &config).await;
487                Ok(())
488            })
489        }),
490    );
491
492    // Migration to add outcome_time to OperationLogEntry
493    migrations.insert(
494        DatabaseVersion(1),
495        Box::new(|mut ctx| {
496            Box::pin(async move {
497                let mut dbtx = ctx.dbtx();
498
499                // Read all OperationLogEntries using V0 format
500                let operation_logs = dbtx
501                    .find_by_prefix(&OperationLogKeyPrefixV0)
502                    .await
503                    .collect::<Vec<_>>()
504                    .await;
505
506                // Build a map from operation_id -> max_time of inactive state
507                let mut op_id_max_time = BTreeMap::new();
508
509                // Process inactive states
510                {
511                    let mut inactive_states_stream =
512                        dbtx.find_by_prefix(&InactiveStateKeyPrefixBytes).await;
513
514                    while let Some((state, meta)) = inactive_states_stream.next().await {
515                        let entry = op_id_max_time
516                            .entry(state.operation_id)
517                            .or_insert(meta.exited_at);
518                        *entry = (*entry).max(meta.exited_at);
519                    }
520                }
521                // Migrate each V0 operation log entry to the new format
522                for (op_key_v0, log_entry_v0) in operation_logs {
523                    let new_entry = OperationLogEntry::new(
524                        log_entry_v0.operation_module_kind,
525                        log_entry_v0.meta,
526                        log_entry_v0.outcome.map(|outcome| {
527                            OperationOutcome {
528                                outcome,
529                                // If we found state times, use the max, otherwise use
530                                // current time
531                                time: op_id_max_time
532                                    .get(&op_key_v0.operation_id)
533                                    .copied()
534                                    .unwrap_or_else(fedimint_core::time::now),
535                            }
536                        }),
537                    );
538
539                    dbtx.remove_entry(&op_key_v0).await;
540                    dbtx.insert_entry(
541                        &OperationLogKey {
542                            operation_id: op_key_v0.operation_id,
543                        },
544                        &new_entry,
545                    )
546                    .await;
547                }
548
549                Ok(())
550            })
551        }),
552    );
553
554    // Fix #6948
555    migrations.insert(
556        DatabaseVersion(2),
557        Box::new(|mut ctx: fedimint_core::db::DbMigrationFnContext<'_, _>| {
558            Box::pin(async move {
559                let mut dbtx = ctx.dbtx();
560
561                // Migrate unordered keys that got written to ordered table
562                {
563                    let mut ordered_log_entries = dbtx
564                        .raw_find_by_prefix(&[DB_KEY_PREFIX_EVENT_LOG])
565                        .await
566                        .expect("DB operation failed");
567                    let mut keys_to_migrate = vec![];
568                    while let Some((k, _v)) = ordered_log_entries.next().await {
569                        trace!(target: LOG_CLIENT_DB,
570                            k=%k.as_hex(),
571                            "Checking ordered log key"
572                        );
573                        if EventLogId::consensus_decode_whole(&k[1..], &Default::default()).is_err()
574                        {
575                            assert!(
576                                UnordedEventLogId::consensus_decode_whole(
577                                    &k[1..],
578                                    &Default::default()
579                                )
580                                .is_ok()
581                            );
582                            keys_to_migrate.push(k);
583                        }
584                    }
585                    drop(ordered_log_entries);
586                    for mut key_to_migrate in keys_to_migrate {
587                        warn!(target: LOG_CLIENT_DB,
588                            k=%key_to_migrate.as_hex(),
589                            "Migrating unordered event log entry written to an ordered log"
590                        );
591                        let v = dbtx
592                            .raw_remove_entry(&key_to_migrate)
593                            .await
594                            .expect("DB operation failed")
595                            .expect("Was there a moment ago");
596                        assert_eq!(key_to_migrate[0], 0x39);
597                        key_to_migrate[0] = DB_KEY_PREFIX_UNORDERED_EVENT_LOG;
598                        assert_eq!(key_to_migrate[0], 0x3a);
599                        dbtx.raw_insert_bytes(&key_to_migrate, &v)
600                            .await
601                            .expect("DB operation failed");
602                    }
603                }
604
605                // Migrate ordered keys that got written to unordered table
606                {
607                    let mut unordered_log_entries = dbtx
608                        .raw_find_by_prefix(&[DB_KEY_PREFIX_UNORDERED_EVENT_LOG])
609                        .await
610                        .expect("DB operation failed");
611                    let mut keys_to_migrate = vec![];
612                    while let Some((k, _v)) = unordered_log_entries.next().await {
613                        trace!(target: LOG_CLIENT_DB,
614                            k=%k.as_hex(),
615                            "Checking unordered log key"
616                        );
617                        if UnordedEventLogId::consensus_decode_whole(&k[1..], &Default::default())
618                            .is_err()
619                        {
620                            assert!(
621                                EventLogId::consensus_decode_whole(&k[1..], &Default::default())
622                                    .is_ok()
623                            );
624                            keys_to_migrate.push(k);
625                        }
626                    }
627                    drop(unordered_log_entries);
628                    for mut key_to_migrate in keys_to_migrate {
629                        warn!(target: LOG_CLIENT_DB,
630                            k=%key_to_migrate.as_hex(),
631                            "Migrating ordered event log entry written to an unordered log"
632                        );
633                        let v = dbtx
634                            .raw_remove_entry(&key_to_migrate)
635                            .await
636                            .expect("DB operation failed")
637                            .expect("Was there a moment ago");
638                        assert_eq!(key_to_migrate[0], 0x3a);
639                        key_to_migrate[0] = DB_KEY_PREFIX_EVENT_LOG;
640                        assert_eq!(key_to_migrate[0], 0x39);
641                        dbtx.raw_insert_bytes(&key_to_migrate, &v)
642                            .await
643                            .expect("DB operation failed");
644                    }
645                }
646                Ok(())
647            })
648        }),
649    );
650
651    // Fix #7367
652    migrations.insert(
653        DatabaseVersion(3),
654        Box::new(|mut ctx: fedimint_core::db::DbMigrationFnContext<'_, _>| {
655            Box::pin(async move {
656                let mut dbtx = ctx.dbtx();
657
658                for module_id in 0..u16::MAX {
659                    let old_key = ClientModuleRecoveryIncorrectDoNotUse {
660                        module_instance_id: module_id,
661                    };
662                    let new_key = ClientModuleRecovery {
663                        module_instance_id: module_id,
664                    };
665                    let Some(value) = dbtx.get_value(&old_key).await else {
666                        debug!(target: LOG_CLIENT_DB, %module_id, "No more ClientModuleRecovery keys found for migartion");
667                        break;
668                    };
669
670                    debug!(target: LOG_CLIENT_DB, %module_id, "Migrating old ClientModuleRecovery key");
671                    dbtx.remove_entry(&old_key).await.expect("Is there.");
672                    assert!(dbtx.insert_entry(&new_key, &value).await.is_none());
673                }
674
675                Ok(())
676            })
677        }),
678    );
679    migrations
680}
681
682/// Apply core client database migrations
683///
684/// TODO: This should be private.
685pub async fn apply_migrations_core_client_dbtx(
686    dbtx: &mut DatabaseTransaction<'_>,
687    kind: String,
688) -> Result<(), anyhow::Error> {
689    apply_migrations_dbtx(
690        dbtx,
691        (),
692        kind,
693        get_core_client_database_migrations(),
694        None,
695        Some(DbKeyPrefix::UserData as u8),
696    )
697    .await
698}
699
700/// `apply_migrations_client` iterates from the on disk database version for the
701/// client module up to `target_db_version` and executes all of the migrations
702/// that exist in the migrations map, including state machine migrations.
703/// Each migration in the migrations map updates the database to have the
704/// correct on-disk data structures that the code is expecting. The entire
705/// process is atomic, (i.e migration from 0->1 and 1->2 happen atomically).
706/// This function is called before the module is initialized and as long as the
707/// correct migrations are supplied in the migrations map, the module
708/// will be able to read and write from the database successfully.
709pub async fn apply_migrations_client_module(
710    db: &Database,
711    kind: String,
712    migrations: BTreeMap<DatabaseVersion, ClientModuleMigrationFn>,
713    module_instance_id: ModuleInstanceId,
714) -> Result<(), anyhow::Error> {
715    let mut dbtx = db.begin_transaction().await;
716    apply_migrations_client_module_dbtx(
717        &mut dbtx.to_ref_nc(),
718        kind,
719        migrations,
720        module_instance_id,
721    )
722    .await?;
723    dbtx.commit_tx_result().await
724}
725
726pub async fn apply_migrations_client_module_dbtx(
727    dbtx: &mut DatabaseTransaction<'_>,
728    kind: String,
729    migrations: BTreeMap<DatabaseVersion, ClientModuleMigrationFn>,
730    module_instance_id: ModuleInstanceId,
731) -> Result<(), anyhow::Error> {
732    // Newly created databases will not have any data underneath the
733    // `MODULE_GLOBAL_PREFIX` since they have just been instantiated.
734    let is_new_db = dbtx
735        .raw_find_by_prefix(&[MODULE_GLOBAL_PREFIX])
736        .await?
737        .next()
738        .await
739        .is_none();
740
741    let target_version = get_current_database_version(&migrations);
742
743    // First write the database version to disk if it does not exist.
744    create_database_version_dbtx(
745        dbtx,
746        target_version,
747        Some(module_instance_id),
748        kind.clone(),
749        is_new_db,
750    )
751    .await?;
752
753    let current_version = dbtx
754        .get_value(&DatabaseVersionKey(module_instance_id))
755        .await;
756
757    let db_version = if let Some(mut current_version) = current_version {
758        if current_version == target_version {
759            trace!(
760                target: LOG_CLIENT_DB,
761                %current_version,
762                %target_version,
763                module_instance_id,
764                kind,
765                "Database version up to date"
766            );
767            return Ok(());
768        }
769
770        if target_version < current_version {
771            return Err(anyhow!(format!(
772                "On disk database version for module {kind} was higher ({}) than the target database version ({}).",
773                current_version, target_version,
774            )));
775        }
776
777        info!(
778            target: LOG_CLIENT_DB,
779            %current_version,
780            %target_version,
781            module_instance_id,
782            kind,
783            "Migrating client module database"
784        );
785        let mut active_states = get_active_states(&mut dbtx.to_ref_nc(), module_instance_id).await;
786        let mut inactive_states =
787            get_inactive_states(&mut dbtx.to_ref_nc(), module_instance_id).await;
788
789        while current_version < target_version {
790            let new_states = if let Some(migration) = migrations.get(&current_version) {
791                debug!(
792                     target: LOG_CLIENT_DB,
793                     module_instance_id,
794                     %kind,
795                     %current_version,
796                     %target_version,
797                     "Running module db migration");
798
799                migration(
800                    &mut dbtx
801                        .to_ref_with_prefix_module_id(module_instance_id)
802                        .0
803                        .into_nc(),
804                    active_states.clone(),
805                    inactive_states.clone(),
806                )
807                .await?
808            } else {
809                warn!(
810                    target: LOG_CLIENT_DB,
811                    ?current_version, "Missing client db migration");
812                None
813            };
814
815            // If the client migration returned new states, a state machine migration has
816            // occurred, and the new states need to be persisted to the database.
817            if let Some((new_active_states, new_inactive_states)) = new_states {
818                remove_old_and_persist_new_active_states(
819                    &mut dbtx.to_ref_nc(),
820                    new_active_states.clone(),
821                    active_states.clone(),
822                    module_instance_id,
823                )
824                .await;
825                remove_old_and_persist_new_inactive_states(
826                    &mut dbtx.to_ref_nc(),
827                    new_inactive_states.clone(),
828                    inactive_states.clone(),
829                    module_instance_id,
830                )
831                .await;
832
833                // the new states become the old states for the next migration
834                active_states = new_active_states;
835                inactive_states = new_inactive_states;
836            }
837
838            current_version = current_version.increment();
839            dbtx.insert_entry(&DatabaseVersionKey(module_instance_id), &current_version)
840                .await;
841        }
842
843        current_version
844    } else {
845        target_version
846    };
847
848    debug!(
849        target: LOG_CLIENT_DB,
850        ?kind, ?db_version, "Client DB Version");
851    Ok(())
852}
853
854/// Reads all active states from the database and returns `Vec<DynState>`.
855/// TODO: It is unfortunate that we can't read states by the module's instance
856/// id so we are forced to return all active states. Once we do a db migration
857/// to add `module_instance_id` to `ActiveStateKey`, this can be improved to
858/// only read the module's relevant states.
859pub async fn get_active_states(
860    dbtx: &mut DatabaseTransaction<'_>,
861    module_instance_id: ModuleInstanceId,
862) -> Vec<(Vec<u8>, OperationId)> {
863    dbtx.find_by_prefix(&ActiveStateKeyPrefixBytes)
864        .await
865        .filter_map(|(state, _)| async move {
866            if module_instance_id == state.module_instance_id {
867                Some((state.state, state.operation_id))
868            } else {
869                None
870            }
871        })
872        .collect::<Vec<_>>()
873        .await
874}
875
876/// Reads all inactive states from the database and returns `Vec<DynState>`.
877/// TODO: It is unfortunate that we can't read states by the module's instance
878/// id so we are forced to return all inactive states. Once we do a db migration
879/// to add `module_instance_id` to `InactiveStateKey`, this can be improved to
880/// only read the module's relevant states.
881pub async fn get_inactive_states(
882    dbtx: &mut DatabaseTransaction<'_>,
883    module_instance_id: ModuleInstanceId,
884) -> Vec<(Vec<u8>, OperationId)> {
885    dbtx.find_by_prefix(&InactiveStateKeyPrefixBytes)
886        .await
887        .filter_map(|(state, _)| async move {
888            if module_instance_id == state.module_instance_id {
889                Some((state.state, state.operation_id))
890            } else {
891                None
892            }
893        })
894        .collect::<Vec<_>>()
895        .await
896}
897
898/// Persists new active states by first removing all current active states, and
899/// re-writing with the new set of active states. `new_active_states` is
900/// expected to contain all active states, not just the newly created states.
901pub async fn remove_old_and_persist_new_active_states(
902    dbtx: &mut DatabaseTransaction<'_>,
903    new_active_states: Vec<(Vec<u8>, OperationId)>,
904    states_to_remove: Vec<(Vec<u8>, OperationId)>,
905    module_instance_id: ModuleInstanceId,
906) {
907    // Remove all existing active states
908    for (bytes, operation_id) in states_to_remove {
909        dbtx.remove_entry(&ActiveStateKeyBytes {
910            operation_id,
911            module_instance_id,
912            state: bytes,
913        })
914        .await
915        .expect("Did not delete anything");
916    }
917
918    // Insert new "migrated" active states
919    for (bytes, operation_id) in new_active_states {
920        dbtx.insert_new_entry(
921            &ActiveStateKeyBytes {
922                operation_id,
923                module_instance_id,
924                state: bytes,
925            },
926            &ActiveStateMeta::default(),
927        )
928        .await;
929    }
930}
931
932/// Persists new inactive states by first removing all current inactive states,
933/// and re-writing with the new set of inactive states. `new_inactive_states` is
934/// expected to contain all inactive states, not just the newly created states.
935pub async fn remove_old_and_persist_new_inactive_states(
936    dbtx: &mut DatabaseTransaction<'_>,
937    new_inactive_states: Vec<(Vec<u8>, OperationId)>,
938    states_to_remove: Vec<(Vec<u8>, OperationId)>,
939    module_instance_id: ModuleInstanceId,
940) {
941    // Remove all existing active states
942    for (bytes, operation_id) in states_to_remove {
943        dbtx.remove_entry(&InactiveStateKeyBytes {
944            operation_id,
945            module_instance_id,
946            state: bytes,
947        })
948        .await
949        .expect("Did not delete anything");
950    }
951
952    // Insert new "migrated" inactive states
953    for (bytes, operation_id) in new_inactive_states {
954        dbtx.insert_new_entry(
955            &InactiveStateKeyBytes {
956                operation_id,
957                module_instance_id,
958                state: bytes,
959            },
960            &InactiveStateMeta {
961                created_at: fedimint_core::time::now(),
962                exited_at: fedimint_core::time::now(),
963            },
964        )
965        .await;
966    }
967}
968
969/// Fetches the encoded client secret from the database and decodes it.
970/// If an encoded client secret is not present in the database, or if
971/// decoding fails, an error is returned.
972pub async fn get_decoded_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
973    let mut tx = db.begin_transaction_nc().await;
974    let client_secret = tx.get_value(&EncodedClientSecretKey).await;
975
976    match client_secret {
977        Some(client_secret) => {
978            T::consensus_decode_whole(&client_secret, &ModuleRegistry::default())
979                .map_err(|e| anyhow!("Decoding failed: {e}"))
980        }
981        None => bail!("Encoded client secret not present in DB"),
982    }
983}
984
985/// V0 version of operation log entry for migration purposes
986#[derive(Debug, Serialize, Deserialize, Encodable, Decodable)]
987pub struct OperationLogEntryV0 {
988    pub(crate) operation_module_kind: String,
989    pub(crate) meta: JsonStringed,
990    pub(crate) outcome: Option<JsonStringed>,
991}