fedimint_client/
db.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::time::SystemTime;
3
4use anyhow::{anyhow, bail};
5use bitcoin::hex::DisplayHex as _;
6use fedimint_api_client::api::ApiVersionSet;
7use fedimint_client_module::db::ClientModuleMigrationFn;
8use fedimint_client_module::module::recovery::RecoveryProgress;
9use fedimint_client_module::oplog::{JsonStringed, OperationLogEntry, OperationOutcome};
10use fedimint_client_module::sm::{ActiveStateMeta, InactiveStateMeta};
11use fedimint_core::config::{ClientConfig, ClientConfigV0, FederationId, GlobalClientConfig};
12use fedimint_core::core::{ModuleInstanceId, OperationId};
13use fedimint_core::db::{
14    Database, DatabaseTransaction, DatabaseVersion, DatabaseVersionKey,
15    IDatabaseTransactionOpsCore, IDatabaseTransactionOpsCoreTyped, MODULE_GLOBAL_PREFIX,
16    apply_migrations_dbtx, create_database_version_dbtx, get_current_database_version,
17};
18use fedimint_core::encoding::{Decodable, Encodable};
19use fedimint_core::module::SupportedApiVersionsSummary;
20use fedimint_core::module::registry::ModuleRegistry;
21use fedimint_core::{PeerId, impl_db_lookup, impl_db_record};
22use fedimint_eventlog::{
23    DB_KEY_PREFIX_EVENT_LOG, DB_KEY_PREFIX_UNORDERED_EVENT_LOG, EventLogId, UnordedEventLogId,
24};
25use fedimint_logging::LOG_CLIENT_DB;
26use futures::StreamExt;
27use serde::{Deserialize, Serialize};
28use strum::IntoEnumIterator as _;
29use strum_macros::EnumIter;
30use tracing::{debug, info, trace, warn};
31
32use crate::backup::{ClientBackup, Metadata};
33use crate::sm::executor::{
34    ActiveStateKeyBytes, ActiveStateKeyPrefixBytes, ExecutorDbPrefixes, InactiveStateKeyBytes,
35    InactiveStateKeyPrefixBytes,
36};
37
38#[repr(u8)]
39#[derive(Clone, EnumIter, Debug)]
40pub enum DbKeyPrefix {
41    EncodedClientSecret = 0x28,
42    ClientSecret = 0x29, // Unused
43    ClientPreRootSecretHash = 0x2a,
44    OperationLog = 0x2c,
45    ChronologicalOperationLog = 0x2d,
46    CommonApiVersionCache = 0x2e,
47    ClientConfig = 0x2f,
48    ClientInviteCode = 0x30, // Unused; clean out remnant data before re-using!
49    ClientInitState = 0x31,
50    ClientMetadata = 0x32,
51    ClientLastBackup = 0x33,
52    ClientMetaField = 0x34,
53    ClientMetaServiceInfo = 0x35,
54    ApiSecret = 0x36,
55    PeerLastApiVersionsSummaryCache = 0x37,
56    ApiUrlAnnouncement = 0x38,
57    EventLog = fedimint_eventlog::DB_KEY_PREFIX_EVENT_LOG,
58    UnorderedEventLog = fedimint_eventlog::DB_KEY_PREFIX_UNORDERED_EVENT_LOG,
59
60    DatabaseVersion = fedimint_core::db::DbKeyPrefix::DatabaseVersion as u8,
61    ClientBackup = fedimint_core::db::DbKeyPrefix::ClientBackup as u8,
62
63    ActiveStates = ExecutorDbPrefixes::ActiveStates as u8,
64    InactiveStates = ExecutorDbPrefixes::InactiveStates as u8,
65
66    /// Arbitrary data of the applications integrating Fedimint client and
67    /// wanting to store some Federation-specific data in Fedimint client
68    /// database.
69    ///
70    /// New users are encouraged to use this single prefix only.
71    //
72    // TODO: https://github.com/fedimint/fedimint/issues/4444
73    //       in the future, we should make all global access to the db private
74    //       and only expose a getter returning isolated database.
75    UserData = 0xb0,
76    /// Prefixes between 0xb1..=0xcf shall all be considered allocated for
77    /// historical and future external use
78    ExternalReservedStart = 0xb1,
79    /// Prefixes between 0xb1..=0xcf shall all be considered allocated for
80    /// historical and future external use
81    ExternalReservedEnd = 0xcf,
82    /// 0xd0.. reserved for Fedimint internal use
83    InternalReservedStart = 0xd0,
84    /// Per-module instance data
85    ModuleGlobalPrefix = 0xff,
86}
87
88pub(crate) async fn verify_client_db_integrity_dbtx(dbtx: &mut DatabaseTransaction<'_>) {
89    let prefixes: BTreeSet<u8> = DbKeyPrefix::iter().map(|prefix| prefix as u8).collect();
90
91    let mut records = dbtx.raw_find_by_prefix(&[]).await.expect("DB fail");
92    while let Some((k, v)) = records.next().await {
93        // from here and above, we don't want to spend time verifying it
94        if DbKeyPrefix::UserData as u8 <= k[0] {
95            break;
96        }
97
98        assert!(
99            prefixes.contains(&k[0]),
100            "Unexpected client db record found: {}: {}",
101            k.as_hex(),
102            v.as_hex()
103        );
104    }
105}
106
107impl std::fmt::Display for DbKeyPrefix {
108    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
109        write!(f, "{self:?}")
110    }
111}
112
113#[derive(Debug, Encodable, Decodable)]
114pub struct EncodedClientSecretKey;
115
116#[derive(Debug, Encodable, Decodable)]
117pub struct EncodedClientSecretKeyPrefix;
118
119impl_db_record!(
120    key = EncodedClientSecretKey,
121    value = Vec<u8>,
122    db_prefix = DbKeyPrefix::EncodedClientSecret,
123);
124impl_db_lookup!(
125    key = EncodedClientSecretKey,
126    query_prefix = EncodedClientSecretKeyPrefix
127);
128
129#[derive(Debug, Encodable, Decodable, Serialize)]
130pub struct OperationLogKey {
131    pub operation_id: OperationId,
132}
133
134impl_db_record!(
135    key = OperationLogKey,
136    value = OperationLogEntry,
137    db_prefix = DbKeyPrefix::OperationLog
138);
139
140#[derive(Debug, Encodable)]
141pub struct OperationLogKeyPrefix;
142
143impl_db_lookup!(key = OperationLogKey, query_prefix = OperationLogKeyPrefix);
144
145#[derive(Debug, Encodable, Decodable, Serialize)]
146pub struct OperationLogKeyV0 {
147    pub operation_id: OperationId,
148}
149
150#[derive(Debug, Encodable)]
151pub struct OperationLogKeyPrefixV0;
152
153impl_db_record!(
154    key = OperationLogKeyV0,
155    value = OperationLogEntryV0,
156    db_prefix = DbKeyPrefix::OperationLog
157);
158
159impl_db_lookup!(
160    key = OperationLogKeyV0,
161    query_prefix = OperationLogKeyPrefixV0
162);
163
164#[derive(Debug, Encodable, Decodable, Serialize)]
165pub struct ClientPreRootSecretHashKey;
166
167impl_db_record!(
168    key = ClientPreRootSecretHashKey,
169    value = [u8; 8],
170    db_prefix = DbKeyPrefix::ClientPreRootSecretHash
171);
172
173/// Key used to lookup operation log entries in chronological order
174#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq, Encodable, Decodable, Serialize)]
175pub struct ChronologicalOperationLogKey {
176    pub creation_time: std::time::SystemTime,
177    pub operation_id: OperationId,
178}
179
180#[derive(Debug, Encodable)]
181pub struct ChronologicalOperationLogKeyPrefix;
182
183impl_db_record!(
184    key = ChronologicalOperationLogKey,
185    value = (),
186    db_prefix = DbKeyPrefix::ChronologicalOperationLog
187);
188
189impl_db_lookup!(
190    key = ChronologicalOperationLogKey,
191    query_prefix = ChronologicalOperationLogKeyPrefix
192);
193
194#[derive(Debug, Encodable, Decodable)]
195pub struct CachedApiVersionSetKey;
196
197#[derive(Debug, Encodable, Decodable)]
198pub struct CachedApiVersionSet(pub ApiVersionSet);
199
200impl_db_record!(
201    key = CachedApiVersionSetKey,
202    value = CachedApiVersionSet,
203    db_prefix = DbKeyPrefix::CommonApiVersionCache
204);
205
206#[derive(Debug, Encodable, Decodable)]
207pub struct PeerLastApiVersionsSummaryKey(pub PeerId);
208
209#[derive(Debug, Encodable, Decodable)]
210pub struct PeerLastApiVersionsSummary(pub SupportedApiVersionsSummary);
211
212impl_db_record!(
213    key = PeerLastApiVersionsSummaryKey,
214    value = PeerLastApiVersionsSummary,
215    db_prefix = DbKeyPrefix::PeerLastApiVersionsSummaryCache
216);
217
218#[derive(Debug, Encodable, Decodable, Serialize)]
219pub struct ClientConfigKey;
220
221impl_db_record!(
222    key = ClientConfigKey,
223    value = ClientConfig,
224    db_prefix = DbKeyPrefix::ClientConfig
225);
226
227#[derive(Debug, Encodable, Decodable, Serialize)]
228pub struct ClientConfigKeyV0 {
229    pub id: FederationId,
230}
231
232#[derive(Debug, Encodable)]
233pub struct ClientConfigKeyPrefixV0;
234
235impl_db_record!(
236    key = ClientConfigKeyV0,
237    value = ClientConfigV0,
238    db_prefix = DbKeyPrefix::ClientConfig
239);
240
241impl_db_lookup!(
242    key = ClientConfigKeyV0,
243    query_prefix = ClientConfigKeyPrefixV0
244);
245
246#[derive(Debug, Encodable, Decodable, Serialize)]
247pub struct ApiSecretKey;
248
249#[derive(Debug, Encodable)]
250pub struct ApiSecretKeyPrefix;
251
252impl_db_record!(
253    key = ApiSecretKey,
254    value = String,
255    db_prefix = DbKeyPrefix::ApiSecret
256);
257
258impl_db_lookup!(key = ApiSecretKey, query_prefix = ApiSecretKeyPrefix);
259
260/// Client metadata that will be stored/restored on backup&recovery
261#[derive(Debug, Encodable, Decodable, Serialize)]
262pub struct ClientMetadataKey;
263
264#[derive(Debug, Encodable)]
265pub struct ClientMetadataPrefix;
266
267impl_db_record!(
268    key = ClientMetadataKey,
269    value = Metadata,
270    db_prefix = DbKeyPrefix::ClientMetadata
271);
272
273impl_db_lookup!(key = ClientMetadataKey, query_prefix = ClientMetadataPrefix);
274
275/// Does the client modules need to run recovery before being usable?
276#[derive(Debug, Encodable, Decodable, Serialize)]
277pub struct ClientInitStateKey;
278
279#[derive(Debug, Encodable)]
280pub struct ClientInitStatePrefix;
281
282/// Client initialization mode
283#[derive(Debug, Encodable, Decodable)]
284pub enum InitMode {
285    /// Should only be used with freshly generated root secret
286    Fresh,
287    /// Should be used with root secrets provided by the user to recover a
288    /// (even if just possibly) already used secret.
289    Recover { snapshot: Option<ClientBackup> },
290}
291
292/// Like `InitMode`, but without no longer required data.
293///
294/// This is distinct from `InitMode` to prevent holding on to `snapshot`
295/// forever both for user's privacy and space use. In case user get hacked
296/// or phone gets stolen.
297#[derive(Debug, Encodable, Decodable)]
298pub enum InitModeComplete {
299    Fresh,
300    Recover,
301}
302
303/// The state of the client initialization
304#[derive(Debug, Encodable, Decodable)]
305pub enum InitState {
306    /// Client data initialization might still require some work (e.g. client
307    /// recovery)
308    Pending(InitMode),
309    /// Client initialization was complete
310    Complete(InitModeComplete),
311}
312
313impl InitState {
314    pub fn into_complete(self) -> Self {
315        match self {
316            InitState::Pending(p) => InitState::Complete(match p {
317                InitMode::Fresh => InitModeComplete::Fresh,
318                InitMode::Recover { .. } => InitModeComplete::Recover,
319            }),
320            InitState::Complete(t) => InitState::Complete(t),
321        }
322    }
323
324    pub fn does_require_recovery(&self) -> Option<Option<ClientBackup>> {
325        match self {
326            InitState::Pending(p) => match p {
327                InitMode::Fresh => None,
328                InitMode::Recover { snapshot } => Some(snapshot.clone()),
329            },
330            InitState::Complete(_) => None,
331        }
332    }
333
334    pub fn is_pending(&self) -> bool {
335        match self {
336            InitState::Pending(_) => true,
337            InitState::Complete(_) => false,
338        }
339    }
340}
341
342impl_db_record!(
343    key = ClientInitStateKey,
344    value = InitState,
345    db_prefix = DbKeyPrefix::ClientInitState
346);
347
348impl_db_lookup!(
349    key = ClientInitStateKey,
350    query_prefix = ClientInitStatePrefix
351);
352
353#[derive(Debug, Encodable, Decodable, Serialize)]
354pub struct ClientRecoverySnapshot;
355
356#[derive(Debug, Encodable, Decodable, Serialize)]
357pub struct ClientRecoverySnapshotPrefix;
358
359impl_db_record!(
360    key = ClientRecoverySnapshot,
361    value = Option<ClientBackup>,
362    db_prefix = DbKeyPrefix::ClientInitState
363);
364
365impl_db_lookup!(
366    key = ClientRecoverySnapshot,
367    query_prefix = ClientRecoverySnapshotPrefix
368);
369
370#[derive(Debug, Encodable, Decodable, Serialize)]
371pub struct ClientModuleRecovery {
372    pub module_instance_id: ModuleInstanceId,
373}
374
375#[derive(Debug, Encodable)]
376pub struct ClientModuleRecoveryPrefix;
377
378#[derive(Debug, Clone, Encodable, Decodable)]
379pub struct ClientModuleRecoveryState {
380    pub progress: RecoveryProgress,
381}
382
383impl ClientModuleRecoveryState {
384    pub fn is_done(&self) -> bool {
385        self.progress.is_done()
386    }
387}
388
389impl_db_record!(
390    key = ClientModuleRecovery,
391    value = ClientModuleRecoveryState,
392    db_prefix = DbKeyPrefix::ClientInitState,
393);
394
395impl_db_lookup!(
396    key = ClientModuleRecovery,
397    query_prefix = ClientModuleRecoveryPrefix
398);
399
400/// Last valid backup the client attempted to make
401///
402/// Can be used to find previous valid versions of
403/// module backup.
404#[derive(Debug, Encodable, Decodable)]
405pub struct LastBackupKey;
406
407impl_db_record!(
408    key = LastBackupKey,
409    value = ClientBackup,
410    db_prefix = DbKeyPrefix::ClientLastBackup
411);
412
413#[derive(Encodable, Decodable, Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
414pub struct MetaFieldPrefix;
415
416#[derive(Encodable, Decodable, Debug)]
417pub struct MetaServiceInfoKey;
418
419#[derive(Encodable, Decodable, Debug)]
420pub struct MetaServiceInfo {
421    pub last_updated: SystemTime,
422    pub revision: u64,
423}
424
425#[derive(
426    Encodable, Decodable, Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Serialize, Deserialize,
427)]
428pub struct MetaFieldKey(pub fedimint_client_module::meta::MetaFieldKey);
429
430#[derive(Encodable, Decodable, Debug, Clone, Serialize, Deserialize)]
431pub struct MetaFieldValue(pub fedimint_client_module::meta::MetaFieldValue);
432
433impl_db_record!(
434    key = MetaFieldKey,
435    value = MetaFieldValue,
436    db_prefix = DbKeyPrefix::ClientMetaField
437);
438
439impl_db_record!(
440    key = MetaServiceInfoKey,
441    value = MetaServiceInfo,
442    db_prefix = DbKeyPrefix::ClientMetaServiceInfo
443);
444
445impl_db_lookup!(key = MetaFieldKey, query_prefix = MetaFieldPrefix);
446
447pub fn get_core_client_database_migrations()
448-> BTreeMap<DatabaseVersion, fedimint_core::db::ClientCoreDbMigrationFn> {
449    let mut migrations: BTreeMap<DatabaseVersion, fedimint_core::db::ClientCoreDbMigrationFn> =
450        BTreeMap::new();
451    migrations.insert(
452        DatabaseVersion(0),
453        Box::new(|mut ctx| {
454            Box::pin(async move {
455                let mut dbtx = ctx.dbtx();
456
457                let config_v0 = dbtx
458                    .find_by_prefix(&ClientConfigKeyPrefixV0)
459                    .await
460                    .collect::<Vec<_>>()
461                    .await;
462
463                assert!(config_v0.len() <= 1);
464                let Some((id, config_v0)) = config_v0.into_iter().next() else {
465                    return Ok(());
466                };
467
468                let global = GlobalClientConfig {
469                    api_endpoints: config_v0.global.api_endpoints,
470                    broadcast_public_keys: None,
471                    consensus_version: config_v0.global.consensus_version,
472                    meta: config_v0.global.meta,
473                };
474
475                let config = ClientConfig {
476                    global,
477                    modules: config_v0.modules,
478                };
479
480                dbtx.remove_entry(&id).await;
481                dbtx.insert_new_entry(&ClientConfigKey, &config).await;
482                Ok(())
483            })
484        }),
485    );
486
487    // Migration to add outcome_time to OperationLogEntry
488    migrations.insert(
489        DatabaseVersion(1),
490        Box::new(|mut ctx| {
491            Box::pin(async move {
492                let mut dbtx = ctx.dbtx();
493
494                // Read all OperationLogEntries using V0 format
495                let operation_logs = dbtx
496                    .find_by_prefix(&OperationLogKeyPrefixV0)
497                    .await
498                    .collect::<Vec<_>>()
499                    .await;
500
501                // Build a map from operation_id -> max_time of inactive state
502                let mut op_id_max_time = BTreeMap::new();
503
504                // Process inactive states
505                {
506                    let mut inactive_states_stream =
507                        dbtx.find_by_prefix(&InactiveStateKeyPrefixBytes).await;
508
509                    while let Some((state, meta)) = inactive_states_stream.next().await {
510                        let entry = op_id_max_time
511                            .entry(state.operation_id)
512                            .or_insert(meta.exited_at);
513                        *entry = (*entry).max(meta.exited_at);
514                    }
515                }
516                // Migrate each V0 operation log entry to the new format
517                for (op_key_v0, log_entry_v0) in operation_logs {
518                    let new_entry = OperationLogEntry::new(
519                        log_entry_v0.operation_module_kind,
520                        log_entry_v0.meta,
521                        log_entry_v0.outcome.map(|outcome| {
522                            OperationOutcome {
523                                outcome,
524                                // If we found state times, use the max, otherwise use
525                                // current time
526                                time: op_id_max_time
527                                    .get(&op_key_v0.operation_id)
528                                    .copied()
529                                    .unwrap_or_else(fedimint_core::time::now),
530                            }
531                        }),
532                    );
533
534                    dbtx.remove_entry(&op_key_v0).await;
535                    dbtx.insert_entry(
536                        &OperationLogKey {
537                            operation_id: op_key_v0.operation_id,
538                        },
539                        &new_entry,
540                    )
541                    .await;
542                }
543
544                Ok(())
545            })
546        }),
547    );
548
549    // Fix #6948
550    migrations.insert(
551        DatabaseVersion(2),
552        Box::new(|mut ctx: fedimint_core::db::DbMigrationFnContext<'_, _>| {
553            Box::pin(async move {
554                let mut dbtx = ctx.dbtx();
555
556                // Migrate unordered keys that got written to ordered table
557                {
558                    let mut ordered_log_entries = dbtx
559                        .raw_find_by_prefix(&[DB_KEY_PREFIX_EVENT_LOG])
560                        .await
561                        .expect("DB operation failed");
562                    let mut keys_to_migrate = vec![];
563                    while let Some((k, _v)) = ordered_log_entries.next().await {
564                        trace!(target: LOG_CLIENT_DB,
565                            k=%k.as_hex(),
566                            "Checking ordered log key"
567                        );
568                        if EventLogId::consensus_decode_whole(&k[1..], &Default::default()).is_err()
569                        {
570                            assert!(
571                                UnordedEventLogId::consensus_decode_whole(
572                                    &k[1..],
573                                    &Default::default()
574                                )
575                                .is_ok()
576                            );
577                            keys_to_migrate.push(k);
578                        }
579                    }
580                    drop(ordered_log_entries);
581                    for mut key_to_migrate in keys_to_migrate {
582                        warn!(target: LOG_CLIENT_DB,
583                            k=%key_to_migrate.as_hex(),
584                            "Migrating unordered event log entry written to an ordered log"
585                        );
586                        let v = dbtx
587                            .raw_remove_entry(&key_to_migrate)
588                            .await
589                            .expect("DB operation failed")
590                            .expect("Was there a moment ago");
591                        assert_eq!(key_to_migrate[0], 0x39);
592                        key_to_migrate[0] = DB_KEY_PREFIX_UNORDERED_EVENT_LOG;
593                        assert_eq!(key_to_migrate[0], 0x3a);
594                        dbtx.raw_insert_bytes(&key_to_migrate, &v)
595                            .await
596                            .expect("DB operation failed");
597                    }
598                }
599
600                // Migrate ordered keys that got written to unordered table
601                {
602                    let mut unordered_log_entries = dbtx
603                        .raw_find_by_prefix(&[DB_KEY_PREFIX_UNORDERED_EVENT_LOG])
604                        .await
605                        .expect("DB operation failed");
606                    let mut keys_to_migrate = vec![];
607                    while let Some((k, _v)) = unordered_log_entries.next().await {
608                        trace!(target: LOG_CLIENT_DB,
609                            k=%k.as_hex(),
610                            "Checking unordered log key"
611                        );
612                        if UnordedEventLogId::consensus_decode_whole(&k[1..], &Default::default())
613                            .is_err()
614                        {
615                            assert!(
616                                EventLogId::consensus_decode_whole(&k[1..], &Default::default())
617                                    .is_ok()
618                            );
619                            keys_to_migrate.push(k);
620                        }
621                    }
622                    drop(unordered_log_entries);
623                    for mut key_to_migrate in keys_to_migrate {
624                        warn!(target: LOG_CLIENT_DB,
625                            k=%key_to_migrate.as_hex(),
626                            "Migrating ordered event log entry written to an unordered log"
627                        );
628                        let v = dbtx
629                            .raw_remove_entry(&key_to_migrate)
630                            .await
631                            .expect("DB operation failed")
632                            .expect("Was there a moment ago");
633                        assert_eq!(key_to_migrate[0], 0x3a);
634                        key_to_migrate[0] = DB_KEY_PREFIX_EVENT_LOG;
635                        assert_eq!(key_to_migrate[0], 0x39);
636                        dbtx.raw_insert_bytes(&key_to_migrate, &v)
637                            .await
638                            .expect("DB operation failed");
639                    }
640                }
641                Ok(())
642            })
643        }),
644    );
645    migrations
646}
647
648pub async fn apply_migrations_core_client_dbtx(
649    dbtx: &mut DatabaseTransaction<'_>,
650    kind: String,
651) -> Result<(), anyhow::Error> {
652    apply_migrations_dbtx(
653        dbtx,
654        (),
655        kind,
656        get_core_client_database_migrations(),
657        None,
658        Some(DbKeyPrefix::UserData as u8),
659    )
660    .await
661}
662
663/// `apply_migrations_client` iterates from the on disk database version for the
664/// client module up to `target_db_version` and executes all of the migrations
665/// that exist in the migrations map, including state machine migrations.
666/// Each migration in the migrations map updates the database to have the
667/// correct on-disk data structures that the code is expecting. The entire
668/// process is atomic, (i.e migration from 0->1 and 1->2 happen atomically).
669/// This function is called before the module is initialized and as long as the
670/// correct migrations are supplied in the migrations map, the module
671/// will be able to read and write from the database successfully.
672pub async fn apply_migrations_client_module(
673    db: &Database,
674    kind: String,
675    migrations: BTreeMap<DatabaseVersion, ClientModuleMigrationFn>,
676    module_instance_id: ModuleInstanceId,
677) -> Result<(), anyhow::Error> {
678    let mut dbtx = db.begin_transaction().await;
679    apply_migrations_client_module_dbtx(
680        &mut dbtx.to_ref_nc(),
681        kind,
682        migrations,
683        module_instance_id,
684    )
685    .await?;
686    dbtx.commit_tx_result().await
687}
688
689pub async fn apply_migrations_client_module_dbtx(
690    dbtx: &mut DatabaseTransaction<'_>,
691    kind: String,
692    migrations: BTreeMap<DatabaseVersion, ClientModuleMigrationFn>,
693    module_instance_id: ModuleInstanceId,
694) -> Result<(), anyhow::Error> {
695    // Newly created databases will not have any data underneath the
696    // `MODULE_GLOBAL_PREFIX` since they have just been instantiated.
697    let is_new_db = dbtx
698        .raw_find_by_prefix(&[MODULE_GLOBAL_PREFIX])
699        .await?
700        .next()
701        .await
702        .is_none();
703
704    let target_version = get_current_database_version(&migrations);
705
706    // First write the database version to disk if it does not exist.
707    create_database_version_dbtx(
708        dbtx,
709        target_version,
710        Some(module_instance_id),
711        kind.clone(),
712        is_new_db,
713    )
714    .await?;
715
716    let current_version = dbtx
717        .get_value(&DatabaseVersionKey(module_instance_id))
718        .await;
719
720    let db_version = if let Some(mut current_version) = current_version {
721        if current_version == target_version {
722            trace!(
723                target: LOG_CLIENT_DB,
724                %current_version,
725                %target_version,
726                module_instance_id,
727                kind,
728                "Database version up to date"
729            );
730            return Ok(());
731        }
732
733        if target_version < current_version {
734            return Err(anyhow!(format!(
735                "On disk database version for module {kind} was higher ({}) than the target database version ({}).",
736                current_version, target_version,
737            )));
738        }
739
740        info!(
741            target: LOG_CLIENT_DB,
742            %current_version,
743            %target_version,
744            module_instance_id,
745            kind,
746            "Migrating client module database"
747        );
748        let mut active_states = get_active_states(&mut dbtx.to_ref_nc(), module_instance_id).await;
749        let mut inactive_states =
750            get_inactive_states(&mut dbtx.to_ref_nc(), module_instance_id).await;
751
752        while current_version < target_version {
753            let new_states = if let Some(migration) = migrations.get(&current_version) {
754                debug!(
755                     target: LOG_CLIENT_DB,
756                     module_instance_id,
757                     %kind,
758                     %current_version,
759                     %target_version,
760                     "Running module db migration");
761
762                migration(
763                    &mut dbtx
764                        .to_ref_with_prefix_module_id(module_instance_id)
765                        .0
766                        .into_nc(),
767                    active_states.clone(),
768                    inactive_states.clone(),
769                )
770                .await?
771            } else {
772                warn!(
773                    target: LOG_CLIENT_DB,
774                    ?current_version, "Missing client db migration");
775                None
776            };
777
778            // If the client migration returned new states, a state machine migration has
779            // occurred, and the new states need to be persisted to the database.
780            if let Some((new_active_states, new_inactive_states)) = new_states {
781                remove_old_and_persist_new_active_states(
782                    &mut dbtx.to_ref_nc(),
783                    new_active_states.clone(),
784                    active_states.clone(),
785                    module_instance_id,
786                )
787                .await;
788                remove_old_and_persist_new_inactive_states(
789                    &mut dbtx.to_ref_nc(),
790                    new_inactive_states.clone(),
791                    inactive_states.clone(),
792                    module_instance_id,
793                )
794                .await;
795
796                // the new states become the old states for the next migration
797                active_states = new_active_states;
798                inactive_states = new_inactive_states;
799            }
800
801            current_version = current_version.increment();
802            dbtx.insert_entry(&DatabaseVersionKey(module_instance_id), &current_version)
803                .await;
804        }
805
806        current_version
807    } else {
808        target_version
809    };
810
811    debug!(
812        target: LOG_CLIENT_DB,
813        ?kind, ?db_version, "Client DB Version");
814    Ok(())
815}
816
817/// Reads all active states from the database and returns `Vec<DynState>`.
818/// TODO: It is unfortunate that we can't read states by the module's instance
819/// id so we are forced to return all active states. Once we do a db migration
820/// to add `module_instance_id` to `ActiveStateKey`, this can be improved to
821/// only read the module's relevant states.
822pub async fn get_active_states(
823    dbtx: &mut DatabaseTransaction<'_>,
824    module_instance_id: ModuleInstanceId,
825) -> Vec<(Vec<u8>, OperationId)> {
826    dbtx.find_by_prefix(&ActiveStateKeyPrefixBytes)
827        .await
828        .filter_map(|(state, _)| async move {
829            if module_instance_id == state.module_instance_id {
830                Some((state.state, state.operation_id))
831            } else {
832                None
833            }
834        })
835        .collect::<Vec<_>>()
836        .await
837}
838
839/// Reads all inactive states from the database and returns `Vec<DynState>`.
840/// TODO: It is unfortunate that we can't read states by the module's instance
841/// id so we are forced to return all inactive states. Once we do a db migration
842/// to add `module_instance_id` to `InactiveStateKey`, this can be improved to
843/// only read the module's relevant states.
844pub async fn get_inactive_states(
845    dbtx: &mut DatabaseTransaction<'_>,
846    module_instance_id: ModuleInstanceId,
847) -> Vec<(Vec<u8>, OperationId)> {
848    dbtx.find_by_prefix(&InactiveStateKeyPrefixBytes)
849        .await
850        .filter_map(|(state, _)| async move {
851            if module_instance_id == state.module_instance_id {
852                Some((state.state, state.operation_id))
853            } else {
854                None
855            }
856        })
857        .collect::<Vec<_>>()
858        .await
859}
860
861/// Persists new active states by first removing all current active states, and
862/// re-writing with the new set of active states. `new_active_states` is
863/// expected to contain all active states, not just the newly created states.
864pub async fn remove_old_and_persist_new_active_states(
865    dbtx: &mut DatabaseTransaction<'_>,
866    new_active_states: Vec<(Vec<u8>, OperationId)>,
867    states_to_remove: Vec<(Vec<u8>, OperationId)>,
868    module_instance_id: ModuleInstanceId,
869) {
870    // Remove all existing active states
871    for (bytes, operation_id) in states_to_remove {
872        dbtx.remove_entry(&ActiveStateKeyBytes {
873            operation_id,
874            module_instance_id,
875            state: bytes,
876        })
877        .await
878        .expect("Did not delete anything");
879    }
880
881    // Insert new "migrated" active states
882    for (bytes, operation_id) in new_active_states {
883        dbtx.insert_new_entry(
884            &ActiveStateKeyBytes {
885                operation_id,
886                module_instance_id,
887                state: bytes,
888            },
889            &ActiveStateMeta::default(),
890        )
891        .await;
892    }
893}
894
895/// Persists new inactive states by first removing all current inactive states,
896/// and re-writing with the new set of inactive states. `new_inactive_states` is
897/// expected to contain all inactive states, not just the newly created states.
898pub async fn remove_old_and_persist_new_inactive_states(
899    dbtx: &mut DatabaseTransaction<'_>,
900    new_inactive_states: Vec<(Vec<u8>, OperationId)>,
901    states_to_remove: Vec<(Vec<u8>, OperationId)>,
902    module_instance_id: ModuleInstanceId,
903) {
904    // Remove all existing active states
905    for (bytes, operation_id) in states_to_remove {
906        dbtx.remove_entry(&InactiveStateKeyBytes {
907            operation_id,
908            module_instance_id,
909            state: bytes,
910        })
911        .await
912        .expect("Did not delete anything");
913    }
914
915    // Insert new "migrated" inactive states
916    for (bytes, operation_id) in new_inactive_states {
917        dbtx.insert_new_entry(
918            &InactiveStateKeyBytes {
919                operation_id,
920                module_instance_id,
921                state: bytes,
922            },
923            &InactiveStateMeta {
924                created_at: fedimint_core::time::now(),
925                exited_at: fedimint_core::time::now(),
926            },
927        )
928        .await;
929    }
930}
931
932/// Fetches the encoded client secret from the database and decodes it.
933/// If an encoded client secret is not present in the database, or if
934/// decoding fails, an error is returned.
935pub async fn get_decoded_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
936    let mut tx = db.begin_transaction_nc().await;
937    let client_secret = tx.get_value(&EncodedClientSecretKey).await;
938
939    match client_secret {
940        Some(client_secret) => {
941            T::consensus_decode_whole(&client_secret, &ModuleRegistry::default())
942                .map_err(|e| anyhow!("Decoding failed: {e}"))
943        }
944        None => bail!("Encoded client secret not present in DB"),
945    }
946}
947
948/// V0 version of operation log entry for migration purposes
949#[derive(Debug, Serialize, Deserialize, Encodable, Decodable)]
950pub struct OperationLogEntryV0 {
951    pub(crate) operation_module_kind: String,
952    pub(crate) meta: JsonStringed,
953    pub(crate) outcome: Option<JsonStringed>,
954}