fedimint_client/client/
builder.rs

1use std::collections::BTreeMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5
6use anyhow::{Context as _, anyhow, bail, ensure};
7use bitcoin::key::Secp256k1;
8use fedimint_api_client::api::global_api::with_cache::GlobalFederationApiWithCacheExt as _;
9use fedimint_api_client::api::global_api::with_request_hook::{
10    ApiRequestHook, RawFederationApiWithRequestHookExt as _,
11};
12use fedimint_api_client::api::net::Connector;
13use fedimint_api_client::api::{ApiVersionSet, DynGlobalApi, ReconnectFederationApi};
14use fedimint_client_module::api::ClientRawFederationApiExt as _;
15use fedimint_client_module::meta::LegacyMetaSource;
16use fedimint_client_module::module::init::ClientModuleInit;
17use fedimint_client_module::module::recovery::RecoveryProgress;
18use fedimint_client_module::module::{ClientModuleRegistry, FinalClientIface};
19use fedimint_client_module::secret::DeriveableSecretClientExt as _;
20use fedimint_client_module::transaction::{
21    TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext, tx_submission_sm_decoder,
22};
23use fedimint_client_module::{AdminCreds, ModuleRecoveryStarted};
24use fedimint_core::config::{ClientConfig, ModuleInitRegistry};
25use fedimint_core::core::{ModuleInstanceId, ModuleKind};
26use fedimint_core::db::{
27    Database, IDatabaseTransactionOpsCoreTyped as _, verify_module_db_integrity_dbtx,
28};
29use fedimint_core::envs::is_running_in_test_env;
30use fedimint_core::module::ApiVersion;
31use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
32use fedimint_core::task::TaskGroup;
33use fedimint_core::util::FmtCompactAnyhow as _;
34use fedimint_core::{NumPeers, maybe_add_send};
35use fedimint_derive_secret::DerivableSecret;
36use fedimint_eventlog::{
37    DBTransactionEventLogExt as _, EventLogEntry, run_event_log_ordering_task,
38};
39use fedimint_logging::LOG_CLIENT;
40use tokio::sync::{broadcast, watch};
41use tracing::{debug, warn};
42
43use super::handle::ClientHandle;
44use super::{Client, client_decoders};
45use crate::api_announcements::{get_api_urls, run_api_announcement_sync};
46use crate::backup::{ClientBackup, Metadata};
47use crate::db::{
48    self, ApiSecretKey, ClientInitStateKey, ClientMetadataKey, ClientModuleRecovery,
49    ClientModuleRecoveryState, ClientPreRootSecretHashKey, InitMode, InitState,
50    apply_migrations_client_module_dbtx,
51};
52use crate::meta::MetaService;
53use crate::module_init::ClientModuleInitRegistry;
54use crate::oplog::OperationLog;
55use crate::sm::executor::Executor;
56use crate::sm::notifier::Notifier;
57
58/// Used to configure, assemble and build [`Client`]
59pub struct ClientBuilder {
60    module_inits: ClientModuleInitRegistry,
61    primary_module_instance: Option<ModuleInstanceId>,
62    primary_module_kind: Option<ModuleKind>,
63    admin_creds: Option<AdminCreds>,
64    db_no_decoders: Database,
65    meta_service: Arc<crate::meta::MetaService>,
66    connector: Connector,
67    stopped: bool,
68    log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
69    request_hook: ApiRequestHook,
70}
71
72impl ClientBuilder {
73    pub(crate) fn new(db: Database) -> Self {
74        let meta_service = MetaService::new(LegacyMetaSource::default());
75        let (log_event_added_transient_tx, _log_event_added_transient_rx) =
76            broadcast::channel(1024);
77        ClientBuilder {
78            module_inits: ModuleInitRegistry::new(),
79            primary_module_instance: None,
80            primary_module_kind: None,
81            connector: Connector::default(),
82            admin_creds: None,
83            db_no_decoders: db,
84            stopped: false,
85            meta_service,
86            log_event_added_transient_tx,
87            request_hook: Arc::new(|api| api),
88        }
89    }
90
91    pub(crate) fn from_existing(client: &Client) -> Self {
92        ClientBuilder {
93            module_inits: client.module_inits.clone(),
94            primary_module_instance: Some(client.primary_module_instance),
95            primary_module_kind: None,
96            admin_creds: None,
97            db_no_decoders: client.db.with_decoders(ModuleRegistry::default()),
98            stopped: false,
99            // non unique
100            meta_service: client.meta_service.clone(),
101            connector: client.connector,
102            log_event_added_transient_tx: client.log_event_added_transient_tx.clone(),
103            request_hook: client.request_hook.clone(),
104        }
105    }
106
107    /// Replace module generator registry entirely
108    ///
109    /// There has to be at least one module supporting being primary among the
110    /// registered modules. The client won't start without the federation and
111    /// the client having at least one overlapping primary module. In case there
112    /// are multiple, the one to use can be selected with
113    /// [`ClientBuilder::with_primary_module_kind`] or
114    /// [`ClientBuilder::with_primary_module_instance_id`].
115    pub fn with_module_inits(&mut self, module_inits: ClientModuleInitRegistry) {
116        self.module_inits = module_inits;
117    }
118
119    /// Make module generator available when reading the config
120    ///
121    /// There has to be at least one module supporting being primary among the
122    /// registered modules. The client won't start without the federation and
123    /// the client having at least one overlapping primary module. In case there
124    /// are multiple, the one to use can be selected with
125    /// [`ClientBuilder::with_primary_module_kind`] or
126    /// [`ClientBuilder::with_primary_module_instance_id`].
127    pub fn with_module<M: ClientModuleInit>(&mut self, module_init: M) {
128        self.module_inits.attach(module_init);
129    }
130
131    pub fn stopped(&mut self) {
132        self.stopped = true;
133    }
134
135    /// Build the [`Client`] with a custom wrapper around its api request logic
136    ///
137    /// This is intended to be used by downstream applications, e.g. to:
138    ///
139    /// * simulate offline mode,
140    /// * save battery when the OS indicates lack of connectivity,
141    /// * inject faults and delays for testing purposes,
142    /// * collect statistics and emit notifications.
143    pub fn with_api_request_hook(mut self, hook: ApiRequestHook) -> Self {
144        self.request_hook = hook;
145        self
146    }
147
148    /// Uses this module with the given instance id as the primary module. See
149    /// [`fedimint_client_module::ClientModule::supports_being_primary`] for
150    /// more information.
151    ///
152    /// ## Panics
153    /// If there was a primary module specified previously
154    #[deprecated(
155        since = "0.6.0",
156        note = "Use `with_primary_module_kind` instead, as the instance id can't be known upfront. If you *really* need the old behavior you can use `with_primary_module_instance_id`."
157    )]
158    pub fn with_primary_module(&mut self, primary_module_instance: ModuleInstanceId) {
159        self.with_primary_module_instance_id(primary_module_instance);
160    }
161
162    /// **You are likely looking for
163    /// [`ClientBuilder::with_primary_module_kind`]. This function is rarely
164    /// useful and often dangerous, handle with care.**
165    ///
166    /// Uses this module with the given instance id as the primary module. See
167    /// [`fedimint_client_module::ClientModule::supports_being_primary`] for
168    /// more information. Since the module instance id of modules of a
169    /// specific kind may differ between different federations it is
170    /// generally not recommended to specify it, but rather to specify the
171    /// module kind that should be used as primary. See
172    /// [`ClientBuilder::with_primary_module_kind`].
173    ///
174    /// ## Panics
175    /// If there was a primary module specified previously
176    pub fn with_primary_module_instance_id(&mut self, primary_module_instance: ModuleInstanceId) {
177        let was_replaced = self
178            .primary_module_instance
179            .replace(primary_module_instance)
180            .is_some();
181        assert!(
182            !was_replaced,
183            "Only one primary module can be given to the builder."
184        );
185    }
186
187    /// Uses this module kind as the primary module if present in the config.
188    /// See [`fedimint_client_module::ClientModule::supports_being_primary`] for
189    /// more information.
190    ///
191    /// ## Panics
192    /// If there was a primary module kind specified previously
193    pub fn with_primary_module_kind(&mut self, primary_module_kind: ModuleKind) {
194        let was_replaced = self
195            .primary_module_kind
196            .replace(primary_module_kind)
197            .is_some();
198        assert!(
199            !was_replaced,
200            "Only one primary module kind can be given to the builder."
201        );
202    }
203
204    pub fn with_meta_service(&mut self, meta_service: Arc<MetaService>) {
205        self.meta_service = meta_service;
206    }
207
208    /// Migrate client module databases
209    ///
210    /// Note: Client core db migration are done immediately in
211    /// [`Client::builder`], to ensure db matches the code at all times,
212    /// while migrating modules requires figuring out what modules actually
213    /// are first.
214    async fn migrate_module_dbs(&self, db: &Database) -> anyhow::Result<()> {
215        // Only apply the client database migrations if the database has been
216        // initialized.
217        // This only works as long as you don't change the client config
218        if let Ok(client_config) = self.load_existing_config().await {
219            for (module_id, module_cfg) in client_config.modules {
220                let kind = module_cfg.kind.clone();
221                let Some(init) = self.module_inits.get(&kind) else {
222                    // normal, expected and already logged about when building the client
223                    continue;
224                };
225
226                let mut dbtx = db.begin_transaction().await;
227                apply_migrations_client_module_dbtx(
228                    &mut dbtx.to_ref_nc(),
229                    kind.to_string(),
230                    init.get_database_migrations(),
231                    module_id,
232                )
233                .await?;
234                if let Some(used_db_prefixes) = init.used_db_prefixes() {
235                    if is_running_in_test_env() {
236                        verify_module_db_integrity_dbtx(
237                            &mut dbtx.to_ref_nc(),
238                            module_id,
239                            kind,
240                            &used_db_prefixes,
241                        )
242                        .await;
243                    }
244                }
245                dbtx.commit_tx_result().await?;
246            }
247        }
248
249        Ok(())
250    }
251
252    pub fn db_no_decoders(&self) -> &Database {
253        &self.db_no_decoders
254    }
255
256    pub async fn load_existing_config(&self) -> anyhow::Result<ClientConfig> {
257        let Some(config) = Client::get_config_from_db(&self.db_no_decoders).await else {
258            bail!("Client database not initialized")
259        };
260
261        Ok(config)
262    }
263
264    pub fn set_admin_creds(&mut self, creds: AdminCreds) {
265        self.admin_creds = Some(creds);
266    }
267
268    pub fn with_connector(&mut self, connector: Connector) {
269        self.connector = connector;
270    }
271
272    #[cfg(feature = "tor")]
273    pub fn with_tor_connector(&mut self) {
274        self.with_connector(Connector::tor());
275    }
276
277    async fn init(
278        self,
279        pre_root_secret: DerivableSecret,
280        config: ClientConfig,
281        api_secret: Option<String>,
282        init_mode: InitMode,
283    ) -> anyhow::Result<ClientHandle> {
284        if Client::is_initialized(&self.db_no_decoders).await {
285            bail!("Client database already initialized")
286        }
287
288        // Note: It's important all client initialization is performed as one big
289        // transaction to avoid half-initialized client state.
290        {
291            debug!(target: LOG_CLIENT, "Initializing client database");
292            let mut dbtx = self.db_no_decoders.begin_transaction().await;
293            // Save config to DB
294            dbtx.insert_new_entry(&crate::db::ClientConfigKey, &config)
295                .await;
296            dbtx.insert_entry(
297                &ClientPreRootSecretHashKey,
298                &pre_root_secret.derive_pre_root_secret_hash(),
299            )
300            .await;
301
302            if let Some(api_secret) = api_secret.as_ref() {
303                dbtx.insert_new_entry(&ApiSecretKey, api_secret).await;
304            }
305
306            let init_state = InitState::Pending(init_mode);
307            dbtx.insert_entry(&ClientInitStateKey, &init_state).await;
308
309            let metadata = init_state
310                .does_require_recovery()
311                .flatten()
312                .map_or(Metadata::empty(), |s| s.metadata);
313
314            dbtx.insert_new_entry(&ClientMetadataKey, &metadata).await;
315
316            dbtx.commit_tx_result().await?;
317        }
318
319        let stopped = self.stopped;
320        self.build(pre_root_secret, config, api_secret, stopped)
321            .await
322    }
323
324    /// Join a new Federation
325    ///
326    /// When a user wants to connect to a new federation this function fetches
327    /// the federation config and initializes the client database. If a user
328    /// already joined the federation in the past and has a preexisting database
329    /// use [`ClientBuilder::open`] instead.
330    ///
331    /// **Warning**: Calling `join` with a `root_secret` key that was used
332    /// previous to `join` a Federation will lead to all sorts of malfunctions
333    /// including likely loss of funds.
334    ///
335    /// This should be generally called only if the `root_secret` key is known
336    /// not to have been used before (e.g. just randomly generated). For keys
337    /// that might have been previous used (e.g. provided by the user),
338    /// it's safer to call [`Self::recover`] which will attempt to recover
339    /// client module states for the Federation.
340    ///
341    /// A typical "join federation" flow would look as follows:
342    /// ```no_run
343    /// # use std::str::FromStr;
344    /// # use fedimint_core::invite_code::InviteCode;
345    /// # use fedimint_core::config::ClientConfig;
346    /// # use fedimint_derive_secret::DerivableSecret;
347    /// # use fedimint_client::{Client, ClientBuilder};
348    /// # use fedimint_core::db::Database;
349    /// # use fedimint_core::config::META_FEDERATION_NAME_KEY;
350    /// #
351    /// # #[tokio::main]
352    /// # async fn main() {
353    /// # let root_secret: DerivableSecret = unimplemented!();
354    /// // Create a root secret, e.g. via fedimint-bip39, see also:
355    /// // https://github.com/fedimint/fedimint/blob/master/docs/secret_derivation.md
356    /// // let root_secret = …;
357    ///
358    /// // Get invite code from user
359    /// let invite_code = InviteCode::from_str("fed11qgqpw9thwvaz7te3xgmjuvpwxqhrzw3jxumrvvf0qqqjpetvlg8glnpvzcufhffgzhv8m75f7y34ryk7suamh8x7zetly8h0v9v0rm")
360    ///     .expect("Invalid invite code");
361    /// let config = fedimint_api_client::api::net::Connector::default().download_from_invite_code(&invite_code).await
362    ///     .expect("Error downloading config");
363    ///
364    /// // Tell the user the federation name, bitcoin network
365    /// // (e.g. from wallet module config), and other details
366    /// // that are typically contained in the federation's
367    /// // meta fields.
368    ///
369    /// // let network = config.get_first_module_by_kind::<WalletClientConfig>("wallet")
370    /// //     .expect("Module not found")
371    /// //     .network;
372    ///
373    /// println!(
374    ///     "The federation name is: {}",
375    ///     config.meta::<String>(META_FEDERATION_NAME_KEY)
376    ///         .expect("Could not decode name field")
377    ///         .expect("Name isn't set")
378    /// );
379    ///
380    /// // Open the client's database, using the federation ID
381    /// // as the DB name is a common pattern:
382    ///
383    /// // let db_path = format!("./path/to/db/{}", config.federation_id());
384    /// // let db = RocksDb::open(db_path).expect("error opening DB");
385    /// # let db: Database = unimplemented!();
386    ///
387    /// let client = Client::builder(db).await.expect("Error building client")
388    ///     // Mount the modules the client should support:
389    ///     // .with_module(LightningClientInit)
390    ///     // .with_module(MintClientInit)
391    ///     // .with_module(WalletClientInit::default())
392    ///     .join(root_secret, config, None)
393    ///     .await
394    ///     .expect("Error joining federation");
395    /// # }
396    /// ```
397    pub async fn join(
398        self,
399        pre_root_secret: DerivableSecret,
400        config: ClientConfig,
401        api_secret: Option<String>,
402    ) -> anyhow::Result<ClientHandle> {
403        self.init(pre_root_secret, config, api_secret, InitMode::Fresh)
404            .await
405    }
406
407    /// Download most recent valid backup found from the Federation
408    pub async fn download_backup_from_federation(
409        &self,
410        root_secret: &DerivableSecret,
411        config: &ClientConfig,
412        api_secret: Option<String>,
413    ) -> anyhow::Result<Option<ClientBackup>> {
414        let api = DynGlobalApi::from_endpoints(
415            // TODO: change join logic to use FederationId v2
416            config
417                .global
418                .api_endpoints
419                .iter()
420                .map(|(peer_id, peer_url)| (*peer_id, peer_url.url.clone())),
421            &api_secret,
422        )
423        .await?;
424
425        Client::download_backup_from_federation_static(
426            &api,
427            &Self::federation_root_secret(root_secret, config),
428            &self.decoders(config),
429        )
430        .await
431    }
432
433    /// Join a (possibly) previous joined Federation
434    ///
435    /// Unlike [`Self::join`], `recover` will run client module recovery for
436    /// each client module attempting to recover any previous module state.
437    ///
438    /// Recovery process takes time during which each recovering client module
439    /// will not be available for use.
440    ///
441    /// Calling `recovery` with a `root_secret` that was not actually previous
442    /// used in a given Federation is safe.
443    pub async fn recover(
444        self,
445        root_secret: DerivableSecret,
446        config: ClientConfig,
447        api_secret: Option<String>,
448        backup: Option<ClientBackup>,
449    ) -> anyhow::Result<ClientHandle> {
450        let client = self
451            .init(
452                root_secret,
453                config,
454                api_secret,
455                InitMode::Recover {
456                    snapshot: backup.clone(),
457                },
458            )
459            .await?;
460
461        Ok(client)
462    }
463
464    pub async fn open(self, pre_root_secret: DerivableSecret) -> anyhow::Result<ClientHandle> {
465        let Some(config) = Client::get_config_from_db(&self.db_no_decoders).await else {
466            bail!("Client database not initialized")
467        };
468
469        match self
470            .db_no_decoders()
471            .begin_transaction_nc()
472            .await
473            .get_value(&ClientPreRootSecretHashKey)
474            .await
475        {
476            Some(secret_hash) => {
477                ensure!(
478                    pre_root_secret.derive_pre_root_secret_hash() == secret_hash,
479                    "Secret hash does not match. Incorrect secret"
480                );
481            }
482            _ => {
483                debug!(target: LOG_CLIENT, "Backfilling secret hash");
484                // Note: no need for dbtx autocommit, we are the only writer ATM
485                let mut dbtx = self.db_no_decoders.begin_transaction().await;
486                dbtx.insert_entry(
487                    &ClientPreRootSecretHashKey,
488                    &pre_root_secret.derive_pre_root_secret_hash(),
489                )
490                .await;
491                dbtx.commit_tx().await;
492            }
493        }
494
495        let api_secret = Client::get_api_secret_from_db(&self.db_no_decoders).await;
496        let stopped = self.stopped;
497        let request_hook = self.request_hook.clone();
498
499        let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
500        let client = self
501            .build_stopped(
502                pre_root_secret,
503                &config,
504                api_secret,
505                log_event_added_transient_tx,
506                request_hook,
507            )
508            .await?;
509        if !stopped {
510            client.as_inner().start_executor();
511        }
512        Ok(client)
513    }
514
515    /// Build a [`Client`] and start the executor
516    pub(crate) async fn build(
517        self,
518        pre_root_secret: DerivableSecret,
519        config: ClientConfig,
520        api_secret: Option<String>,
521        stopped: bool,
522    ) -> anyhow::Result<ClientHandle> {
523        let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
524        let request_hook = self.request_hook.clone();
525        let client = self
526            .build_stopped(
527                pre_root_secret,
528                &config,
529                api_secret,
530                log_event_added_transient_tx,
531                request_hook,
532            )
533            .await?;
534        if !stopped {
535            client.as_inner().start_executor();
536        }
537
538        Ok(client)
539    }
540
541    // TODO: remove config argument
542    /// Build a [`Client`] but do not start the executor
543    async fn build_stopped(
544        self,
545        root_secret: DerivableSecret,
546        config: &ClientConfig,
547        api_secret: Option<String>,
548        log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
549        request_hook: ApiRequestHook,
550    ) -> anyhow::Result<ClientHandle> {
551        let (log_event_added_tx, log_event_added_rx) = watch::channel(());
552        let (log_ordering_wakeup_tx, log_ordering_wakeup_rx) = watch::channel(());
553
554        let decoders = self.decoders(config);
555        let config = Self::config_decoded(config, &decoders)?;
556        let fed_id = config.calculate_federation_id();
557        let db = self.db_no_decoders.with_decoders(decoders.clone());
558        let connector = self.connector;
559        let peer_urls = get_api_urls(&db, &config).await;
560        let api = match self.admin_creds.as_ref() {
561            Some(admin_creds) => ReconnectFederationApi::new_admin(
562                admin_creds.peer_id,
563                peer_urls
564                    .into_iter()
565                    .find_map(|(peer, api_url)| (admin_creds.peer_id == peer).then_some(api_url))
566                    .context("Admin creds should match a peer")?,
567                &api_secret,
568            )
569            .await?
570            .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
571            .with_request_hook(&request_hook)
572            .with_cache()
573            .into(),
574            None => ReconnectFederationApi::from_endpoints(peer_urls, &api_secret, None)
575                .await?
576                .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
577                .with_request_hook(&request_hook)
578                .with_cache()
579                .into(),
580        };
581        let task_group = TaskGroup::new();
582
583        // Migrate the database before interacting with it in case any on-disk data
584        // structures have changed.
585        self.migrate_module_dbs(&db).await?;
586
587        let init_state = Self::load_init_state(&db).await;
588
589        let mut primary_module_instance = self.primary_module_instance.or_else(|| {
590            let primary_module_kind = self.primary_module_kind?;
591            config
592                .modules
593                .iter()
594                .find_map(|(module_instance_id, module_config)| {
595                    (module_config.kind() == &primary_module_kind).then_some(*module_instance_id)
596                })
597        });
598
599        let notifier = Notifier::new();
600
601        let common_api_versions = Client::load_and_refresh_common_api_version_static(
602            &config,
603            &self.module_inits,
604            &api,
605            &db,
606            &task_group,
607        )
608        .await
609        .inspect_err(|err| {
610            warn!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Failed to discover initial API version to use.");
611        })
612        .unwrap_or(ApiVersionSet {
613            core: ApiVersion::new(0, 0),
614            // This will cause all modules to skip initialization
615            modules: BTreeMap::new(),
616        });
617
618        debug!(target: LOG_CLIENT, ?common_api_versions, "Completed api version negotiation");
619
620        let mut module_recoveries: BTreeMap<
621            ModuleInstanceId,
622            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
623        > = BTreeMap::new();
624        let mut module_recovery_progress_receivers: BTreeMap<
625            ModuleInstanceId,
626            watch::Receiver<RecoveryProgress>,
627        > = BTreeMap::new();
628
629        let final_client = FinalClientIface::default();
630
631        let root_secret = Self::federation_root_secret(&root_secret, &config);
632
633        let modules = {
634            let mut modules = ClientModuleRegistry::default();
635            for (module_instance_id, module_config) in config.modules.clone() {
636                let kind = module_config.kind().clone();
637                let Some(module_init) = self.module_inits.get(&kind).cloned() else {
638                    debug!(
639                        target: LOG_CLIENT,
640                        kind=%kind,
641                        instance_id=%module_instance_id,
642                        "Module kind of instance not found in module gens, skipping");
643                    continue;
644                };
645
646                let Some(&api_version) = common_api_versions.modules.get(&module_instance_id)
647                else {
648                    warn!(
649                        target: LOG_CLIENT,
650                        kind=%kind,
651                        instance_id=%module_instance_id,
652                        "Module kind of instance has incompatible api version, skipping"
653                    );
654                    continue;
655                };
656
657                // since the exact logic of when to start recovery is a bit gnarly,
658                // the recovery call is extracted here.
659                let start_module_recover_fn =
660                    |snapshot: Option<ClientBackup>, progress: RecoveryProgress| {
661                        let module_config = module_config.clone();
662                        let num_peers = NumPeers::from(config.global.api_endpoints.len());
663                        let db = db.clone();
664                        let kind = kind.clone();
665                        let notifier = notifier.clone();
666                        let api = api.clone();
667                        let root_secret = root_secret.clone();
668                        let admin_auth = self.admin_creds.as_ref().map(|creds| creds.auth.clone());
669                        let final_client = final_client.clone();
670                        let (progress_tx, progress_rx) = tokio::sync::watch::channel(progress);
671                        let task_group = task_group.clone();
672                        let module_init = module_init.clone();
673                        (
674                            Box::pin(async move {
675                                module_init
676                                    .recover(
677                                        final_client.clone(),
678                                        fed_id,
679                                        num_peers,
680                                        module_config.clone(),
681                                        db.clone(),
682                                        module_instance_id,
683                                        common_api_versions.core,
684                                        api_version,
685                                        root_secret.derive_module_secret(module_instance_id),
686                                        notifier.clone(),
687                                        api.clone(),
688                                        admin_auth,
689                                        snapshot.as_ref().and_then(|s| s.modules.get(&module_instance_id)),
690                                        progress_tx,
691                                        task_group,
692                                    )
693                                    .await
694                                    .inspect_err(|err| {
695                                        warn!(
696                                            target: LOG_CLIENT,
697                                            module_id = module_instance_id, %kind, err = %err.fmt_compact_anyhow(), "Module failed to recover"
698                                        );
699                                    })
700                            }),
701                            progress_rx,
702                        )
703                    };
704
705                let recovery = match init_state.does_require_recovery() {
706                    Some(snapshot) => {
707                        match db
708                            .begin_transaction_nc()
709                            .await
710                            .get_value(&ClientModuleRecovery { module_instance_id })
711                            .await
712                        {
713                            Some(module_recovery_state) => {
714                                if module_recovery_state.is_done() {
715                                    debug!(
716                                        id = %module_instance_id,
717                                        %kind, "Module recovery already complete"
718                                    );
719                                    None
720                                } else {
721                                    debug!(
722                                        id = %module_instance_id,
723                                        %kind,
724                                        progress = %module_recovery_state.progress,
725                                        "Starting module recovery with an existing progress"
726                                    );
727                                    Some(start_module_recover_fn(
728                                        snapshot,
729                                        module_recovery_state.progress,
730                                    ))
731                                }
732                            }
733                            _ => {
734                                let progress = RecoveryProgress::none();
735                                let mut dbtx = db.begin_transaction().await;
736                                dbtx.log_event(
737                                    log_ordering_wakeup_tx.clone(),
738                                    None,
739                                    ModuleRecoveryStarted::new(module_instance_id),
740                                )
741                                .await;
742                                dbtx.insert_entry(
743                                    &ClientModuleRecovery { module_instance_id },
744                                    &ClientModuleRecoveryState { progress },
745                                )
746                                .await;
747
748                                dbtx.commit_tx().await;
749
750                                debug!(
751                                    id = %module_instance_id,
752                                    %kind, "Starting new module recovery"
753                                );
754                                Some(start_module_recover_fn(snapshot, progress))
755                            }
756                        }
757                    }
758                    _ => None,
759                };
760
761                match recovery {
762                    Some((recovery, recovery_progress_rx)) => {
763                        module_recoveries.insert(module_instance_id, recovery);
764                        module_recovery_progress_receivers
765                            .insert(module_instance_id, recovery_progress_rx);
766                    }
767                    _ => {
768                        let module = module_init
769                            .init(
770                                final_client.clone(),
771                                fed_id,
772                                config.global.api_endpoints.len(),
773                                module_config,
774                                db.clone(),
775                                module_instance_id,
776                                common_api_versions.core,
777                                api_version,
778                                // This is a divergence from the legacy client, where the child
779                                // secret keys were derived using
780                                // *module kind*-specific derivation paths.
781                                // Since the new client has to support multiple, segregated modules
782                                // of the same kind we have to use
783                                // the instance id instead.
784                                root_secret.derive_module_secret(module_instance_id),
785                                notifier.clone(),
786                                api.clone(),
787                                self.admin_creds.as_ref().map(|cred| cred.auth.clone()),
788                                task_group.clone(),
789                            )
790                            .await?;
791
792                        if primary_module_instance.is_none() && module.supports_being_primary() {
793                            primary_module_instance = Some(module_instance_id);
794                        } else if primary_module_instance == Some(module_instance_id)
795                            && !module.supports_being_primary()
796                        {
797                            bail!(
798                                "Module instance {module_instance_id} of kind {kind} does not support being a primary module"
799                            );
800                        }
801
802                        modules.register_module(module_instance_id, kind, module);
803                    }
804                }
805            }
806            modules
807        };
808
809        if init_state.is_pending() && module_recoveries.is_empty() {
810            let mut dbtx = db.begin_transaction().await;
811            dbtx.insert_entry(&ClientInitStateKey, &init_state.into_complete())
812                .await;
813            dbtx.commit_tx().await;
814        }
815
816        let executor = {
817            let mut executor_builder = Executor::builder();
818            executor_builder
819                .with_module(TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext);
820
821            for (module_instance_id, _, module) in modules.iter_modules() {
822                executor_builder.with_module_dyn(module.context(module_instance_id));
823            }
824
825            for module_instance_id in module_recoveries.keys() {
826                executor_builder.with_valid_module_id(*module_instance_id);
827            }
828
829            executor_builder.build(db.clone(), notifier, task_group.clone())
830        };
831
832        let recovery_receiver_init_val = module_recovery_progress_receivers
833            .iter()
834            .map(|(module_instance_id, rx)| (*module_instance_id, *rx.borrow()))
835            .collect::<BTreeMap<_, _>>();
836        let (client_recovery_progress_sender, client_recovery_progress_receiver) =
837            watch::channel(recovery_receiver_init_val);
838
839        let client_inner = Arc::new(Client {
840            final_client: final_client.clone(),
841            config: tokio::sync::RwLock::new(config.clone()),
842            api_secret,
843            decoders,
844            db: db.clone(),
845            federation_id: fed_id,
846            federation_config_meta: config.global.meta,
847            primary_module_instance: primary_module_instance
848                .ok_or(anyhow!("No primary module set or found"))?,
849            modules,
850            module_inits: self.module_inits.clone(),
851            log_ordering_wakeup_tx,
852            log_event_added_rx,
853            log_event_added_transient_tx: log_event_added_transient_tx.clone(),
854            request_hook,
855            executor,
856            api,
857            secp_ctx: Secp256k1::new(),
858            root_secret,
859            task_group,
860            operation_log: OperationLog::new(db.clone()),
861            client_recovery_progress_receiver,
862            meta_service: self.meta_service,
863            connector,
864        });
865        client_inner
866            .task_group
867            .spawn_cancellable("MetaService::update_continuously", {
868                let client_inner = client_inner.clone();
869                async move {
870                    client_inner
871                        .meta_service
872                        .update_continuously(&client_inner)
873                        .await;
874                }
875            });
876
877        client_inner.task_group.spawn_cancellable(
878            "update-api-announcements",
879            run_api_announcement_sync(client_inner.clone()),
880        );
881
882        client_inner.task_group.spawn_cancellable(
883            "event log ordering task",
884            run_event_log_ordering_task(
885                db.clone(),
886                log_ordering_wakeup_rx,
887                log_event_added_tx,
888                log_event_added_transient_tx,
889            ),
890        );
891        let client_iface = std::sync::Arc::<Client>::downgrade(&client_inner);
892
893        let client_arc = ClientHandle::new(client_inner);
894
895        for (_, _, module) in client_arc.modules.iter_modules() {
896            module.start().await;
897        }
898
899        final_client.set(client_iface.clone());
900
901        if !module_recoveries.is_empty() {
902            client_arc.spawn_module_recoveries_task(
903                client_recovery_progress_sender,
904                module_recoveries,
905                module_recovery_progress_receivers,
906            );
907        }
908
909        Ok(client_arc)
910    }
911
912    async fn load_init_state(db: &Database) -> InitState {
913        let mut dbtx = db.begin_transaction_nc().await;
914        dbtx.get_value(&ClientInitStateKey)
915            .await
916            .unwrap_or_else(|| {
917                // could be turned in a hard error in the future, but for now
918                // no need to break backward compat.
919                warn!(
920                    target: LOG_CLIENT,
921                    "Client missing ClientRequiresRecovery: assuming complete"
922                );
923                db::InitState::Complete(db::InitModeComplete::Fresh)
924            })
925    }
926
927    fn decoders(&self, config: &ClientConfig) -> ModuleDecoderRegistry {
928        let mut decoders = client_decoders(
929            &self.module_inits,
930            config
931                .modules
932                .iter()
933                .map(|(module_instance, module_config)| (*module_instance, module_config.kind())),
934        );
935
936        decoders.register_module(
937            TRANSACTION_SUBMISSION_MODULE_INSTANCE,
938            ModuleKind::from_static_str("tx_submission"),
939            tx_submission_sm_decoder(),
940        );
941
942        decoders
943    }
944
945    fn config_decoded(
946        config: &ClientConfig,
947        decoders: &ModuleDecoderRegistry,
948    ) -> Result<ClientConfig, fedimint_core::encoding::DecodeError> {
949        config.clone().redecode_raw(decoders)
950    }
951
952    /// Re-derive client's `root_secret` using the federation ID. This
953    /// eliminates the possibility of having the same client `root_secret`
954    /// across multiple federations.
955    fn federation_root_secret(
956        root_secret: &DerivableSecret,
957        config: &ClientConfig,
958    ) -> DerivableSecret {
959        root_secret.federation_key(&config.global.calculate_federation_id())
960    }
961
962    /// Register to receiver all new transient (unpersisted) events
963    pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
964        self.log_event_added_transient_tx.subscribe()
965    }
966}