1use std::collections::BTreeMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5
6use anyhow::{Context as _, anyhow, bail, ensure};
7use bitcoin::key::Secp256k1;
8use fedimint_api_client::api::global_api::with_cache::GlobalFederationApiWithCacheExt as _;
9use fedimint_api_client::api::global_api::with_request_hook::{
10 ApiRequestHook, RawFederationApiWithRequestHookExt as _,
11};
12use fedimint_api_client::api::net::Connector;
13use fedimint_api_client::api::{
14 ApiVersionSet, DynGlobalApi, FederationApiExt as _, ReconnectFederationApi,
15};
16use fedimint_client_module::api::ClientRawFederationApiExt as _;
17use fedimint_client_module::meta::LegacyMetaSource;
18use fedimint_client_module::module::init::ClientModuleInit;
19use fedimint_client_module::module::recovery::RecoveryProgress;
20use fedimint_client_module::module::{ClientModuleRegistry, FinalClientIface};
21use fedimint_client_module::secret::{DeriveableSecretClientExt as _, get_default_client_secret};
22use fedimint_client_module::transaction::{
23 TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext, tx_submission_sm_decoder,
24};
25use fedimint_client_module::{AdminCreds, ModuleRecoveryStarted};
26use fedimint_core::config::{ClientConfig, FederationId, ModuleInitRegistry};
27use fedimint_core::core::{ModuleInstanceId, ModuleKind};
28use fedimint_core::db::{
29 Database, IDatabaseTransactionOpsCoreTyped as _, verify_module_db_integrity_dbtx,
30};
31use fedimint_core::endpoint_constants::CLIENT_CONFIG_ENDPOINT;
32use fedimint_core::envs::is_running_in_test_env;
33use fedimint_core::invite_code::InviteCode;
34use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
35use fedimint_core::module::{ApiRequestErased, ApiVersion};
36use fedimint_core::task::TaskGroup;
37use fedimint_core::util::FmtCompactAnyhow as _;
38use fedimint_core::{NumPeers, maybe_add_send};
39use fedimint_derive_secret::DerivableSecret;
40use fedimint_eventlog::{
41 DBTransactionEventLogExt as _, EventLogEntry, run_event_log_ordering_task,
42};
43use fedimint_logging::LOG_CLIENT;
44use tokio::sync::{broadcast, watch};
45use tracing::{debug, warn};
46
47use super::handle::ClientHandle;
48use super::{Client, client_decoders};
49use crate::api_announcements::{
50 get_api_urls, refresh_api_announcement_sync, run_api_announcement_sync,
51};
52use crate::backup::{ClientBackup, Metadata};
53use crate::db::{
54 self, ApiSecretKey, ClientInitStateKey, ClientMetadataKey, ClientModuleRecovery,
55 ClientModuleRecoveryState, ClientPreRootSecretHashKey, InitMode, InitState,
56 PendingClientConfigKey, apply_migrations_client_module_dbtx,
57};
58use crate::meta::MetaService;
59use crate::module_init::ClientModuleInitRegistry;
60use crate::oplog::OperationLog;
61use crate::sm::executor::Executor;
62use crate::sm::notifier::Notifier;
63
64#[derive(Clone)]
86pub enum RootSecret {
87 StandardDoubleDerive(DerivableSecret),
92 Custom(DerivableSecret),
97}
98
99impl RootSecret {
100 fn to_inner(&self, federation_id: FederationId) -> DerivableSecret {
101 match self {
102 RootSecret::StandardDoubleDerive(derivable_secret) => {
103 get_default_client_secret(derivable_secret, &federation_id)
104 }
105 RootSecret::Custom(derivable_secret) => derivable_secret.clone(),
106 }
107 }
108}
109
110pub struct ClientBuilder {
112 module_inits: ClientModuleInitRegistry,
113 primary_module_instance: Option<ModuleInstanceId>,
114 primary_module_kind: Option<ModuleKind>,
115 admin_creds: Option<AdminCreds>,
116 db_no_decoders: Database,
117 meta_service: Arc<crate::meta::MetaService>,
118 connector: Connector,
119 stopped: bool,
120 log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
121 request_hook: ApiRequestHook,
122}
123
124impl ClientBuilder {
125 pub(crate) fn new(db: Database) -> Self {
126 let meta_service = MetaService::new(LegacyMetaSource::default());
127 let (log_event_added_transient_tx, _log_event_added_transient_rx) =
128 broadcast::channel(1024);
129 ClientBuilder {
130 module_inits: ModuleInitRegistry::new(),
131 primary_module_instance: None,
132 primary_module_kind: None,
133 connector: Connector::default(),
134 admin_creds: None,
135 db_no_decoders: db,
136 stopped: false,
137 meta_service,
138 log_event_added_transient_tx,
139 request_hook: Arc::new(|api| api),
140 }
141 }
142
143 pub(crate) fn from_existing(client: &Client) -> Self {
144 ClientBuilder {
145 module_inits: client.module_inits.clone(),
146 primary_module_instance: Some(client.primary_module_instance),
147 primary_module_kind: None,
148 admin_creds: None,
149 db_no_decoders: client.db.with_decoders(ModuleRegistry::default()),
150 stopped: false,
151 meta_service: client.meta_service.clone(),
153 connector: client.connector,
154 log_event_added_transient_tx: client.log_event_added_transient_tx.clone(),
155 request_hook: client.request_hook.clone(),
156 }
157 }
158
159 pub fn with_module_inits(&mut self, module_inits: ClientModuleInitRegistry) {
168 self.module_inits = module_inits;
169 }
170
171 pub fn with_module<M: ClientModuleInit>(&mut self, module_init: M) {
180 self.module_inits.attach(module_init);
181 }
182
183 pub fn stopped(&mut self) {
184 self.stopped = true;
185 }
186
187 pub fn with_api_request_hook(mut self, hook: ApiRequestHook) -> Self {
196 self.request_hook = hook;
197 self
198 }
199
200 #[deprecated(
207 since = "0.6.0",
208 note = "Use `with_primary_module_kind` instead, as the instance id can't be known upfront. If you *really* need the old behavior you can use `with_primary_module_instance_id`."
209 )]
210 pub fn with_primary_module(&mut self, primary_module_instance: ModuleInstanceId) {
211 self.with_primary_module_instance_id(primary_module_instance);
212 }
213
214 pub fn with_primary_module_instance_id(&mut self, primary_module_instance: ModuleInstanceId) {
229 let was_replaced = self
230 .primary_module_instance
231 .replace(primary_module_instance)
232 .is_some();
233 assert!(
234 !was_replaced,
235 "Only one primary module can be given to the builder."
236 );
237 }
238
239 pub fn with_primary_module_kind(&mut self, primary_module_kind: ModuleKind) {
246 let was_replaced = self
247 .primary_module_kind
248 .replace(primary_module_kind)
249 .is_some();
250 assert!(
251 !was_replaced,
252 "Only one primary module kind can be given to the builder."
253 );
254 }
255
256 pub fn with_meta_service(&mut self, meta_service: Arc<MetaService>) {
257 self.meta_service = meta_service;
258 }
259
260 async fn migrate_module_dbs(&self, db: &Database) -> anyhow::Result<()> {
267 if let Ok(client_config) = self.load_existing_config().await {
271 for (module_id, module_cfg) in client_config.modules {
272 let kind = module_cfg.kind.clone();
273 let Some(init) = self.module_inits.get(&kind) else {
274 continue;
276 };
277
278 let mut dbtx = db.begin_transaction().await;
279 apply_migrations_client_module_dbtx(
280 &mut dbtx.to_ref_nc(),
281 kind.to_string(),
282 init.get_database_migrations(),
283 module_id,
284 )
285 .await?;
286 if let Some(used_db_prefixes) = init.used_db_prefixes()
287 && is_running_in_test_env()
288 {
289 verify_module_db_integrity_dbtx(
290 &mut dbtx.to_ref_nc(),
291 module_id,
292 kind,
293 &used_db_prefixes,
294 )
295 .await;
296 }
297 dbtx.commit_tx_result().await?;
298 }
299 }
300
301 Ok(())
302 }
303
304 pub fn db_no_decoders(&self) -> &Database {
305 &self.db_no_decoders
306 }
307
308 pub async fn load_existing_config(&self) -> anyhow::Result<ClientConfig> {
309 let Some(config) = Client::get_config_from_db(&self.db_no_decoders).await else {
310 bail!("Client database not initialized")
311 };
312
313 Ok(config)
314 }
315
316 pub fn set_admin_creds(&mut self, creds: AdminCreds) {
317 self.admin_creds = Some(creds);
318 }
319
320 pub fn with_connector(&mut self, connector: Connector) {
321 self.connector = connector;
322 }
323
324 #[cfg(feature = "tor")]
325 pub fn with_tor_connector(&mut self) {
326 self.with_connector(Connector::tor());
327 }
328
329 async fn init(
330 self,
331 pre_root_secret: DerivableSecret,
332 config: ClientConfig,
333 api_secret: Option<String>,
334 init_mode: InitMode,
335 ) -> anyhow::Result<ClientHandle> {
336 if Client::is_initialized(&self.db_no_decoders).await {
337 bail!("Client database already initialized")
338 }
339
340 {
343 debug!(target: LOG_CLIENT, "Initializing client database");
344 let mut dbtx = self.db_no_decoders.begin_transaction().await;
345 dbtx.insert_new_entry(&crate::db::ClientConfigKey, &config)
347 .await;
348 dbtx.insert_entry(
349 &ClientPreRootSecretHashKey,
350 &pre_root_secret.derive_pre_root_secret_hash(),
351 )
352 .await;
353
354 if let Some(api_secret) = api_secret.as_ref() {
355 dbtx.insert_new_entry(&ApiSecretKey, api_secret).await;
356 }
357
358 let init_state = InitState::Pending(init_mode);
359 dbtx.insert_entry(&ClientInitStateKey, &init_state).await;
360
361 let metadata = init_state
362 .does_require_recovery()
363 .flatten()
364 .map_or(Metadata::empty(), |s| s.metadata);
365
366 dbtx.insert_new_entry(&ClientMetadataKey, &metadata).await;
367
368 dbtx.commit_tx_result().await?;
369 }
370
371 let stopped = self.stopped;
372 self.build(pre_root_secret, config, api_secret, stopped)
373 .await
374 }
375
376 pub async fn preview(self, invite_code: &InviteCode) -> anyhow::Result<ClientPreview> {
377 let config = self
378 .connector
379 .download_from_invite_code(invite_code)
380 .await?;
381
382 if let Some(guardian_pub_keys) = config.global.broadcast_public_keys.clone() {
383 let api = DynGlobalApi::from_endpoints(invite_code.peers(), &invite_code.api_secret())
387 .await?;
388 refresh_api_announcement_sync(&api, self.db_no_decoders(), &guardian_pub_keys).await?;
389 }
390
391 Ok(ClientPreview {
392 inner: self,
393 config,
394 api_secret: invite_code.api_secret(),
395 })
396 }
397
398 pub async fn preview_with_existing_config(
400 self,
401 config: ClientConfig,
402 api_secret: Option<String>,
403 ) -> anyhow::Result<ClientPreview> {
404 Ok(ClientPreview {
405 inner: self,
406 config,
407 api_secret,
408 })
409 }
410
411 pub async fn open(self, pre_root_secret: RootSecret) -> anyhow::Result<ClientHandle> {
412 Self::migrate_pending_config_if_present(&self.db_no_decoders).await;
414
415 let Some(config) = Client::get_config_from_db(&self.db_no_decoders).await else {
416 bail!("Client database not initialized")
417 };
418
419 let pre_root_secret = pre_root_secret.to_inner(config.calculate_federation_id());
420
421 match self
422 .db_no_decoders()
423 .begin_transaction_nc()
424 .await
425 .get_value(&ClientPreRootSecretHashKey)
426 .await
427 {
428 Some(secret_hash) => {
429 ensure!(
430 pre_root_secret.derive_pre_root_secret_hash() == secret_hash,
431 "Secret hash does not match. Incorrect secret"
432 );
433 }
434 _ => {
435 debug!(target: LOG_CLIENT, "Backfilling secret hash");
436 let mut dbtx = self.db_no_decoders.begin_transaction().await;
438 dbtx.insert_entry(
439 &ClientPreRootSecretHashKey,
440 &pre_root_secret.derive_pre_root_secret_hash(),
441 )
442 .await;
443 dbtx.commit_tx().await;
444 }
445 }
446
447 let api_secret = Client::get_api_secret_from_db(&self.db_no_decoders).await;
448 let stopped = self.stopped;
449 let request_hook = self.request_hook.clone();
450
451 let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
452 let client = self
453 .build_stopped(
454 pre_root_secret,
455 &config,
456 api_secret,
457 log_event_added_transient_tx,
458 request_hook,
459 )
460 .await?;
461 if !stopped {
462 client.as_inner().start_executor();
463 }
464 Ok(client)
465 }
466
467 pub(crate) async fn build(
469 self,
470 pre_root_secret: DerivableSecret,
471 config: ClientConfig,
472 api_secret: Option<String>,
473 stopped: bool,
474 ) -> anyhow::Result<ClientHandle> {
475 let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
476 let request_hook = self.request_hook.clone();
477 let client = self
478 .build_stopped(
479 pre_root_secret,
480 &config,
481 api_secret,
482 log_event_added_transient_tx,
483 request_hook,
484 )
485 .await?;
486 if !stopped {
487 client.as_inner().start_executor();
488 }
489
490 Ok(client)
491 }
492
493 async fn build_stopped(
496 self,
497 pre_root_secret: DerivableSecret,
498 config: &ClientConfig,
499 api_secret: Option<String>,
500 log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
501 request_hook: ApiRequestHook,
502 ) -> anyhow::Result<ClientHandle> {
503 let (log_event_added_tx, log_event_added_rx) = watch::channel(());
504 let (log_ordering_wakeup_tx, log_ordering_wakeup_rx) = watch::channel(());
505
506 let decoders = self.decoders(config);
507 let config = Self::config_decoded(config, &decoders)?;
508 let fed_id = config.calculate_federation_id();
509 let db = self.db_no_decoders.with_decoders(decoders.clone());
510 let connector = self.connector;
511 let peer_urls = get_api_urls(&db, &config).await;
512 let api = match self.admin_creds.as_ref() {
513 Some(admin_creds) => ReconnectFederationApi::new_admin(
514 admin_creds.peer_id,
515 peer_urls
516 .into_iter()
517 .find_map(|(peer, api_url)| (admin_creds.peer_id == peer).then_some(api_url))
518 .context("Admin creds should match a peer")?,
519 &api_secret,
520 )
521 .await?
522 .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
523 .with_request_hook(&request_hook)
524 .with_cache()
525 .into(),
526 None => ReconnectFederationApi::from_endpoints(peer_urls, &api_secret, None)
527 .await?
528 .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
529 .with_request_hook(&request_hook)
530 .with_cache()
531 .into(),
532 };
533 let task_group = TaskGroup::new();
534
535 self.migrate_module_dbs(&db).await?;
538
539 let init_state = Self::load_init_state(&db).await;
540
541 let mut primary_module_instance = self.primary_module_instance.or_else(|| {
542 let primary_module_kind = self.primary_module_kind?;
543 config
544 .modules
545 .iter()
546 .find_map(|(module_instance_id, module_config)| {
547 (module_config.kind() == &primary_module_kind).then_some(*module_instance_id)
548 })
549 });
550
551 let notifier = Notifier::new();
552
553 let common_api_versions = Client::load_and_refresh_common_api_version_static(
554 &config,
555 &self.module_inits,
556 &api,
557 &db,
558 &task_group,
559 )
560 .await
561 .inspect_err(|err| {
562 warn!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Failed to discover initial API version to use.");
563 })
564 .unwrap_or(ApiVersionSet {
565 core: ApiVersion::new(0, 0),
566 modules: BTreeMap::new(),
568 });
569
570 debug!(target: LOG_CLIENT, ?common_api_versions, "Completed api version negotiation");
571
572 Self::load_and_refresh_client_config_static(&config, &api, &db, &task_group);
574
575 let mut module_recoveries: BTreeMap<
576 ModuleInstanceId,
577 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
578 > = BTreeMap::new();
579 let mut module_recovery_progress_receivers: BTreeMap<
580 ModuleInstanceId,
581 watch::Receiver<RecoveryProgress>,
582 > = BTreeMap::new();
583
584 let final_client = FinalClientIface::default();
585
586 let root_secret = Self::federation_root_secret(&pre_root_secret, &config);
587
588 let modules = {
589 let mut modules = ClientModuleRegistry::default();
590 for (module_instance_id, module_config) in config.modules.clone() {
591 let kind = module_config.kind().clone();
592 let Some(module_init) = self.module_inits.get(&kind).cloned() else {
593 debug!(
594 target: LOG_CLIENT,
595 kind=%kind,
596 instance_id=%module_instance_id,
597 "Module kind of instance not found in module gens, skipping");
598 continue;
599 };
600
601 let Some(&api_version) = common_api_versions.modules.get(&module_instance_id)
602 else {
603 warn!(
604 target: LOG_CLIENT,
605 kind=%kind,
606 instance_id=%module_instance_id,
607 "Module kind of instance has incompatible api version, skipping"
608 );
609 continue;
610 };
611
612 let start_module_recover_fn =
615 |snapshot: Option<ClientBackup>, progress: RecoveryProgress| {
616 let module_config = module_config.clone();
617 let num_peers = NumPeers::from(config.global.api_endpoints.len());
618 let db = db.clone();
619 let kind = kind.clone();
620 let notifier = notifier.clone();
621 let api = api.clone();
622 let root_secret = root_secret.clone();
623 let admin_auth = self.admin_creds.as_ref().map(|creds| creds.auth.clone());
624 let final_client = final_client.clone();
625 let (progress_tx, progress_rx) = tokio::sync::watch::channel(progress);
626 let task_group = task_group.clone();
627 let module_init = module_init.clone();
628 (
629 Box::pin(async move {
630 module_init
631 .recover(
632 final_client.clone(),
633 fed_id,
634 num_peers,
635 module_config.clone(),
636 db.clone(),
637 module_instance_id,
638 common_api_versions.core,
639 api_version,
640 root_secret.derive_module_secret(module_instance_id),
641 notifier.clone(),
642 api.clone(),
643 admin_auth,
644 snapshot.as_ref().and_then(|s| s.modules.get(&module_instance_id)),
645 progress_tx,
646 task_group,
647 )
648 .await
649 .inspect_err(|err| {
650 warn!(
651 target: LOG_CLIENT,
652 module_id = module_instance_id, %kind, err = %err.fmt_compact_anyhow(), "Module failed to recover"
653 );
654 })
655 }),
656 progress_rx,
657 )
658 };
659
660 let recovery = match init_state.does_require_recovery() {
661 Some(snapshot) => {
662 match db
663 .begin_transaction_nc()
664 .await
665 .get_value(&ClientModuleRecovery { module_instance_id })
666 .await
667 {
668 Some(module_recovery_state) => {
669 if module_recovery_state.is_done() {
670 debug!(
671 id = %module_instance_id,
672 %kind, "Module recovery already complete"
673 );
674 None
675 } else {
676 debug!(
677 id = %module_instance_id,
678 %kind,
679 progress = %module_recovery_state.progress,
680 "Starting module recovery with an existing progress"
681 );
682 Some(start_module_recover_fn(
683 snapshot,
684 module_recovery_state.progress,
685 ))
686 }
687 }
688 _ => {
689 let progress = RecoveryProgress::none();
690 let mut dbtx = db.begin_transaction().await;
691 dbtx.log_event(
692 log_ordering_wakeup_tx.clone(),
693 None,
694 ModuleRecoveryStarted::new(module_instance_id),
695 )
696 .await;
697 dbtx.insert_entry(
698 &ClientModuleRecovery { module_instance_id },
699 &ClientModuleRecoveryState { progress },
700 )
701 .await;
702
703 dbtx.commit_tx().await;
704
705 debug!(
706 id = %module_instance_id,
707 %kind, "Starting new module recovery"
708 );
709 Some(start_module_recover_fn(snapshot, progress))
710 }
711 }
712 }
713 _ => None,
714 };
715
716 match recovery {
717 Some((recovery, recovery_progress_rx)) => {
718 module_recoveries.insert(module_instance_id, recovery);
719 module_recovery_progress_receivers
720 .insert(module_instance_id, recovery_progress_rx);
721 }
722 _ => {
723 let module = module_init
724 .init(
725 final_client.clone(),
726 fed_id,
727 config.global.api_endpoints.len(),
728 module_config,
729 db.clone(),
730 module_instance_id,
731 common_api_versions.core,
732 api_version,
733 root_secret.derive_module_secret(module_instance_id),
740 notifier.clone(),
741 api.clone(),
742 self.admin_creds.as_ref().map(|cred| cred.auth.clone()),
743 task_group.clone(),
744 )
745 .await?;
746
747 if primary_module_instance.is_none() && module.supports_being_primary() {
748 primary_module_instance = Some(module_instance_id);
749 } else if primary_module_instance == Some(module_instance_id)
750 && !module.supports_being_primary()
751 {
752 bail!(
753 "Module instance {module_instance_id} of kind {kind} does not support being a primary module"
754 );
755 }
756
757 modules.register_module(module_instance_id, kind, module);
758 }
759 }
760 }
761 modules
762 };
763
764 if init_state.is_pending() && module_recoveries.is_empty() {
765 let mut dbtx = db.begin_transaction().await;
766 dbtx.insert_entry(&ClientInitStateKey, &init_state.into_complete())
767 .await;
768 dbtx.commit_tx().await;
769 }
770
771 let executor = {
772 let mut executor_builder = Executor::builder();
773 executor_builder
774 .with_module(TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext);
775
776 for (module_instance_id, _, module) in modules.iter_modules() {
777 executor_builder.with_module_dyn(module.context(module_instance_id));
778 }
779
780 for module_instance_id in module_recoveries.keys() {
781 executor_builder.with_valid_module_id(*module_instance_id);
782 }
783
784 executor_builder.build(
785 db.clone(),
786 notifier,
787 task_group.clone(),
788 log_ordering_wakeup_tx.clone(),
789 )
790 };
791
792 let recovery_receiver_init_val = module_recovery_progress_receivers
793 .iter()
794 .map(|(module_instance_id, rx)| (*module_instance_id, *rx.borrow()))
795 .collect::<BTreeMap<_, _>>();
796 let (client_recovery_progress_sender, client_recovery_progress_receiver) =
797 watch::channel(recovery_receiver_init_val);
798
799 let client_inner = Arc::new(Client {
800 final_client: final_client.clone(),
801 config: tokio::sync::RwLock::new(config.clone()),
802 api_secret,
803 decoders,
804 db: db.clone(),
805 federation_id: fed_id,
806 federation_config_meta: config.global.meta,
807 primary_module_instance: primary_module_instance
808 .ok_or(anyhow!("No primary module set or found"))?,
809 modules,
810 module_inits: self.module_inits.clone(),
811 log_ordering_wakeup_tx,
812 log_event_added_rx,
813 log_event_added_transient_tx: log_event_added_transient_tx.clone(),
814 request_hook,
815 executor,
816 api,
817 secp_ctx: Secp256k1::new(),
818 root_secret,
819 task_group,
820 operation_log: OperationLog::new(db.clone()),
821 client_recovery_progress_receiver,
822 meta_service: self.meta_service,
823 connector,
824 });
825 client_inner
826 .task_group
827 .spawn_cancellable("MetaService::update_continuously", {
828 let client_inner = client_inner.clone();
829 async move {
830 client_inner
831 .meta_service
832 .update_continuously(&client_inner)
833 .await;
834 }
835 });
836
837 client_inner.task_group.spawn_cancellable(
838 "update-api-announcements",
839 run_api_announcement_sync(client_inner.clone()),
840 );
841
842 client_inner.task_group.spawn_cancellable(
843 "event log ordering task",
844 run_event_log_ordering_task(
845 db.clone(),
846 log_ordering_wakeup_rx,
847 log_event_added_tx,
848 log_event_added_transient_tx,
849 ),
850 );
851 let client_iface = std::sync::Arc::<Client>::downgrade(&client_inner);
852
853 let client_arc = ClientHandle::new(client_inner);
854
855 for (_, _, module) in client_arc.modules.iter_modules() {
856 module.start().await;
857 }
858
859 final_client.set(client_iface.clone());
860
861 if !module_recoveries.is_empty() {
862 client_arc.spawn_module_recoveries_task(
863 client_recovery_progress_sender,
864 module_recoveries,
865 module_recovery_progress_receivers,
866 );
867 }
868
869 Ok(client_arc)
870 }
871
872 async fn load_init_state(db: &Database) -> InitState {
873 let mut dbtx = db.begin_transaction_nc().await;
874 dbtx.get_value(&ClientInitStateKey)
875 .await
876 .unwrap_or_else(|| {
877 warn!(
880 target: LOG_CLIENT,
881 "Client missing ClientRequiresRecovery: assuming complete"
882 );
883 db::InitState::Complete(db::InitModeComplete::Fresh)
884 })
885 }
886
887 fn decoders(&self, config: &ClientConfig) -> ModuleDecoderRegistry {
888 let mut decoders = client_decoders(
889 &self.module_inits,
890 config
891 .modules
892 .iter()
893 .map(|(module_instance, module_config)| (*module_instance, module_config.kind())),
894 );
895
896 decoders.register_module(
897 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
898 ModuleKind::from_static_str("tx_submission"),
899 tx_submission_sm_decoder(),
900 );
901
902 decoders
903 }
904
905 fn config_decoded(
906 config: &ClientConfig,
907 decoders: &ModuleDecoderRegistry,
908 ) -> Result<ClientConfig, fedimint_core::encoding::DecodeError> {
909 config.clone().redecode_raw(decoders)
910 }
911
912 fn federation_root_secret(
916 pre_root_secret: &DerivableSecret,
917 config: &ClientConfig,
918 ) -> DerivableSecret {
919 pre_root_secret.federation_key(&config.global.calculate_federation_id())
920 }
921
922 pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
924 self.log_event_added_transient_tx.subscribe()
925 }
926
927 async fn migrate_pending_config_if_present(db: &Database) {
931 if let Some(pending_config) = Client::get_pending_config_from_db(db).await {
932 debug!(target: LOG_CLIENT, "Found pending client config, migrating to current config");
933
934 let mut dbtx = db.begin_transaction().await;
935 dbtx.insert_entry(&crate::db::ClientConfigKey, &pending_config)
937 .await;
938 dbtx.remove_entry(&PendingClientConfigKey).await;
940 dbtx.commit_tx().await;
941
942 debug!(target: LOG_CLIENT, "Successfully migrated pending config to current config");
943 }
944 }
945
946 fn load_and_refresh_client_config_static(
949 config: &ClientConfig,
950 api: &DynGlobalApi,
951 db: &Database,
952 task_group: &TaskGroup,
953 ) {
954 let config = config.clone();
955 let api = api.clone();
956 let db = db.clone();
957 let task_group = task_group.clone();
958
959 task_group.spawn_cancellable("refresh_client_config_static", async move {
961 Self::refresh_client_config_static(&config, &api, &db).await;
962 });
963 }
964
965 async fn refresh_client_config_static(
967 config: &ClientConfig,
968 api: &DynGlobalApi,
969 db: &Database,
970 ) {
971 if let Err(error) = Self::refresh_client_config_static_try(config, api, db).await {
972 warn!(
973 target: LOG_CLIENT,
974 err = %error.fmt_compact_anyhow(), "Failed to refresh client config"
975 );
976 }
977 }
978
979 fn validate_config_update(
981 current_config: &ClientConfig,
982 new_config: &ClientConfig,
983 ) -> anyhow::Result<()> {
984 if current_config.global != new_config.global {
986 bail!("Global configuration changes are not allowed in config updates");
987 }
988
989 for (module_id, current_module_config) in ¤t_config.modules {
991 match new_config.modules.get(module_id) {
992 Some(new_module_config) => {
993 if current_module_config != new_module_config {
994 bail!(
995 "Module {} configuration changes are not allowed, only additions are permitted",
996 module_id
997 );
998 }
999 }
1000 None => {
1001 bail!(
1002 "Module {} was removed in new config, only additions are allowed",
1003 module_id
1004 );
1005 }
1006 }
1007 }
1008
1009 Ok(())
1010 }
1011
1012 async fn refresh_client_config_static_try(
1014 current_config: &ClientConfig,
1015 api: &DynGlobalApi,
1016 db: &Database,
1017 ) -> anyhow::Result<()> {
1018 debug!(target: LOG_CLIENT, "Refreshing client config");
1019
1020 let fetched_config = api
1022 .request_current_consensus::<ClientConfig>(
1023 CLIENT_CONFIG_ENDPOINT.to_owned(),
1024 ApiRequestErased::default(),
1025 )
1026 .await?;
1027
1028 Self::validate_config_update(current_config, &fetched_config)?;
1030
1031 if current_config != &fetched_config {
1033 debug!(target: LOG_CLIENT, "Detected federation config change, saving as pending config");
1034
1035 let mut dbtx = db.begin_transaction().await;
1036 dbtx.insert_entry(&PendingClientConfigKey, &fetched_config)
1037 .await;
1038 dbtx.commit_tx().await;
1039 } else {
1040 debug!(target: LOG_CLIENT, "No federation config changes detected");
1041 }
1042
1043 Ok(())
1044 }
1045}
1046
1047pub struct ClientPreview {
1048 inner: ClientBuilder,
1049 config: ClientConfig,
1050 api_secret: Option<String>,
1051}
1052
1053impl ClientPreview {
1054 pub fn config(&self) -> &ClientConfig {
1056 &self.config
1057 }
1058
1059 pub async fn join(self, pre_root_secret: RootSecret) -> anyhow::Result<ClientHandle> {
1136 let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1137
1138 let client = self
1139 .inner
1140 .init(
1141 pre_root_secret,
1142 self.config,
1143 self.api_secret,
1144 InitMode::Fresh,
1145 )
1146 .await?;
1147
1148 Ok(client)
1149 }
1150
1151 pub async fn recover(
1163 self,
1164 pre_root_secret: RootSecret,
1165 backup: Option<ClientBackup>,
1166 ) -> anyhow::Result<ClientHandle> {
1167 let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1168
1169 let client = self
1170 .inner
1171 .init(
1172 pre_root_secret,
1173 self.config,
1174 self.api_secret,
1175 InitMode::Recover {
1176 snapshot: backup.clone(),
1177 },
1178 )
1179 .await?;
1180
1181 Ok(client)
1182 }
1183
1184 pub async fn download_backup_from_federation(
1186 &self,
1187 pre_root_secret: RootSecret,
1188 ) -> anyhow::Result<Option<ClientBackup>> {
1189 let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1190 let api = DynGlobalApi::from_endpoints(
1191 self.config
1193 .global
1194 .api_endpoints
1195 .iter()
1196 .map(|(peer_id, peer_url)| (*peer_id, peer_url.url.clone())),
1197 &self.api_secret,
1198 )
1199 .await?;
1200
1201 Client::download_backup_from_federation_static(
1202 &api,
1203 &ClientBuilder::federation_root_secret(&pre_root_secret, &self.config),
1204 &self.inner.decoders(&self.config),
1205 )
1206 .await
1207 }
1208}