fedimint_client/client/
builder.rs

1use std::collections::BTreeMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5
6use anyhow::{Context as _, anyhow, bail, ensure};
7use bitcoin::key::Secp256k1;
8use fedimint_api_client::api::global_api::with_cache::GlobalFederationApiWithCacheExt as _;
9use fedimint_api_client::api::global_api::with_request_hook::{
10    ApiRequestHook, RawFederationApiWithRequestHookExt as _,
11};
12use fedimint_api_client::api::net::Connector;
13use fedimint_api_client::api::{ApiVersionSet, DynGlobalApi, ReconnectFederationApi};
14use fedimint_client_module::api::ClientRawFederationApiExt as _;
15use fedimint_client_module::meta::LegacyMetaSource;
16use fedimint_client_module::module::init::ClientModuleInit;
17use fedimint_client_module::module::recovery::RecoveryProgress;
18use fedimint_client_module::module::{ClientModuleRegistry, FinalClientIface};
19use fedimint_client_module::secret::DeriveableSecretClientExt as _;
20use fedimint_client_module::transaction::{
21    TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext, tx_submission_sm_decoder,
22};
23use fedimint_client_module::{AdminCreds, ModuleRecoveryStarted};
24use fedimint_core::config::{ClientConfig, ModuleInitRegistry};
25use fedimint_core::core::{ModuleInstanceId, ModuleKind};
26use fedimint_core::db::{
27    Database, IDatabaseTransactionOpsCoreTyped as _, verify_module_db_integrity_dbtx,
28};
29use fedimint_core::envs::is_running_in_test_env;
30use fedimint_core::module::ApiVersion;
31use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
32use fedimint_core::task::TaskGroup;
33use fedimint_core::util::FmtCompactAnyhow as _;
34use fedimint_core::{NumPeers, maybe_add_send};
35use fedimint_derive_secret::DerivableSecret;
36use fedimint_eventlog::{
37    DBTransactionEventLogExt as _, EventLogEntry, run_event_log_ordering_task,
38};
39use fedimint_logging::LOG_CLIENT;
40use tokio::sync::{broadcast, watch};
41use tracing::{debug, warn};
42
43use super::handle::ClientHandle;
44use super::{Client, client_decoders};
45use crate::api_announcements::{get_api_urls, run_api_announcement_sync};
46use crate::backup::{ClientBackup, Metadata};
47use crate::db::{
48    self, ApiSecretKey, ClientInitStateKey, ClientMetadataKey, ClientModuleRecovery,
49    ClientModuleRecoveryState, ClientPreRootSecretHashKey, InitMode, InitState,
50    apply_migrations_client_module_dbtx,
51};
52use crate::meta::MetaService;
53use crate::module_init::ClientModuleInitRegistry;
54use crate::oplog::OperationLog;
55use crate::sm::executor::Executor;
56use crate::sm::notifier::Notifier;
57
58/// Used to configure, assemble and build [`Client`]
59pub struct ClientBuilder {
60    module_inits: ClientModuleInitRegistry,
61    primary_module_instance: Option<ModuleInstanceId>,
62    primary_module_kind: Option<ModuleKind>,
63    admin_creds: Option<AdminCreds>,
64    db_no_decoders: Database,
65    meta_service: Arc<crate::meta::MetaService>,
66    connector: Connector,
67    stopped: bool,
68    log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
69    request_hook: ApiRequestHook,
70}
71
72impl ClientBuilder {
73    pub(crate) fn new(db: Database) -> Self {
74        let meta_service = MetaService::new(LegacyMetaSource::default());
75        let (log_event_added_transient_tx, _log_event_added_transient_rx) =
76            broadcast::channel(1024);
77        ClientBuilder {
78            module_inits: ModuleInitRegistry::new(),
79            primary_module_instance: None,
80            primary_module_kind: None,
81            connector: Connector::default(),
82            admin_creds: None,
83            db_no_decoders: db,
84            stopped: false,
85            meta_service,
86            log_event_added_transient_tx,
87            request_hook: Arc::new(|api| api),
88        }
89    }
90
91    pub(crate) fn from_existing(client: &Client) -> Self {
92        ClientBuilder {
93            module_inits: client.module_inits.clone(),
94            primary_module_instance: Some(client.primary_module_instance),
95            primary_module_kind: None,
96            admin_creds: None,
97            db_no_decoders: client.db.with_decoders(ModuleRegistry::default()),
98            stopped: false,
99            // non unique
100            meta_service: client.meta_service.clone(),
101            connector: client.connector,
102            log_event_added_transient_tx: client.log_event_added_transient_tx.clone(),
103            request_hook: client.request_hook.clone(),
104        }
105    }
106
107    /// Replace module generator registry entirely
108    pub fn with_module_inits(&mut self, module_inits: ClientModuleInitRegistry) {
109        self.module_inits = module_inits;
110    }
111
112    /// Make module generator available when reading the config
113    pub fn with_module<M: ClientModuleInit>(&mut self, module_init: M) {
114        self.module_inits.attach(module_init);
115    }
116
117    pub fn stopped(&mut self) {
118        self.stopped = true;
119    }
120
121    /// Build the [`Client`] with a custom wrapper around its api request logic
122    ///
123    /// This is intended to be used by downstream applications, e.g. to:
124    ///
125    /// * simulate offline mode,
126    /// * save battery when the OS indicates lack of connectivity,
127    /// * inject faults and delays for testing purposes,
128    /// * collect statistics and emit notifications.
129    pub fn with_api_request_hook(mut self, hook: ApiRequestHook) -> Self {
130        self.request_hook = hook;
131        self
132    }
133
134    /// Uses this module with the given instance id as the primary module. See
135    /// [`fedimint_client_module::ClientModule::supports_being_primary`] for
136    /// more information.
137    ///
138    /// ## Panics
139    /// If there was a primary module specified previously
140    #[deprecated(
141        since = "0.6.0",
142        note = "Use `with_primary_module_kind` instead, as the instance id can't be known upfront. If you *really* need the old behavior you can use `with_primary_module_instance_id`."
143    )]
144    pub fn with_primary_module(&mut self, primary_module_instance: ModuleInstanceId) {
145        self.with_primary_module_instance_id(primary_module_instance);
146    }
147
148    /// **You are likely looking for
149    /// [`ClientBuilder::with_primary_module_kind`]. This function is rarely
150    /// useful and often dangerous, handle with care.**
151    ///
152    /// Uses this module with the given instance id as the primary module. See
153    /// [`fedimint_client_module::ClientModule::supports_being_primary`] for
154    /// more information. Since the module instance id of modules of a
155    /// specific kind may differ between different federations it is
156    /// generally not recommended to specify it, but rather to specify the
157    /// module kind that should be used as primary. See
158    /// [`ClientBuilder::with_primary_module_kind`].
159    ///
160    /// ## Panics
161    /// If there was a primary module specified previously
162    pub fn with_primary_module_instance_id(&mut self, primary_module_instance: ModuleInstanceId) {
163        let was_replaced = self
164            .primary_module_instance
165            .replace(primary_module_instance)
166            .is_some();
167        assert!(
168            !was_replaced,
169            "Only one primary module can be given to the builder."
170        );
171    }
172
173    /// Uses this module kind as the primary module if present in the config.
174    /// See [`fedimint_client_module::ClientModule::supports_being_primary`] for
175    /// more information.
176    ///
177    /// ## Panics
178    /// If there was a primary module kind specified previously
179    pub fn with_primary_module_kind(&mut self, primary_module_kind: ModuleKind) {
180        let was_replaced = self
181            .primary_module_kind
182            .replace(primary_module_kind)
183            .is_some();
184        assert!(
185            !was_replaced,
186            "Only one primary module kind can be given to the builder."
187        );
188    }
189
190    pub fn with_meta_service(&mut self, meta_service: Arc<MetaService>) {
191        self.meta_service = meta_service;
192    }
193
194    async fn migrate_database(&self, db: &Database) -> anyhow::Result<()> {
195        // Only apply the client database migrations if the database has been
196        // initialized.
197        // This only works as long as you don't change the client config
198        if let Ok(client_config) = self.load_existing_config().await {
199            for (module_id, module_cfg) in client_config.modules {
200                let kind = module_cfg.kind.clone();
201                let Some(init) = self.module_inits.get(&kind) else {
202                    // normal, expected and already logged about when building the client
203                    continue;
204                };
205
206                let mut dbtx = db.begin_transaction().await;
207                apply_migrations_client_module_dbtx(
208                    &mut dbtx.to_ref_nc(),
209                    kind.to_string(),
210                    init.get_database_migrations(),
211                    module_id,
212                )
213                .await?;
214                if let Some(used_db_prefixes) = init.used_db_prefixes() {
215                    if is_running_in_test_env() {
216                        verify_module_db_integrity_dbtx(
217                            &mut dbtx.to_ref_nc(),
218                            module_id,
219                            kind,
220                            &used_db_prefixes,
221                        )
222                        .await;
223                    }
224                }
225                dbtx.commit_tx_result().await?;
226            }
227        }
228
229        Ok(())
230    }
231
232    pub fn db_no_decoders(&self) -> &Database {
233        &self.db_no_decoders
234    }
235
236    pub async fn load_existing_config(&self) -> anyhow::Result<ClientConfig> {
237        let Some(config) = Client::get_config_from_db(&self.db_no_decoders).await else {
238            bail!("Client database not initialized")
239        };
240
241        Ok(config)
242    }
243
244    pub fn set_admin_creds(&mut self, creds: AdminCreds) {
245        self.admin_creds = Some(creds);
246    }
247
248    pub fn with_connector(&mut self, connector: Connector) {
249        self.connector = connector;
250    }
251
252    #[cfg(feature = "tor")]
253    pub fn with_tor_connector(&mut self) {
254        self.with_connector(Connector::tor());
255    }
256
257    async fn init(
258        self,
259        pre_root_secret: DerivableSecret,
260        config: ClientConfig,
261        api_secret: Option<String>,
262        init_mode: InitMode,
263    ) -> anyhow::Result<ClientHandle> {
264        if Client::is_initialized(&self.db_no_decoders).await {
265            bail!("Client database already initialized")
266        }
267
268        // Note: It's important all client initialization is performed as one big
269        // transaction to avoid half-initialized client state.
270        {
271            debug!(target: LOG_CLIENT, "Initializing client database");
272            let mut dbtx = self.db_no_decoders.begin_transaction().await;
273            // Save config to DB
274            dbtx.insert_new_entry(&crate::db::ClientConfigKey, &config)
275                .await;
276            dbtx.insert_entry(
277                &ClientPreRootSecretHashKey,
278                &pre_root_secret.derive_pre_root_secret_hash(),
279            )
280            .await;
281
282            if let Some(api_secret) = api_secret.as_ref() {
283                dbtx.insert_new_entry(&ApiSecretKey, api_secret).await;
284            }
285
286            let init_state = InitState::Pending(init_mode);
287            dbtx.insert_entry(&ClientInitStateKey, &init_state).await;
288
289            let metadata = init_state
290                .does_require_recovery()
291                .flatten()
292                .map_or(Metadata::empty(), |s| s.metadata);
293
294            dbtx.insert_new_entry(&ClientMetadataKey, &metadata).await;
295
296            dbtx.commit_tx_result().await?;
297        }
298
299        let stopped = self.stopped;
300        self.build(pre_root_secret, config, api_secret, stopped)
301            .await
302    }
303
304    /// Join a new Federation
305    ///
306    /// When a user wants to connect to a new federation this function fetches
307    /// the federation config and initializes the client database. If a user
308    /// already joined the federation in the past and has a preexisting database
309    /// use [`ClientBuilder::open`] instead.
310    ///
311    /// **Warning**: Calling `join` with a `root_secret` key that was used
312    /// previous to `join` a Federation will lead to all sorts of malfunctions
313    /// including likely loss of funds.
314    ///
315    /// This should be generally called only if the `root_secret` key is known
316    /// not to have been used before (e.g. just randomly generated). For keys
317    /// that might have been previous used (e.g. provided by the user),
318    /// it's safer to call [`Self::recover`] which will attempt to recover
319    /// client module states for the Federation.
320    ///
321    /// A typical "join federation" flow would look as follows:
322    /// ```no_run
323    /// # use std::str::FromStr;
324    /// # use fedimint_core::invite_code::InviteCode;
325    /// # use fedimint_core::config::ClientConfig;
326    /// # use fedimint_derive_secret::DerivableSecret;
327    /// # use fedimint_client::{Client, ClientBuilder};
328    /// # use fedimint_core::db::Database;
329    /// # use fedimint_core::config::META_FEDERATION_NAME_KEY;
330    /// #
331    /// # #[tokio::main]
332    /// # async fn main() {
333    /// # let root_secret: DerivableSecret = unimplemented!();
334    /// // Create a root secret, e.g. via fedimint-bip39, see also:
335    /// // https://github.com/fedimint/fedimint/blob/master/docs/secret_derivation.md
336    /// // let root_secret = …;
337    ///
338    /// // Get invite code from user
339    /// let invite_code = InviteCode::from_str("fed11qgqpw9thwvaz7te3xgmjuvpwxqhrzw3jxumrvvf0qqqjpetvlg8glnpvzcufhffgzhv8m75f7y34ryk7suamh8x7zetly8h0v9v0rm")
340    ///     .expect("Invalid invite code");
341    /// let config = fedimint_api_client::api::net::Connector::default().download_from_invite_code(&invite_code).await
342    ///     .expect("Error downloading config");
343    ///
344    /// // Tell the user the federation name, bitcoin network
345    /// // (e.g. from wallet module config), and other details
346    /// // that are typically contained in the federation's
347    /// // meta fields.
348    ///
349    /// // let network = config.get_first_module_by_kind::<WalletClientConfig>("wallet")
350    /// //     .expect("Module not found")
351    /// //     .network;
352    ///
353    /// println!(
354    ///     "The federation name is: {}",
355    ///     config.meta::<String>(META_FEDERATION_NAME_KEY)
356    ///         .expect("Could not decode name field")
357    ///         .expect("Name isn't set")
358    /// );
359    ///
360    /// // Open the client's database, using the federation ID
361    /// // as the DB name is a common pattern:
362    ///
363    /// // let db_path = format!("./path/to/db/{}", config.federation_id());
364    /// // let db = RocksDb::open(db_path).expect("error opening DB");
365    /// # let db: Database = unimplemented!();
366    ///
367    /// let client = Client::builder(db).await.expect("Error building client")
368    ///     // Mount the modules the client should support:
369    ///     // .with_module(LightningClientInit)
370    ///     // .with_module(MintClientInit)
371    ///     // .with_module(WalletClientInit::default())
372    ///     .join(root_secret, config, None)
373    ///     .await
374    ///     .expect("Error joining federation");
375    /// # }
376    /// ```
377    pub async fn join(
378        self,
379        pre_root_secret: DerivableSecret,
380        config: ClientConfig,
381        api_secret: Option<String>,
382    ) -> anyhow::Result<ClientHandle> {
383        self.init(pre_root_secret, config, api_secret, InitMode::Fresh)
384            .await
385    }
386
387    /// Download most recent valid backup found from the Federation
388    pub async fn download_backup_from_federation(
389        &self,
390        root_secret: &DerivableSecret,
391        config: &ClientConfig,
392        api_secret: Option<String>,
393    ) -> anyhow::Result<Option<ClientBackup>> {
394        let api = DynGlobalApi::from_endpoints(
395            // TODO: change join logic to use FederationId v2
396            config
397                .global
398                .api_endpoints
399                .iter()
400                .map(|(peer_id, peer_url)| (*peer_id, peer_url.url.clone())),
401            &api_secret,
402        )
403        .await?;
404
405        Client::download_backup_from_federation_static(
406            &api,
407            &Self::federation_root_secret(root_secret, config),
408            &self.decoders(config),
409        )
410        .await
411    }
412
413    /// Join a (possibly) previous joined Federation
414    ///
415    /// Unlike [`Self::join`], `recover` will run client module recovery for
416    /// each client module attempting to recover any previous module state.
417    ///
418    /// Recovery process takes time during which each recovering client module
419    /// will not be available for use.
420    ///
421    /// Calling `recovery` with a `root_secret` that was not actually previous
422    /// used in a given Federation is safe.
423    pub async fn recover(
424        self,
425        root_secret: DerivableSecret,
426        config: ClientConfig,
427        api_secret: Option<String>,
428        backup: Option<ClientBackup>,
429    ) -> anyhow::Result<ClientHandle> {
430        let client = self
431            .init(
432                root_secret,
433                config,
434                api_secret,
435                InitMode::Recover {
436                    snapshot: backup.clone(),
437                },
438            )
439            .await?;
440
441        Ok(client)
442    }
443
444    pub async fn open(self, pre_root_secret: DerivableSecret) -> anyhow::Result<ClientHandle> {
445        let Some(config) = Client::get_config_from_db(&self.db_no_decoders).await else {
446            bail!("Client database not initialized")
447        };
448
449        match self
450            .db_no_decoders()
451            .begin_transaction_nc()
452            .await
453            .get_value(&ClientPreRootSecretHashKey)
454            .await
455        {
456            Some(secret_hash) => {
457                ensure!(
458                    pre_root_secret.derive_pre_root_secret_hash() == secret_hash,
459                    "Secret hash does not match. Incorrect secret"
460                );
461            }
462            _ => {
463                debug!(target: LOG_CLIENT, "Backfilling secret hash");
464                // Note: no need for dbtx autocommit, we are the only writer ATM
465                let mut dbtx = self.db_no_decoders.begin_transaction().await;
466                dbtx.insert_entry(
467                    &ClientPreRootSecretHashKey,
468                    &pre_root_secret.derive_pre_root_secret_hash(),
469                )
470                .await;
471                dbtx.commit_tx().await;
472            }
473        }
474
475        let api_secret = Client::get_api_secret_from_db(&self.db_no_decoders).await;
476        let stopped = self.stopped;
477        let request_hook = self.request_hook.clone();
478
479        let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
480        let client = self
481            .build_stopped(
482                pre_root_secret,
483                &config,
484                api_secret,
485                log_event_added_transient_tx,
486                request_hook,
487            )
488            .await?;
489        if !stopped {
490            client.as_inner().start_executor();
491        }
492        Ok(client)
493    }
494
495    /// Build a [`Client`] and start the executor
496    pub(crate) async fn build(
497        self,
498        pre_root_secret: DerivableSecret,
499        config: ClientConfig,
500        api_secret: Option<String>,
501        stopped: bool,
502    ) -> anyhow::Result<ClientHandle> {
503        let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
504        let request_hook = self.request_hook.clone();
505        let client = self
506            .build_stopped(
507                pre_root_secret,
508                &config,
509                api_secret,
510                log_event_added_transient_tx,
511                request_hook,
512            )
513            .await?;
514        if !stopped {
515            client.as_inner().start_executor();
516        }
517
518        Ok(client)
519    }
520
521    // TODO: remove config argument
522    /// Build a [`Client`] but do not start the executor
523    async fn build_stopped(
524        self,
525        root_secret: DerivableSecret,
526        config: &ClientConfig,
527        api_secret: Option<String>,
528        log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
529        request_hook: ApiRequestHook,
530    ) -> anyhow::Result<ClientHandle> {
531        let (log_event_added_tx, log_event_added_rx) = watch::channel(());
532        let (log_ordering_wakeup_tx, log_ordering_wakeup_rx) = watch::channel(());
533
534        let decoders = self.decoders(config);
535        let config = Self::config_decoded(config, &decoders)?;
536        let fed_id = config.calculate_federation_id();
537        let db = self.db_no_decoders.with_decoders(decoders.clone());
538        let connector = self.connector;
539        let peer_urls = get_api_urls(&db, &config).await;
540        let api = match self.admin_creds.as_ref() {
541            Some(admin_creds) => ReconnectFederationApi::new_admin(
542                admin_creds.peer_id,
543                peer_urls
544                    .into_iter()
545                    .find_map(|(peer, api_url)| (admin_creds.peer_id == peer).then_some(api_url))
546                    .context("Admin creds should match a peer")?,
547                &api_secret,
548            )
549            .await?
550            .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
551            .with_request_hook(&request_hook)
552            .with_cache()
553            .into(),
554            None => ReconnectFederationApi::from_endpoints(peer_urls, &api_secret, None)
555                .await?
556                .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
557                .with_request_hook(&request_hook)
558                .with_cache()
559                .into(),
560        };
561        let task_group = TaskGroup::new();
562
563        // Migrate the database before interacting with it in case any on-disk data
564        // structures have changed.
565        self.migrate_database(&db).await?;
566
567        let init_state = Self::load_init_state(&db).await;
568
569        let primary_module_instance = self
570            .primary_module_instance
571            .or_else(|| {
572                let primary_module_kind = self.primary_module_kind?;
573                config
574                    .modules
575                    .iter()
576                    .find_map(|(module_instance_id, module_config)| {
577                        (module_config.kind() == &primary_module_kind)
578                            .then_some(*module_instance_id)
579                    })
580            })
581            .ok_or(anyhow!("No primary module set or found"))?;
582
583        let notifier = Notifier::new();
584
585        let common_api_versions = Client::load_and_refresh_common_api_version_static(
586            &config,
587            &self.module_inits,
588            &api,
589            &db,
590            &task_group,
591        )
592        .await
593        .inspect_err(|err| {
594            warn!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Failed to discover initial API version to use.");
595        })
596        .unwrap_or(ApiVersionSet {
597            core: ApiVersion::new(0, 0),
598            // This will cause all modules to skip initialization
599            modules: BTreeMap::new(),
600        });
601
602        debug!(target: LOG_CLIENT, ?common_api_versions, "Completed api version negotiation");
603
604        let mut module_recoveries: BTreeMap<
605            ModuleInstanceId,
606            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
607        > = BTreeMap::new();
608        let mut module_recovery_progress_receivers: BTreeMap<
609            ModuleInstanceId,
610            watch::Receiver<RecoveryProgress>,
611        > = BTreeMap::new();
612
613        let final_client = FinalClientIface::default();
614
615        let root_secret = Self::federation_root_secret(&root_secret, &config);
616
617        let modules = {
618            let mut modules = ClientModuleRegistry::default();
619            for (module_instance_id, module_config) in config.modules.clone() {
620                let kind = module_config.kind().clone();
621                let Some(module_init) = self.module_inits.get(&kind).cloned() else {
622                    debug!(
623                        target: LOG_CLIENT,
624                        kind=%kind,
625                        instance_id=%module_instance_id,
626                        "Module kind of instance not found in module gens, skipping");
627                    continue;
628                };
629
630                let Some(&api_version) = common_api_versions.modules.get(&module_instance_id)
631                else {
632                    warn!(
633                        target: LOG_CLIENT,
634                        kind=%kind,
635                        instance_id=%module_instance_id,
636                        "Module kind of instance has incompatible api version, skipping"
637                    );
638                    continue;
639                };
640
641                // since the exact logic of when to start recovery is a bit gnarly,
642                // the recovery call is extracted here.
643                let start_module_recover_fn =
644                    |snapshot: Option<ClientBackup>, progress: RecoveryProgress| {
645                        let module_config = module_config.clone();
646                        let num_peers = NumPeers::from(config.global.api_endpoints.len());
647                        let db = db.clone();
648                        let kind = kind.clone();
649                        let notifier = notifier.clone();
650                        let api = api.clone();
651                        let root_secret = root_secret.clone();
652                        let admin_auth = self.admin_creds.as_ref().map(|creds| creds.auth.clone());
653                        let final_client = final_client.clone();
654                        let (progress_tx, progress_rx) = tokio::sync::watch::channel(progress);
655                        let task_group = task_group.clone();
656                        let module_init = module_init.clone();
657                        (
658                            Box::pin(async move {
659                                module_init
660                                    .recover(
661                                        final_client.clone(),
662                                        fed_id,
663                                        num_peers,
664                                        module_config.clone(),
665                                        db.clone(),
666                                        module_instance_id,
667                                        common_api_versions.core,
668                                        api_version,
669                                        root_secret.derive_module_secret(module_instance_id),
670                                        notifier.clone(),
671                                        api.clone(),
672                                        admin_auth,
673                                        snapshot.as_ref().and_then(|s| s.modules.get(&module_instance_id)),
674                                        progress_tx,
675                                        task_group,
676                                    )
677                                    .await
678                                    .inspect_err(|err| {
679                                        warn!(
680                                            target: LOG_CLIENT,
681                                            module_id = module_instance_id, %kind, err = %err.fmt_compact_anyhow(), "Module failed to recover"
682                                        );
683                                    })
684                            }),
685                            progress_rx,
686                        )
687                    };
688
689                let recovery = match init_state.does_require_recovery() {
690                    Some(snapshot) => {
691                        match db
692                            .begin_transaction_nc()
693                            .await
694                            .get_value(&ClientModuleRecovery { module_instance_id })
695                            .await
696                        {
697                            Some(module_recovery_state) => {
698                                if module_recovery_state.is_done() {
699                                    debug!(
700                                        id = %module_instance_id,
701                                        %kind, "Module recovery already complete"
702                                    );
703                                    None
704                                } else {
705                                    debug!(
706                                        id = %module_instance_id,
707                                        %kind,
708                                        progress = %module_recovery_state.progress,
709                                        "Starting module recovery with an existing progress"
710                                    );
711                                    Some(start_module_recover_fn(
712                                        snapshot,
713                                        module_recovery_state.progress,
714                                    ))
715                                }
716                            }
717                            _ => {
718                                let progress = RecoveryProgress::none();
719                                let mut dbtx = db.begin_transaction().await;
720                                dbtx.log_event(
721                                    log_ordering_wakeup_tx.clone(),
722                                    None,
723                                    ModuleRecoveryStarted::new(module_instance_id),
724                                )
725                                .await;
726                                dbtx.insert_entry(
727                                    &ClientModuleRecovery { module_instance_id },
728                                    &ClientModuleRecoveryState { progress },
729                                )
730                                .await;
731
732                                dbtx.commit_tx().await;
733
734                                debug!(
735                                    id = %module_instance_id,
736                                    %kind, "Starting new module recovery"
737                                );
738                                Some(start_module_recover_fn(snapshot, progress))
739                            }
740                        }
741                    }
742                    _ => None,
743                };
744
745                match recovery {
746                    Some((recovery, recovery_progress_rx)) => {
747                        module_recoveries.insert(module_instance_id, recovery);
748                        module_recovery_progress_receivers
749                            .insert(module_instance_id, recovery_progress_rx);
750                    }
751                    _ => {
752                        let module = module_init
753                            .init(
754                                final_client.clone(),
755                                fed_id,
756                                config.global.api_endpoints.len(),
757                                module_config,
758                                db.clone(),
759                                module_instance_id,
760                                common_api_versions.core,
761                                api_version,
762                                // This is a divergence from the legacy client, where the child
763                                // secret keys were derived using
764                                // *module kind*-specific derivation paths.
765                                // Since the new client has to support multiple, segregated modules
766                                // of the same kind we have to use
767                                // the instance id instead.
768                                root_secret.derive_module_secret(module_instance_id),
769                                notifier.clone(),
770                                api.clone(),
771                                self.admin_creds.as_ref().map(|cred| cred.auth.clone()),
772                                task_group.clone(),
773                            )
774                            .await?;
775
776                        if primary_module_instance == module_instance_id
777                            && !module.supports_being_primary()
778                        {
779                            bail!(
780                                "Module instance {primary_module_instance} of kind {kind} does not support being a primary module"
781                            );
782                        }
783
784                        modules.register_module(module_instance_id, kind, module);
785                    }
786                }
787            }
788            modules
789        };
790
791        if init_state.is_pending() && module_recoveries.is_empty() {
792            let mut dbtx = db.begin_transaction().await;
793            dbtx.insert_entry(&ClientInitStateKey, &init_state.into_complete())
794                .await;
795            dbtx.commit_tx().await;
796        }
797
798        let executor = {
799            let mut executor_builder = Executor::builder();
800            executor_builder
801                .with_module(TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext);
802
803            for (module_instance_id, _, module) in modules.iter_modules() {
804                executor_builder.with_module_dyn(module.context(module_instance_id));
805            }
806
807            for module_instance_id in module_recoveries.keys() {
808                executor_builder.with_valid_module_id(*module_instance_id);
809            }
810
811            executor_builder.build(db.clone(), notifier, task_group.clone())
812        };
813
814        let recovery_receiver_init_val = module_recovery_progress_receivers
815            .iter()
816            .map(|(module_instance_id, rx)| (*module_instance_id, *rx.borrow()))
817            .collect::<BTreeMap<_, _>>();
818        let (client_recovery_progress_sender, client_recovery_progress_receiver) =
819            watch::channel(recovery_receiver_init_val);
820
821        let client_inner = Arc::new(Client {
822            final_client: final_client.clone(),
823            config: tokio::sync::RwLock::new(config.clone()),
824            api_secret,
825            decoders,
826            db: db.clone(),
827            federation_id: fed_id,
828            federation_config_meta: config.global.meta,
829            primary_module_instance,
830            modules,
831            module_inits: self.module_inits.clone(),
832            log_ordering_wakeup_tx,
833            log_event_added_rx,
834            log_event_added_transient_tx: log_event_added_transient_tx.clone(),
835            request_hook,
836            executor,
837            api,
838            secp_ctx: Secp256k1::new(),
839            root_secret,
840            task_group,
841            operation_log: OperationLog::new(db.clone()),
842            client_recovery_progress_receiver,
843            meta_service: self.meta_service,
844            connector,
845        });
846        client_inner
847            .task_group
848            .spawn_cancellable("MetaService::update_continuously", {
849                let client_inner = client_inner.clone();
850                async move {
851                    client_inner
852                        .meta_service
853                        .update_continuously(&client_inner)
854                        .await;
855                }
856            });
857
858        client_inner.task_group.spawn_cancellable(
859            "update-api-announcements",
860            run_api_announcement_sync(client_inner.clone()),
861        );
862
863        client_inner.task_group.spawn_cancellable(
864            "event log ordering task",
865            run_event_log_ordering_task(
866                db.clone(),
867                log_ordering_wakeup_rx,
868                log_event_added_tx,
869                log_event_added_transient_tx,
870            ),
871        );
872        let client_iface = std::sync::Arc::<Client>::downgrade(&client_inner);
873
874        let client_arc = ClientHandle::new(client_inner);
875
876        for (_, _, module) in client_arc.modules.iter_modules() {
877            module.start().await;
878        }
879
880        final_client.set(client_iface.clone());
881
882        if !module_recoveries.is_empty() {
883            client_arc.spawn_module_recoveries_task(
884                client_recovery_progress_sender,
885                module_recoveries,
886                module_recovery_progress_receivers,
887            );
888        }
889
890        Ok(client_arc)
891    }
892
893    async fn load_init_state(db: &Database) -> InitState {
894        let mut dbtx = db.begin_transaction_nc().await;
895        dbtx.get_value(&ClientInitStateKey)
896            .await
897            .unwrap_or_else(|| {
898                // could be turned in a hard error in the future, but for now
899                // no need to break backward compat.
900                warn!(
901                    target: LOG_CLIENT,
902                    "Client missing ClientRequiresRecovery: assuming complete"
903                );
904                db::InitState::Complete(db::InitModeComplete::Fresh)
905            })
906    }
907
908    fn decoders(&self, config: &ClientConfig) -> ModuleDecoderRegistry {
909        let mut decoders = client_decoders(
910            &self.module_inits,
911            config
912                .modules
913                .iter()
914                .map(|(module_instance, module_config)| (*module_instance, module_config.kind())),
915        );
916
917        decoders.register_module(
918            TRANSACTION_SUBMISSION_MODULE_INSTANCE,
919            ModuleKind::from_static_str("tx_submission"),
920            tx_submission_sm_decoder(),
921        );
922
923        decoders
924    }
925
926    fn config_decoded(
927        config: &ClientConfig,
928        decoders: &ModuleDecoderRegistry,
929    ) -> Result<ClientConfig, fedimint_core::encoding::DecodeError> {
930        config.clone().redecode_raw(decoders)
931    }
932
933    /// Re-derive client's `root_secret` using the federation ID. This
934    /// eliminates the possibility of having the same client `root_secret`
935    /// across multiple federations.
936    fn federation_root_secret(
937        root_secret: &DerivableSecret,
938        config: &ClientConfig,
939    ) -> DerivableSecret {
940        root_secret.federation_key(&config.global.calculate_federation_id())
941    }
942
943    /// Register to receiver all new transient (unpersisted) events
944    pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
945        self.log_event_added_transient_tx.subscribe()
946    }
947}