fedimint_client/
db.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::time::SystemTime;
3
4use anyhow::{anyhow, bail};
5use bitcoin::hex::DisplayHex as _;
6use fedimint_api_client::api::ApiVersionSet;
7use fedimint_client_module::db::ClientModuleMigrationFn;
8use fedimint_client_module::module::recovery::RecoveryProgress;
9use fedimint_client_module::oplog::{JsonStringed, OperationLogEntry, OperationOutcome};
10use fedimint_client_module::sm::{ActiveStateMeta, InactiveStateMeta};
11use fedimint_core::config::{ClientConfig, ClientConfigV0, FederationId, GlobalClientConfig};
12use fedimint_core::core::{ModuleInstanceId, OperationId};
13use fedimint_core::db::{
14    Database, DatabaseTransaction, DatabaseVersion, DatabaseVersionKey,
15    IDatabaseTransactionOpsCore, IDatabaseTransactionOpsCoreTyped, MODULE_GLOBAL_PREFIX,
16    apply_migrations_dbtx, create_database_version_dbtx, get_current_database_version,
17};
18use fedimint_core::encoding::{Decodable, Encodable};
19use fedimint_core::module::SupportedApiVersionsSummary;
20use fedimint_core::module::registry::ModuleRegistry;
21use fedimint_core::{ChainId, PeerId, impl_db_lookup, impl_db_record};
22use fedimint_eventlog::{
23    DB_KEY_PREFIX_EVENT_LOG, DB_KEY_PREFIX_UNORDERED_EVENT_LOG, EventLogId, UnordedEventLogId,
24};
25use fedimint_logging::LOG_CLIENT_DB;
26use futures::StreamExt;
27use serde::{Deserialize, Serialize};
28use strum::IntoEnumIterator as _;
29use strum_macros::EnumIter;
30use tracing::{debug, info, trace, warn};
31
32use crate::backup::{ClientBackup, Metadata};
33use crate::sm::executor::{
34    ActiveStateKeyBytes, ActiveStateKeyPrefixBytes, ExecutorDbPrefixes, InactiveStateKeyBytes,
35    InactiveStateKeyPrefixBytes,
36};
37
38#[repr(u8)]
39#[derive(Clone, EnumIter, Debug)]
40pub enum DbKeyPrefix {
41    EncodedClientSecret = 0x28,
42    ClientSecret = 0x29, // Unused
43    ClientPreRootSecretHash = 0x2a,
44    OperationLog = 0x2c,
45    ChronologicalOperationLog = 0x2d,
46    CommonApiVersionCache = 0x2e,
47    ClientConfig = 0x2f,
48    PendingClientConfig = 0x3b,
49    ClientInviteCode = 0x30, // Unused; clean out remnant data before re-using!
50    ClientInitState = 0x31,
51    ClientMetadata = 0x32,
52    ClientLastBackup = 0x33,
53    ClientMetaField = 0x34,
54    ClientMetaServiceInfo = 0x35,
55    ApiSecret = 0x36,
56    PeerLastApiVersionsSummaryCache = 0x37,
57    ApiUrlAnnouncement = 0x38,
58    EventLog = fedimint_eventlog::DB_KEY_PREFIX_EVENT_LOG,
59    UnorderedEventLog = fedimint_eventlog::DB_KEY_PREFIX_UNORDERED_EVENT_LOG,
60    EventLogTrimable = fedimint_eventlog::DB_KEY_PREFIX_EVENT_LOG_TRIMABLE,
61    ChainId = 0x3c,
62    ClientModuleRecovery = 0x40,
63    GuardianMetadata = 0x42,
64
65    DatabaseVersion = fedimint_core::db::DbKeyPrefix::DatabaseVersion as u8,
66    ClientBackup = fedimint_core::db::DbKeyPrefix::ClientBackup as u8,
67
68    ActiveStates = ExecutorDbPrefixes::ActiveStates as u8,
69    InactiveStates = ExecutorDbPrefixes::InactiveStates as u8,
70
71    /// Arbitrary data of the applications integrating Fedimint client and
72    /// wanting to store some Federation-specific data in Fedimint client
73    /// database.
74    ///
75    /// New users are encouraged to use this single prefix only.
76    //
77    // TODO: https://github.com/fedimint/fedimint/issues/4444
78    //       in the future, we should make all global access to the db private
79    //       and only expose a getter returning isolated database.
80    UserData = 0xb0,
81    /// Prefixes between 0xb1..=0xcf shall all be considered allocated for
82    /// historical and future external use
83    ExternalReservedStart = 0xb1,
84    /// Prefixes between 0xb1..=0xcf shall all be considered allocated for
85    /// historical and future external use
86    ExternalReservedEnd = 0xcf,
87    /// 0xd0.. reserved for Fedimint internal use
88    // (see [`DbKeyPrefixInternalReserved`] for *internal* details)
89    InternalReservedStart = 0xd0,
90    /// Per-module instance data
91    ModuleGlobalPrefix = 0xff,
92}
93
94#[repr(u8)]
95#[derive(Clone, EnumIter, Debug)]
96pub(crate) enum DbKeyPrefixInternalReserved {
97    /// [`crate::Client::built_in_application_event_log_tracker`]
98    DefaultApplicationEventLogPos = 0xd0,
99}
100
101pub(crate) async fn verify_client_db_integrity_dbtx(dbtx: &mut DatabaseTransaction<'_>) {
102    let prefixes: BTreeSet<u8> = DbKeyPrefix::iter().map(|prefix| prefix as u8).collect();
103
104    let mut records = dbtx.raw_find_by_prefix(&[]).await.expect("DB fail");
105    while let Some((k, v)) = records.next().await {
106        // from here and above, we don't want to spend time verifying it
107        if DbKeyPrefix::UserData as u8 <= k[0] {
108            break;
109        }
110
111        assert!(
112            prefixes.contains(&k[0]),
113            "Unexpected client db record found: {}: {}",
114            k.as_hex(),
115            v.as_hex()
116        );
117    }
118}
119
120impl std::fmt::Display for DbKeyPrefix {
121    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
122        write!(f, "{self:?}")
123    }
124}
125
126#[derive(Debug, Encodable, Decodable)]
127pub struct EncodedClientSecretKey;
128
129#[derive(Debug, Encodable, Decodable)]
130pub struct EncodedClientSecretKeyPrefix;
131
132impl_db_record!(
133    key = EncodedClientSecretKey,
134    value = Vec<u8>,
135    db_prefix = DbKeyPrefix::EncodedClientSecret,
136);
137impl_db_lookup!(
138    key = EncodedClientSecretKey,
139    query_prefix = EncodedClientSecretKeyPrefix
140);
141
142#[derive(Debug, Encodable, Decodable, Serialize)]
143pub struct OperationLogKey {
144    pub operation_id: OperationId,
145}
146
147impl_db_record!(
148    key = OperationLogKey,
149    value = OperationLogEntry,
150    db_prefix = DbKeyPrefix::OperationLog
151);
152
153#[derive(Debug, Encodable)]
154pub struct OperationLogKeyPrefix;
155
156impl_db_lookup!(key = OperationLogKey, query_prefix = OperationLogKeyPrefix);
157
158#[derive(Debug, Encodable, Decodable, Serialize)]
159pub struct OperationLogKeyV0 {
160    pub operation_id: OperationId,
161}
162
163#[derive(Debug, Encodable)]
164pub struct OperationLogKeyPrefixV0;
165
166impl_db_record!(
167    key = OperationLogKeyV0,
168    value = OperationLogEntryV0,
169    db_prefix = DbKeyPrefix::OperationLog
170);
171
172impl_db_lookup!(
173    key = OperationLogKeyV0,
174    query_prefix = OperationLogKeyPrefixV0
175);
176
177#[derive(Debug, Encodable, Decodable, Serialize)]
178pub struct ClientPreRootSecretHashKey;
179
180impl_db_record!(
181    key = ClientPreRootSecretHashKey,
182    value = [u8; 8],
183    db_prefix = DbKeyPrefix::ClientPreRootSecretHash
184);
185
186/// Key used to lookup operation log entries in chronological order
187#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq, Encodable, Decodable, Serialize, Deserialize)]
188pub struct ChronologicalOperationLogKey {
189    pub creation_time: std::time::SystemTime,
190    pub operation_id: OperationId,
191}
192
193#[derive(Debug, Encodable)]
194pub struct ChronologicalOperationLogKeyPrefix;
195
196impl_db_record!(
197    key = ChronologicalOperationLogKey,
198    value = (),
199    db_prefix = DbKeyPrefix::ChronologicalOperationLog
200);
201
202impl_db_lookup!(
203    key = ChronologicalOperationLogKey,
204    query_prefix = ChronologicalOperationLogKeyPrefix
205);
206
207#[derive(Debug, Encodable, Decodable)]
208pub struct CachedApiVersionSetKey;
209
210#[derive(Debug, Encodable, Decodable)]
211pub struct CachedApiVersionSet(pub ApiVersionSet);
212
213impl_db_record!(
214    key = CachedApiVersionSetKey,
215    value = CachedApiVersionSet,
216    db_prefix = DbKeyPrefix::CommonApiVersionCache
217);
218
219#[derive(Debug, Encodable, Decodable)]
220pub struct PeerLastApiVersionsSummaryKey(pub PeerId);
221
222#[derive(Debug, Encodable, Decodable)]
223pub struct PeerLastApiVersionsSummary(pub SupportedApiVersionsSummary);
224
225impl_db_record!(
226    key = PeerLastApiVersionsSummaryKey,
227    value = PeerLastApiVersionsSummary,
228    db_prefix = DbKeyPrefix::PeerLastApiVersionsSummaryCache
229);
230
231#[derive(Debug, Encodable, Decodable, Serialize)]
232pub struct ClientConfigKey;
233
234impl_db_record!(
235    key = ClientConfigKey,
236    value = ClientConfig,
237    db_prefix = DbKeyPrefix::ClientConfig
238);
239
240#[derive(Debug, Encodable, Decodable, Serialize)]
241pub struct PendingClientConfigKey;
242
243impl_db_record!(
244    key = PendingClientConfigKey,
245    value = ClientConfig,
246    db_prefix = DbKeyPrefix::PendingClientConfig
247);
248
249#[derive(Debug, Encodable, Decodable, Serialize)]
250pub struct ClientConfigKeyV0 {
251    pub id: FederationId,
252}
253
254#[derive(Debug, Encodable)]
255pub struct ClientConfigKeyPrefixV0;
256
257impl_db_record!(
258    key = ClientConfigKeyV0,
259    value = ClientConfigV0,
260    db_prefix = DbKeyPrefix::ClientConfig
261);
262
263impl_db_lookup!(
264    key = ClientConfigKeyV0,
265    query_prefix = ClientConfigKeyPrefixV0
266);
267
268#[derive(Debug, Encodable, Decodable, Serialize)]
269pub struct ApiSecretKey;
270
271#[derive(Debug, Encodable)]
272pub struct ApiSecretKeyPrefix;
273
274impl_db_record!(
275    key = ApiSecretKey,
276    value = String,
277    db_prefix = DbKeyPrefix::ApiSecret
278);
279
280impl_db_lookup!(key = ApiSecretKey, query_prefix = ApiSecretKeyPrefix);
281
282/// Cached chain ID (bitcoin block hash at height 1) from the federation
283#[derive(Debug, Encodable, Decodable, Serialize)]
284pub struct ChainIdKey;
285
286impl_db_record!(
287    key = ChainIdKey,
288    value = ChainId,
289    db_prefix = DbKeyPrefix::ChainId
290);
291
292/// Client metadata that will be stored/restored on backup&recovery
293#[derive(Debug, Encodable, Decodable, Serialize)]
294pub struct ClientMetadataKey;
295
296#[derive(Debug, Encodable)]
297pub struct ClientMetadataPrefix;
298
299impl_db_record!(
300    key = ClientMetadataKey,
301    value = Metadata,
302    db_prefix = DbKeyPrefix::ClientMetadata
303);
304
305impl_db_lookup!(key = ClientMetadataKey, query_prefix = ClientMetadataPrefix);
306
307/// Does the client modules need to run recovery before being usable?
308#[derive(Debug, Encodable, Decodable, Serialize)]
309pub struct ClientInitStateKey;
310
311#[derive(Debug, Encodable)]
312pub struct ClientInitStatePrefix;
313
314/// Client initialization mode
315#[derive(Debug, Encodable, Decodable)]
316pub enum InitMode {
317    /// Should only be used with freshly generated root secret
318    Fresh,
319    /// Should be used with root secrets provided by the user to recover a
320    /// (even if just possibly) already used secret.
321    Recover { snapshot: Option<ClientBackup> },
322}
323
324/// Like `InitMode`, but without no longer required data.
325///
326/// This is distinct from `InitMode` to prevent holding on to `snapshot`
327/// forever both for user's privacy and space use. In case user get hacked
328/// or phone gets stolen.
329#[derive(Debug, Encodable, Decodable)]
330pub enum InitModeComplete {
331    Fresh,
332    Recover,
333}
334
335/// The state of the client initialization
336#[derive(Debug, Encodable, Decodable)]
337pub enum InitState {
338    /// Client data initialization might still require some work (e.g. client
339    /// recovery)
340    Pending(InitMode),
341    /// Client initialization was complete
342    Complete(InitModeComplete),
343}
344
345impl InitState {
346    pub fn into_complete(self) -> Self {
347        match self {
348            InitState::Pending(p) => InitState::Complete(match p {
349                InitMode::Fresh => InitModeComplete::Fresh,
350                InitMode::Recover { .. } => InitModeComplete::Recover,
351            }),
352            InitState::Complete(t) => InitState::Complete(t),
353        }
354    }
355
356    pub fn does_require_recovery(&self) -> Option<Option<ClientBackup>> {
357        match self {
358            InitState::Pending(p) => match p {
359                InitMode::Fresh => None,
360                InitMode::Recover { snapshot } => Some(snapshot.clone()),
361            },
362            InitState::Complete(_) => None,
363        }
364    }
365
366    pub fn is_pending(&self) -> bool {
367        match self {
368            InitState::Pending(_) => true,
369            InitState::Complete(_) => false,
370        }
371    }
372}
373
374impl_db_record!(
375    key = ClientInitStateKey,
376    value = InitState,
377    db_prefix = DbKeyPrefix::ClientInitState
378);
379
380impl_db_lookup!(
381    key = ClientInitStateKey,
382    query_prefix = ClientInitStatePrefix
383);
384
385#[derive(Debug, Encodable, Decodable, Serialize)]
386pub struct ClientModuleRecovery {
387    pub module_instance_id: ModuleInstanceId,
388}
389
390#[derive(Debug, Clone, Encodable, Decodable)]
391pub struct ClientModuleRecoveryState {
392    pub progress: RecoveryProgress,
393}
394
395impl ClientModuleRecoveryState {
396    pub fn is_done(&self) -> bool {
397        self.progress.is_done()
398    }
399}
400
401impl_db_record!(
402    key = ClientModuleRecovery,
403    value = ClientModuleRecoveryState,
404    db_prefix = DbKeyPrefix::ClientModuleRecovery,
405);
406
407/// Old (incorrect) version of the [`ClientModuleRecoveryState`]
408/// that used the wrong prefix.
409///
410/// See <https://github.com/fedimint/fedimint/issues/7367>.
411///
412/// Used only for the migration.
413#[derive(Debug, Encodable, Decodable, Serialize)]
414pub struct ClientModuleRecoveryIncorrectDoNotUse {
415    pub module_instance_id: ModuleInstanceId,
416}
417
418impl_db_record!(
419    key = ClientModuleRecoveryIncorrectDoNotUse,
420    value = ClientModuleRecoveryState,
421    // This was wrong and we keep it wrong for a migration.
422    db_prefix = DbKeyPrefix::ClientInitState,
423);
424
425/// Last valid backup the client attempted to make
426///
427/// Can be used to find previous valid versions of
428/// module backup.
429#[derive(Debug, Encodable, Decodable)]
430pub struct LastBackupKey;
431
432impl_db_record!(
433    key = LastBackupKey,
434    value = ClientBackup,
435    db_prefix = DbKeyPrefix::ClientLastBackup
436);
437
438#[derive(Encodable, Decodable, Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
439pub(crate) struct MetaFieldPrefix;
440
441#[derive(Encodable, Decodable, Debug)]
442pub struct MetaServiceInfoKey;
443
444#[derive(Encodable, Decodable, Debug)]
445pub struct MetaServiceInfo {
446    pub last_updated: SystemTime,
447    pub revision: u64,
448}
449
450#[derive(
451    Encodable, Decodable, Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Serialize, Deserialize,
452)]
453pub(crate) struct MetaFieldKey(pub fedimint_client_module::meta::MetaFieldKey);
454
455#[derive(Encodable, Decodable, Debug, Clone, Serialize, Deserialize)]
456pub(crate) struct MetaFieldValue(pub fedimint_client_module::meta::MetaFieldValue);
457
458impl_db_record!(
459    key = MetaFieldKey,
460    value = MetaFieldValue,
461    db_prefix = DbKeyPrefix::ClientMetaField
462);
463
464impl_db_record!(
465    key = MetaServiceInfoKey,
466    value = MetaServiceInfo,
467    db_prefix = DbKeyPrefix::ClientMetaServiceInfo
468);
469
470impl_db_lookup!(key = MetaFieldKey, query_prefix = MetaFieldPrefix);
471
472pub fn get_core_client_database_migrations()
473-> BTreeMap<DatabaseVersion, fedimint_core::db::ClientCoreDbMigrationFn> {
474    let mut migrations: BTreeMap<DatabaseVersion, fedimint_core::db::ClientCoreDbMigrationFn> =
475        BTreeMap::new();
476    migrations.insert(
477        DatabaseVersion(0),
478        Box::new(|mut ctx| {
479            Box::pin(async move {
480                let mut dbtx = ctx.dbtx();
481
482                let config_v0 = dbtx
483                    .find_by_prefix(&ClientConfigKeyPrefixV0)
484                    .await
485                    .collect::<Vec<_>>()
486                    .await;
487
488                assert!(config_v0.len() <= 1);
489                let Some((id, config_v0)) = config_v0.into_iter().next() else {
490                    return Ok(());
491                };
492
493                let global = GlobalClientConfig {
494                    api_endpoints: config_v0.global.api_endpoints,
495                    broadcast_public_keys: None,
496                    consensus_version: config_v0.global.consensus_version,
497                    meta: config_v0.global.meta,
498                };
499
500                let config = ClientConfig {
501                    global,
502                    modules: config_v0.modules,
503                };
504
505                dbtx.remove_entry(&id).await;
506                dbtx.insert_new_entry(&ClientConfigKey, &config).await;
507                Ok(())
508            })
509        }),
510    );
511
512    // Migration to add outcome_time to OperationLogEntry
513    migrations.insert(
514        DatabaseVersion(1),
515        Box::new(|mut ctx| {
516            Box::pin(async move {
517                let mut dbtx = ctx.dbtx();
518
519                // Read all OperationLogEntries using V0 format
520                let operation_logs = dbtx
521                    .find_by_prefix(&OperationLogKeyPrefixV0)
522                    .await
523                    .collect::<Vec<_>>()
524                    .await;
525
526                // Build a map from operation_id -> max_time of inactive state
527                let mut op_id_max_time = BTreeMap::new();
528
529                // Process inactive states
530                {
531                    let mut inactive_states_stream =
532                        dbtx.find_by_prefix(&InactiveStateKeyPrefixBytes).await;
533
534                    while let Some((state, meta)) = inactive_states_stream.next().await {
535                        let entry = op_id_max_time
536                            .entry(state.operation_id)
537                            .or_insert(meta.exited_at);
538                        *entry = (*entry).max(meta.exited_at);
539                    }
540                }
541                // Migrate each V0 operation log entry to the new format
542                for (op_key_v0, log_entry_v0) in operation_logs {
543                    let new_entry = OperationLogEntry::new(
544                        log_entry_v0.operation_module_kind,
545                        log_entry_v0.meta,
546                        log_entry_v0.outcome.map(|outcome| {
547                            OperationOutcome {
548                                outcome,
549                                // If we found state times, use the max, otherwise use
550                                // current time
551                                time: op_id_max_time
552                                    .get(&op_key_v0.operation_id)
553                                    .copied()
554                                    .unwrap_or_else(fedimint_core::time::now),
555                            }
556                        }),
557                    );
558
559                    dbtx.remove_entry(&op_key_v0).await;
560                    dbtx.insert_entry(
561                        &OperationLogKey {
562                            operation_id: op_key_v0.operation_id,
563                        },
564                        &new_entry,
565                    )
566                    .await;
567                }
568
569                Ok(())
570            })
571        }),
572    );
573
574    // Fix #6948
575    migrations.insert(
576        DatabaseVersion(2),
577        Box::new(|mut ctx: fedimint_core::db::DbMigrationFnContext<'_, _>| {
578            Box::pin(async move {
579                let mut dbtx = ctx.dbtx();
580
581                // Migrate unordered keys that got written to ordered table
582                {
583                    let mut ordered_log_entries = dbtx
584                        .raw_find_by_prefix(&[DB_KEY_PREFIX_EVENT_LOG])
585                        .await
586                        .expect("DB operation failed");
587                    let mut keys_to_migrate = vec![];
588                    while let Some((k, _v)) = ordered_log_entries.next().await {
589                        trace!(target: LOG_CLIENT_DB,
590                            k=%k.as_hex(),
591                            "Checking ordered log key"
592                        );
593                        if EventLogId::consensus_decode_whole(&k[1..], &Default::default()).is_err()
594                        {
595                            assert!(
596                                UnordedEventLogId::consensus_decode_whole(
597                                    &k[1..],
598                                    &Default::default()
599                                )
600                                .is_ok()
601                            );
602                            keys_to_migrate.push(k);
603                        }
604                    }
605                    drop(ordered_log_entries);
606                    for mut key_to_migrate in keys_to_migrate {
607                        warn!(target: LOG_CLIENT_DB,
608                            k=%key_to_migrate.as_hex(),
609                            "Migrating unordered event log entry written to an ordered log"
610                        );
611                        let v = dbtx
612                            .raw_remove_entry(&key_to_migrate)
613                            .await
614                            .expect("DB operation failed")
615                            .expect("Was there a moment ago");
616                        assert_eq!(key_to_migrate[0], 0x39);
617                        key_to_migrate[0] = DB_KEY_PREFIX_UNORDERED_EVENT_LOG;
618                        assert_eq!(key_to_migrate[0], 0x3a);
619                        dbtx.raw_insert_bytes(&key_to_migrate, &v)
620                            .await
621                            .expect("DB operation failed");
622                    }
623                }
624
625                // Migrate ordered keys that got written to unordered table
626                {
627                    let mut unordered_log_entries = dbtx
628                        .raw_find_by_prefix(&[DB_KEY_PREFIX_UNORDERED_EVENT_LOG])
629                        .await
630                        .expect("DB operation failed");
631                    let mut keys_to_migrate = vec![];
632                    while let Some((k, _v)) = unordered_log_entries.next().await {
633                        trace!(target: LOG_CLIENT_DB,
634                            k=%k.as_hex(),
635                            "Checking unordered log key"
636                        );
637                        if UnordedEventLogId::consensus_decode_whole(&k[1..], &Default::default())
638                            .is_err()
639                        {
640                            assert!(
641                                EventLogId::consensus_decode_whole(&k[1..], &Default::default())
642                                    .is_ok()
643                            );
644                            keys_to_migrate.push(k);
645                        }
646                    }
647                    drop(unordered_log_entries);
648                    for mut key_to_migrate in keys_to_migrate {
649                        warn!(target: LOG_CLIENT_DB,
650                            k=%key_to_migrate.as_hex(),
651                            "Migrating ordered event log entry written to an unordered log"
652                        );
653                        let v = dbtx
654                            .raw_remove_entry(&key_to_migrate)
655                            .await
656                            .expect("DB operation failed")
657                            .expect("Was there a moment ago");
658                        assert_eq!(key_to_migrate[0], 0x3a);
659                        key_to_migrate[0] = DB_KEY_PREFIX_EVENT_LOG;
660                        assert_eq!(key_to_migrate[0], 0x39);
661                        dbtx.raw_insert_bytes(&key_to_migrate, &v)
662                            .await
663                            .expect("DB operation failed");
664                    }
665                }
666                Ok(())
667            })
668        }),
669    );
670
671    // Fix #7367
672    migrations.insert(
673        DatabaseVersion(3),
674        Box::new(|mut ctx: fedimint_core::db::DbMigrationFnContext<'_, _>| {
675            Box::pin(async move {
676                let mut dbtx = ctx.dbtx();
677
678                for module_id in 0..u16::MAX {
679                    let old_key = ClientModuleRecoveryIncorrectDoNotUse {
680                        module_instance_id: module_id,
681                    };
682                    let new_key = ClientModuleRecovery {
683                        module_instance_id: module_id,
684                    };
685                    let Some(value) = dbtx.get_value(&old_key).await else {
686                        debug!(target: LOG_CLIENT_DB, %module_id, "No more ClientModuleRecovery keys found for migartion");
687                        break;
688                    };
689
690                    debug!(target: LOG_CLIENT_DB, %module_id, "Migrating old ClientModuleRecovery key");
691                    dbtx.remove_entry(&old_key).await.expect("Is there.");
692                    assert!(dbtx.insert_entry(&new_key, &value).await.is_none());
693                }
694
695                Ok(())
696            })
697        }),
698    );
699    migrations
700}
701
702/// Apply core client database migrations
703///
704/// TODO: This should be private.
705pub async fn apply_migrations_core_client_dbtx(
706    dbtx: &mut DatabaseTransaction<'_>,
707    kind: String,
708) -> Result<(), anyhow::Error> {
709    apply_migrations_dbtx(
710        dbtx,
711        (),
712        kind,
713        get_core_client_database_migrations(),
714        None,
715        Some(DbKeyPrefix::UserData as u8),
716    )
717    .await
718}
719
720/// `apply_migrations_client` iterates from the on disk database version for the
721/// client module up to `target_db_version` and executes all of the migrations
722/// that exist in the migrations map, including state machine migrations.
723/// Each migration in the migrations map updates the database to have the
724/// correct on-disk data structures that the code is expecting. The entire
725/// process is atomic, (i.e migration from 0->1 and 1->2 happen atomically).
726/// This function is called before the module is initialized and as long as the
727/// correct migrations are supplied in the migrations map, the module
728/// will be able to read and write from the database successfully.
729pub async fn apply_migrations_client_module(
730    db: &Database,
731    kind: String,
732    migrations: BTreeMap<DatabaseVersion, ClientModuleMigrationFn>,
733    module_instance_id: ModuleInstanceId,
734) -> Result<(), anyhow::Error> {
735    let mut dbtx = db.begin_transaction().await;
736    apply_migrations_client_module_dbtx(
737        &mut dbtx.to_ref_nc(),
738        kind,
739        migrations,
740        module_instance_id,
741    )
742    .await?;
743    dbtx.commit_tx_result()
744        .await
745        .map_err(|e| anyhow::Error::msg(e.to_string()))
746}
747
748pub async fn apply_migrations_client_module_dbtx(
749    dbtx: &mut DatabaseTransaction<'_>,
750    kind: String,
751    migrations: BTreeMap<DatabaseVersion, ClientModuleMigrationFn>,
752    module_instance_id: ModuleInstanceId,
753) -> Result<(), anyhow::Error> {
754    // Newly created databases will not have any data underneath the
755    // `MODULE_GLOBAL_PREFIX` since they have just been instantiated.
756    let is_new_db = dbtx
757        .raw_find_by_prefix(&[MODULE_GLOBAL_PREFIX])
758        .await?
759        .next()
760        .await
761        .is_none();
762
763    let target_version = get_current_database_version(&migrations);
764
765    // First write the database version to disk if it does not exist.
766    create_database_version_dbtx(
767        dbtx,
768        target_version,
769        Some(module_instance_id),
770        kind.clone(),
771        is_new_db,
772    )
773    .await?;
774
775    let current_version = dbtx
776        .get_value(&DatabaseVersionKey(module_instance_id))
777        .await;
778
779    let db_version = if let Some(mut current_version) = current_version {
780        if current_version == target_version {
781            trace!(
782                target: LOG_CLIENT_DB,
783                %current_version,
784                %target_version,
785                module_instance_id,
786                kind,
787                "Database version up to date"
788            );
789            return Ok(());
790        }
791
792        if target_version < current_version {
793            return Err(anyhow!(format!(
794                "On disk database version for module {kind} was higher ({}) than the target database version ({}).",
795                current_version, target_version,
796            )));
797        }
798
799        info!(
800            target: LOG_CLIENT_DB,
801            %current_version,
802            %target_version,
803            module_instance_id,
804            kind,
805            "Migrating client module database"
806        );
807        let mut active_states = get_active_states(&mut dbtx.to_ref_nc(), module_instance_id).await;
808        let mut inactive_states =
809            get_inactive_states(&mut dbtx.to_ref_nc(), module_instance_id).await;
810
811        while current_version < target_version {
812            let new_states = if let Some(migration) = migrations.get(&current_version) {
813                debug!(
814                     target: LOG_CLIENT_DB,
815                     module_instance_id,
816                     %kind,
817                     %current_version,
818                     %target_version,
819                     "Running module db migration");
820
821                migration(
822                    &mut dbtx
823                        .to_ref_with_prefix_module_id(module_instance_id)
824                        .0
825                        .into_nc(),
826                    active_states.clone(),
827                    inactive_states.clone(),
828                )
829                .await?
830            } else {
831                warn!(
832                    target: LOG_CLIENT_DB,
833                    ?current_version, "Missing client db migration");
834                None
835            };
836
837            // If the client migration returned new states, a state machine migration has
838            // occurred, and the new states need to be persisted to the database.
839            if let Some((new_active_states, new_inactive_states)) = new_states {
840                remove_old_and_persist_new_active_states(
841                    &mut dbtx.to_ref_nc(),
842                    new_active_states.clone(),
843                    active_states.clone(),
844                    module_instance_id,
845                )
846                .await;
847                remove_old_and_persist_new_inactive_states(
848                    &mut dbtx.to_ref_nc(),
849                    new_inactive_states.clone(),
850                    inactive_states.clone(),
851                    module_instance_id,
852                )
853                .await;
854
855                // the new states become the old states for the next migration
856                active_states = new_active_states;
857                inactive_states = new_inactive_states;
858            }
859
860            current_version = current_version.increment();
861            dbtx.insert_entry(&DatabaseVersionKey(module_instance_id), &current_version)
862                .await;
863        }
864
865        current_version
866    } else {
867        target_version
868    };
869
870    debug!(
871        target: LOG_CLIENT_DB,
872        ?kind, ?db_version, "Client DB Version");
873    Ok(())
874}
875
876/// Reads all active states from the database and returns `Vec<DynState>`.
877/// TODO: It is unfortunate that we can't read states by the module's instance
878/// id so we are forced to return all active states. Once we do a db migration
879/// to add `module_instance_id` to `ActiveStateKey`, this can be improved to
880/// only read the module's relevant states.
881pub async fn get_active_states(
882    dbtx: &mut DatabaseTransaction<'_>,
883    module_instance_id: ModuleInstanceId,
884) -> Vec<(Vec<u8>, OperationId)> {
885    dbtx.find_by_prefix(&ActiveStateKeyPrefixBytes)
886        .await
887        .filter_map(|(state, _)| async move {
888            if module_instance_id == state.module_instance_id {
889                Some((state.state, state.operation_id))
890            } else {
891                None
892            }
893        })
894        .collect::<Vec<_>>()
895        .await
896}
897
898/// Reads all inactive states from the database and returns `Vec<DynState>`.
899/// TODO: It is unfortunate that we can't read states by the module's instance
900/// id so we are forced to return all inactive states. Once we do a db migration
901/// to add `module_instance_id` to `InactiveStateKey`, this can be improved to
902/// only read the module's relevant states.
903pub async fn get_inactive_states(
904    dbtx: &mut DatabaseTransaction<'_>,
905    module_instance_id: ModuleInstanceId,
906) -> Vec<(Vec<u8>, OperationId)> {
907    dbtx.find_by_prefix(&InactiveStateKeyPrefixBytes)
908        .await
909        .filter_map(|(state, _)| async move {
910            if module_instance_id == state.module_instance_id {
911                Some((state.state, state.operation_id))
912            } else {
913                None
914            }
915        })
916        .collect::<Vec<_>>()
917        .await
918}
919
920/// Persists new active states by first removing all current active states, and
921/// re-writing with the new set of active states. `new_active_states` is
922/// expected to contain all active states, not just the newly created states.
923pub async fn remove_old_and_persist_new_active_states(
924    dbtx: &mut DatabaseTransaction<'_>,
925    new_active_states: Vec<(Vec<u8>, OperationId)>,
926    states_to_remove: Vec<(Vec<u8>, OperationId)>,
927    module_instance_id: ModuleInstanceId,
928) {
929    // Remove all existing active states
930    for (bytes, operation_id) in states_to_remove {
931        dbtx.remove_entry(&ActiveStateKeyBytes {
932            operation_id,
933            module_instance_id,
934            state: bytes,
935        })
936        .await
937        .expect("Did not delete anything");
938    }
939
940    // Insert new "migrated" active states
941    for (bytes, operation_id) in new_active_states {
942        dbtx.insert_new_entry(
943            &ActiveStateKeyBytes {
944                operation_id,
945                module_instance_id,
946                state: bytes,
947            },
948            &ActiveStateMeta::default(),
949        )
950        .await;
951    }
952}
953
954/// Persists new inactive states by first removing all current inactive states,
955/// and re-writing with the new set of inactive states. `new_inactive_states` is
956/// expected to contain all inactive states, not just the newly created states.
957pub async fn remove_old_and_persist_new_inactive_states(
958    dbtx: &mut DatabaseTransaction<'_>,
959    new_inactive_states: Vec<(Vec<u8>, OperationId)>,
960    states_to_remove: Vec<(Vec<u8>, OperationId)>,
961    module_instance_id: ModuleInstanceId,
962) {
963    // Remove all existing active states
964    for (bytes, operation_id) in states_to_remove {
965        dbtx.remove_entry(&InactiveStateKeyBytes {
966            operation_id,
967            module_instance_id,
968            state: bytes,
969        })
970        .await
971        .expect("Did not delete anything");
972    }
973
974    // Insert new "migrated" inactive states
975    for (bytes, operation_id) in new_inactive_states {
976        dbtx.insert_new_entry(
977            &InactiveStateKeyBytes {
978                operation_id,
979                module_instance_id,
980                state: bytes,
981            },
982            &InactiveStateMeta {
983                created_at: fedimint_core::time::now(),
984                exited_at: fedimint_core::time::now(),
985            },
986        )
987        .await;
988    }
989}
990
991/// Fetches the encoded client secret from the database and decodes it.
992/// If an encoded client secret is not present in the database, or if
993/// decoding fails, an error is returned.
994pub async fn get_decoded_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
995    let mut tx = db.begin_transaction_nc().await;
996    let client_secret = tx.get_value(&EncodedClientSecretKey).await;
997
998    match client_secret {
999        Some(client_secret) => {
1000            T::consensus_decode_whole(&client_secret, &ModuleRegistry::default())
1001                .map_err(|e| anyhow!("Decoding failed: {e}"))
1002        }
1003        None => bail!("Encoded client secret not present in DB"),
1004    }
1005}
1006
1007/// V0 version of operation log entry for migration purposes
1008#[derive(Debug, Serialize, Deserialize, Encodable, Decodable)]
1009pub struct OperationLogEntryV0 {
1010    pub(crate) operation_module_kind: String,
1011    pub(crate) meta: JsonStringed,
1012    pub(crate) outcome: Option<JsonStringed>,
1013}