1use std::collections::BTreeMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5
6use anyhow::{Context as _, anyhow, bail, ensure};
7use bitcoin::key::Secp256k1;
8use fedimint_api_client::api::global_api::with_cache::GlobalFederationApiWithCacheExt as _;
9use fedimint_api_client::api::global_api::with_request_hook::{
10 ApiRequestHook, RawFederationApiWithRequestHookExt as _,
11};
12use fedimint_api_client::api::net::Connector;
13use fedimint_api_client::api::{ApiVersionSet, DynGlobalApi, ReconnectFederationApi};
14use fedimint_client_module::api::ClientRawFederationApiExt as _;
15use fedimint_client_module::meta::LegacyMetaSource;
16use fedimint_client_module::module::init::ClientModuleInit;
17use fedimint_client_module::module::recovery::RecoveryProgress;
18use fedimint_client_module::module::{ClientModuleRegistry, FinalClientIface};
19use fedimint_client_module::secret::DeriveableSecretClientExt as _;
20use fedimint_client_module::transaction::{
21 TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext, tx_submission_sm_decoder,
22};
23use fedimint_client_module::{AdminCreds, ModuleRecoveryStarted};
24use fedimint_core::config::{ClientConfig, ModuleInitRegistry};
25use fedimint_core::core::{ModuleInstanceId, ModuleKind};
26use fedimint_core::db::{
27 Database, IDatabaseTransactionOpsCoreTyped as _, verify_module_db_integrity_dbtx,
28};
29use fedimint_core::envs::is_running_in_test_env;
30use fedimint_core::module::ApiVersion;
31use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
32use fedimint_core::task::TaskGroup;
33use fedimint_core::util::FmtCompactAnyhow as _;
34use fedimint_core::{NumPeers, maybe_add_send};
35use fedimint_derive_secret::DerivableSecret;
36use fedimint_eventlog::{
37 DBTransactionEventLogExt as _, EventLogEntry, run_event_log_ordering_task,
38};
39use fedimint_logging::LOG_CLIENT;
40use tokio::sync::{broadcast, watch};
41use tracing::{debug, warn};
42
43use super::handle::ClientHandle;
44use super::{Client, client_decoders};
45use crate::api_announcements::{get_api_urls, run_api_announcement_sync};
46use crate::backup::{ClientBackup, Metadata};
47use crate::db::{
48 self, ApiSecretKey, ClientInitStateKey, ClientMetadataKey, ClientModuleRecovery,
49 ClientModuleRecoveryState, ClientPreRootSecretHashKey, InitMode, InitState,
50 apply_migrations_client_module_dbtx,
51};
52use crate::meta::MetaService;
53use crate::module_init::ClientModuleInitRegistry;
54use crate::oplog::OperationLog;
55use crate::sm::executor::Executor;
56use crate::sm::notifier::Notifier;
57
58pub struct ClientBuilder {
60 module_inits: ClientModuleInitRegistry,
61 primary_module_instance: Option<ModuleInstanceId>,
62 primary_module_kind: Option<ModuleKind>,
63 admin_creds: Option<AdminCreds>,
64 db_no_decoders: Database,
65 meta_service: Arc<crate::meta::MetaService>,
66 connector: Connector,
67 stopped: bool,
68 log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
69 request_hook: ApiRequestHook,
70}
71
72impl ClientBuilder {
73 pub(crate) fn new(db: Database) -> Self {
74 let meta_service = MetaService::new(LegacyMetaSource::default());
75 let (log_event_added_transient_tx, _log_event_added_transient_rx) =
76 broadcast::channel(1024);
77 ClientBuilder {
78 module_inits: ModuleInitRegistry::new(),
79 primary_module_instance: None,
80 primary_module_kind: None,
81 connector: Connector::default(),
82 admin_creds: None,
83 db_no_decoders: db,
84 stopped: false,
85 meta_service,
86 log_event_added_transient_tx,
87 request_hook: Arc::new(|api| api),
88 }
89 }
90
91 pub(crate) fn from_existing(client: &Client) -> Self {
92 ClientBuilder {
93 module_inits: client.module_inits.clone(),
94 primary_module_instance: Some(client.primary_module_instance),
95 primary_module_kind: None,
96 admin_creds: None,
97 db_no_decoders: client.db.with_decoders(ModuleRegistry::default()),
98 stopped: false,
99 meta_service: client.meta_service.clone(),
101 connector: client.connector,
102 log_event_added_transient_tx: client.log_event_added_transient_tx.clone(),
103 request_hook: client.request_hook.clone(),
104 }
105 }
106
107 pub fn with_module_inits(&mut self, module_inits: ClientModuleInitRegistry) {
116 self.module_inits = module_inits;
117 }
118
119 pub fn with_module<M: ClientModuleInit>(&mut self, module_init: M) {
128 self.module_inits.attach(module_init);
129 }
130
131 pub fn stopped(&mut self) {
132 self.stopped = true;
133 }
134
135 pub fn with_api_request_hook(mut self, hook: ApiRequestHook) -> Self {
144 self.request_hook = hook;
145 self
146 }
147
148 #[deprecated(
155 since = "0.6.0",
156 note = "Use `with_primary_module_kind` instead, as the instance id can't be known upfront. If you *really* need the old behavior you can use `with_primary_module_instance_id`."
157 )]
158 pub fn with_primary_module(&mut self, primary_module_instance: ModuleInstanceId) {
159 self.with_primary_module_instance_id(primary_module_instance);
160 }
161
162 pub fn with_primary_module_instance_id(&mut self, primary_module_instance: ModuleInstanceId) {
177 let was_replaced = self
178 .primary_module_instance
179 .replace(primary_module_instance)
180 .is_some();
181 assert!(
182 !was_replaced,
183 "Only one primary module can be given to the builder."
184 );
185 }
186
187 pub fn with_primary_module_kind(&mut self, primary_module_kind: ModuleKind) {
194 let was_replaced = self
195 .primary_module_kind
196 .replace(primary_module_kind)
197 .is_some();
198 assert!(
199 !was_replaced,
200 "Only one primary module kind can be given to the builder."
201 );
202 }
203
204 pub fn with_meta_service(&mut self, meta_service: Arc<MetaService>) {
205 self.meta_service = meta_service;
206 }
207
208 async fn migrate_module_dbs(&self, db: &Database) -> anyhow::Result<()> {
215 if let Ok(client_config) = self.load_existing_config().await {
219 for (module_id, module_cfg) in client_config.modules {
220 let kind = module_cfg.kind.clone();
221 let Some(init) = self.module_inits.get(&kind) else {
222 continue;
224 };
225
226 let mut dbtx = db.begin_transaction().await;
227 apply_migrations_client_module_dbtx(
228 &mut dbtx.to_ref_nc(),
229 kind.to_string(),
230 init.get_database_migrations(),
231 module_id,
232 )
233 .await?;
234 if let Some(used_db_prefixes) = init.used_db_prefixes() {
235 if is_running_in_test_env() {
236 verify_module_db_integrity_dbtx(
237 &mut dbtx.to_ref_nc(),
238 module_id,
239 kind,
240 &used_db_prefixes,
241 )
242 .await;
243 }
244 }
245 dbtx.commit_tx_result().await?;
246 }
247 }
248
249 Ok(())
250 }
251
252 pub fn db_no_decoders(&self) -> &Database {
253 &self.db_no_decoders
254 }
255
256 pub async fn load_existing_config(&self) -> anyhow::Result<ClientConfig> {
257 let Some(config) = Client::get_config_from_db(&self.db_no_decoders).await else {
258 bail!("Client database not initialized")
259 };
260
261 Ok(config)
262 }
263
264 pub fn set_admin_creds(&mut self, creds: AdminCreds) {
265 self.admin_creds = Some(creds);
266 }
267
268 pub fn with_connector(&mut self, connector: Connector) {
269 self.connector = connector;
270 }
271
272 #[cfg(feature = "tor")]
273 pub fn with_tor_connector(&mut self) {
274 self.with_connector(Connector::tor());
275 }
276
277 async fn init(
278 self,
279 pre_root_secret: DerivableSecret,
280 config: ClientConfig,
281 api_secret: Option<String>,
282 init_mode: InitMode,
283 ) -> anyhow::Result<ClientHandle> {
284 if Client::is_initialized(&self.db_no_decoders).await {
285 bail!("Client database already initialized")
286 }
287
288 {
291 debug!(target: LOG_CLIENT, "Initializing client database");
292 let mut dbtx = self.db_no_decoders.begin_transaction().await;
293 dbtx.insert_new_entry(&crate::db::ClientConfigKey, &config)
295 .await;
296 dbtx.insert_entry(
297 &ClientPreRootSecretHashKey,
298 &pre_root_secret.derive_pre_root_secret_hash(),
299 )
300 .await;
301
302 if let Some(api_secret) = api_secret.as_ref() {
303 dbtx.insert_new_entry(&ApiSecretKey, api_secret).await;
304 }
305
306 let init_state = InitState::Pending(init_mode);
307 dbtx.insert_entry(&ClientInitStateKey, &init_state).await;
308
309 let metadata = init_state
310 .does_require_recovery()
311 .flatten()
312 .map_or(Metadata::empty(), |s| s.metadata);
313
314 dbtx.insert_new_entry(&ClientMetadataKey, &metadata).await;
315
316 dbtx.commit_tx_result().await?;
317 }
318
319 let stopped = self.stopped;
320 self.build(pre_root_secret, config, api_secret, stopped)
321 .await
322 }
323
324 pub async fn join(
398 self,
399 pre_root_secret: DerivableSecret,
400 config: ClientConfig,
401 api_secret: Option<String>,
402 ) -> anyhow::Result<ClientHandle> {
403 self.init(pre_root_secret, config, api_secret, InitMode::Fresh)
404 .await
405 }
406
407 pub async fn download_backup_from_federation(
409 &self,
410 root_secret: &DerivableSecret,
411 config: &ClientConfig,
412 api_secret: Option<String>,
413 ) -> anyhow::Result<Option<ClientBackup>> {
414 let api = DynGlobalApi::from_endpoints(
415 config
417 .global
418 .api_endpoints
419 .iter()
420 .map(|(peer_id, peer_url)| (*peer_id, peer_url.url.clone())),
421 &api_secret,
422 )
423 .await?;
424
425 Client::download_backup_from_federation_static(
426 &api,
427 &Self::federation_root_secret(root_secret, config),
428 &self.decoders(config),
429 )
430 .await
431 }
432
433 pub async fn recover(
444 self,
445 root_secret: DerivableSecret,
446 config: ClientConfig,
447 api_secret: Option<String>,
448 backup: Option<ClientBackup>,
449 ) -> anyhow::Result<ClientHandle> {
450 let client = self
451 .init(
452 root_secret,
453 config,
454 api_secret,
455 InitMode::Recover {
456 snapshot: backup.clone(),
457 },
458 )
459 .await?;
460
461 Ok(client)
462 }
463
464 pub async fn open(self, pre_root_secret: DerivableSecret) -> anyhow::Result<ClientHandle> {
465 let Some(config) = Client::get_config_from_db(&self.db_no_decoders).await else {
466 bail!("Client database not initialized")
467 };
468
469 match self
470 .db_no_decoders()
471 .begin_transaction_nc()
472 .await
473 .get_value(&ClientPreRootSecretHashKey)
474 .await
475 {
476 Some(secret_hash) => {
477 ensure!(
478 pre_root_secret.derive_pre_root_secret_hash() == secret_hash,
479 "Secret hash does not match. Incorrect secret"
480 );
481 }
482 _ => {
483 debug!(target: LOG_CLIENT, "Backfilling secret hash");
484 let mut dbtx = self.db_no_decoders.begin_transaction().await;
486 dbtx.insert_entry(
487 &ClientPreRootSecretHashKey,
488 &pre_root_secret.derive_pre_root_secret_hash(),
489 )
490 .await;
491 dbtx.commit_tx().await;
492 }
493 }
494
495 let api_secret = Client::get_api_secret_from_db(&self.db_no_decoders).await;
496 let stopped = self.stopped;
497 let request_hook = self.request_hook.clone();
498
499 let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
500 let client = self
501 .build_stopped(
502 pre_root_secret,
503 &config,
504 api_secret,
505 log_event_added_transient_tx,
506 request_hook,
507 )
508 .await?;
509 if !stopped {
510 client.as_inner().start_executor();
511 }
512 Ok(client)
513 }
514
515 pub(crate) async fn build(
517 self,
518 pre_root_secret: DerivableSecret,
519 config: ClientConfig,
520 api_secret: Option<String>,
521 stopped: bool,
522 ) -> anyhow::Result<ClientHandle> {
523 let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
524 let request_hook = self.request_hook.clone();
525 let client = self
526 .build_stopped(
527 pre_root_secret,
528 &config,
529 api_secret,
530 log_event_added_transient_tx,
531 request_hook,
532 )
533 .await?;
534 if !stopped {
535 client.as_inner().start_executor();
536 }
537
538 Ok(client)
539 }
540
541 async fn build_stopped(
544 self,
545 root_secret: DerivableSecret,
546 config: &ClientConfig,
547 api_secret: Option<String>,
548 log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
549 request_hook: ApiRequestHook,
550 ) -> anyhow::Result<ClientHandle> {
551 let (log_event_added_tx, log_event_added_rx) = watch::channel(());
552 let (log_ordering_wakeup_tx, log_ordering_wakeup_rx) = watch::channel(());
553
554 let decoders = self.decoders(config);
555 let config = Self::config_decoded(config, &decoders)?;
556 let fed_id = config.calculate_federation_id();
557 let db = self.db_no_decoders.with_decoders(decoders.clone());
558 let connector = self.connector;
559 let peer_urls = get_api_urls(&db, &config).await;
560 let api = match self.admin_creds.as_ref() {
561 Some(admin_creds) => ReconnectFederationApi::new_admin(
562 admin_creds.peer_id,
563 peer_urls
564 .into_iter()
565 .find_map(|(peer, api_url)| (admin_creds.peer_id == peer).then_some(api_url))
566 .context("Admin creds should match a peer")?,
567 &api_secret,
568 )
569 .await?
570 .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
571 .with_request_hook(&request_hook)
572 .with_cache()
573 .into(),
574 None => ReconnectFederationApi::from_endpoints(peer_urls, &api_secret, None)
575 .await?
576 .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
577 .with_request_hook(&request_hook)
578 .with_cache()
579 .into(),
580 };
581 let task_group = TaskGroup::new();
582
583 self.migrate_module_dbs(&db).await?;
586
587 let init_state = Self::load_init_state(&db).await;
588
589 let mut primary_module_instance = self.primary_module_instance.or_else(|| {
590 let primary_module_kind = self.primary_module_kind?;
591 config
592 .modules
593 .iter()
594 .find_map(|(module_instance_id, module_config)| {
595 (module_config.kind() == &primary_module_kind).then_some(*module_instance_id)
596 })
597 });
598
599 let notifier = Notifier::new();
600
601 let common_api_versions = Client::load_and_refresh_common_api_version_static(
602 &config,
603 &self.module_inits,
604 &api,
605 &db,
606 &task_group,
607 )
608 .await
609 .inspect_err(|err| {
610 warn!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Failed to discover initial API version to use.");
611 })
612 .unwrap_or(ApiVersionSet {
613 core: ApiVersion::new(0, 0),
614 modules: BTreeMap::new(),
616 });
617
618 debug!(target: LOG_CLIENT, ?common_api_versions, "Completed api version negotiation");
619
620 let mut module_recoveries: BTreeMap<
621 ModuleInstanceId,
622 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
623 > = BTreeMap::new();
624 let mut module_recovery_progress_receivers: BTreeMap<
625 ModuleInstanceId,
626 watch::Receiver<RecoveryProgress>,
627 > = BTreeMap::new();
628
629 let final_client = FinalClientIface::default();
630
631 let root_secret = Self::federation_root_secret(&root_secret, &config);
632
633 let modules = {
634 let mut modules = ClientModuleRegistry::default();
635 for (module_instance_id, module_config) in config.modules.clone() {
636 let kind = module_config.kind().clone();
637 let Some(module_init) = self.module_inits.get(&kind).cloned() else {
638 debug!(
639 target: LOG_CLIENT,
640 kind=%kind,
641 instance_id=%module_instance_id,
642 "Module kind of instance not found in module gens, skipping");
643 continue;
644 };
645
646 let Some(&api_version) = common_api_versions.modules.get(&module_instance_id)
647 else {
648 warn!(
649 target: LOG_CLIENT,
650 kind=%kind,
651 instance_id=%module_instance_id,
652 "Module kind of instance has incompatible api version, skipping"
653 );
654 continue;
655 };
656
657 let start_module_recover_fn =
660 |snapshot: Option<ClientBackup>, progress: RecoveryProgress| {
661 let module_config = module_config.clone();
662 let num_peers = NumPeers::from(config.global.api_endpoints.len());
663 let db = db.clone();
664 let kind = kind.clone();
665 let notifier = notifier.clone();
666 let api = api.clone();
667 let root_secret = root_secret.clone();
668 let admin_auth = self.admin_creds.as_ref().map(|creds| creds.auth.clone());
669 let final_client = final_client.clone();
670 let (progress_tx, progress_rx) = tokio::sync::watch::channel(progress);
671 let task_group = task_group.clone();
672 let module_init = module_init.clone();
673 (
674 Box::pin(async move {
675 module_init
676 .recover(
677 final_client.clone(),
678 fed_id,
679 num_peers,
680 module_config.clone(),
681 db.clone(),
682 module_instance_id,
683 common_api_versions.core,
684 api_version,
685 root_secret.derive_module_secret(module_instance_id),
686 notifier.clone(),
687 api.clone(),
688 admin_auth,
689 snapshot.as_ref().and_then(|s| s.modules.get(&module_instance_id)),
690 progress_tx,
691 task_group,
692 )
693 .await
694 .inspect_err(|err| {
695 warn!(
696 target: LOG_CLIENT,
697 module_id = module_instance_id, %kind, err = %err.fmt_compact_anyhow(), "Module failed to recover"
698 );
699 })
700 }),
701 progress_rx,
702 )
703 };
704
705 let recovery = match init_state.does_require_recovery() {
706 Some(snapshot) => {
707 match db
708 .begin_transaction_nc()
709 .await
710 .get_value(&ClientModuleRecovery { module_instance_id })
711 .await
712 {
713 Some(module_recovery_state) => {
714 if module_recovery_state.is_done() {
715 debug!(
716 id = %module_instance_id,
717 %kind, "Module recovery already complete"
718 );
719 None
720 } else {
721 debug!(
722 id = %module_instance_id,
723 %kind,
724 progress = %module_recovery_state.progress,
725 "Starting module recovery with an existing progress"
726 );
727 Some(start_module_recover_fn(
728 snapshot,
729 module_recovery_state.progress,
730 ))
731 }
732 }
733 _ => {
734 let progress = RecoveryProgress::none();
735 let mut dbtx = db.begin_transaction().await;
736 dbtx.log_event(
737 log_ordering_wakeup_tx.clone(),
738 None,
739 ModuleRecoveryStarted::new(module_instance_id),
740 )
741 .await;
742 dbtx.insert_entry(
743 &ClientModuleRecovery { module_instance_id },
744 &ClientModuleRecoveryState { progress },
745 )
746 .await;
747
748 dbtx.commit_tx().await;
749
750 debug!(
751 id = %module_instance_id,
752 %kind, "Starting new module recovery"
753 );
754 Some(start_module_recover_fn(snapshot, progress))
755 }
756 }
757 }
758 _ => None,
759 };
760
761 match recovery {
762 Some((recovery, recovery_progress_rx)) => {
763 module_recoveries.insert(module_instance_id, recovery);
764 module_recovery_progress_receivers
765 .insert(module_instance_id, recovery_progress_rx);
766 }
767 _ => {
768 let module = module_init
769 .init(
770 final_client.clone(),
771 fed_id,
772 config.global.api_endpoints.len(),
773 module_config,
774 db.clone(),
775 module_instance_id,
776 common_api_versions.core,
777 api_version,
778 root_secret.derive_module_secret(module_instance_id),
785 notifier.clone(),
786 api.clone(),
787 self.admin_creds.as_ref().map(|cred| cred.auth.clone()),
788 task_group.clone(),
789 )
790 .await?;
791
792 if primary_module_instance.is_none() && module.supports_being_primary() {
793 primary_module_instance = Some(module_instance_id);
794 } else if primary_module_instance == Some(module_instance_id)
795 && !module.supports_being_primary()
796 {
797 bail!(
798 "Module instance {module_instance_id} of kind {kind} does not support being a primary module"
799 );
800 }
801
802 modules.register_module(module_instance_id, kind, module);
803 }
804 }
805 }
806 modules
807 };
808
809 if init_state.is_pending() && module_recoveries.is_empty() {
810 let mut dbtx = db.begin_transaction().await;
811 dbtx.insert_entry(&ClientInitStateKey, &init_state.into_complete())
812 .await;
813 dbtx.commit_tx().await;
814 }
815
816 let executor = {
817 let mut executor_builder = Executor::builder();
818 executor_builder
819 .with_module(TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext);
820
821 for (module_instance_id, _, module) in modules.iter_modules() {
822 executor_builder.with_module_dyn(module.context(module_instance_id));
823 }
824
825 for module_instance_id in module_recoveries.keys() {
826 executor_builder.with_valid_module_id(*module_instance_id);
827 }
828
829 executor_builder.build(db.clone(), notifier, task_group.clone())
830 };
831
832 let recovery_receiver_init_val = module_recovery_progress_receivers
833 .iter()
834 .map(|(module_instance_id, rx)| (*module_instance_id, *rx.borrow()))
835 .collect::<BTreeMap<_, _>>();
836 let (client_recovery_progress_sender, client_recovery_progress_receiver) =
837 watch::channel(recovery_receiver_init_val);
838
839 let client_inner = Arc::new(Client {
840 final_client: final_client.clone(),
841 config: tokio::sync::RwLock::new(config.clone()),
842 api_secret,
843 decoders,
844 db: db.clone(),
845 federation_id: fed_id,
846 federation_config_meta: config.global.meta,
847 primary_module_instance: primary_module_instance
848 .ok_or(anyhow!("No primary module set or found"))?,
849 modules,
850 module_inits: self.module_inits.clone(),
851 log_ordering_wakeup_tx,
852 log_event_added_rx,
853 log_event_added_transient_tx: log_event_added_transient_tx.clone(),
854 request_hook,
855 executor,
856 api,
857 secp_ctx: Secp256k1::new(),
858 root_secret,
859 task_group,
860 operation_log: OperationLog::new(db.clone()),
861 client_recovery_progress_receiver,
862 meta_service: self.meta_service,
863 connector,
864 });
865 client_inner
866 .task_group
867 .spawn_cancellable("MetaService::update_continuously", {
868 let client_inner = client_inner.clone();
869 async move {
870 client_inner
871 .meta_service
872 .update_continuously(&client_inner)
873 .await;
874 }
875 });
876
877 client_inner.task_group.spawn_cancellable(
878 "update-api-announcements",
879 run_api_announcement_sync(client_inner.clone()),
880 );
881
882 client_inner.task_group.spawn_cancellable(
883 "event log ordering task",
884 run_event_log_ordering_task(
885 db.clone(),
886 log_ordering_wakeup_rx,
887 log_event_added_tx,
888 log_event_added_transient_tx,
889 ),
890 );
891 let client_iface = std::sync::Arc::<Client>::downgrade(&client_inner);
892
893 let client_arc = ClientHandle::new(client_inner);
894
895 for (_, _, module) in client_arc.modules.iter_modules() {
896 module.start().await;
897 }
898
899 final_client.set(client_iface.clone());
900
901 if !module_recoveries.is_empty() {
902 client_arc.spawn_module_recoveries_task(
903 client_recovery_progress_sender,
904 module_recoveries,
905 module_recovery_progress_receivers,
906 );
907 }
908
909 Ok(client_arc)
910 }
911
912 async fn load_init_state(db: &Database) -> InitState {
913 let mut dbtx = db.begin_transaction_nc().await;
914 dbtx.get_value(&ClientInitStateKey)
915 .await
916 .unwrap_or_else(|| {
917 warn!(
920 target: LOG_CLIENT,
921 "Client missing ClientRequiresRecovery: assuming complete"
922 );
923 db::InitState::Complete(db::InitModeComplete::Fresh)
924 })
925 }
926
927 fn decoders(&self, config: &ClientConfig) -> ModuleDecoderRegistry {
928 let mut decoders = client_decoders(
929 &self.module_inits,
930 config
931 .modules
932 .iter()
933 .map(|(module_instance, module_config)| (*module_instance, module_config.kind())),
934 );
935
936 decoders.register_module(
937 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
938 ModuleKind::from_static_str("tx_submission"),
939 tx_submission_sm_decoder(),
940 );
941
942 decoders
943 }
944
945 fn config_decoded(
946 config: &ClientConfig,
947 decoders: &ModuleDecoderRegistry,
948 ) -> Result<ClientConfig, fedimint_core::encoding::DecodeError> {
949 config.clone().redecode_raw(decoders)
950 }
951
952 fn federation_root_secret(
956 root_secret: &DerivableSecret,
957 config: &ClientConfig,
958 ) -> DerivableSecret {
959 root_secret.federation_key(&config.global.calculate_federation_id())
960 }
961
962 pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
964 self.log_event_added_transient_tx.subscribe()
965 }
966}