1use std::collections::{BTreeMap, BTreeSet};
2use std::time::SystemTime;
3
4use anyhow::{anyhow, bail};
5use bitcoin::hex::DisplayHex as _;
6use fedimint_api_client::api::ApiVersionSet;
7use fedimint_client_module::db::ClientModuleMigrationFn;
8use fedimint_client_module::module::recovery::RecoveryProgress;
9use fedimint_client_module::oplog::{JsonStringed, OperationLogEntry, OperationOutcome};
10use fedimint_client_module::sm::{ActiveStateMeta, InactiveStateMeta};
11use fedimint_core::config::{ClientConfig, ClientConfigV0, FederationId, GlobalClientConfig};
12use fedimint_core::core::{ModuleInstanceId, OperationId};
13use fedimint_core::db::{
14 Database, DatabaseTransaction, DatabaseVersion, DatabaseVersionKey,
15 IDatabaseTransactionOpsCore, IDatabaseTransactionOpsCoreTyped, MODULE_GLOBAL_PREFIX,
16 apply_migrations_dbtx, create_database_version_dbtx, get_current_database_version,
17};
18use fedimint_core::encoding::{Decodable, Encodable};
19use fedimint_core::module::SupportedApiVersionsSummary;
20use fedimint_core::module::registry::ModuleRegistry;
21use fedimint_core::{PeerId, impl_db_lookup, impl_db_record};
22use fedimint_eventlog::{
23 DB_KEY_PREFIX_EVENT_LOG, DB_KEY_PREFIX_UNORDERED_EVENT_LOG, EventLogId, UnordedEventLogId,
24};
25use fedimint_logging::LOG_CLIENT_DB;
26use futures::StreamExt;
27use serde::{Deserialize, Serialize};
28use strum::IntoEnumIterator as _;
29use strum_macros::EnumIter;
30use tracing::{debug, info, trace, warn};
31
32use crate::backup::{ClientBackup, Metadata};
33use crate::sm::executor::{
34 ActiveStateKeyBytes, ActiveStateKeyPrefixBytes, ExecutorDbPrefixes, InactiveStateKeyBytes,
35 InactiveStateKeyPrefixBytes,
36};
37
38#[repr(u8)]
39#[derive(Clone, EnumIter, Debug)]
40pub enum DbKeyPrefix {
41 EncodedClientSecret = 0x28,
42 ClientSecret = 0x29, ClientPreRootSecretHash = 0x2a,
44 OperationLog = 0x2c,
45 ChronologicalOperationLog = 0x2d,
46 CommonApiVersionCache = 0x2e,
47 ClientConfig = 0x2f,
48 PendingClientConfig = 0x3b,
49 ClientInviteCode = 0x30, ClientInitState = 0x31,
51 ClientMetadata = 0x32,
52 ClientLastBackup = 0x33,
53 ClientMetaField = 0x34,
54 ClientMetaServiceInfo = 0x35,
55 ApiSecret = 0x36,
56 PeerLastApiVersionsSummaryCache = 0x37,
57 ApiUrlAnnouncement = 0x38,
58 EventLog = fedimint_eventlog::DB_KEY_PREFIX_EVENT_LOG,
59 UnorderedEventLog = fedimint_eventlog::DB_KEY_PREFIX_UNORDERED_EVENT_LOG,
60 EventLogTrimable = fedimint_eventlog::DB_KEY_PREFIX_EVENT_LOG_TRIMABLE,
61 ClientModuleRecovery = 0x40,
62
63 DatabaseVersion = fedimint_core::db::DbKeyPrefix::DatabaseVersion as u8,
64 ClientBackup = fedimint_core::db::DbKeyPrefix::ClientBackup as u8,
65
66 ActiveStates = ExecutorDbPrefixes::ActiveStates as u8,
67 InactiveStates = ExecutorDbPrefixes::InactiveStates as u8,
68
69 UserData = 0xb0,
79 ExternalReservedStart = 0xb1,
82 ExternalReservedEnd = 0xcf,
85 InternalReservedStart = 0xd0,
88 ModuleGlobalPrefix = 0xff,
90}
91
92#[repr(u8)]
93#[derive(Clone, EnumIter, Debug)]
94pub(crate) enum DbKeyPrefixInternalReserved {
95 DefaultApplicationEventLogPos = 0xd0,
97}
98
99pub(crate) async fn verify_client_db_integrity_dbtx(dbtx: &mut DatabaseTransaction<'_>) {
100 let prefixes: BTreeSet<u8> = DbKeyPrefix::iter().map(|prefix| prefix as u8).collect();
101
102 let mut records = dbtx.raw_find_by_prefix(&[]).await.expect("DB fail");
103 while let Some((k, v)) = records.next().await {
104 if DbKeyPrefix::UserData as u8 <= k[0] {
106 break;
107 }
108
109 assert!(
110 prefixes.contains(&k[0]),
111 "Unexpected client db record found: {}: {}",
112 k.as_hex(),
113 v.as_hex()
114 );
115 }
116}
117
118impl std::fmt::Display for DbKeyPrefix {
119 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
120 write!(f, "{self:?}")
121 }
122}
123
124#[derive(Debug, Encodable, Decodable)]
125pub struct EncodedClientSecretKey;
126
127#[derive(Debug, Encodable, Decodable)]
128pub struct EncodedClientSecretKeyPrefix;
129
130impl_db_record!(
131 key = EncodedClientSecretKey,
132 value = Vec<u8>,
133 db_prefix = DbKeyPrefix::EncodedClientSecret,
134);
135impl_db_lookup!(
136 key = EncodedClientSecretKey,
137 query_prefix = EncodedClientSecretKeyPrefix
138);
139
140#[derive(Debug, Encodable, Decodable, Serialize)]
141pub struct OperationLogKey {
142 pub operation_id: OperationId,
143}
144
145impl_db_record!(
146 key = OperationLogKey,
147 value = OperationLogEntry,
148 db_prefix = DbKeyPrefix::OperationLog
149);
150
151#[derive(Debug, Encodable)]
152pub struct OperationLogKeyPrefix;
153
154impl_db_lookup!(key = OperationLogKey, query_prefix = OperationLogKeyPrefix);
155
156#[derive(Debug, Encodable, Decodable, Serialize)]
157pub struct OperationLogKeyV0 {
158 pub operation_id: OperationId,
159}
160
161#[derive(Debug, Encodable)]
162pub struct OperationLogKeyPrefixV0;
163
164impl_db_record!(
165 key = OperationLogKeyV0,
166 value = OperationLogEntryV0,
167 db_prefix = DbKeyPrefix::OperationLog
168);
169
170impl_db_lookup!(
171 key = OperationLogKeyV0,
172 query_prefix = OperationLogKeyPrefixV0
173);
174
175#[derive(Debug, Encodable, Decodable, Serialize)]
176pub struct ClientPreRootSecretHashKey;
177
178impl_db_record!(
179 key = ClientPreRootSecretHashKey,
180 value = [u8; 8],
181 db_prefix = DbKeyPrefix::ClientPreRootSecretHash
182);
183
184#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq, Encodable, Decodable, Serialize, Deserialize)]
186pub struct ChronologicalOperationLogKey {
187 pub creation_time: std::time::SystemTime,
188 pub operation_id: OperationId,
189}
190
191#[derive(Debug, Encodable)]
192pub struct ChronologicalOperationLogKeyPrefix;
193
194impl_db_record!(
195 key = ChronologicalOperationLogKey,
196 value = (),
197 db_prefix = DbKeyPrefix::ChronologicalOperationLog
198);
199
200impl_db_lookup!(
201 key = ChronologicalOperationLogKey,
202 query_prefix = ChronologicalOperationLogKeyPrefix
203);
204
205#[derive(Debug, Encodable, Decodable)]
206pub struct CachedApiVersionSetKey;
207
208#[derive(Debug, Encodable, Decodable)]
209pub struct CachedApiVersionSet(pub ApiVersionSet);
210
211impl_db_record!(
212 key = CachedApiVersionSetKey,
213 value = CachedApiVersionSet,
214 db_prefix = DbKeyPrefix::CommonApiVersionCache
215);
216
217#[derive(Debug, Encodable, Decodable)]
218pub struct PeerLastApiVersionsSummaryKey(pub PeerId);
219
220#[derive(Debug, Encodable, Decodable)]
221pub struct PeerLastApiVersionsSummary(pub SupportedApiVersionsSummary);
222
223impl_db_record!(
224 key = PeerLastApiVersionsSummaryKey,
225 value = PeerLastApiVersionsSummary,
226 db_prefix = DbKeyPrefix::PeerLastApiVersionsSummaryCache
227);
228
229#[derive(Debug, Encodable, Decodable, Serialize)]
230pub struct ClientConfigKey;
231
232impl_db_record!(
233 key = ClientConfigKey,
234 value = ClientConfig,
235 db_prefix = DbKeyPrefix::ClientConfig
236);
237
238#[derive(Debug, Encodable, Decodable, Serialize)]
239pub struct PendingClientConfigKey;
240
241impl_db_record!(
242 key = PendingClientConfigKey,
243 value = ClientConfig,
244 db_prefix = DbKeyPrefix::PendingClientConfig
245);
246
247#[derive(Debug, Encodable, Decodable, Serialize)]
248pub struct ClientConfigKeyV0 {
249 pub id: FederationId,
250}
251
252#[derive(Debug, Encodable)]
253pub struct ClientConfigKeyPrefixV0;
254
255impl_db_record!(
256 key = ClientConfigKeyV0,
257 value = ClientConfigV0,
258 db_prefix = DbKeyPrefix::ClientConfig
259);
260
261impl_db_lookup!(
262 key = ClientConfigKeyV0,
263 query_prefix = ClientConfigKeyPrefixV0
264);
265
266#[derive(Debug, Encodable, Decodable, Serialize)]
267pub struct ApiSecretKey;
268
269#[derive(Debug, Encodable)]
270pub struct ApiSecretKeyPrefix;
271
272impl_db_record!(
273 key = ApiSecretKey,
274 value = String,
275 db_prefix = DbKeyPrefix::ApiSecret
276);
277
278impl_db_lookup!(key = ApiSecretKey, query_prefix = ApiSecretKeyPrefix);
279
280#[derive(Debug, Encodable, Decodable, Serialize)]
282pub struct ClientMetadataKey;
283
284#[derive(Debug, Encodable)]
285pub struct ClientMetadataPrefix;
286
287impl_db_record!(
288 key = ClientMetadataKey,
289 value = Metadata,
290 db_prefix = DbKeyPrefix::ClientMetadata
291);
292
293impl_db_lookup!(key = ClientMetadataKey, query_prefix = ClientMetadataPrefix);
294
295#[derive(Debug, Encodable, Decodable, Serialize)]
297pub struct ClientInitStateKey;
298
299#[derive(Debug, Encodable)]
300pub struct ClientInitStatePrefix;
301
302#[derive(Debug, Encodable, Decodable)]
304pub enum InitMode {
305 Fresh,
307 Recover { snapshot: Option<ClientBackup> },
310}
311
312#[derive(Debug, Encodable, Decodable)]
318pub enum InitModeComplete {
319 Fresh,
320 Recover,
321}
322
323#[derive(Debug, Encodable, Decodable)]
325pub enum InitState {
326 Pending(InitMode),
329 Complete(InitModeComplete),
331}
332
333impl InitState {
334 pub fn into_complete(self) -> Self {
335 match self {
336 InitState::Pending(p) => InitState::Complete(match p {
337 InitMode::Fresh => InitModeComplete::Fresh,
338 InitMode::Recover { .. } => InitModeComplete::Recover,
339 }),
340 InitState::Complete(t) => InitState::Complete(t),
341 }
342 }
343
344 pub fn does_require_recovery(&self) -> Option<Option<ClientBackup>> {
345 match self {
346 InitState::Pending(p) => match p {
347 InitMode::Fresh => None,
348 InitMode::Recover { snapshot } => Some(snapshot.clone()),
349 },
350 InitState::Complete(_) => None,
351 }
352 }
353
354 pub fn is_pending(&self) -> bool {
355 match self {
356 InitState::Pending(_) => true,
357 InitState::Complete(_) => false,
358 }
359 }
360}
361
362impl_db_record!(
363 key = ClientInitStateKey,
364 value = InitState,
365 db_prefix = DbKeyPrefix::ClientInitState
366);
367
368impl_db_lookup!(
369 key = ClientInitStateKey,
370 query_prefix = ClientInitStatePrefix
371);
372
373#[derive(Debug, Encodable, Decodable, Serialize)]
374pub struct ClientModuleRecovery {
375 pub module_instance_id: ModuleInstanceId,
376}
377
378#[derive(Debug, Clone, Encodable, Decodable)]
379pub struct ClientModuleRecoveryState {
380 pub progress: RecoveryProgress,
381}
382
383impl ClientModuleRecoveryState {
384 pub fn is_done(&self) -> bool {
385 self.progress.is_done()
386 }
387}
388
389impl_db_record!(
390 key = ClientModuleRecovery,
391 value = ClientModuleRecoveryState,
392 db_prefix = DbKeyPrefix::ClientModuleRecovery,
393);
394
395#[derive(Debug, Encodable, Decodable, Serialize)]
402pub struct ClientModuleRecoveryIncorrectDoNotUse {
403 pub module_instance_id: ModuleInstanceId,
404}
405
406impl_db_record!(
407 key = ClientModuleRecoveryIncorrectDoNotUse,
408 value = ClientModuleRecoveryState,
409 db_prefix = DbKeyPrefix::ClientInitState,
411);
412
413#[derive(Debug, Encodable, Decodable)]
418pub struct LastBackupKey;
419
420impl_db_record!(
421 key = LastBackupKey,
422 value = ClientBackup,
423 db_prefix = DbKeyPrefix::ClientLastBackup
424);
425
426#[derive(Encodable, Decodable, Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
427pub(crate) struct MetaFieldPrefix;
428
429#[derive(Encodable, Decodable, Debug)]
430pub struct MetaServiceInfoKey;
431
432#[derive(Encodable, Decodable, Debug)]
433pub struct MetaServiceInfo {
434 pub last_updated: SystemTime,
435 pub revision: u64,
436}
437
438#[derive(
439 Encodable, Decodable, Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Serialize, Deserialize,
440)]
441pub(crate) struct MetaFieldKey(pub fedimint_client_module::meta::MetaFieldKey);
442
443#[derive(Encodable, Decodable, Debug, Clone, Serialize, Deserialize)]
444pub(crate) struct MetaFieldValue(pub fedimint_client_module::meta::MetaFieldValue);
445
446impl_db_record!(
447 key = MetaFieldKey,
448 value = MetaFieldValue,
449 db_prefix = DbKeyPrefix::ClientMetaField
450);
451
452impl_db_record!(
453 key = MetaServiceInfoKey,
454 value = MetaServiceInfo,
455 db_prefix = DbKeyPrefix::ClientMetaServiceInfo
456);
457
458impl_db_lookup!(key = MetaFieldKey, query_prefix = MetaFieldPrefix);
459
460pub fn get_core_client_database_migrations()
461-> BTreeMap<DatabaseVersion, fedimint_core::db::ClientCoreDbMigrationFn> {
462 let mut migrations: BTreeMap<DatabaseVersion, fedimint_core::db::ClientCoreDbMigrationFn> =
463 BTreeMap::new();
464 migrations.insert(
465 DatabaseVersion(0),
466 Box::new(|mut ctx| {
467 Box::pin(async move {
468 let mut dbtx = ctx.dbtx();
469
470 let config_v0 = dbtx
471 .find_by_prefix(&ClientConfigKeyPrefixV0)
472 .await
473 .collect::<Vec<_>>()
474 .await;
475
476 assert!(config_v0.len() <= 1);
477 let Some((id, config_v0)) = config_v0.into_iter().next() else {
478 return Ok(());
479 };
480
481 let global = GlobalClientConfig {
482 api_endpoints: config_v0.global.api_endpoints,
483 broadcast_public_keys: None,
484 consensus_version: config_v0.global.consensus_version,
485 meta: config_v0.global.meta,
486 };
487
488 let config = ClientConfig {
489 global,
490 modules: config_v0.modules,
491 };
492
493 dbtx.remove_entry(&id).await;
494 dbtx.insert_new_entry(&ClientConfigKey, &config).await;
495 Ok(())
496 })
497 }),
498 );
499
500 migrations.insert(
502 DatabaseVersion(1),
503 Box::new(|mut ctx| {
504 Box::pin(async move {
505 let mut dbtx = ctx.dbtx();
506
507 let operation_logs = dbtx
509 .find_by_prefix(&OperationLogKeyPrefixV0)
510 .await
511 .collect::<Vec<_>>()
512 .await;
513
514 let mut op_id_max_time = BTreeMap::new();
516
517 {
519 let mut inactive_states_stream =
520 dbtx.find_by_prefix(&InactiveStateKeyPrefixBytes).await;
521
522 while let Some((state, meta)) = inactive_states_stream.next().await {
523 let entry = op_id_max_time
524 .entry(state.operation_id)
525 .or_insert(meta.exited_at);
526 *entry = (*entry).max(meta.exited_at);
527 }
528 }
529 for (op_key_v0, log_entry_v0) in operation_logs {
531 let new_entry = OperationLogEntry::new(
532 log_entry_v0.operation_module_kind,
533 log_entry_v0.meta,
534 log_entry_v0.outcome.map(|outcome| {
535 OperationOutcome {
536 outcome,
537 time: op_id_max_time
540 .get(&op_key_v0.operation_id)
541 .copied()
542 .unwrap_or_else(fedimint_core::time::now),
543 }
544 }),
545 );
546
547 dbtx.remove_entry(&op_key_v0).await;
548 dbtx.insert_entry(
549 &OperationLogKey {
550 operation_id: op_key_v0.operation_id,
551 },
552 &new_entry,
553 )
554 .await;
555 }
556
557 Ok(())
558 })
559 }),
560 );
561
562 migrations.insert(
564 DatabaseVersion(2),
565 Box::new(|mut ctx: fedimint_core::db::DbMigrationFnContext<'_, _>| {
566 Box::pin(async move {
567 let mut dbtx = ctx.dbtx();
568
569 {
571 let mut ordered_log_entries = dbtx
572 .raw_find_by_prefix(&[DB_KEY_PREFIX_EVENT_LOG])
573 .await
574 .expect("DB operation failed");
575 let mut keys_to_migrate = vec![];
576 while let Some((k, _v)) = ordered_log_entries.next().await {
577 trace!(target: LOG_CLIENT_DB,
578 k=%k.as_hex(),
579 "Checking ordered log key"
580 );
581 if EventLogId::consensus_decode_whole(&k[1..], &Default::default()).is_err()
582 {
583 assert!(
584 UnordedEventLogId::consensus_decode_whole(
585 &k[1..],
586 &Default::default()
587 )
588 .is_ok()
589 );
590 keys_to_migrate.push(k);
591 }
592 }
593 drop(ordered_log_entries);
594 for mut key_to_migrate in keys_to_migrate {
595 warn!(target: LOG_CLIENT_DB,
596 k=%key_to_migrate.as_hex(),
597 "Migrating unordered event log entry written to an ordered log"
598 );
599 let v = dbtx
600 .raw_remove_entry(&key_to_migrate)
601 .await
602 .expect("DB operation failed")
603 .expect("Was there a moment ago");
604 assert_eq!(key_to_migrate[0], 0x39);
605 key_to_migrate[0] = DB_KEY_PREFIX_UNORDERED_EVENT_LOG;
606 assert_eq!(key_to_migrate[0], 0x3a);
607 dbtx.raw_insert_bytes(&key_to_migrate, &v)
608 .await
609 .expect("DB operation failed");
610 }
611 }
612
613 {
615 let mut unordered_log_entries = dbtx
616 .raw_find_by_prefix(&[DB_KEY_PREFIX_UNORDERED_EVENT_LOG])
617 .await
618 .expect("DB operation failed");
619 let mut keys_to_migrate = vec![];
620 while let Some((k, _v)) = unordered_log_entries.next().await {
621 trace!(target: LOG_CLIENT_DB,
622 k=%k.as_hex(),
623 "Checking unordered log key"
624 );
625 if UnordedEventLogId::consensus_decode_whole(&k[1..], &Default::default())
626 .is_err()
627 {
628 assert!(
629 EventLogId::consensus_decode_whole(&k[1..], &Default::default())
630 .is_ok()
631 );
632 keys_to_migrate.push(k);
633 }
634 }
635 drop(unordered_log_entries);
636 for mut key_to_migrate in keys_to_migrate {
637 warn!(target: LOG_CLIENT_DB,
638 k=%key_to_migrate.as_hex(),
639 "Migrating ordered event log entry written to an unordered log"
640 );
641 let v = dbtx
642 .raw_remove_entry(&key_to_migrate)
643 .await
644 .expect("DB operation failed")
645 .expect("Was there a moment ago");
646 assert_eq!(key_to_migrate[0], 0x3a);
647 key_to_migrate[0] = DB_KEY_PREFIX_EVENT_LOG;
648 assert_eq!(key_to_migrate[0], 0x39);
649 dbtx.raw_insert_bytes(&key_to_migrate, &v)
650 .await
651 .expect("DB operation failed");
652 }
653 }
654 Ok(())
655 })
656 }),
657 );
658
659 migrations.insert(
661 DatabaseVersion(3),
662 Box::new(|mut ctx: fedimint_core::db::DbMigrationFnContext<'_, _>| {
663 Box::pin(async move {
664 let mut dbtx = ctx.dbtx();
665
666 for module_id in 0..u16::MAX {
667 let old_key = ClientModuleRecoveryIncorrectDoNotUse {
668 module_instance_id: module_id,
669 };
670 let new_key = ClientModuleRecovery {
671 module_instance_id: module_id,
672 };
673 let Some(value) = dbtx.get_value(&old_key).await else {
674 debug!(target: LOG_CLIENT_DB, %module_id, "No more ClientModuleRecovery keys found for migartion");
675 break;
676 };
677
678 debug!(target: LOG_CLIENT_DB, %module_id, "Migrating old ClientModuleRecovery key");
679 dbtx.remove_entry(&old_key).await.expect("Is there.");
680 assert!(dbtx.insert_entry(&new_key, &value).await.is_none());
681 }
682
683 Ok(())
684 })
685 }),
686 );
687 migrations
688}
689
690pub async fn apply_migrations_core_client_dbtx(
694 dbtx: &mut DatabaseTransaction<'_>,
695 kind: String,
696) -> Result<(), anyhow::Error> {
697 apply_migrations_dbtx(
698 dbtx,
699 (),
700 kind,
701 get_core_client_database_migrations(),
702 None,
703 Some(DbKeyPrefix::UserData as u8),
704 )
705 .await
706}
707
708pub async fn apply_migrations_client_module(
718 db: &Database,
719 kind: String,
720 migrations: BTreeMap<DatabaseVersion, ClientModuleMigrationFn>,
721 module_instance_id: ModuleInstanceId,
722) -> Result<(), anyhow::Error> {
723 let mut dbtx = db.begin_transaction().await;
724 apply_migrations_client_module_dbtx(
725 &mut dbtx.to_ref_nc(),
726 kind,
727 migrations,
728 module_instance_id,
729 )
730 .await?;
731 dbtx.commit_tx_result().await
732}
733
734pub async fn apply_migrations_client_module_dbtx(
735 dbtx: &mut DatabaseTransaction<'_>,
736 kind: String,
737 migrations: BTreeMap<DatabaseVersion, ClientModuleMigrationFn>,
738 module_instance_id: ModuleInstanceId,
739) -> Result<(), anyhow::Error> {
740 let is_new_db = dbtx
743 .raw_find_by_prefix(&[MODULE_GLOBAL_PREFIX])
744 .await?
745 .next()
746 .await
747 .is_none();
748
749 let target_version = get_current_database_version(&migrations);
750
751 create_database_version_dbtx(
753 dbtx,
754 target_version,
755 Some(module_instance_id),
756 kind.clone(),
757 is_new_db,
758 )
759 .await?;
760
761 let current_version = dbtx
762 .get_value(&DatabaseVersionKey(module_instance_id))
763 .await;
764
765 let db_version = if let Some(mut current_version) = current_version {
766 if current_version == target_version {
767 trace!(
768 target: LOG_CLIENT_DB,
769 %current_version,
770 %target_version,
771 module_instance_id,
772 kind,
773 "Database version up to date"
774 );
775 return Ok(());
776 }
777
778 if target_version < current_version {
779 return Err(anyhow!(format!(
780 "On disk database version for module {kind} was higher ({}) than the target database version ({}).",
781 current_version, target_version,
782 )));
783 }
784
785 info!(
786 target: LOG_CLIENT_DB,
787 %current_version,
788 %target_version,
789 module_instance_id,
790 kind,
791 "Migrating client module database"
792 );
793 let mut active_states = get_active_states(&mut dbtx.to_ref_nc(), module_instance_id).await;
794 let mut inactive_states =
795 get_inactive_states(&mut dbtx.to_ref_nc(), module_instance_id).await;
796
797 while current_version < target_version {
798 let new_states = if let Some(migration) = migrations.get(¤t_version) {
799 debug!(
800 target: LOG_CLIENT_DB,
801 module_instance_id,
802 %kind,
803 %current_version,
804 %target_version,
805 "Running module db migration");
806
807 migration(
808 &mut dbtx
809 .to_ref_with_prefix_module_id(module_instance_id)
810 .0
811 .into_nc(),
812 active_states.clone(),
813 inactive_states.clone(),
814 )
815 .await?
816 } else {
817 warn!(
818 target: LOG_CLIENT_DB,
819 ?current_version, "Missing client db migration");
820 None
821 };
822
823 if let Some((new_active_states, new_inactive_states)) = new_states {
826 remove_old_and_persist_new_active_states(
827 &mut dbtx.to_ref_nc(),
828 new_active_states.clone(),
829 active_states.clone(),
830 module_instance_id,
831 )
832 .await;
833 remove_old_and_persist_new_inactive_states(
834 &mut dbtx.to_ref_nc(),
835 new_inactive_states.clone(),
836 inactive_states.clone(),
837 module_instance_id,
838 )
839 .await;
840
841 active_states = new_active_states;
843 inactive_states = new_inactive_states;
844 }
845
846 current_version = current_version.increment();
847 dbtx.insert_entry(&DatabaseVersionKey(module_instance_id), ¤t_version)
848 .await;
849 }
850
851 current_version
852 } else {
853 target_version
854 };
855
856 debug!(
857 target: LOG_CLIENT_DB,
858 ?kind, ?db_version, "Client DB Version");
859 Ok(())
860}
861
862pub async fn get_active_states(
868 dbtx: &mut DatabaseTransaction<'_>,
869 module_instance_id: ModuleInstanceId,
870) -> Vec<(Vec<u8>, OperationId)> {
871 dbtx.find_by_prefix(&ActiveStateKeyPrefixBytes)
872 .await
873 .filter_map(|(state, _)| async move {
874 if module_instance_id == state.module_instance_id {
875 Some((state.state, state.operation_id))
876 } else {
877 None
878 }
879 })
880 .collect::<Vec<_>>()
881 .await
882}
883
884pub async fn get_inactive_states(
890 dbtx: &mut DatabaseTransaction<'_>,
891 module_instance_id: ModuleInstanceId,
892) -> Vec<(Vec<u8>, OperationId)> {
893 dbtx.find_by_prefix(&InactiveStateKeyPrefixBytes)
894 .await
895 .filter_map(|(state, _)| async move {
896 if module_instance_id == state.module_instance_id {
897 Some((state.state, state.operation_id))
898 } else {
899 None
900 }
901 })
902 .collect::<Vec<_>>()
903 .await
904}
905
906pub async fn remove_old_and_persist_new_active_states(
910 dbtx: &mut DatabaseTransaction<'_>,
911 new_active_states: Vec<(Vec<u8>, OperationId)>,
912 states_to_remove: Vec<(Vec<u8>, OperationId)>,
913 module_instance_id: ModuleInstanceId,
914) {
915 for (bytes, operation_id) in states_to_remove {
917 dbtx.remove_entry(&ActiveStateKeyBytes {
918 operation_id,
919 module_instance_id,
920 state: bytes,
921 })
922 .await
923 .expect("Did not delete anything");
924 }
925
926 for (bytes, operation_id) in new_active_states {
928 dbtx.insert_new_entry(
929 &ActiveStateKeyBytes {
930 operation_id,
931 module_instance_id,
932 state: bytes,
933 },
934 &ActiveStateMeta::default(),
935 )
936 .await;
937 }
938}
939
940pub async fn remove_old_and_persist_new_inactive_states(
944 dbtx: &mut DatabaseTransaction<'_>,
945 new_inactive_states: Vec<(Vec<u8>, OperationId)>,
946 states_to_remove: Vec<(Vec<u8>, OperationId)>,
947 module_instance_id: ModuleInstanceId,
948) {
949 for (bytes, operation_id) in states_to_remove {
951 dbtx.remove_entry(&InactiveStateKeyBytes {
952 operation_id,
953 module_instance_id,
954 state: bytes,
955 })
956 .await
957 .expect("Did not delete anything");
958 }
959
960 for (bytes, operation_id) in new_inactive_states {
962 dbtx.insert_new_entry(
963 &InactiveStateKeyBytes {
964 operation_id,
965 module_instance_id,
966 state: bytes,
967 },
968 &InactiveStateMeta {
969 created_at: fedimint_core::time::now(),
970 exited_at: fedimint_core::time::now(),
971 },
972 )
973 .await;
974 }
975}
976
977pub async fn get_decoded_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
981 let mut tx = db.begin_transaction_nc().await;
982 let client_secret = tx.get_value(&EncodedClientSecretKey).await;
983
984 match client_secret {
985 Some(client_secret) => {
986 T::consensus_decode_whole(&client_secret, &ModuleRegistry::default())
987 .map_err(|e| anyhow!("Decoding failed: {e}"))
988 }
989 None => bail!("Encoded client secret not present in DB"),
990 }
991}
992
993#[derive(Debug, Serialize, Deserialize, Encodable, Decodable)]
995pub struct OperationLogEntryV0 {
996 pub(crate) operation_module_kind: String,
997 pub(crate) meta: JsonStringed,
998 pub(crate) outcome: Option<JsonStringed>,
999}