1use std::collections::{BTreeMap, BTreeSet};
2use std::time::SystemTime;
3
4use anyhow::{anyhow, bail};
5use bitcoin::hex::DisplayHex as _;
6use fedimint_api_client::api::ApiVersionSet;
7use fedimint_client_module::db::ClientModuleMigrationFn;
8use fedimint_client_module::module::recovery::RecoveryProgress;
9use fedimint_client_module::oplog::{JsonStringed, OperationLogEntry, OperationOutcome};
10use fedimint_client_module::sm::{ActiveStateMeta, InactiveStateMeta};
11use fedimint_core::config::{ClientConfig, ClientConfigV0, FederationId, GlobalClientConfig};
12use fedimint_core::core::{ModuleInstanceId, OperationId};
13use fedimint_core::db::{
14 Database, DatabaseTransaction, DatabaseVersion, DatabaseVersionKey,
15 IDatabaseTransactionOpsCore, IDatabaseTransactionOpsCoreTyped, MODULE_GLOBAL_PREFIX,
16 apply_migrations_dbtx, create_database_version_dbtx, get_current_database_version,
17};
18use fedimint_core::encoding::{Decodable, Encodable};
19use fedimint_core::module::SupportedApiVersionsSummary;
20use fedimint_core::module::registry::ModuleRegistry;
21use fedimint_core::{PeerId, impl_db_lookup, impl_db_record};
22use fedimint_eventlog::{
23 DB_KEY_PREFIX_EVENT_LOG, DB_KEY_PREFIX_UNORDERED_EVENT_LOG, EventLogId, UnordedEventLogId,
24};
25use fedimint_logging::LOG_CLIENT_DB;
26use futures::StreamExt;
27use serde::{Deserialize, Serialize};
28use strum::IntoEnumIterator as _;
29use strum_macros::EnumIter;
30use tracing::{debug, info, trace, warn};
31
32use crate::backup::{ClientBackup, Metadata};
33use crate::sm::executor::{
34 ActiveStateKeyBytes, ActiveStateKeyPrefixBytes, ExecutorDbPrefixes, InactiveStateKeyBytes,
35 InactiveStateKeyPrefixBytes,
36};
37
38#[repr(u8)]
39#[derive(Clone, EnumIter, Debug)]
40pub enum DbKeyPrefix {
41 EncodedClientSecret = 0x28,
42 ClientSecret = 0x29, ClientPreRootSecretHash = 0x2a,
44 OperationLog = 0x2c,
45 ChronologicalOperationLog = 0x2d,
46 CommonApiVersionCache = 0x2e,
47 ClientConfig = 0x2f,
48 ClientInviteCode = 0x30, ClientInitState = 0x31,
50 ClientMetadata = 0x32,
51 ClientLastBackup = 0x33,
52 ClientMetaField = 0x34,
53 ClientMetaServiceInfo = 0x35,
54 ApiSecret = 0x36,
55 PeerLastApiVersionsSummaryCache = 0x37,
56 ApiUrlAnnouncement = 0x38,
57 EventLog = fedimint_eventlog::DB_KEY_PREFIX_EVENT_LOG,
58 UnorderedEventLog = fedimint_eventlog::DB_KEY_PREFIX_UNORDERED_EVENT_LOG,
59 ClientModuleRecovery = 0x40,
60
61 DatabaseVersion = fedimint_core::db::DbKeyPrefix::DatabaseVersion as u8,
62 ClientBackup = fedimint_core::db::DbKeyPrefix::ClientBackup as u8,
63
64 ActiveStates = ExecutorDbPrefixes::ActiveStates as u8,
65 InactiveStates = ExecutorDbPrefixes::InactiveStates as u8,
66
67 UserData = 0xb0,
77 ExternalReservedStart = 0xb1,
80 ExternalReservedEnd = 0xcf,
83 InternalReservedStart = 0xd0,
85 ModuleGlobalPrefix = 0xff,
87}
88
89pub(crate) async fn verify_client_db_integrity_dbtx(dbtx: &mut DatabaseTransaction<'_>) {
90 let prefixes: BTreeSet<u8> = DbKeyPrefix::iter().map(|prefix| prefix as u8).collect();
91
92 let mut records = dbtx.raw_find_by_prefix(&[]).await.expect("DB fail");
93 while let Some((k, v)) = records.next().await {
94 if DbKeyPrefix::UserData as u8 <= k[0] {
96 break;
97 }
98
99 assert!(
100 prefixes.contains(&k[0]),
101 "Unexpected client db record found: {}: {}",
102 k.as_hex(),
103 v.as_hex()
104 );
105 }
106}
107
108impl std::fmt::Display for DbKeyPrefix {
109 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
110 write!(f, "{self:?}")
111 }
112}
113
114#[derive(Debug, Encodable, Decodable)]
115pub struct EncodedClientSecretKey;
116
117#[derive(Debug, Encodable, Decodable)]
118pub struct EncodedClientSecretKeyPrefix;
119
120impl_db_record!(
121 key = EncodedClientSecretKey,
122 value = Vec<u8>,
123 db_prefix = DbKeyPrefix::EncodedClientSecret,
124);
125impl_db_lookup!(
126 key = EncodedClientSecretKey,
127 query_prefix = EncodedClientSecretKeyPrefix
128);
129
130#[derive(Debug, Encodable, Decodable, Serialize)]
131pub struct OperationLogKey {
132 pub operation_id: OperationId,
133}
134
135impl_db_record!(
136 key = OperationLogKey,
137 value = OperationLogEntry,
138 db_prefix = DbKeyPrefix::OperationLog
139);
140
141#[derive(Debug, Encodable)]
142pub struct OperationLogKeyPrefix;
143
144impl_db_lookup!(key = OperationLogKey, query_prefix = OperationLogKeyPrefix);
145
146#[derive(Debug, Encodable, Decodable, Serialize)]
147pub struct OperationLogKeyV0 {
148 pub operation_id: OperationId,
149}
150
151#[derive(Debug, Encodable)]
152pub struct OperationLogKeyPrefixV0;
153
154impl_db_record!(
155 key = OperationLogKeyV0,
156 value = OperationLogEntryV0,
157 db_prefix = DbKeyPrefix::OperationLog
158);
159
160impl_db_lookup!(
161 key = OperationLogKeyV0,
162 query_prefix = OperationLogKeyPrefixV0
163);
164
165#[derive(Debug, Encodable, Decodable, Serialize)]
166pub struct ClientPreRootSecretHashKey;
167
168impl_db_record!(
169 key = ClientPreRootSecretHashKey,
170 value = [u8; 8],
171 db_prefix = DbKeyPrefix::ClientPreRootSecretHash
172);
173
174#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq, Encodable, Decodable, Serialize)]
176pub struct ChronologicalOperationLogKey {
177 pub creation_time: std::time::SystemTime,
178 pub operation_id: OperationId,
179}
180
181#[derive(Debug, Encodable)]
182pub struct ChronologicalOperationLogKeyPrefix;
183
184impl_db_record!(
185 key = ChronologicalOperationLogKey,
186 value = (),
187 db_prefix = DbKeyPrefix::ChronologicalOperationLog
188);
189
190impl_db_lookup!(
191 key = ChronologicalOperationLogKey,
192 query_prefix = ChronologicalOperationLogKeyPrefix
193);
194
195#[derive(Debug, Encodable, Decodable)]
196pub struct CachedApiVersionSetKey;
197
198#[derive(Debug, Encodable, Decodable)]
199pub struct CachedApiVersionSet(pub ApiVersionSet);
200
201impl_db_record!(
202 key = CachedApiVersionSetKey,
203 value = CachedApiVersionSet,
204 db_prefix = DbKeyPrefix::CommonApiVersionCache
205);
206
207#[derive(Debug, Encodable, Decodable)]
208pub struct PeerLastApiVersionsSummaryKey(pub PeerId);
209
210#[derive(Debug, Encodable, Decodable)]
211pub struct PeerLastApiVersionsSummary(pub SupportedApiVersionsSummary);
212
213impl_db_record!(
214 key = PeerLastApiVersionsSummaryKey,
215 value = PeerLastApiVersionsSummary,
216 db_prefix = DbKeyPrefix::PeerLastApiVersionsSummaryCache
217);
218
219#[derive(Debug, Encodable, Decodable, Serialize)]
220pub struct ClientConfigKey;
221
222impl_db_record!(
223 key = ClientConfigKey,
224 value = ClientConfig,
225 db_prefix = DbKeyPrefix::ClientConfig
226);
227
228#[derive(Debug, Encodable, Decodable, Serialize)]
229pub struct ClientConfigKeyV0 {
230 pub id: FederationId,
231}
232
233#[derive(Debug, Encodable)]
234pub struct ClientConfigKeyPrefixV0;
235
236impl_db_record!(
237 key = ClientConfigKeyV0,
238 value = ClientConfigV0,
239 db_prefix = DbKeyPrefix::ClientConfig
240);
241
242impl_db_lookup!(
243 key = ClientConfigKeyV0,
244 query_prefix = ClientConfigKeyPrefixV0
245);
246
247#[derive(Debug, Encodable, Decodable, Serialize)]
248pub struct ApiSecretKey;
249
250#[derive(Debug, Encodable)]
251pub struct ApiSecretKeyPrefix;
252
253impl_db_record!(
254 key = ApiSecretKey,
255 value = String,
256 db_prefix = DbKeyPrefix::ApiSecret
257);
258
259impl_db_lookup!(key = ApiSecretKey, query_prefix = ApiSecretKeyPrefix);
260
261#[derive(Debug, Encodable, Decodable, Serialize)]
263pub struct ClientMetadataKey;
264
265#[derive(Debug, Encodable)]
266pub struct ClientMetadataPrefix;
267
268impl_db_record!(
269 key = ClientMetadataKey,
270 value = Metadata,
271 db_prefix = DbKeyPrefix::ClientMetadata
272);
273
274impl_db_lookup!(key = ClientMetadataKey, query_prefix = ClientMetadataPrefix);
275
276#[derive(Debug, Encodable, Decodable, Serialize)]
278pub struct ClientInitStateKey;
279
280#[derive(Debug, Encodable)]
281pub struct ClientInitStatePrefix;
282
283#[derive(Debug, Encodable, Decodable)]
285pub enum InitMode {
286 Fresh,
288 Recover { snapshot: Option<ClientBackup> },
291}
292
293#[derive(Debug, Encodable, Decodable)]
299pub enum InitModeComplete {
300 Fresh,
301 Recover,
302}
303
304#[derive(Debug, Encodable, Decodable)]
306pub enum InitState {
307 Pending(InitMode),
310 Complete(InitModeComplete),
312}
313
314impl InitState {
315 pub fn into_complete(self) -> Self {
316 match self {
317 InitState::Pending(p) => InitState::Complete(match p {
318 InitMode::Fresh => InitModeComplete::Fresh,
319 InitMode::Recover { .. } => InitModeComplete::Recover,
320 }),
321 InitState::Complete(t) => InitState::Complete(t),
322 }
323 }
324
325 pub fn does_require_recovery(&self) -> Option<Option<ClientBackup>> {
326 match self {
327 InitState::Pending(p) => match p {
328 InitMode::Fresh => None,
329 InitMode::Recover { snapshot } => Some(snapshot.clone()),
330 },
331 InitState::Complete(_) => None,
332 }
333 }
334
335 pub fn is_pending(&self) -> bool {
336 match self {
337 InitState::Pending(_) => true,
338 InitState::Complete(_) => false,
339 }
340 }
341}
342
343impl_db_record!(
344 key = ClientInitStateKey,
345 value = InitState,
346 db_prefix = DbKeyPrefix::ClientInitState
347);
348
349impl_db_lookup!(
350 key = ClientInitStateKey,
351 query_prefix = ClientInitStatePrefix
352);
353
354#[derive(Debug, Encodable, Decodable, Serialize)]
355pub struct ClientModuleRecovery {
356 pub module_instance_id: ModuleInstanceId,
357}
358
359#[derive(Debug, Clone, Encodable, Decodable)]
360pub struct ClientModuleRecoveryState {
361 pub progress: RecoveryProgress,
362}
363
364impl ClientModuleRecoveryState {
365 pub fn is_done(&self) -> bool {
366 self.progress.is_done()
367 }
368}
369
370impl_db_record!(
371 key = ClientModuleRecovery,
372 value = ClientModuleRecoveryState,
373 db_prefix = DbKeyPrefix::ClientModuleRecovery,
374);
375
376#[derive(Debug, Encodable, Decodable, Serialize)]
383pub struct ClientModuleRecoveryIncorrectDoNotUse {
384 pub module_instance_id: ModuleInstanceId,
385}
386
387impl_db_record!(
388 key = ClientModuleRecoveryIncorrectDoNotUse,
389 value = ClientModuleRecoveryState,
390 db_prefix = DbKeyPrefix::ClientInitState,
392);
393
394#[derive(Debug, Encodable, Decodable)]
399pub struct LastBackupKey;
400
401impl_db_record!(
402 key = LastBackupKey,
403 value = ClientBackup,
404 db_prefix = DbKeyPrefix::ClientLastBackup
405);
406
407#[derive(Encodable, Decodable, Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
408pub(crate) struct MetaFieldPrefix;
409
410#[derive(Encodable, Decodable, Debug)]
411pub struct MetaServiceInfoKey;
412
413#[derive(Encodable, Decodable, Debug)]
414pub struct MetaServiceInfo {
415 pub last_updated: SystemTime,
416 pub revision: u64,
417}
418
419#[derive(
420 Encodable, Decodable, Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Serialize, Deserialize,
421)]
422pub(crate) struct MetaFieldKey(pub fedimint_client_module::meta::MetaFieldKey);
423
424#[derive(Encodable, Decodable, Debug, Clone, Serialize, Deserialize)]
425pub(crate) struct MetaFieldValue(pub fedimint_client_module::meta::MetaFieldValue);
426
427impl_db_record!(
428 key = MetaFieldKey,
429 value = MetaFieldValue,
430 db_prefix = DbKeyPrefix::ClientMetaField
431);
432
433impl_db_record!(
434 key = MetaServiceInfoKey,
435 value = MetaServiceInfo,
436 db_prefix = DbKeyPrefix::ClientMetaServiceInfo
437);
438
439impl_db_lookup!(key = MetaFieldKey, query_prefix = MetaFieldPrefix);
440
441pub fn get_core_client_database_migrations()
442-> BTreeMap<DatabaseVersion, fedimint_core::db::ClientCoreDbMigrationFn> {
443 let mut migrations: BTreeMap<DatabaseVersion, fedimint_core::db::ClientCoreDbMigrationFn> =
444 BTreeMap::new();
445 migrations.insert(
446 DatabaseVersion(0),
447 Box::new(|mut ctx| {
448 Box::pin(async move {
449 let mut dbtx = ctx.dbtx();
450
451 let config_v0 = dbtx
452 .find_by_prefix(&ClientConfigKeyPrefixV0)
453 .await
454 .collect::<Vec<_>>()
455 .await;
456
457 assert!(config_v0.len() <= 1);
458 let Some((id, config_v0)) = config_v0.into_iter().next() else {
459 return Ok(());
460 };
461
462 let global = GlobalClientConfig {
463 api_endpoints: config_v0.global.api_endpoints,
464 broadcast_public_keys: None,
465 consensus_version: config_v0.global.consensus_version,
466 meta: config_v0.global.meta,
467 };
468
469 let config = ClientConfig {
470 global,
471 modules: config_v0.modules,
472 };
473
474 dbtx.remove_entry(&id).await;
475 dbtx.insert_new_entry(&ClientConfigKey, &config).await;
476 Ok(())
477 })
478 }),
479 );
480
481 migrations.insert(
483 DatabaseVersion(1),
484 Box::new(|mut ctx| {
485 Box::pin(async move {
486 let mut dbtx = ctx.dbtx();
487
488 let operation_logs = dbtx
490 .find_by_prefix(&OperationLogKeyPrefixV0)
491 .await
492 .collect::<Vec<_>>()
493 .await;
494
495 let mut op_id_max_time = BTreeMap::new();
497
498 {
500 let mut inactive_states_stream =
501 dbtx.find_by_prefix(&InactiveStateKeyPrefixBytes).await;
502
503 while let Some((state, meta)) = inactive_states_stream.next().await {
504 let entry = op_id_max_time
505 .entry(state.operation_id)
506 .or_insert(meta.exited_at);
507 *entry = (*entry).max(meta.exited_at);
508 }
509 }
510 for (op_key_v0, log_entry_v0) in operation_logs {
512 let new_entry = OperationLogEntry::new(
513 log_entry_v0.operation_module_kind,
514 log_entry_v0.meta,
515 log_entry_v0.outcome.map(|outcome| {
516 OperationOutcome {
517 outcome,
518 time: op_id_max_time
521 .get(&op_key_v0.operation_id)
522 .copied()
523 .unwrap_or_else(fedimint_core::time::now),
524 }
525 }),
526 );
527
528 dbtx.remove_entry(&op_key_v0).await;
529 dbtx.insert_entry(
530 &OperationLogKey {
531 operation_id: op_key_v0.operation_id,
532 },
533 &new_entry,
534 )
535 .await;
536 }
537
538 Ok(())
539 })
540 }),
541 );
542
543 migrations.insert(
545 DatabaseVersion(2),
546 Box::new(|mut ctx: fedimint_core::db::DbMigrationFnContext<'_, _>| {
547 Box::pin(async move {
548 let mut dbtx = ctx.dbtx();
549
550 {
552 let mut ordered_log_entries = dbtx
553 .raw_find_by_prefix(&[DB_KEY_PREFIX_EVENT_LOG])
554 .await
555 .expect("DB operation failed");
556 let mut keys_to_migrate = vec![];
557 while let Some((k, _v)) = ordered_log_entries.next().await {
558 trace!(target: LOG_CLIENT_DB,
559 k=%k.as_hex(),
560 "Checking ordered log key"
561 );
562 if EventLogId::consensus_decode_whole(&k[1..], &Default::default()).is_err()
563 {
564 assert!(
565 UnordedEventLogId::consensus_decode_whole(
566 &k[1..],
567 &Default::default()
568 )
569 .is_ok()
570 );
571 keys_to_migrate.push(k);
572 }
573 }
574 drop(ordered_log_entries);
575 for mut key_to_migrate in keys_to_migrate {
576 warn!(target: LOG_CLIENT_DB,
577 k=%key_to_migrate.as_hex(),
578 "Migrating unordered event log entry written to an ordered log"
579 );
580 let v = dbtx
581 .raw_remove_entry(&key_to_migrate)
582 .await
583 .expect("DB operation failed")
584 .expect("Was there a moment ago");
585 assert_eq!(key_to_migrate[0], 0x39);
586 key_to_migrate[0] = DB_KEY_PREFIX_UNORDERED_EVENT_LOG;
587 assert_eq!(key_to_migrate[0], 0x3a);
588 dbtx.raw_insert_bytes(&key_to_migrate, &v)
589 .await
590 .expect("DB operation failed");
591 }
592 }
593
594 {
596 let mut unordered_log_entries = dbtx
597 .raw_find_by_prefix(&[DB_KEY_PREFIX_UNORDERED_EVENT_LOG])
598 .await
599 .expect("DB operation failed");
600 let mut keys_to_migrate = vec![];
601 while let Some((k, _v)) = unordered_log_entries.next().await {
602 trace!(target: LOG_CLIENT_DB,
603 k=%k.as_hex(),
604 "Checking unordered log key"
605 );
606 if UnordedEventLogId::consensus_decode_whole(&k[1..], &Default::default())
607 .is_err()
608 {
609 assert!(
610 EventLogId::consensus_decode_whole(&k[1..], &Default::default())
611 .is_ok()
612 );
613 keys_to_migrate.push(k);
614 }
615 }
616 drop(unordered_log_entries);
617 for mut key_to_migrate in keys_to_migrate {
618 warn!(target: LOG_CLIENT_DB,
619 k=%key_to_migrate.as_hex(),
620 "Migrating ordered event log entry written to an unordered log"
621 );
622 let v = dbtx
623 .raw_remove_entry(&key_to_migrate)
624 .await
625 .expect("DB operation failed")
626 .expect("Was there a moment ago");
627 assert_eq!(key_to_migrate[0], 0x3a);
628 key_to_migrate[0] = DB_KEY_PREFIX_EVENT_LOG;
629 assert_eq!(key_to_migrate[0], 0x39);
630 dbtx.raw_insert_bytes(&key_to_migrate, &v)
631 .await
632 .expect("DB operation failed");
633 }
634 }
635 Ok(())
636 })
637 }),
638 );
639
640 migrations.insert(
642 DatabaseVersion(3),
643 Box::new(|mut ctx: fedimint_core::db::DbMigrationFnContext<'_, _>| {
644 Box::pin(async move {
645 let mut dbtx = ctx.dbtx();
646
647 for module_id in 0..u16::MAX {
648 let old_key = ClientModuleRecoveryIncorrectDoNotUse {
649 module_instance_id: module_id,
650 };
651 let new_key = ClientModuleRecovery {
652 module_instance_id: module_id,
653 };
654 let Some(value) = dbtx.get_value(&old_key).await else {
655 debug!(target: LOG_CLIENT_DB, %module_id, "No more ClientModuleRecovery keys found for migartion");
656 break;
657 };
658
659 debug!(target: LOG_CLIENT_DB, %module_id, "Migrating old ClientModuleRecovery key");
660 dbtx.remove_entry(&old_key).await.expect("Is there.");
661 assert!(dbtx.insert_entry(&new_key, &value).await.is_none());
662 }
663
664 Ok(())
665 })
666 }),
667 );
668 migrations
669}
670
671pub async fn apply_migrations_core_client_dbtx(
675 dbtx: &mut DatabaseTransaction<'_>,
676 kind: String,
677) -> Result<(), anyhow::Error> {
678 apply_migrations_dbtx(
679 dbtx,
680 (),
681 kind,
682 get_core_client_database_migrations(),
683 None,
684 Some(DbKeyPrefix::UserData as u8),
685 )
686 .await
687}
688
689pub async fn apply_migrations_client_module(
699 db: &Database,
700 kind: String,
701 migrations: BTreeMap<DatabaseVersion, ClientModuleMigrationFn>,
702 module_instance_id: ModuleInstanceId,
703) -> Result<(), anyhow::Error> {
704 let mut dbtx = db.begin_transaction().await;
705 apply_migrations_client_module_dbtx(
706 &mut dbtx.to_ref_nc(),
707 kind,
708 migrations,
709 module_instance_id,
710 )
711 .await?;
712 dbtx.commit_tx_result().await
713}
714
715pub async fn apply_migrations_client_module_dbtx(
716 dbtx: &mut DatabaseTransaction<'_>,
717 kind: String,
718 migrations: BTreeMap<DatabaseVersion, ClientModuleMigrationFn>,
719 module_instance_id: ModuleInstanceId,
720) -> Result<(), anyhow::Error> {
721 let is_new_db = dbtx
724 .raw_find_by_prefix(&[MODULE_GLOBAL_PREFIX])
725 .await?
726 .next()
727 .await
728 .is_none();
729
730 let target_version = get_current_database_version(&migrations);
731
732 create_database_version_dbtx(
734 dbtx,
735 target_version,
736 Some(module_instance_id),
737 kind.clone(),
738 is_new_db,
739 )
740 .await?;
741
742 let current_version = dbtx
743 .get_value(&DatabaseVersionKey(module_instance_id))
744 .await;
745
746 let db_version = if let Some(mut current_version) = current_version {
747 if current_version == target_version {
748 trace!(
749 target: LOG_CLIENT_DB,
750 %current_version,
751 %target_version,
752 module_instance_id,
753 kind,
754 "Database version up to date"
755 );
756 return Ok(());
757 }
758
759 if target_version < current_version {
760 return Err(anyhow!(format!(
761 "On disk database version for module {kind} was higher ({}) than the target database version ({}).",
762 current_version, target_version,
763 )));
764 }
765
766 info!(
767 target: LOG_CLIENT_DB,
768 %current_version,
769 %target_version,
770 module_instance_id,
771 kind,
772 "Migrating client module database"
773 );
774 let mut active_states = get_active_states(&mut dbtx.to_ref_nc(), module_instance_id).await;
775 let mut inactive_states =
776 get_inactive_states(&mut dbtx.to_ref_nc(), module_instance_id).await;
777
778 while current_version < target_version {
779 let new_states = if let Some(migration) = migrations.get(¤t_version) {
780 debug!(
781 target: LOG_CLIENT_DB,
782 module_instance_id,
783 %kind,
784 %current_version,
785 %target_version,
786 "Running module db migration");
787
788 migration(
789 &mut dbtx
790 .to_ref_with_prefix_module_id(module_instance_id)
791 .0
792 .into_nc(),
793 active_states.clone(),
794 inactive_states.clone(),
795 )
796 .await?
797 } else {
798 warn!(
799 target: LOG_CLIENT_DB,
800 ?current_version, "Missing client db migration");
801 None
802 };
803
804 if let Some((new_active_states, new_inactive_states)) = new_states {
807 remove_old_and_persist_new_active_states(
808 &mut dbtx.to_ref_nc(),
809 new_active_states.clone(),
810 active_states.clone(),
811 module_instance_id,
812 )
813 .await;
814 remove_old_and_persist_new_inactive_states(
815 &mut dbtx.to_ref_nc(),
816 new_inactive_states.clone(),
817 inactive_states.clone(),
818 module_instance_id,
819 )
820 .await;
821
822 active_states = new_active_states;
824 inactive_states = new_inactive_states;
825 }
826
827 current_version = current_version.increment();
828 dbtx.insert_entry(&DatabaseVersionKey(module_instance_id), ¤t_version)
829 .await;
830 }
831
832 current_version
833 } else {
834 target_version
835 };
836
837 debug!(
838 target: LOG_CLIENT_DB,
839 ?kind, ?db_version, "Client DB Version");
840 Ok(())
841}
842
843pub async fn get_active_states(
849 dbtx: &mut DatabaseTransaction<'_>,
850 module_instance_id: ModuleInstanceId,
851) -> Vec<(Vec<u8>, OperationId)> {
852 dbtx.find_by_prefix(&ActiveStateKeyPrefixBytes)
853 .await
854 .filter_map(|(state, _)| async move {
855 if module_instance_id == state.module_instance_id {
856 Some((state.state, state.operation_id))
857 } else {
858 None
859 }
860 })
861 .collect::<Vec<_>>()
862 .await
863}
864
865pub async fn get_inactive_states(
871 dbtx: &mut DatabaseTransaction<'_>,
872 module_instance_id: ModuleInstanceId,
873) -> Vec<(Vec<u8>, OperationId)> {
874 dbtx.find_by_prefix(&InactiveStateKeyPrefixBytes)
875 .await
876 .filter_map(|(state, _)| async move {
877 if module_instance_id == state.module_instance_id {
878 Some((state.state, state.operation_id))
879 } else {
880 None
881 }
882 })
883 .collect::<Vec<_>>()
884 .await
885}
886
887pub async fn remove_old_and_persist_new_active_states(
891 dbtx: &mut DatabaseTransaction<'_>,
892 new_active_states: Vec<(Vec<u8>, OperationId)>,
893 states_to_remove: Vec<(Vec<u8>, OperationId)>,
894 module_instance_id: ModuleInstanceId,
895) {
896 for (bytes, operation_id) in states_to_remove {
898 dbtx.remove_entry(&ActiveStateKeyBytes {
899 operation_id,
900 module_instance_id,
901 state: bytes,
902 })
903 .await
904 .expect("Did not delete anything");
905 }
906
907 for (bytes, operation_id) in new_active_states {
909 dbtx.insert_new_entry(
910 &ActiveStateKeyBytes {
911 operation_id,
912 module_instance_id,
913 state: bytes,
914 },
915 &ActiveStateMeta::default(),
916 )
917 .await;
918 }
919}
920
921pub async fn remove_old_and_persist_new_inactive_states(
925 dbtx: &mut DatabaseTransaction<'_>,
926 new_inactive_states: Vec<(Vec<u8>, OperationId)>,
927 states_to_remove: Vec<(Vec<u8>, OperationId)>,
928 module_instance_id: ModuleInstanceId,
929) {
930 for (bytes, operation_id) in states_to_remove {
932 dbtx.remove_entry(&InactiveStateKeyBytes {
933 operation_id,
934 module_instance_id,
935 state: bytes,
936 })
937 .await
938 .expect("Did not delete anything");
939 }
940
941 for (bytes, operation_id) in new_inactive_states {
943 dbtx.insert_new_entry(
944 &InactiveStateKeyBytes {
945 operation_id,
946 module_instance_id,
947 state: bytes,
948 },
949 &InactiveStateMeta {
950 created_at: fedimint_core::time::now(),
951 exited_at: fedimint_core::time::now(),
952 },
953 )
954 .await;
955 }
956}
957
958pub async fn get_decoded_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
962 let mut tx = db.begin_transaction_nc().await;
963 let client_secret = tx.get_value(&EncodedClientSecretKey).await;
964
965 match client_secret {
966 Some(client_secret) => {
967 T::consensus_decode_whole(&client_secret, &ModuleRegistry::default())
968 .map_err(|e| anyhow!("Decoding failed: {e}"))
969 }
970 None => bail!("Encoded client secret not present in DB"),
971 }
972}
973
974#[derive(Debug, Serialize, Deserialize, Encodable, Decodable)]
976pub struct OperationLogEntryV0 {
977 pub(crate) operation_module_kind: String,
978 pub(crate) meta: JsonStringed,
979 pub(crate) outcome: Option<JsonStringed>,
980}