1use std::collections::{BTreeMap, BTreeSet};
2use std::time::SystemTime;
3
4use anyhow::{anyhow, bail};
5use bitcoin::hex::DisplayHex as _;
6use fedimint_api_client::api::ApiVersionSet;
7use fedimint_client_module::db::ClientModuleMigrationFn;
8use fedimint_client_module::module::recovery::RecoveryProgress;
9use fedimint_client_module::oplog::{JsonStringed, OperationLogEntry, OperationOutcome};
10use fedimint_client_module::sm::{ActiveStateMeta, InactiveStateMeta};
11use fedimint_core::config::{ClientConfig, ClientConfigV0, FederationId, GlobalClientConfig};
12use fedimint_core::core::{ModuleInstanceId, OperationId};
13use fedimint_core::db::{
14 Database, DatabaseTransaction, DatabaseVersion, DatabaseVersionKey,
15 IDatabaseTransactionOpsCore, IDatabaseTransactionOpsCoreTyped, MODULE_GLOBAL_PREFIX,
16 apply_migrations_dbtx, create_database_version_dbtx, get_current_database_version,
17};
18use fedimint_core::encoding::{Decodable, Encodable};
19use fedimint_core::module::SupportedApiVersionsSummary;
20use fedimint_core::module::registry::ModuleRegistry;
21use fedimint_core::{PeerId, impl_db_lookup, impl_db_record};
22use fedimint_eventlog::{
23 DB_KEY_PREFIX_EVENT_LOG, DB_KEY_PREFIX_UNORDERED_EVENT_LOG, EventLogId, UnordedEventLogId,
24};
25use fedimint_logging::LOG_CLIENT_DB;
26use futures::StreamExt;
27use serde::{Deserialize, Serialize};
28use strum::IntoEnumIterator as _;
29use strum_macros::EnumIter;
30use tracing::{debug, info, trace, warn};
31
32use crate::backup::{ClientBackup, Metadata};
33use crate::sm::executor::{
34 ActiveStateKeyBytes, ActiveStateKeyPrefixBytes, ExecutorDbPrefixes, InactiveStateKeyBytes,
35 InactiveStateKeyPrefixBytes,
36};
37
38#[repr(u8)]
39#[derive(Clone, EnumIter, Debug)]
40pub enum DbKeyPrefix {
41 EncodedClientSecret = 0x28,
42 ClientSecret = 0x29, ClientPreRootSecretHash = 0x2a,
44 OperationLog = 0x2c,
45 ChronologicalOperationLog = 0x2d,
46 CommonApiVersionCache = 0x2e,
47 ClientConfig = 0x2f,
48 PendingClientConfig = 0x3b,
49 ClientInviteCode = 0x30, ClientInitState = 0x31,
51 ClientMetadata = 0x32,
52 ClientLastBackup = 0x33,
53 ClientMetaField = 0x34,
54 ClientMetaServiceInfo = 0x35,
55 ApiSecret = 0x36,
56 PeerLastApiVersionsSummaryCache = 0x37,
57 ApiUrlAnnouncement = 0x38,
58 EventLog = fedimint_eventlog::DB_KEY_PREFIX_EVENT_LOG,
59 UnorderedEventLog = fedimint_eventlog::DB_KEY_PREFIX_UNORDERED_EVENT_LOG,
60 EventLogTrimable = fedimint_eventlog::DB_KEY_PREFIX_EVENT_LOG_TRIMABLE,
61 ClientModuleRecovery = 0x40,
62
63 DatabaseVersion = fedimint_core::db::DbKeyPrefix::DatabaseVersion as u8,
64 ClientBackup = fedimint_core::db::DbKeyPrefix::ClientBackup as u8,
65
66 ActiveStates = ExecutorDbPrefixes::ActiveStates as u8,
67 InactiveStates = ExecutorDbPrefixes::InactiveStates as u8,
68
69 UserData = 0xb0,
79 ExternalReservedStart = 0xb1,
82 ExternalReservedEnd = 0xcf,
85 InternalReservedStart = 0xd0,
87 ModuleGlobalPrefix = 0xff,
89}
90
91pub(crate) async fn verify_client_db_integrity_dbtx(dbtx: &mut DatabaseTransaction<'_>) {
92 let prefixes: BTreeSet<u8> = DbKeyPrefix::iter().map(|prefix| prefix as u8).collect();
93
94 let mut records = dbtx.raw_find_by_prefix(&[]).await.expect("DB fail");
95 while let Some((k, v)) = records.next().await {
96 if DbKeyPrefix::UserData as u8 <= k[0] {
98 break;
99 }
100
101 assert!(
102 prefixes.contains(&k[0]),
103 "Unexpected client db record found: {}: {}",
104 k.as_hex(),
105 v.as_hex()
106 );
107 }
108}
109
110impl std::fmt::Display for DbKeyPrefix {
111 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
112 write!(f, "{self:?}")
113 }
114}
115
116#[derive(Debug, Encodable, Decodable)]
117pub struct EncodedClientSecretKey;
118
119#[derive(Debug, Encodable, Decodable)]
120pub struct EncodedClientSecretKeyPrefix;
121
122impl_db_record!(
123 key = EncodedClientSecretKey,
124 value = Vec<u8>,
125 db_prefix = DbKeyPrefix::EncodedClientSecret,
126);
127impl_db_lookup!(
128 key = EncodedClientSecretKey,
129 query_prefix = EncodedClientSecretKeyPrefix
130);
131
132#[derive(Debug, Encodable, Decodable, Serialize)]
133pub struct OperationLogKey {
134 pub operation_id: OperationId,
135}
136
137impl_db_record!(
138 key = OperationLogKey,
139 value = OperationLogEntry,
140 db_prefix = DbKeyPrefix::OperationLog
141);
142
143#[derive(Debug, Encodable)]
144pub struct OperationLogKeyPrefix;
145
146impl_db_lookup!(key = OperationLogKey, query_prefix = OperationLogKeyPrefix);
147
148#[derive(Debug, Encodable, Decodable, Serialize)]
149pub struct OperationLogKeyV0 {
150 pub operation_id: OperationId,
151}
152
153#[derive(Debug, Encodable)]
154pub struct OperationLogKeyPrefixV0;
155
156impl_db_record!(
157 key = OperationLogKeyV0,
158 value = OperationLogEntryV0,
159 db_prefix = DbKeyPrefix::OperationLog
160);
161
162impl_db_lookup!(
163 key = OperationLogKeyV0,
164 query_prefix = OperationLogKeyPrefixV0
165);
166
167#[derive(Debug, Encodable, Decodable, Serialize)]
168pub struct ClientPreRootSecretHashKey;
169
170impl_db_record!(
171 key = ClientPreRootSecretHashKey,
172 value = [u8; 8],
173 db_prefix = DbKeyPrefix::ClientPreRootSecretHash
174);
175
176#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq, Encodable, Decodable, Serialize, Deserialize)]
178pub struct ChronologicalOperationLogKey {
179 pub creation_time: std::time::SystemTime,
180 pub operation_id: OperationId,
181}
182
183#[derive(Debug, Encodable)]
184pub struct ChronologicalOperationLogKeyPrefix;
185
186impl_db_record!(
187 key = ChronologicalOperationLogKey,
188 value = (),
189 db_prefix = DbKeyPrefix::ChronologicalOperationLog
190);
191
192impl_db_lookup!(
193 key = ChronologicalOperationLogKey,
194 query_prefix = ChronologicalOperationLogKeyPrefix
195);
196
197#[derive(Debug, Encodable, Decodable)]
198pub struct CachedApiVersionSetKey;
199
200#[derive(Debug, Encodable, Decodable)]
201pub struct CachedApiVersionSet(pub ApiVersionSet);
202
203impl_db_record!(
204 key = CachedApiVersionSetKey,
205 value = CachedApiVersionSet,
206 db_prefix = DbKeyPrefix::CommonApiVersionCache
207);
208
209#[derive(Debug, Encodable, Decodable)]
210pub struct PeerLastApiVersionsSummaryKey(pub PeerId);
211
212#[derive(Debug, Encodable, Decodable)]
213pub struct PeerLastApiVersionsSummary(pub SupportedApiVersionsSummary);
214
215impl_db_record!(
216 key = PeerLastApiVersionsSummaryKey,
217 value = PeerLastApiVersionsSummary,
218 db_prefix = DbKeyPrefix::PeerLastApiVersionsSummaryCache
219);
220
221#[derive(Debug, Encodable, Decodable, Serialize)]
222pub struct ClientConfigKey;
223
224impl_db_record!(
225 key = ClientConfigKey,
226 value = ClientConfig,
227 db_prefix = DbKeyPrefix::ClientConfig
228);
229
230#[derive(Debug, Encodable, Decodable, Serialize)]
231pub struct PendingClientConfigKey;
232
233impl_db_record!(
234 key = PendingClientConfigKey,
235 value = ClientConfig,
236 db_prefix = DbKeyPrefix::PendingClientConfig
237);
238
239#[derive(Debug, Encodable, Decodable, Serialize)]
240pub struct ClientConfigKeyV0 {
241 pub id: FederationId,
242}
243
244#[derive(Debug, Encodable)]
245pub struct ClientConfigKeyPrefixV0;
246
247impl_db_record!(
248 key = ClientConfigKeyV0,
249 value = ClientConfigV0,
250 db_prefix = DbKeyPrefix::ClientConfig
251);
252
253impl_db_lookup!(
254 key = ClientConfigKeyV0,
255 query_prefix = ClientConfigKeyPrefixV0
256);
257
258#[derive(Debug, Encodable, Decodable, Serialize)]
259pub struct ApiSecretKey;
260
261#[derive(Debug, Encodable)]
262pub struct ApiSecretKeyPrefix;
263
264impl_db_record!(
265 key = ApiSecretKey,
266 value = String,
267 db_prefix = DbKeyPrefix::ApiSecret
268);
269
270impl_db_lookup!(key = ApiSecretKey, query_prefix = ApiSecretKeyPrefix);
271
272#[derive(Debug, Encodable, Decodable, Serialize)]
274pub struct ClientMetadataKey;
275
276#[derive(Debug, Encodable)]
277pub struct ClientMetadataPrefix;
278
279impl_db_record!(
280 key = ClientMetadataKey,
281 value = Metadata,
282 db_prefix = DbKeyPrefix::ClientMetadata
283);
284
285impl_db_lookup!(key = ClientMetadataKey, query_prefix = ClientMetadataPrefix);
286
287#[derive(Debug, Encodable, Decodable, Serialize)]
289pub struct ClientInitStateKey;
290
291#[derive(Debug, Encodable)]
292pub struct ClientInitStatePrefix;
293
294#[derive(Debug, Encodable, Decodable)]
296pub enum InitMode {
297 Fresh,
299 Recover { snapshot: Option<ClientBackup> },
302}
303
304#[derive(Debug, Encodable, Decodable)]
310pub enum InitModeComplete {
311 Fresh,
312 Recover,
313}
314
315#[derive(Debug, Encodable, Decodable)]
317pub enum InitState {
318 Pending(InitMode),
321 Complete(InitModeComplete),
323}
324
325impl InitState {
326 pub fn into_complete(self) -> Self {
327 match self {
328 InitState::Pending(p) => InitState::Complete(match p {
329 InitMode::Fresh => InitModeComplete::Fresh,
330 InitMode::Recover { .. } => InitModeComplete::Recover,
331 }),
332 InitState::Complete(t) => InitState::Complete(t),
333 }
334 }
335
336 pub fn does_require_recovery(&self) -> Option<Option<ClientBackup>> {
337 match self {
338 InitState::Pending(p) => match p {
339 InitMode::Fresh => None,
340 InitMode::Recover { snapshot } => Some(snapshot.clone()),
341 },
342 InitState::Complete(_) => None,
343 }
344 }
345
346 pub fn is_pending(&self) -> bool {
347 match self {
348 InitState::Pending(_) => true,
349 InitState::Complete(_) => false,
350 }
351 }
352}
353
354impl_db_record!(
355 key = ClientInitStateKey,
356 value = InitState,
357 db_prefix = DbKeyPrefix::ClientInitState
358);
359
360impl_db_lookup!(
361 key = ClientInitStateKey,
362 query_prefix = ClientInitStatePrefix
363);
364
365#[derive(Debug, Encodable, Decodable, Serialize)]
366pub struct ClientModuleRecovery {
367 pub module_instance_id: ModuleInstanceId,
368}
369
370#[derive(Debug, Clone, Encodable, Decodable)]
371pub struct ClientModuleRecoveryState {
372 pub progress: RecoveryProgress,
373}
374
375impl ClientModuleRecoveryState {
376 pub fn is_done(&self) -> bool {
377 self.progress.is_done()
378 }
379}
380
381impl_db_record!(
382 key = ClientModuleRecovery,
383 value = ClientModuleRecoveryState,
384 db_prefix = DbKeyPrefix::ClientModuleRecovery,
385);
386
387#[derive(Debug, Encodable, Decodable, Serialize)]
394pub struct ClientModuleRecoveryIncorrectDoNotUse {
395 pub module_instance_id: ModuleInstanceId,
396}
397
398impl_db_record!(
399 key = ClientModuleRecoveryIncorrectDoNotUse,
400 value = ClientModuleRecoveryState,
401 db_prefix = DbKeyPrefix::ClientInitState,
403);
404
405#[derive(Debug, Encodable, Decodable)]
410pub struct LastBackupKey;
411
412impl_db_record!(
413 key = LastBackupKey,
414 value = ClientBackup,
415 db_prefix = DbKeyPrefix::ClientLastBackup
416);
417
418#[derive(Encodable, Decodable, Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
419pub(crate) struct MetaFieldPrefix;
420
421#[derive(Encodable, Decodable, Debug)]
422pub struct MetaServiceInfoKey;
423
424#[derive(Encodable, Decodable, Debug)]
425pub struct MetaServiceInfo {
426 pub last_updated: SystemTime,
427 pub revision: u64,
428}
429
430#[derive(
431 Encodable, Decodable, Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Serialize, Deserialize,
432)]
433pub(crate) struct MetaFieldKey(pub fedimint_client_module::meta::MetaFieldKey);
434
435#[derive(Encodable, Decodable, Debug, Clone, Serialize, Deserialize)]
436pub(crate) struct MetaFieldValue(pub fedimint_client_module::meta::MetaFieldValue);
437
438impl_db_record!(
439 key = MetaFieldKey,
440 value = MetaFieldValue,
441 db_prefix = DbKeyPrefix::ClientMetaField
442);
443
444impl_db_record!(
445 key = MetaServiceInfoKey,
446 value = MetaServiceInfo,
447 db_prefix = DbKeyPrefix::ClientMetaServiceInfo
448);
449
450impl_db_lookup!(key = MetaFieldKey, query_prefix = MetaFieldPrefix);
451
452pub fn get_core_client_database_migrations()
453-> BTreeMap<DatabaseVersion, fedimint_core::db::ClientCoreDbMigrationFn> {
454 let mut migrations: BTreeMap<DatabaseVersion, fedimint_core::db::ClientCoreDbMigrationFn> =
455 BTreeMap::new();
456 migrations.insert(
457 DatabaseVersion(0),
458 Box::new(|mut ctx| {
459 Box::pin(async move {
460 let mut dbtx = ctx.dbtx();
461
462 let config_v0 = dbtx
463 .find_by_prefix(&ClientConfigKeyPrefixV0)
464 .await
465 .collect::<Vec<_>>()
466 .await;
467
468 assert!(config_v0.len() <= 1);
469 let Some((id, config_v0)) = config_v0.into_iter().next() else {
470 return Ok(());
471 };
472
473 let global = GlobalClientConfig {
474 api_endpoints: config_v0.global.api_endpoints,
475 broadcast_public_keys: None,
476 consensus_version: config_v0.global.consensus_version,
477 meta: config_v0.global.meta,
478 };
479
480 let config = ClientConfig {
481 global,
482 modules: config_v0.modules,
483 };
484
485 dbtx.remove_entry(&id).await;
486 dbtx.insert_new_entry(&ClientConfigKey, &config).await;
487 Ok(())
488 })
489 }),
490 );
491
492 migrations.insert(
494 DatabaseVersion(1),
495 Box::new(|mut ctx| {
496 Box::pin(async move {
497 let mut dbtx = ctx.dbtx();
498
499 let operation_logs = dbtx
501 .find_by_prefix(&OperationLogKeyPrefixV0)
502 .await
503 .collect::<Vec<_>>()
504 .await;
505
506 let mut op_id_max_time = BTreeMap::new();
508
509 {
511 let mut inactive_states_stream =
512 dbtx.find_by_prefix(&InactiveStateKeyPrefixBytes).await;
513
514 while let Some((state, meta)) = inactive_states_stream.next().await {
515 let entry = op_id_max_time
516 .entry(state.operation_id)
517 .or_insert(meta.exited_at);
518 *entry = (*entry).max(meta.exited_at);
519 }
520 }
521 for (op_key_v0, log_entry_v0) in operation_logs {
523 let new_entry = OperationLogEntry::new(
524 log_entry_v0.operation_module_kind,
525 log_entry_v0.meta,
526 log_entry_v0.outcome.map(|outcome| {
527 OperationOutcome {
528 outcome,
529 time: op_id_max_time
532 .get(&op_key_v0.operation_id)
533 .copied()
534 .unwrap_or_else(fedimint_core::time::now),
535 }
536 }),
537 );
538
539 dbtx.remove_entry(&op_key_v0).await;
540 dbtx.insert_entry(
541 &OperationLogKey {
542 operation_id: op_key_v0.operation_id,
543 },
544 &new_entry,
545 )
546 .await;
547 }
548
549 Ok(())
550 })
551 }),
552 );
553
554 migrations.insert(
556 DatabaseVersion(2),
557 Box::new(|mut ctx: fedimint_core::db::DbMigrationFnContext<'_, _>| {
558 Box::pin(async move {
559 let mut dbtx = ctx.dbtx();
560
561 {
563 let mut ordered_log_entries = dbtx
564 .raw_find_by_prefix(&[DB_KEY_PREFIX_EVENT_LOG])
565 .await
566 .expect("DB operation failed");
567 let mut keys_to_migrate = vec![];
568 while let Some((k, _v)) = ordered_log_entries.next().await {
569 trace!(target: LOG_CLIENT_DB,
570 k=%k.as_hex(),
571 "Checking ordered log key"
572 );
573 if EventLogId::consensus_decode_whole(&k[1..], &Default::default()).is_err()
574 {
575 assert!(
576 UnordedEventLogId::consensus_decode_whole(
577 &k[1..],
578 &Default::default()
579 )
580 .is_ok()
581 );
582 keys_to_migrate.push(k);
583 }
584 }
585 drop(ordered_log_entries);
586 for mut key_to_migrate in keys_to_migrate {
587 warn!(target: LOG_CLIENT_DB,
588 k=%key_to_migrate.as_hex(),
589 "Migrating unordered event log entry written to an ordered log"
590 );
591 let v = dbtx
592 .raw_remove_entry(&key_to_migrate)
593 .await
594 .expect("DB operation failed")
595 .expect("Was there a moment ago");
596 assert_eq!(key_to_migrate[0], 0x39);
597 key_to_migrate[0] = DB_KEY_PREFIX_UNORDERED_EVENT_LOG;
598 assert_eq!(key_to_migrate[0], 0x3a);
599 dbtx.raw_insert_bytes(&key_to_migrate, &v)
600 .await
601 .expect("DB operation failed");
602 }
603 }
604
605 {
607 let mut unordered_log_entries = dbtx
608 .raw_find_by_prefix(&[DB_KEY_PREFIX_UNORDERED_EVENT_LOG])
609 .await
610 .expect("DB operation failed");
611 let mut keys_to_migrate = vec![];
612 while let Some((k, _v)) = unordered_log_entries.next().await {
613 trace!(target: LOG_CLIENT_DB,
614 k=%k.as_hex(),
615 "Checking unordered log key"
616 );
617 if UnordedEventLogId::consensus_decode_whole(&k[1..], &Default::default())
618 .is_err()
619 {
620 assert!(
621 EventLogId::consensus_decode_whole(&k[1..], &Default::default())
622 .is_ok()
623 );
624 keys_to_migrate.push(k);
625 }
626 }
627 drop(unordered_log_entries);
628 for mut key_to_migrate in keys_to_migrate {
629 warn!(target: LOG_CLIENT_DB,
630 k=%key_to_migrate.as_hex(),
631 "Migrating ordered event log entry written to an unordered log"
632 );
633 let v = dbtx
634 .raw_remove_entry(&key_to_migrate)
635 .await
636 .expect("DB operation failed")
637 .expect("Was there a moment ago");
638 assert_eq!(key_to_migrate[0], 0x3a);
639 key_to_migrate[0] = DB_KEY_PREFIX_EVENT_LOG;
640 assert_eq!(key_to_migrate[0], 0x39);
641 dbtx.raw_insert_bytes(&key_to_migrate, &v)
642 .await
643 .expect("DB operation failed");
644 }
645 }
646 Ok(())
647 })
648 }),
649 );
650
651 migrations.insert(
653 DatabaseVersion(3),
654 Box::new(|mut ctx: fedimint_core::db::DbMigrationFnContext<'_, _>| {
655 Box::pin(async move {
656 let mut dbtx = ctx.dbtx();
657
658 for module_id in 0..u16::MAX {
659 let old_key = ClientModuleRecoveryIncorrectDoNotUse {
660 module_instance_id: module_id,
661 };
662 let new_key = ClientModuleRecovery {
663 module_instance_id: module_id,
664 };
665 let Some(value) = dbtx.get_value(&old_key).await else {
666 debug!(target: LOG_CLIENT_DB, %module_id, "No more ClientModuleRecovery keys found for migartion");
667 break;
668 };
669
670 debug!(target: LOG_CLIENT_DB, %module_id, "Migrating old ClientModuleRecovery key");
671 dbtx.remove_entry(&old_key).await.expect("Is there.");
672 assert!(dbtx.insert_entry(&new_key, &value).await.is_none());
673 }
674
675 Ok(())
676 })
677 }),
678 );
679 migrations
680}
681
682pub async fn apply_migrations_core_client_dbtx(
686 dbtx: &mut DatabaseTransaction<'_>,
687 kind: String,
688) -> Result<(), anyhow::Error> {
689 apply_migrations_dbtx(
690 dbtx,
691 (),
692 kind,
693 get_core_client_database_migrations(),
694 None,
695 Some(DbKeyPrefix::UserData as u8),
696 )
697 .await
698}
699
700pub async fn apply_migrations_client_module(
710 db: &Database,
711 kind: String,
712 migrations: BTreeMap<DatabaseVersion, ClientModuleMigrationFn>,
713 module_instance_id: ModuleInstanceId,
714) -> Result<(), anyhow::Error> {
715 let mut dbtx = db.begin_transaction().await;
716 apply_migrations_client_module_dbtx(
717 &mut dbtx.to_ref_nc(),
718 kind,
719 migrations,
720 module_instance_id,
721 )
722 .await?;
723 dbtx.commit_tx_result().await
724}
725
726pub async fn apply_migrations_client_module_dbtx(
727 dbtx: &mut DatabaseTransaction<'_>,
728 kind: String,
729 migrations: BTreeMap<DatabaseVersion, ClientModuleMigrationFn>,
730 module_instance_id: ModuleInstanceId,
731) -> Result<(), anyhow::Error> {
732 let is_new_db = dbtx
735 .raw_find_by_prefix(&[MODULE_GLOBAL_PREFIX])
736 .await?
737 .next()
738 .await
739 .is_none();
740
741 let target_version = get_current_database_version(&migrations);
742
743 create_database_version_dbtx(
745 dbtx,
746 target_version,
747 Some(module_instance_id),
748 kind.clone(),
749 is_new_db,
750 )
751 .await?;
752
753 let current_version = dbtx
754 .get_value(&DatabaseVersionKey(module_instance_id))
755 .await;
756
757 let db_version = if let Some(mut current_version) = current_version {
758 if current_version == target_version {
759 trace!(
760 target: LOG_CLIENT_DB,
761 %current_version,
762 %target_version,
763 module_instance_id,
764 kind,
765 "Database version up to date"
766 );
767 return Ok(());
768 }
769
770 if target_version < current_version {
771 return Err(anyhow!(format!(
772 "On disk database version for module {kind} was higher ({}) than the target database version ({}).",
773 current_version, target_version,
774 )));
775 }
776
777 info!(
778 target: LOG_CLIENT_DB,
779 %current_version,
780 %target_version,
781 module_instance_id,
782 kind,
783 "Migrating client module database"
784 );
785 let mut active_states = get_active_states(&mut dbtx.to_ref_nc(), module_instance_id).await;
786 let mut inactive_states =
787 get_inactive_states(&mut dbtx.to_ref_nc(), module_instance_id).await;
788
789 while current_version < target_version {
790 let new_states = if let Some(migration) = migrations.get(¤t_version) {
791 debug!(
792 target: LOG_CLIENT_DB,
793 module_instance_id,
794 %kind,
795 %current_version,
796 %target_version,
797 "Running module db migration");
798
799 migration(
800 &mut dbtx
801 .to_ref_with_prefix_module_id(module_instance_id)
802 .0
803 .into_nc(),
804 active_states.clone(),
805 inactive_states.clone(),
806 )
807 .await?
808 } else {
809 warn!(
810 target: LOG_CLIENT_DB,
811 ?current_version, "Missing client db migration");
812 None
813 };
814
815 if let Some((new_active_states, new_inactive_states)) = new_states {
818 remove_old_and_persist_new_active_states(
819 &mut dbtx.to_ref_nc(),
820 new_active_states.clone(),
821 active_states.clone(),
822 module_instance_id,
823 )
824 .await;
825 remove_old_and_persist_new_inactive_states(
826 &mut dbtx.to_ref_nc(),
827 new_inactive_states.clone(),
828 inactive_states.clone(),
829 module_instance_id,
830 )
831 .await;
832
833 active_states = new_active_states;
835 inactive_states = new_inactive_states;
836 }
837
838 current_version = current_version.increment();
839 dbtx.insert_entry(&DatabaseVersionKey(module_instance_id), ¤t_version)
840 .await;
841 }
842
843 current_version
844 } else {
845 target_version
846 };
847
848 debug!(
849 target: LOG_CLIENT_DB,
850 ?kind, ?db_version, "Client DB Version");
851 Ok(())
852}
853
854pub async fn get_active_states(
860 dbtx: &mut DatabaseTransaction<'_>,
861 module_instance_id: ModuleInstanceId,
862) -> Vec<(Vec<u8>, OperationId)> {
863 dbtx.find_by_prefix(&ActiveStateKeyPrefixBytes)
864 .await
865 .filter_map(|(state, _)| async move {
866 if module_instance_id == state.module_instance_id {
867 Some((state.state, state.operation_id))
868 } else {
869 None
870 }
871 })
872 .collect::<Vec<_>>()
873 .await
874}
875
876pub async fn get_inactive_states(
882 dbtx: &mut DatabaseTransaction<'_>,
883 module_instance_id: ModuleInstanceId,
884) -> Vec<(Vec<u8>, OperationId)> {
885 dbtx.find_by_prefix(&InactiveStateKeyPrefixBytes)
886 .await
887 .filter_map(|(state, _)| async move {
888 if module_instance_id == state.module_instance_id {
889 Some((state.state, state.operation_id))
890 } else {
891 None
892 }
893 })
894 .collect::<Vec<_>>()
895 .await
896}
897
898pub async fn remove_old_and_persist_new_active_states(
902 dbtx: &mut DatabaseTransaction<'_>,
903 new_active_states: Vec<(Vec<u8>, OperationId)>,
904 states_to_remove: Vec<(Vec<u8>, OperationId)>,
905 module_instance_id: ModuleInstanceId,
906) {
907 for (bytes, operation_id) in states_to_remove {
909 dbtx.remove_entry(&ActiveStateKeyBytes {
910 operation_id,
911 module_instance_id,
912 state: bytes,
913 })
914 .await
915 .expect("Did not delete anything");
916 }
917
918 for (bytes, operation_id) in new_active_states {
920 dbtx.insert_new_entry(
921 &ActiveStateKeyBytes {
922 operation_id,
923 module_instance_id,
924 state: bytes,
925 },
926 &ActiveStateMeta::default(),
927 )
928 .await;
929 }
930}
931
932pub async fn remove_old_and_persist_new_inactive_states(
936 dbtx: &mut DatabaseTransaction<'_>,
937 new_inactive_states: Vec<(Vec<u8>, OperationId)>,
938 states_to_remove: Vec<(Vec<u8>, OperationId)>,
939 module_instance_id: ModuleInstanceId,
940) {
941 for (bytes, operation_id) in states_to_remove {
943 dbtx.remove_entry(&InactiveStateKeyBytes {
944 operation_id,
945 module_instance_id,
946 state: bytes,
947 })
948 .await
949 .expect("Did not delete anything");
950 }
951
952 for (bytes, operation_id) in new_inactive_states {
954 dbtx.insert_new_entry(
955 &InactiveStateKeyBytes {
956 operation_id,
957 module_instance_id,
958 state: bytes,
959 },
960 &InactiveStateMeta {
961 created_at: fedimint_core::time::now(),
962 exited_at: fedimint_core::time::now(),
963 },
964 )
965 .await;
966 }
967}
968
969pub async fn get_decoded_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
973 let mut tx = db.begin_transaction_nc().await;
974 let client_secret = tx.get_value(&EncodedClientSecretKey).await;
975
976 match client_secret {
977 Some(client_secret) => {
978 T::consensus_decode_whole(&client_secret, &ModuleRegistry::default())
979 .map_err(|e| anyhow!("Decoding failed: {e}"))
980 }
981 None => bail!("Encoded client secret not present in DB"),
982 }
983}
984
985#[derive(Debug, Serialize, Deserialize, Encodable, Decodable)]
987pub struct OperationLogEntryV0 {
988 pub(crate) operation_module_kind: String,
989 pub(crate) meta: JsonStringed,
990 pub(crate) outcome: Option<JsonStringed>,
991}