fedimint_client/
db.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::time::SystemTime;
3
4use anyhow::{anyhow, bail};
5use bitcoin::hex::DisplayHex as _;
6use fedimint_api_client::api::ApiVersionSet;
7use fedimint_client_module::db::ClientModuleMigrationFn;
8use fedimint_client_module::module::recovery::RecoveryProgress;
9use fedimint_client_module::oplog::{JsonStringed, OperationLogEntry, OperationOutcome};
10use fedimint_client_module::sm::{ActiveStateMeta, InactiveStateMeta};
11use fedimint_core::config::{ClientConfig, ClientConfigV0, FederationId, GlobalClientConfig};
12use fedimint_core::core::{ModuleInstanceId, OperationId};
13use fedimint_core::db::{
14    Database, DatabaseTransaction, DatabaseVersion, DatabaseVersionKey,
15    IDatabaseTransactionOpsCore, IDatabaseTransactionOpsCoreTyped, MODULE_GLOBAL_PREFIX,
16    apply_migrations_dbtx, create_database_version_dbtx, get_current_database_version,
17};
18use fedimint_core::encoding::{Decodable, Encodable};
19use fedimint_core::module::SupportedApiVersionsSummary;
20use fedimint_core::module::registry::ModuleRegistry;
21use fedimint_core::{PeerId, impl_db_lookup, impl_db_record};
22use fedimint_eventlog::{
23    DB_KEY_PREFIX_EVENT_LOG, DB_KEY_PREFIX_UNORDERED_EVENT_LOG, EventLogId, UnordedEventLogId,
24};
25use fedimint_logging::LOG_CLIENT_DB;
26use futures::StreamExt;
27use serde::{Deserialize, Serialize};
28use strum::IntoEnumIterator as _;
29use strum_macros::EnumIter;
30use tracing::{debug, info, trace, warn};
31
32use crate::backup::{ClientBackup, Metadata};
33use crate::sm::executor::{
34    ActiveStateKeyBytes, ActiveStateKeyPrefixBytes, ExecutorDbPrefixes, InactiveStateKeyBytes,
35    InactiveStateKeyPrefixBytes,
36};
37
38#[repr(u8)]
39#[derive(Clone, EnumIter, Debug)]
40pub enum DbKeyPrefix {
41    EncodedClientSecret = 0x28,
42    ClientSecret = 0x29, // Unused
43    ClientPreRootSecretHash = 0x2a,
44    OperationLog = 0x2c,
45    ChronologicalOperationLog = 0x2d,
46    CommonApiVersionCache = 0x2e,
47    ClientConfig = 0x2f,
48    ClientInviteCode = 0x30, // Unused; clean out remnant data before re-using!
49    ClientInitState = 0x31,
50    ClientMetadata = 0x32,
51    ClientLastBackup = 0x33,
52    ClientMetaField = 0x34,
53    ClientMetaServiceInfo = 0x35,
54    ApiSecret = 0x36,
55    PeerLastApiVersionsSummaryCache = 0x37,
56    ApiUrlAnnouncement = 0x38,
57    EventLog = fedimint_eventlog::DB_KEY_PREFIX_EVENT_LOG,
58    UnorderedEventLog = fedimint_eventlog::DB_KEY_PREFIX_UNORDERED_EVENT_LOG,
59    ClientModuleRecovery = 0x40,
60
61    DatabaseVersion = fedimint_core::db::DbKeyPrefix::DatabaseVersion as u8,
62    ClientBackup = fedimint_core::db::DbKeyPrefix::ClientBackup as u8,
63
64    ActiveStates = ExecutorDbPrefixes::ActiveStates as u8,
65    InactiveStates = ExecutorDbPrefixes::InactiveStates as u8,
66
67    /// Arbitrary data of the applications integrating Fedimint client and
68    /// wanting to store some Federation-specific data in Fedimint client
69    /// database.
70    ///
71    /// New users are encouraged to use this single prefix only.
72    //
73    // TODO: https://github.com/fedimint/fedimint/issues/4444
74    //       in the future, we should make all global access to the db private
75    //       and only expose a getter returning isolated database.
76    UserData = 0xb0,
77    /// Prefixes between 0xb1..=0xcf shall all be considered allocated for
78    /// historical and future external use
79    ExternalReservedStart = 0xb1,
80    /// Prefixes between 0xb1..=0xcf shall all be considered allocated for
81    /// historical and future external use
82    ExternalReservedEnd = 0xcf,
83    /// 0xd0.. reserved for Fedimint internal use
84    InternalReservedStart = 0xd0,
85    /// Per-module instance data
86    ModuleGlobalPrefix = 0xff,
87}
88
89pub(crate) async fn verify_client_db_integrity_dbtx(dbtx: &mut DatabaseTransaction<'_>) {
90    let prefixes: BTreeSet<u8> = DbKeyPrefix::iter().map(|prefix| prefix as u8).collect();
91
92    let mut records = dbtx.raw_find_by_prefix(&[]).await.expect("DB fail");
93    while let Some((k, v)) = records.next().await {
94        // from here and above, we don't want to spend time verifying it
95        if DbKeyPrefix::UserData as u8 <= k[0] {
96            break;
97        }
98
99        assert!(
100            prefixes.contains(&k[0]),
101            "Unexpected client db record found: {}: {}",
102            k.as_hex(),
103            v.as_hex()
104        );
105    }
106}
107
108impl std::fmt::Display for DbKeyPrefix {
109    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
110        write!(f, "{self:?}")
111    }
112}
113
114#[derive(Debug, Encodable, Decodable)]
115pub struct EncodedClientSecretKey;
116
117#[derive(Debug, Encodable, Decodable)]
118pub struct EncodedClientSecretKeyPrefix;
119
120impl_db_record!(
121    key = EncodedClientSecretKey,
122    value = Vec<u8>,
123    db_prefix = DbKeyPrefix::EncodedClientSecret,
124);
125impl_db_lookup!(
126    key = EncodedClientSecretKey,
127    query_prefix = EncodedClientSecretKeyPrefix
128);
129
130#[derive(Debug, Encodable, Decodable, Serialize)]
131pub struct OperationLogKey {
132    pub operation_id: OperationId,
133}
134
135impl_db_record!(
136    key = OperationLogKey,
137    value = OperationLogEntry,
138    db_prefix = DbKeyPrefix::OperationLog
139);
140
141#[derive(Debug, Encodable)]
142pub struct OperationLogKeyPrefix;
143
144impl_db_lookup!(key = OperationLogKey, query_prefix = OperationLogKeyPrefix);
145
146#[derive(Debug, Encodable, Decodable, Serialize)]
147pub struct OperationLogKeyV0 {
148    pub operation_id: OperationId,
149}
150
151#[derive(Debug, Encodable)]
152pub struct OperationLogKeyPrefixV0;
153
154impl_db_record!(
155    key = OperationLogKeyV0,
156    value = OperationLogEntryV0,
157    db_prefix = DbKeyPrefix::OperationLog
158);
159
160impl_db_lookup!(
161    key = OperationLogKeyV0,
162    query_prefix = OperationLogKeyPrefixV0
163);
164
165#[derive(Debug, Encodable, Decodable, Serialize)]
166pub struct ClientPreRootSecretHashKey;
167
168impl_db_record!(
169    key = ClientPreRootSecretHashKey,
170    value = [u8; 8],
171    db_prefix = DbKeyPrefix::ClientPreRootSecretHash
172);
173
174/// Key used to lookup operation log entries in chronological order
175#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq, Encodable, Decodable, Serialize)]
176pub struct ChronologicalOperationLogKey {
177    pub creation_time: std::time::SystemTime,
178    pub operation_id: OperationId,
179}
180
181#[derive(Debug, Encodable)]
182pub struct ChronologicalOperationLogKeyPrefix;
183
184impl_db_record!(
185    key = ChronologicalOperationLogKey,
186    value = (),
187    db_prefix = DbKeyPrefix::ChronologicalOperationLog
188);
189
190impl_db_lookup!(
191    key = ChronologicalOperationLogKey,
192    query_prefix = ChronologicalOperationLogKeyPrefix
193);
194
195#[derive(Debug, Encodable, Decodable)]
196pub struct CachedApiVersionSetKey;
197
198#[derive(Debug, Encodable, Decodable)]
199pub struct CachedApiVersionSet(pub ApiVersionSet);
200
201impl_db_record!(
202    key = CachedApiVersionSetKey,
203    value = CachedApiVersionSet,
204    db_prefix = DbKeyPrefix::CommonApiVersionCache
205);
206
207#[derive(Debug, Encodable, Decodable)]
208pub struct PeerLastApiVersionsSummaryKey(pub PeerId);
209
210#[derive(Debug, Encodable, Decodable)]
211pub struct PeerLastApiVersionsSummary(pub SupportedApiVersionsSummary);
212
213impl_db_record!(
214    key = PeerLastApiVersionsSummaryKey,
215    value = PeerLastApiVersionsSummary,
216    db_prefix = DbKeyPrefix::PeerLastApiVersionsSummaryCache
217);
218
219#[derive(Debug, Encodable, Decodable, Serialize)]
220pub struct ClientConfigKey;
221
222impl_db_record!(
223    key = ClientConfigKey,
224    value = ClientConfig,
225    db_prefix = DbKeyPrefix::ClientConfig
226);
227
228#[derive(Debug, Encodable, Decodable, Serialize)]
229pub struct ClientConfigKeyV0 {
230    pub id: FederationId,
231}
232
233#[derive(Debug, Encodable)]
234pub struct ClientConfigKeyPrefixV0;
235
236impl_db_record!(
237    key = ClientConfigKeyV0,
238    value = ClientConfigV0,
239    db_prefix = DbKeyPrefix::ClientConfig
240);
241
242impl_db_lookup!(
243    key = ClientConfigKeyV0,
244    query_prefix = ClientConfigKeyPrefixV0
245);
246
247#[derive(Debug, Encodable, Decodable, Serialize)]
248pub struct ApiSecretKey;
249
250#[derive(Debug, Encodable)]
251pub struct ApiSecretKeyPrefix;
252
253impl_db_record!(
254    key = ApiSecretKey,
255    value = String,
256    db_prefix = DbKeyPrefix::ApiSecret
257);
258
259impl_db_lookup!(key = ApiSecretKey, query_prefix = ApiSecretKeyPrefix);
260
261/// Client metadata that will be stored/restored on backup&recovery
262#[derive(Debug, Encodable, Decodable, Serialize)]
263pub struct ClientMetadataKey;
264
265#[derive(Debug, Encodable)]
266pub struct ClientMetadataPrefix;
267
268impl_db_record!(
269    key = ClientMetadataKey,
270    value = Metadata,
271    db_prefix = DbKeyPrefix::ClientMetadata
272);
273
274impl_db_lookup!(key = ClientMetadataKey, query_prefix = ClientMetadataPrefix);
275
276/// Does the client modules need to run recovery before being usable?
277#[derive(Debug, Encodable, Decodable, Serialize)]
278pub struct ClientInitStateKey;
279
280#[derive(Debug, Encodable)]
281pub struct ClientInitStatePrefix;
282
283/// Client initialization mode
284#[derive(Debug, Encodable, Decodable)]
285pub enum InitMode {
286    /// Should only be used with freshly generated root secret
287    Fresh,
288    /// Should be used with root secrets provided by the user to recover a
289    /// (even if just possibly) already used secret.
290    Recover { snapshot: Option<ClientBackup> },
291}
292
293/// Like `InitMode`, but without no longer required data.
294///
295/// This is distinct from `InitMode` to prevent holding on to `snapshot`
296/// forever both for user's privacy and space use. In case user get hacked
297/// or phone gets stolen.
298#[derive(Debug, Encodable, Decodable)]
299pub enum InitModeComplete {
300    Fresh,
301    Recover,
302}
303
304/// The state of the client initialization
305#[derive(Debug, Encodable, Decodable)]
306pub enum InitState {
307    /// Client data initialization might still require some work (e.g. client
308    /// recovery)
309    Pending(InitMode),
310    /// Client initialization was complete
311    Complete(InitModeComplete),
312}
313
314impl InitState {
315    pub fn into_complete(self) -> Self {
316        match self {
317            InitState::Pending(p) => InitState::Complete(match p {
318                InitMode::Fresh => InitModeComplete::Fresh,
319                InitMode::Recover { .. } => InitModeComplete::Recover,
320            }),
321            InitState::Complete(t) => InitState::Complete(t),
322        }
323    }
324
325    pub fn does_require_recovery(&self) -> Option<Option<ClientBackup>> {
326        match self {
327            InitState::Pending(p) => match p {
328                InitMode::Fresh => None,
329                InitMode::Recover { snapshot } => Some(snapshot.clone()),
330            },
331            InitState::Complete(_) => None,
332        }
333    }
334
335    pub fn is_pending(&self) -> bool {
336        match self {
337            InitState::Pending(_) => true,
338            InitState::Complete(_) => false,
339        }
340    }
341}
342
343impl_db_record!(
344    key = ClientInitStateKey,
345    value = InitState,
346    db_prefix = DbKeyPrefix::ClientInitState
347);
348
349impl_db_lookup!(
350    key = ClientInitStateKey,
351    query_prefix = ClientInitStatePrefix
352);
353
354#[derive(Debug, Encodable, Decodable, Serialize)]
355pub struct ClientModuleRecovery {
356    pub module_instance_id: ModuleInstanceId,
357}
358
359#[derive(Debug, Clone, Encodable, Decodable)]
360pub struct ClientModuleRecoveryState {
361    pub progress: RecoveryProgress,
362}
363
364impl ClientModuleRecoveryState {
365    pub fn is_done(&self) -> bool {
366        self.progress.is_done()
367    }
368}
369
370impl_db_record!(
371    key = ClientModuleRecovery,
372    value = ClientModuleRecoveryState,
373    db_prefix = DbKeyPrefix::ClientModuleRecovery,
374);
375
376/// Old (incorrect) version of the [`ClientModuleRecoveryState`]
377/// that used the wrong prefix.
378///
379/// See <https://github.com/fedimint/fedimint/issues/7367>.
380///
381/// Used only for the migration.
382#[derive(Debug, Encodable, Decodable, Serialize)]
383pub struct ClientModuleRecoveryIncorrectDoNotUse {
384    pub module_instance_id: ModuleInstanceId,
385}
386
387impl_db_record!(
388    key = ClientModuleRecoveryIncorrectDoNotUse,
389    value = ClientModuleRecoveryState,
390    // This was wrong and we keep it wrong for a migration.
391    db_prefix = DbKeyPrefix::ClientInitState,
392);
393
394/// Last valid backup the client attempted to make
395///
396/// Can be used to find previous valid versions of
397/// module backup.
398#[derive(Debug, Encodable, Decodable)]
399pub struct LastBackupKey;
400
401impl_db_record!(
402    key = LastBackupKey,
403    value = ClientBackup,
404    db_prefix = DbKeyPrefix::ClientLastBackup
405);
406
407#[derive(Encodable, Decodable, Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
408pub(crate) struct MetaFieldPrefix;
409
410#[derive(Encodable, Decodable, Debug)]
411pub struct MetaServiceInfoKey;
412
413#[derive(Encodable, Decodable, Debug)]
414pub struct MetaServiceInfo {
415    pub last_updated: SystemTime,
416    pub revision: u64,
417}
418
419#[derive(
420    Encodable, Decodable, Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Serialize, Deserialize,
421)]
422pub(crate) struct MetaFieldKey(pub fedimint_client_module::meta::MetaFieldKey);
423
424#[derive(Encodable, Decodable, Debug, Clone, Serialize, Deserialize)]
425pub(crate) struct MetaFieldValue(pub fedimint_client_module::meta::MetaFieldValue);
426
427impl_db_record!(
428    key = MetaFieldKey,
429    value = MetaFieldValue,
430    db_prefix = DbKeyPrefix::ClientMetaField
431);
432
433impl_db_record!(
434    key = MetaServiceInfoKey,
435    value = MetaServiceInfo,
436    db_prefix = DbKeyPrefix::ClientMetaServiceInfo
437);
438
439impl_db_lookup!(key = MetaFieldKey, query_prefix = MetaFieldPrefix);
440
441pub fn get_core_client_database_migrations()
442-> BTreeMap<DatabaseVersion, fedimint_core::db::ClientCoreDbMigrationFn> {
443    let mut migrations: BTreeMap<DatabaseVersion, fedimint_core::db::ClientCoreDbMigrationFn> =
444        BTreeMap::new();
445    migrations.insert(
446        DatabaseVersion(0),
447        Box::new(|mut ctx| {
448            Box::pin(async move {
449                let mut dbtx = ctx.dbtx();
450
451                let config_v0 = dbtx
452                    .find_by_prefix(&ClientConfigKeyPrefixV0)
453                    .await
454                    .collect::<Vec<_>>()
455                    .await;
456
457                assert!(config_v0.len() <= 1);
458                let Some((id, config_v0)) = config_v0.into_iter().next() else {
459                    return Ok(());
460                };
461
462                let global = GlobalClientConfig {
463                    api_endpoints: config_v0.global.api_endpoints,
464                    broadcast_public_keys: None,
465                    consensus_version: config_v0.global.consensus_version,
466                    meta: config_v0.global.meta,
467                };
468
469                let config = ClientConfig {
470                    global,
471                    modules: config_v0.modules,
472                };
473
474                dbtx.remove_entry(&id).await;
475                dbtx.insert_new_entry(&ClientConfigKey, &config).await;
476                Ok(())
477            })
478        }),
479    );
480
481    // Migration to add outcome_time to OperationLogEntry
482    migrations.insert(
483        DatabaseVersion(1),
484        Box::new(|mut ctx| {
485            Box::pin(async move {
486                let mut dbtx = ctx.dbtx();
487
488                // Read all OperationLogEntries using V0 format
489                let operation_logs = dbtx
490                    .find_by_prefix(&OperationLogKeyPrefixV0)
491                    .await
492                    .collect::<Vec<_>>()
493                    .await;
494
495                // Build a map from operation_id -> max_time of inactive state
496                let mut op_id_max_time = BTreeMap::new();
497
498                // Process inactive states
499                {
500                    let mut inactive_states_stream =
501                        dbtx.find_by_prefix(&InactiveStateKeyPrefixBytes).await;
502
503                    while let Some((state, meta)) = inactive_states_stream.next().await {
504                        let entry = op_id_max_time
505                            .entry(state.operation_id)
506                            .or_insert(meta.exited_at);
507                        *entry = (*entry).max(meta.exited_at);
508                    }
509                }
510                // Migrate each V0 operation log entry to the new format
511                for (op_key_v0, log_entry_v0) in operation_logs {
512                    let new_entry = OperationLogEntry::new(
513                        log_entry_v0.operation_module_kind,
514                        log_entry_v0.meta,
515                        log_entry_v0.outcome.map(|outcome| {
516                            OperationOutcome {
517                                outcome,
518                                // If we found state times, use the max, otherwise use
519                                // current time
520                                time: op_id_max_time
521                                    .get(&op_key_v0.operation_id)
522                                    .copied()
523                                    .unwrap_or_else(fedimint_core::time::now),
524                            }
525                        }),
526                    );
527
528                    dbtx.remove_entry(&op_key_v0).await;
529                    dbtx.insert_entry(
530                        &OperationLogKey {
531                            operation_id: op_key_v0.operation_id,
532                        },
533                        &new_entry,
534                    )
535                    .await;
536                }
537
538                Ok(())
539            })
540        }),
541    );
542
543    // Fix #6948
544    migrations.insert(
545        DatabaseVersion(2),
546        Box::new(|mut ctx: fedimint_core::db::DbMigrationFnContext<'_, _>| {
547            Box::pin(async move {
548                let mut dbtx = ctx.dbtx();
549
550                // Migrate unordered keys that got written to ordered table
551                {
552                    let mut ordered_log_entries = dbtx
553                        .raw_find_by_prefix(&[DB_KEY_PREFIX_EVENT_LOG])
554                        .await
555                        .expect("DB operation failed");
556                    let mut keys_to_migrate = vec![];
557                    while let Some((k, _v)) = ordered_log_entries.next().await {
558                        trace!(target: LOG_CLIENT_DB,
559                            k=%k.as_hex(),
560                            "Checking ordered log key"
561                        );
562                        if EventLogId::consensus_decode_whole(&k[1..], &Default::default()).is_err()
563                        {
564                            assert!(
565                                UnordedEventLogId::consensus_decode_whole(
566                                    &k[1..],
567                                    &Default::default()
568                                )
569                                .is_ok()
570                            );
571                            keys_to_migrate.push(k);
572                        }
573                    }
574                    drop(ordered_log_entries);
575                    for mut key_to_migrate in keys_to_migrate {
576                        warn!(target: LOG_CLIENT_DB,
577                            k=%key_to_migrate.as_hex(),
578                            "Migrating unordered event log entry written to an ordered log"
579                        );
580                        let v = dbtx
581                            .raw_remove_entry(&key_to_migrate)
582                            .await
583                            .expect("DB operation failed")
584                            .expect("Was there a moment ago");
585                        assert_eq!(key_to_migrate[0], 0x39);
586                        key_to_migrate[0] = DB_KEY_PREFIX_UNORDERED_EVENT_LOG;
587                        assert_eq!(key_to_migrate[0], 0x3a);
588                        dbtx.raw_insert_bytes(&key_to_migrate, &v)
589                            .await
590                            .expect("DB operation failed");
591                    }
592                }
593
594                // Migrate ordered keys that got written to unordered table
595                {
596                    let mut unordered_log_entries = dbtx
597                        .raw_find_by_prefix(&[DB_KEY_PREFIX_UNORDERED_EVENT_LOG])
598                        .await
599                        .expect("DB operation failed");
600                    let mut keys_to_migrate = vec![];
601                    while let Some((k, _v)) = unordered_log_entries.next().await {
602                        trace!(target: LOG_CLIENT_DB,
603                            k=%k.as_hex(),
604                            "Checking unordered log key"
605                        );
606                        if UnordedEventLogId::consensus_decode_whole(&k[1..], &Default::default())
607                            .is_err()
608                        {
609                            assert!(
610                                EventLogId::consensus_decode_whole(&k[1..], &Default::default())
611                                    .is_ok()
612                            );
613                            keys_to_migrate.push(k);
614                        }
615                    }
616                    drop(unordered_log_entries);
617                    for mut key_to_migrate in keys_to_migrate {
618                        warn!(target: LOG_CLIENT_DB,
619                            k=%key_to_migrate.as_hex(),
620                            "Migrating ordered event log entry written to an unordered log"
621                        );
622                        let v = dbtx
623                            .raw_remove_entry(&key_to_migrate)
624                            .await
625                            .expect("DB operation failed")
626                            .expect("Was there a moment ago");
627                        assert_eq!(key_to_migrate[0], 0x3a);
628                        key_to_migrate[0] = DB_KEY_PREFIX_EVENT_LOG;
629                        assert_eq!(key_to_migrate[0], 0x39);
630                        dbtx.raw_insert_bytes(&key_to_migrate, &v)
631                            .await
632                            .expect("DB operation failed");
633                    }
634                }
635                Ok(())
636            })
637        }),
638    );
639
640    // Fix #7367
641    migrations.insert(
642        DatabaseVersion(3),
643        Box::new(|mut ctx: fedimint_core::db::DbMigrationFnContext<'_, _>| {
644            Box::pin(async move {
645                let mut dbtx = ctx.dbtx();
646
647                for module_id in 0..u16::MAX {
648                    let old_key = ClientModuleRecoveryIncorrectDoNotUse {
649                        module_instance_id: module_id,
650                    };
651                    let new_key = ClientModuleRecovery {
652                        module_instance_id: module_id,
653                    };
654                    let Some(value) = dbtx.get_value(&old_key).await else {
655                        debug!(target: LOG_CLIENT_DB, %module_id, "No more ClientModuleRecovery keys found for migartion");
656                        break;
657                    };
658
659                    debug!(target: LOG_CLIENT_DB, %module_id, "Migrating old ClientModuleRecovery key");
660                    dbtx.remove_entry(&old_key).await.expect("Is there.");
661                    assert!(dbtx.insert_entry(&new_key, &value).await.is_none());
662                }
663
664                Ok(())
665            })
666        }),
667    );
668    migrations
669}
670
671/// Apply core client database migrations
672///
673/// TODO: This should be private.
674pub async fn apply_migrations_core_client_dbtx(
675    dbtx: &mut DatabaseTransaction<'_>,
676    kind: String,
677) -> Result<(), anyhow::Error> {
678    apply_migrations_dbtx(
679        dbtx,
680        (),
681        kind,
682        get_core_client_database_migrations(),
683        None,
684        Some(DbKeyPrefix::UserData as u8),
685    )
686    .await
687}
688
689/// `apply_migrations_client` iterates from the on disk database version for the
690/// client module up to `target_db_version` and executes all of the migrations
691/// that exist in the migrations map, including state machine migrations.
692/// Each migration in the migrations map updates the database to have the
693/// correct on-disk data structures that the code is expecting. The entire
694/// process is atomic, (i.e migration from 0->1 and 1->2 happen atomically).
695/// This function is called before the module is initialized and as long as the
696/// correct migrations are supplied in the migrations map, the module
697/// will be able to read and write from the database successfully.
698pub async fn apply_migrations_client_module(
699    db: &Database,
700    kind: String,
701    migrations: BTreeMap<DatabaseVersion, ClientModuleMigrationFn>,
702    module_instance_id: ModuleInstanceId,
703) -> Result<(), anyhow::Error> {
704    let mut dbtx = db.begin_transaction().await;
705    apply_migrations_client_module_dbtx(
706        &mut dbtx.to_ref_nc(),
707        kind,
708        migrations,
709        module_instance_id,
710    )
711    .await?;
712    dbtx.commit_tx_result().await
713}
714
715pub async fn apply_migrations_client_module_dbtx(
716    dbtx: &mut DatabaseTransaction<'_>,
717    kind: String,
718    migrations: BTreeMap<DatabaseVersion, ClientModuleMigrationFn>,
719    module_instance_id: ModuleInstanceId,
720) -> Result<(), anyhow::Error> {
721    // Newly created databases will not have any data underneath the
722    // `MODULE_GLOBAL_PREFIX` since they have just been instantiated.
723    let is_new_db = dbtx
724        .raw_find_by_prefix(&[MODULE_GLOBAL_PREFIX])
725        .await?
726        .next()
727        .await
728        .is_none();
729
730    let target_version = get_current_database_version(&migrations);
731
732    // First write the database version to disk if it does not exist.
733    create_database_version_dbtx(
734        dbtx,
735        target_version,
736        Some(module_instance_id),
737        kind.clone(),
738        is_new_db,
739    )
740    .await?;
741
742    let current_version = dbtx
743        .get_value(&DatabaseVersionKey(module_instance_id))
744        .await;
745
746    let db_version = if let Some(mut current_version) = current_version {
747        if current_version == target_version {
748            trace!(
749                target: LOG_CLIENT_DB,
750                %current_version,
751                %target_version,
752                module_instance_id,
753                kind,
754                "Database version up to date"
755            );
756            return Ok(());
757        }
758
759        if target_version < current_version {
760            return Err(anyhow!(format!(
761                "On disk database version for module {kind} was higher ({}) than the target database version ({}).",
762                current_version, target_version,
763            )));
764        }
765
766        info!(
767            target: LOG_CLIENT_DB,
768            %current_version,
769            %target_version,
770            module_instance_id,
771            kind,
772            "Migrating client module database"
773        );
774        let mut active_states = get_active_states(&mut dbtx.to_ref_nc(), module_instance_id).await;
775        let mut inactive_states =
776            get_inactive_states(&mut dbtx.to_ref_nc(), module_instance_id).await;
777
778        while current_version < target_version {
779            let new_states = if let Some(migration) = migrations.get(&current_version) {
780                debug!(
781                     target: LOG_CLIENT_DB,
782                     module_instance_id,
783                     %kind,
784                     %current_version,
785                     %target_version,
786                     "Running module db migration");
787
788                migration(
789                    &mut dbtx
790                        .to_ref_with_prefix_module_id(module_instance_id)
791                        .0
792                        .into_nc(),
793                    active_states.clone(),
794                    inactive_states.clone(),
795                )
796                .await?
797            } else {
798                warn!(
799                    target: LOG_CLIENT_DB,
800                    ?current_version, "Missing client db migration");
801                None
802            };
803
804            // If the client migration returned new states, a state machine migration has
805            // occurred, and the new states need to be persisted to the database.
806            if let Some((new_active_states, new_inactive_states)) = new_states {
807                remove_old_and_persist_new_active_states(
808                    &mut dbtx.to_ref_nc(),
809                    new_active_states.clone(),
810                    active_states.clone(),
811                    module_instance_id,
812                )
813                .await;
814                remove_old_and_persist_new_inactive_states(
815                    &mut dbtx.to_ref_nc(),
816                    new_inactive_states.clone(),
817                    inactive_states.clone(),
818                    module_instance_id,
819                )
820                .await;
821
822                // the new states become the old states for the next migration
823                active_states = new_active_states;
824                inactive_states = new_inactive_states;
825            }
826
827            current_version = current_version.increment();
828            dbtx.insert_entry(&DatabaseVersionKey(module_instance_id), &current_version)
829                .await;
830        }
831
832        current_version
833    } else {
834        target_version
835    };
836
837    debug!(
838        target: LOG_CLIENT_DB,
839        ?kind, ?db_version, "Client DB Version");
840    Ok(())
841}
842
843/// Reads all active states from the database and returns `Vec<DynState>`.
844/// TODO: It is unfortunate that we can't read states by the module's instance
845/// id so we are forced to return all active states. Once we do a db migration
846/// to add `module_instance_id` to `ActiveStateKey`, this can be improved to
847/// only read the module's relevant states.
848pub async fn get_active_states(
849    dbtx: &mut DatabaseTransaction<'_>,
850    module_instance_id: ModuleInstanceId,
851) -> Vec<(Vec<u8>, OperationId)> {
852    dbtx.find_by_prefix(&ActiveStateKeyPrefixBytes)
853        .await
854        .filter_map(|(state, _)| async move {
855            if module_instance_id == state.module_instance_id {
856                Some((state.state, state.operation_id))
857            } else {
858                None
859            }
860        })
861        .collect::<Vec<_>>()
862        .await
863}
864
865/// Reads all inactive states from the database and returns `Vec<DynState>`.
866/// TODO: It is unfortunate that we can't read states by the module's instance
867/// id so we are forced to return all inactive states. Once we do a db migration
868/// to add `module_instance_id` to `InactiveStateKey`, this can be improved to
869/// only read the module's relevant states.
870pub async fn get_inactive_states(
871    dbtx: &mut DatabaseTransaction<'_>,
872    module_instance_id: ModuleInstanceId,
873) -> Vec<(Vec<u8>, OperationId)> {
874    dbtx.find_by_prefix(&InactiveStateKeyPrefixBytes)
875        .await
876        .filter_map(|(state, _)| async move {
877            if module_instance_id == state.module_instance_id {
878                Some((state.state, state.operation_id))
879            } else {
880                None
881            }
882        })
883        .collect::<Vec<_>>()
884        .await
885}
886
887/// Persists new active states by first removing all current active states, and
888/// re-writing with the new set of active states. `new_active_states` is
889/// expected to contain all active states, not just the newly created states.
890pub async fn remove_old_and_persist_new_active_states(
891    dbtx: &mut DatabaseTransaction<'_>,
892    new_active_states: Vec<(Vec<u8>, OperationId)>,
893    states_to_remove: Vec<(Vec<u8>, OperationId)>,
894    module_instance_id: ModuleInstanceId,
895) {
896    // Remove all existing active states
897    for (bytes, operation_id) in states_to_remove {
898        dbtx.remove_entry(&ActiveStateKeyBytes {
899            operation_id,
900            module_instance_id,
901            state: bytes,
902        })
903        .await
904        .expect("Did not delete anything");
905    }
906
907    // Insert new "migrated" active states
908    for (bytes, operation_id) in new_active_states {
909        dbtx.insert_new_entry(
910            &ActiveStateKeyBytes {
911                operation_id,
912                module_instance_id,
913                state: bytes,
914            },
915            &ActiveStateMeta::default(),
916        )
917        .await;
918    }
919}
920
921/// Persists new inactive states by first removing all current inactive states,
922/// and re-writing with the new set of inactive states. `new_inactive_states` is
923/// expected to contain all inactive states, not just the newly created states.
924pub async fn remove_old_and_persist_new_inactive_states(
925    dbtx: &mut DatabaseTransaction<'_>,
926    new_inactive_states: Vec<(Vec<u8>, OperationId)>,
927    states_to_remove: Vec<(Vec<u8>, OperationId)>,
928    module_instance_id: ModuleInstanceId,
929) {
930    // Remove all existing active states
931    for (bytes, operation_id) in states_to_remove {
932        dbtx.remove_entry(&InactiveStateKeyBytes {
933            operation_id,
934            module_instance_id,
935            state: bytes,
936        })
937        .await
938        .expect("Did not delete anything");
939    }
940
941    // Insert new "migrated" inactive states
942    for (bytes, operation_id) in new_inactive_states {
943        dbtx.insert_new_entry(
944            &InactiveStateKeyBytes {
945                operation_id,
946                module_instance_id,
947                state: bytes,
948            },
949            &InactiveStateMeta {
950                created_at: fedimint_core::time::now(),
951                exited_at: fedimint_core::time::now(),
952            },
953        )
954        .await;
955    }
956}
957
958/// Fetches the encoded client secret from the database and decodes it.
959/// If an encoded client secret is not present in the database, or if
960/// decoding fails, an error is returned.
961pub async fn get_decoded_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
962    let mut tx = db.begin_transaction_nc().await;
963    let client_secret = tx.get_value(&EncodedClientSecretKey).await;
964
965    match client_secret {
966        Some(client_secret) => {
967            T::consensus_decode_whole(&client_secret, &ModuleRegistry::default())
968                .map_err(|e| anyhow!("Decoding failed: {e}"))
969        }
970        None => bail!("Encoded client secret not present in DB"),
971    }
972}
973
974/// V0 version of operation log entry for migration purposes
975#[derive(Debug, Serialize, Deserialize, Encodable, Decodable)]
976pub struct OperationLogEntryV0 {
977    pub(crate) operation_module_kind: String,
978    pub(crate) meta: JsonStringed,
979    pub(crate) outcome: Option<JsonStringed>,
980}