1use std::collections::BTreeMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5
6use anyhow::{Context as _, anyhow, bail, ensure};
7use bitcoin::key::Secp256k1;
8use fedimint_api_client::api::global_api::with_cache::GlobalFederationApiWithCacheExt as _;
9use fedimint_api_client::api::global_api::with_request_hook::{
10 ApiRequestHook, RawFederationApiWithRequestHookExt as _,
11};
12use fedimint_api_client::api::net::Connector;
13use fedimint_api_client::api::{ApiVersionSet, DynGlobalApi, ReconnectFederationApi};
14use fedimint_client_module::api::ClientRawFederationApiExt as _;
15use fedimint_client_module::meta::LegacyMetaSource;
16use fedimint_client_module::module::init::ClientModuleInit;
17use fedimint_client_module::module::recovery::RecoveryProgress;
18use fedimint_client_module::module::{ClientModuleRegistry, FinalClientIface};
19use fedimint_client_module::secret::{DeriveableSecretClientExt as _, get_default_client_secret};
20use fedimint_client_module::transaction::{
21 TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext, tx_submission_sm_decoder,
22};
23use fedimint_client_module::{AdminCreds, ModuleRecoveryStarted};
24use fedimint_core::config::{ClientConfig, FederationId, ModuleInitRegistry};
25use fedimint_core::core::{ModuleInstanceId, ModuleKind};
26use fedimint_core::db::{
27 Database, IDatabaseTransactionOpsCoreTyped as _, verify_module_db_integrity_dbtx,
28};
29use fedimint_core::envs::is_running_in_test_env;
30use fedimint_core::invite_code::InviteCode;
31use fedimint_core::module::ApiVersion;
32use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
33use fedimint_core::task::TaskGroup;
34use fedimint_core::util::FmtCompactAnyhow as _;
35use fedimint_core::{NumPeers, maybe_add_send};
36use fedimint_derive_secret::DerivableSecret;
37use fedimint_eventlog::{
38 DBTransactionEventLogExt as _, EventLogEntry, run_event_log_ordering_task,
39};
40use fedimint_logging::LOG_CLIENT;
41use tokio::sync::{broadcast, watch};
42use tracing::{debug, warn};
43
44use super::handle::ClientHandle;
45use super::{Client, client_decoders};
46use crate::api_announcements::{
47 get_api_urls, refresh_api_announcement_sync, run_api_announcement_sync,
48};
49use crate::backup::{ClientBackup, Metadata};
50use crate::db::{
51 self, ApiSecretKey, ClientInitStateKey, ClientMetadataKey, ClientModuleRecovery,
52 ClientModuleRecoveryState, ClientPreRootSecretHashKey, InitMode, InitState,
53 apply_migrations_client_module_dbtx,
54};
55use crate::meta::MetaService;
56use crate::module_init::ClientModuleInitRegistry;
57use crate::oplog::OperationLog;
58use crate::sm::executor::Executor;
59use crate::sm::notifier::Notifier;
60
61#[derive(Clone)]
83pub enum RootSecret {
84 StandardDoubleDerive(DerivableSecret),
89 Custom(DerivableSecret),
94}
95
96impl RootSecret {
97 fn to_inner(&self, federation_id: FederationId) -> DerivableSecret {
98 match self {
99 RootSecret::StandardDoubleDerive(derivable_secret) => {
100 get_default_client_secret(derivable_secret, &federation_id)
101 }
102 RootSecret::Custom(derivable_secret) => derivable_secret.clone(),
103 }
104 }
105}
106
107pub struct ClientBuilder {
109 module_inits: ClientModuleInitRegistry,
110 primary_module_instance: Option<ModuleInstanceId>,
111 primary_module_kind: Option<ModuleKind>,
112 admin_creds: Option<AdminCreds>,
113 db_no_decoders: Database,
114 meta_service: Arc<crate::meta::MetaService>,
115 connector: Connector,
116 stopped: bool,
117 log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
118 request_hook: ApiRequestHook,
119}
120
121impl ClientBuilder {
122 pub(crate) fn new(db: Database) -> Self {
123 let meta_service = MetaService::new(LegacyMetaSource::default());
124 let (log_event_added_transient_tx, _log_event_added_transient_rx) =
125 broadcast::channel(1024);
126 ClientBuilder {
127 module_inits: ModuleInitRegistry::new(),
128 primary_module_instance: None,
129 primary_module_kind: None,
130 connector: Connector::default(),
131 admin_creds: None,
132 db_no_decoders: db,
133 stopped: false,
134 meta_service,
135 log_event_added_transient_tx,
136 request_hook: Arc::new(|api| api),
137 }
138 }
139
140 pub(crate) fn from_existing(client: &Client) -> Self {
141 ClientBuilder {
142 module_inits: client.module_inits.clone(),
143 primary_module_instance: Some(client.primary_module_instance),
144 primary_module_kind: None,
145 admin_creds: None,
146 db_no_decoders: client.db.with_decoders(ModuleRegistry::default()),
147 stopped: false,
148 meta_service: client.meta_service.clone(),
150 connector: client.connector,
151 log_event_added_transient_tx: client.log_event_added_transient_tx.clone(),
152 request_hook: client.request_hook.clone(),
153 }
154 }
155
156 pub fn with_module_inits(&mut self, module_inits: ClientModuleInitRegistry) {
165 self.module_inits = module_inits;
166 }
167
168 pub fn with_module<M: ClientModuleInit>(&mut self, module_init: M) {
177 self.module_inits.attach(module_init);
178 }
179
180 pub fn stopped(&mut self) {
181 self.stopped = true;
182 }
183
184 pub fn with_api_request_hook(mut self, hook: ApiRequestHook) -> Self {
193 self.request_hook = hook;
194 self
195 }
196
197 #[deprecated(
204 since = "0.6.0",
205 note = "Use `with_primary_module_kind` instead, as the instance id can't be known upfront. If you *really* need the old behavior you can use `with_primary_module_instance_id`."
206 )]
207 pub fn with_primary_module(&mut self, primary_module_instance: ModuleInstanceId) {
208 self.with_primary_module_instance_id(primary_module_instance);
209 }
210
211 pub fn with_primary_module_instance_id(&mut self, primary_module_instance: ModuleInstanceId) {
226 let was_replaced = self
227 .primary_module_instance
228 .replace(primary_module_instance)
229 .is_some();
230 assert!(
231 !was_replaced,
232 "Only one primary module can be given to the builder."
233 );
234 }
235
236 pub fn with_primary_module_kind(&mut self, primary_module_kind: ModuleKind) {
243 let was_replaced = self
244 .primary_module_kind
245 .replace(primary_module_kind)
246 .is_some();
247 assert!(
248 !was_replaced,
249 "Only one primary module kind can be given to the builder."
250 );
251 }
252
253 pub fn with_meta_service(&mut self, meta_service: Arc<MetaService>) {
254 self.meta_service = meta_service;
255 }
256
257 async fn migrate_module_dbs(&self, db: &Database) -> anyhow::Result<()> {
264 if let Ok(client_config) = self.load_existing_config().await {
268 for (module_id, module_cfg) in client_config.modules {
269 let kind = module_cfg.kind.clone();
270 let Some(init) = self.module_inits.get(&kind) else {
271 continue;
273 };
274
275 let mut dbtx = db.begin_transaction().await;
276 apply_migrations_client_module_dbtx(
277 &mut dbtx.to_ref_nc(),
278 kind.to_string(),
279 init.get_database_migrations(),
280 module_id,
281 )
282 .await?;
283 if let Some(used_db_prefixes) = init.used_db_prefixes() {
284 if is_running_in_test_env() {
285 verify_module_db_integrity_dbtx(
286 &mut dbtx.to_ref_nc(),
287 module_id,
288 kind,
289 &used_db_prefixes,
290 )
291 .await;
292 }
293 }
294 dbtx.commit_tx_result().await?;
295 }
296 }
297
298 Ok(())
299 }
300
301 pub fn db_no_decoders(&self) -> &Database {
302 &self.db_no_decoders
303 }
304
305 pub async fn load_existing_config(&self) -> anyhow::Result<ClientConfig> {
306 let Some(config) = Client::get_config_from_db(&self.db_no_decoders).await else {
307 bail!("Client database not initialized")
308 };
309
310 Ok(config)
311 }
312
313 pub fn set_admin_creds(&mut self, creds: AdminCreds) {
314 self.admin_creds = Some(creds);
315 }
316
317 pub fn with_connector(&mut self, connector: Connector) {
318 self.connector = connector;
319 }
320
321 #[cfg(feature = "tor")]
322 pub fn with_tor_connector(&mut self) {
323 self.with_connector(Connector::tor());
324 }
325
326 async fn init(
327 self,
328 pre_root_secret: DerivableSecret,
329 config: ClientConfig,
330 api_secret: Option<String>,
331 init_mode: InitMode,
332 ) -> anyhow::Result<ClientHandle> {
333 if Client::is_initialized(&self.db_no_decoders).await {
334 bail!("Client database already initialized")
335 }
336
337 {
340 debug!(target: LOG_CLIENT, "Initializing client database");
341 let mut dbtx = self.db_no_decoders.begin_transaction().await;
342 dbtx.insert_new_entry(&crate::db::ClientConfigKey, &config)
344 .await;
345 dbtx.insert_entry(
346 &ClientPreRootSecretHashKey,
347 &pre_root_secret.derive_pre_root_secret_hash(),
348 )
349 .await;
350
351 if let Some(api_secret) = api_secret.as_ref() {
352 dbtx.insert_new_entry(&ApiSecretKey, api_secret).await;
353 }
354
355 let init_state = InitState::Pending(init_mode);
356 dbtx.insert_entry(&ClientInitStateKey, &init_state).await;
357
358 let metadata = init_state
359 .does_require_recovery()
360 .flatten()
361 .map_or(Metadata::empty(), |s| s.metadata);
362
363 dbtx.insert_new_entry(&ClientMetadataKey, &metadata).await;
364
365 dbtx.commit_tx_result().await?;
366 }
367
368 let stopped = self.stopped;
369 self.build(pre_root_secret, config, api_secret, stopped)
370 .await
371 }
372
373 pub async fn preview(self, invite_code: &InviteCode) -> anyhow::Result<ClientPreview> {
374 let config = self
375 .connector
376 .download_from_invite_code(invite_code)
377 .await?;
378
379 if let Some(guardian_pub_keys) = config.global.broadcast_public_keys.clone() {
380 let api = DynGlobalApi::from_endpoints(invite_code.peers(), &invite_code.api_secret())
384 .await?;
385 refresh_api_announcement_sync(&api, self.db_no_decoders(), &guardian_pub_keys).await?;
386 }
387
388 Ok(ClientPreview {
389 inner: self,
390 config,
391 api_secret: invite_code.api_secret(),
392 })
393 }
394
395 pub async fn preview_with_existing_config(
397 self,
398 config: ClientConfig,
399 api_secret: Option<String>,
400 ) -> anyhow::Result<ClientPreview> {
401 Ok(ClientPreview {
402 inner: self,
403 config,
404 api_secret,
405 })
406 }
407
408 async fn download_backup_from_federation(
410 &self,
411 pre_root_secret: DerivableSecret,
412 config: &ClientConfig,
413 api_secret: Option<String>,
414 ) -> anyhow::Result<Option<ClientBackup>> {
415 let api = DynGlobalApi::from_endpoints(
416 config
418 .global
419 .api_endpoints
420 .iter()
421 .map(|(peer_id, peer_url)| (*peer_id, peer_url.url.clone())),
422 &api_secret,
423 )
424 .await?;
425
426 Client::download_backup_from_federation_static(
427 &api,
428 &Self::federation_root_secret(&pre_root_secret, config),
429 &self.decoders(config),
430 )
431 .await
432 }
433 pub async fn open(self, pre_root_secret: RootSecret) -> anyhow::Result<ClientHandle> {
434 let Some(config) = Client::get_config_from_db(&self.db_no_decoders).await else {
435 bail!("Client database not initialized")
436 };
437
438 let pre_root_secret = pre_root_secret.to_inner(config.calculate_federation_id());
439
440 match self
441 .db_no_decoders()
442 .begin_transaction_nc()
443 .await
444 .get_value(&ClientPreRootSecretHashKey)
445 .await
446 {
447 Some(secret_hash) => {
448 ensure!(
449 pre_root_secret.derive_pre_root_secret_hash() == secret_hash,
450 "Secret hash does not match. Incorrect secret"
451 );
452 }
453 _ => {
454 debug!(target: LOG_CLIENT, "Backfilling secret hash");
455 let mut dbtx = self.db_no_decoders.begin_transaction().await;
457 dbtx.insert_entry(
458 &ClientPreRootSecretHashKey,
459 &pre_root_secret.derive_pre_root_secret_hash(),
460 )
461 .await;
462 dbtx.commit_tx().await;
463 }
464 }
465
466 let api_secret = Client::get_api_secret_from_db(&self.db_no_decoders).await;
467 let stopped = self.stopped;
468 let request_hook = self.request_hook.clone();
469
470 let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
471 let client = self
472 .build_stopped(
473 pre_root_secret,
474 &config,
475 api_secret,
476 log_event_added_transient_tx,
477 request_hook,
478 )
479 .await?;
480 if !stopped {
481 client.as_inner().start_executor();
482 }
483 Ok(client)
484 }
485
486 pub(crate) async fn build(
488 self,
489 pre_root_secret: DerivableSecret,
490 config: ClientConfig,
491 api_secret: Option<String>,
492 stopped: bool,
493 ) -> anyhow::Result<ClientHandle> {
494 let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
495 let request_hook = self.request_hook.clone();
496 let client = self
497 .build_stopped(
498 pre_root_secret,
499 &config,
500 api_secret,
501 log_event_added_transient_tx,
502 request_hook,
503 )
504 .await?;
505 if !stopped {
506 client.as_inner().start_executor();
507 }
508
509 Ok(client)
510 }
511
512 async fn build_stopped(
515 self,
516 pre_root_secret: DerivableSecret,
517 config: &ClientConfig,
518 api_secret: Option<String>,
519 log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
520 request_hook: ApiRequestHook,
521 ) -> anyhow::Result<ClientHandle> {
522 let (log_event_added_tx, log_event_added_rx) = watch::channel(());
523 let (log_ordering_wakeup_tx, log_ordering_wakeup_rx) = watch::channel(());
524
525 let decoders = self.decoders(config);
526 let config = Self::config_decoded(config, &decoders)?;
527 let fed_id = config.calculate_federation_id();
528 let db = self.db_no_decoders.with_decoders(decoders.clone());
529 let connector = self.connector;
530 let peer_urls = get_api_urls(&db, &config).await;
531 let api = match self.admin_creds.as_ref() {
532 Some(admin_creds) => ReconnectFederationApi::new_admin(
533 admin_creds.peer_id,
534 peer_urls
535 .into_iter()
536 .find_map(|(peer, api_url)| (admin_creds.peer_id == peer).then_some(api_url))
537 .context("Admin creds should match a peer")?,
538 &api_secret,
539 )
540 .await?
541 .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
542 .with_request_hook(&request_hook)
543 .with_cache()
544 .into(),
545 None => ReconnectFederationApi::from_endpoints(peer_urls, &api_secret, None)
546 .await?
547 .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
548 .with_request_hook(&request_hook)
549 .with_cache()
550 .into(),
551 };
552 let task_group = TaskGroup::new();
553
554 self.migrate_module_dbs(&db).await?;
557
558 let init_state = Self::load_init_state(&db).await;
559
560 let mut primary_module_instance = self.primary_module_instance.or_else(|| {
561 let primary_module_kind = self.primary_module_kind?;
562 config
563 .modules
564 .iter()
565 .find_map(|(module_instance_id, module_config)| {
566 (module_config.kind() == &primary_module_kind).then_some(*module_instance_id)
567 })
568 });
569
570 let notifier = Notifier::new();
571
572 let common_api_versions = Client::load_and_refresh_common_api_version_static(
573 &config,
574 &self.module_inits,
575 &api,
576 &db,
577 &task_group,
578 )
579 .await
580 .inspect_err(|err| {
581 warn!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Failed to discover initial API version to use.");
582 })
583 .unwrap_or(ApiVersionSet {
584 core: ApiVersion::new(0, 0),
585 modules: BTreeMap::new(),
587 });
588
589 debug!(target: LOG_CLIENT, ?common_api_versions, "Completed api version negotiation");
590
591 let mut module_recoveries: BTreeMap<
592 ModuleInstanceId,
593 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
594 > = BTreeMap::new();
595 let mut module_recovery_progress_receivers: BTreeMap<
596 ModuleInstanceId,
597 watch::Receiver<RecoveryProgress>,
598 > = BTreeMap::new();
599
600 let final_client = FinalClientIface::default();
601
602 let root_secret = Self::federation_root_secret(&pre_root_secret, &config);
603
604 let modules = {
605 let mut modules = ClientModuleRegistry::default();
606 for (module_instance_id, module_config) in config.modules.clone() {
607 let kind = module_config.kind().clone();
608 let Some(module_init) = self.module_inits.get(&kind).cloned() else {
609 debug!(
610 target: LOG_CLIENT,
611 kind=%kind,
612 instance_id=%module_instance_id,
613 "Module kind of instance not found in module gens, skipping");
614 continue;
615 };
616
617 let Some(&api_version) = common_api_versions.modules.get(&module_instance_id)
618 else {
619 warn!(
620 target: LOG_CLIENT,
621 kind=%kind,
622 instance_id=%module_instance_id,
623 "Module kind of instance has incompatible api version, skipping"
624 );
625 continue;
626 };
627
628 let start_module_recover_fn =
631 |snapshot: Option<ClientBackup>, progress: RecoveryProgress| {
632 let module_config = module_config.clone();
633 let num_peers = NumPeers::from(config.global.api_endpoints.len());
634 let db = db.clone();
635 let kind = kind.clone();
636 let notifier = notifier.clone();
637 let api = api.clone();
638 let root_secret = root_secret.clone();
639 let admin_auth = self.admin_creds.as_ref().map(|creds| creds.auth.clone());
640 let final_client = final_client.clone();
641 let (progress_tx, progress_rx) = tokio::sync::watch::channel(progress);
642 let task_group = task_group.clone();
643 let module_init = module_init.clone();
644 (
645 Box::pin(async move {
646 module_init
647 .recover(
648 final_client.clone(),
649 fed_id,
650 num_peers,
651 module_config.clone(),
652 db.clone(),
653 module_instance_id,
654 common_api_versions.core,
655 api_version,
656 root_secret.derive_module_secret(module_instance_id),
657 notifier.clone(),
658 api.clone(),
659 admin_auth,
660 snapshot.as_ref().and_then(|s| s.modules.get(&module_instance_id)),
661 progress_tx,
662 task_group,
663 )
664 .await
665 .inspect_err(|err| {
666 warn!(
667 target: LOG_CLIENT,
668 module_id = module_instance_id, %kind, err = %err.fmt_compact_anyhow(), "Module failed to recover"
669 );
670 })
671 }),
672 progress_rx,
673 )
674 };
675
676 let recovery = match init_state.does_require_recovery() {
677 Some(snapshot) => {
678 match db
679 .begin_transaction_nc()
680 .await
681 .get_value(&ClientModuleRecovery { module_instance_id })
682 .await
683 {
684 Some(module_recovery_state) => {
685 if module_recovery_state.is_done() {
686 debug!(
687 id = %module_instance_id,
688 %kind, "Module recovery already complete"
689 );
690 None
691 } else {
692 debug!(
693 id = %module_instance_id,
694 %kind,
695 progress = %module_recovery_state.progress,
696 "Starting module recovery with an existing progress"
697 );
698 Some(start_module_recover_fn(
699 snapshot,
700 module_recovery_state.progress,
701 ))
702 }
703 }
704 _ => {
705 let progress = RecoveryProgress::none();
706 let mut dbtx = db.begin_transaction().await;
707 dbtx.log_event(
708 log_ordering_wakeup_tx.clone(),
709 None,
710 ModuleRecoveryStarted::new(module_instance_id),
711 )
712 .await;
713 dbtx.insert_entry(
714 &ClientModuleRecovery { module_instance_id },
715 &ClientModuleRecoveryState { progress },
716 )
717 .await;
718
719 dbtx.commit_tx().await;
720
721 debug!(
722 id = %module_instance_id,
723 %kind, "Starting new module recovery"
724 );
725 Some(start_module_recover_fn(snapshot, progress))
726 }
727 }
728 }
729 _ => None,
730 };
731
732 match recovery {
733 Some((recovery, recovery_progress_rx)) => {
734 module_recoveries.insert(module_instance_id, recovery);
735 module_recovery_progress_receivers
736 .insert(module_instance_id, recovery_progress_rx);
737 }
738 _ => {
739 let module = module_init
740 .init(
741 final_client.clone(),
742 fed_id,
743 config.global.api_endpoints.len(),
744 module_config,
745 db.clone(),
746 module_instance_id,
747 common_api_versions.core,
748 api_version,
749 root_secret.derive_module_secret(module_instance_id),
756 notifier.clone(),
757 api.clone(),
758 self.admin_creds.as_ref().map(|cred| cred.auth.clone()),
759 task_group.clone(),
760 )
761 .await?;
762
763 if primary_module_instance.is_none() && module.supports_being_primary() {
764 primary_module_instance = Some(module_instance_id);
765 } else if primary_module_instance == Some(module_instance_id)
766 && !module.supports_being_primary()
767 {
768 bail!(
769 "Module instance {module_instance_id} of kind {kind} does not support being a primary module"
770 );
771 }
772
773 modules.register_module(module_instance_id, kind, module);
774 }
775 }
776 }
777 modules
778 };
779
780 if init_state.is_pending() && module_recoveries.is_empty() {
781 let mut dbtx = db.begin_transaction().await;
782 dbtx.insert_entry(&ClientInitStateKey, &init_state.into_complete())
783 .await;
784 dbtx.commit_tx().await;
785 }
786
787 let executor = {
788 let mut executor_builder = Executor::builder();
789 executor_builder
790 .with_module(TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext);
791
792 for (module_instance_id, _, module) in modules.iter_modules() {
793 executor_builder.with_module_dyn(module.context(module_instance_id));
794 }
795
796 for module_instance_id in module_recoveries.keys() {
797 executor_builder.with_valid_module_id(*module_instance_id);
798 }
799
800 executor_builder.build(db.clone(), notifier, task_group.clone())
801 };
802
803 let recovery_receiver_init_val = module_recovery_progress_receivers
804 .iter()
805 .map(|(module_instance_id, rx)| (*module_instance_id, *rx.borrow()))
806 .collect::<BTreeMap<_, _>>();
807 let (client_recovery_progress_sender, client_recovery_progress_receiver) =
808 watch::channel(recovery_receiver_init_val);
809
810 let client_inner = Arc::new(Client {
811 final_client: final_client.clone(),
812 config: tokio::sync::RwLock::new(config.clone()),
813 api_secret,
814 decoders,
815 db: db.clone(),
816 federation_id: fed_id,
817 federation_config_meta: config.global.meta,
818 primary_module_instance: primary_module_instance
819 .ok_or(anyhow!("No primary module set or found"))?,
820 modules,
821 module_inits: self.module_inits.clone(),
822 log_ordering_wakeup_tx,
823 log_event_added_rx,
824 log_event_added_transient_tx: log_event_added_transient_tx.clone(),
825 request_hook,
826 executor,
827 api,
828 secp_ctx: Secp256k1::new(),
829 root_secret,
830 task_group,
831 operation_log: OperationLog::new(db.clone()),
832 client_recovery_progress_receiver,
833 meta_service: self.meta_service,
834 connector,
835 });
836 client_inner
837 .task_group
838 .spawn_cancellable("MetaService::update_continuously", {
839 let client_inner = client_inner.clone();
840 async move {
841 client_inner
842 .meta_service
843 .update_continuously(&client_inner)
844 .await;
845 }
846 });
847
848 client_inner.task_group.spawn_cancellable(
849 "update-api-announcements",
850 run_api_announcement_sync(client_inner.clone()),
851 );
852
853 client_inner.task_group.spawn_cancellable(
854 "event log ordering task",
855 run_event_log_ordering_task(
856 db.clone(),
857 log_ordering_wakeup_rx,
858 log_event_added_tx,
859 log_event_added_transient_tx,
860 ),
861 );
862 let client_iface = std::sync::Arc::<Client>::downgrade(&client_inner);
863
864 let client_arc = ClientHandle::new(client_inner);
865
866 for (_, _, module) in client_arc.modules.iter_modules() {
867 module.start().await;
868 }
869
870 final_client.set(client_iface.clone());
871
872 if !module_recoveries.is_empty() {
873 client_arc.spawn_module_recoveries_task(
874 client_recovery_progress_sender,
875 module_recoveries,
876 module_recovery_progress_receivers,
877 );
878 }
879
880 Ok(client_arc)
881 }
882
883 async fn load_init_state(db: &Database) -> InitState {
884 let mut dbtx = db.begin_transaction_nc().await;
885 dbtx.get_value(&ClientInitStateKey)
886 .await
887 .unwrap_or_else(|| {
888 warn!(
891 target: LOG_CLIENT,
892 "Client missing ClientRequiresRecovery: assuming complete"
893 );
894 db::InitState::Complete(db::InitModeComplete::Fresh)
895 })
896 }
897
898 fn decoders(&self, config: &ClientConfig) -> ModuleDecoderRegistry {
899 let mut decoders = client_decoders(
900 &self.module_inits,
901 config
902 .modules
903 .iter()
904 .map(|(module_instance, module_config)| (*module_instance, module_config.kind())),
905 );
906
907 decoders.register_module(
908 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
909 ModuleKind::from_static_str("tx_submission"),
910 tx_submission_sm_decoder(),
911 );
912
913 decoders
914 }
915
916 fn config_decoded(
917 config: &ClientConfig,
918 decoders: &ModuleDecoderRegistry,
919 ) -> Result<ClientConfig, fedimint_core::encoding::DecodeError> {
920 config.clone().redecode_raw(decoders)
921 }
922
923 fn federation_root_secret(
927 pre_root_secret: &DerivableSecret,
928 config: &ClientConfig,
929 ) -> DerivableSecret {
930 pre_root_secret.federation_key(&config.global.calculate_federation_id())
931 }
932
933 pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
935 self.log_event_added_transient_tx.subscribe()
936 }
937}
938
939pub struct ClientPreview {
940 inner: ClientBuilder,
941 config: ClientConfig,
942 api_secret: Option<String>,
943}
944
945impl ClientPreview {
946 pub fn config(&self) -> &ClientConfig {
948 &self.config
949 }
950
951 pub async fn join(self, pre_root_secret: RootSecret) -> anyhow::Result<ClientHandle> {
1028 let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1029
1030 let client = self
1031 .inner
1032 .init(
1033 pre_root_secret,
1034 self.config,
1035 self.api_secret,
1036 InitMode::Fresh,
1037 )
1038 .await?;
1039
1040 Ok(client)
1041 }
1042
1043 pub async fn recover(
1055 self,
1056 pre_root_secret: RootSecret,
1057 custom_backup: Option<ClientBackup>,
1058 ) -> anyhow::Result<ClientHandle> {
1059 let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1060
1061 let backup = if let Some(backup) = custom_backup {
1062 Some(backup)
1063 } else {
1064 self.inner
1065 .download_backup_from_federation(
1066 pre_root_secret.clone(),
1067 &self.config,
1068 self.api_secret.clone(),
1069 )
1070 .await?
1071 };
1072
1073 let client = self
1074 .inner
1075 .init(
1076 pre_root_secret,
1077 self.config,
1078 self.api_secret,
1079 InitMode::Recover {
1080 snapshot: backup.clone(),
1081 },
1082 )
1083 .await?;
1084
1085 Ok(client)
1086 }
1087}