fedimint_client/client/
builder.rs

1use std::collections::BTreeMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::time::Duration;
6
7use anyhow::{bail, ensure};
8use bitcoin::key::Secp256k1;
9use fedimint_api_client::api::global_api::with_cache::GlobalFederationApiWithCacheExt as _;
10use fedimint_api_client::api::global_api::with_request_hook::{
11    ApiRequestHook, RawFederationApiWithRequestHookExt as _,
12};
13use fedimint_api_client::api::net::ConnectorType;
14use fedimint_api_client::api::{ApiVersionSet, DynGlobalApi, FederationApi, FederationApiExt as _};
15use fedimint_client_module::api::ClientRawFederationApiExt as _;
16use fedimint_client_module::meta::LegacyMetaSource;
17use fedimint_client_module::module::init::ClientModuleInit;
18use fedimint_client_module::module::recovery::RecoveryProgress;
19use fedimint_client_module::module::{
20    ClientModuleRegistry, FinalClientIface, PrimaryModulePriority, PrimaryModuleSupport,
21};
22use fedimint_client_module::secret::{DeriveableSecretClientExt as _, get_default_client_secret};
23use fedimint_client_module::transaction::{
24    TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext, tx_submission_sm_decoder,
25};
26use fedimint_client_module::{AdminCreds, ModuleRecoveryStarted};
27use fedimint_connectors::ConnectorRegistry;
28use fedimint_core::config::{ClientConfig, FederationId, ModuleInitRegistry};
29use fedimint_core::core::{ModuleInstanceId, ModuleKind};
30use fedimint_core::db::{
31    Database, IDatabaseTransactionOpsCoreTyped as _, verify_module_db_integrity_dbtx,
32};
33use fedimint_core::endpoint_constants::CLIENT_CONFIG_ENDPOINT;
34use fedimint_core::envs::is_running_in_test_env;
35use fedimint_core::invite_code::InviteCode;
36use fedimint_core::module::registry::ModuleDecoderRegistry;
37use fedimint_core::module::{ApiRequestErased, ApiVersion, SupportedApiVersionsSummary};
38use fedimint_core::task::TaskGroup;
39use fedimint_core::task::jit::{Jit, JitTry, JitTryAnyhow};
40use fedimint_core::util::{FmtCompact as _, FmtCompactAnyhow as _};
41use fedimint_core::{NumPeers, PeerId, fedimint_build_code_version_env, maybe_add_send};
42use fedimint_derive_secret::DerivableSecret;
43use fedimint_eventlog::{
44    DBTransactionEventLogExt as _, EventLogEntry, run_event_log_ordering_task,
45};
46use fedimint_logging::LOG_CLIENT;
47use tokio::sync::{broadcast, watch};
48use tracing::{debug, trace, warn};
49
50use super::handle::ClientHandle;
51use super::{Client, client_decoders};
52use crate::api_announcements::{
53    PeersSignedApiAnnouncements, fetch_api_announcements_from_at_least_num_of_peers, get_api_urls,
54    run_api_announcement_refresh_task, store_api_announcements_updates_from_peers,
55};
56use crate::backup::{ClientBackup, Metadata};
57use crate::client::PrimaryModuleCandidates;
58use crate::db::{
59    self, ApiSecretKey, ClientInitStateKey, ClientMetadataKey, ClientModuleRecovery,
60    ClientModuleRecoveryState, ClientPreRootSecretHashKey, InitMode, InitState,
61    PendingClientConfigKey, apply_migrations_client_module_dbtx,
62};
63use crate::meta::MetaService;
64use crate::module_init::ClientModuleInitRegistry;
65use crate::oplog::OperationLog;
66use crate::sm::executor::Executor;
67use crate::sm::notifier::Notifier;
68
69/// The type of root secret hashing
70///
71/// *Please read this documentation carefully if, especially if you're upgrading
72/// downstream Fedimint client application.*
73///
74/// Internally, client will always hash-in federation id
75/// to the root secret provided to the [`ClientBuilder`],
76/// to ensure a different actual root secret is used for ever federation.
77/// This makes reusing a single root secret for different federations
78/// in a multi-federation client, perfectly fine, and frees the client
79/// from worrying about `FederationId`.
80///
81/// However, in the past Fedimint applications (including `fedimint-cli`)
82/// were doing the hashing-in of `FederationId` outside of `fedimint-client` as
83/// well, which lead to effectively doing it twice, and pushed downloading of
84/// the client config on join to application code, a sub-optimal API, especially
85/// after joining federation needed to handle even more functionality.
86///
87/// To keep the interoperability of the seed phrases this double-derivation
88/// is preserved, due to other architectural reason, `fedimint-client`
89/// will now do the outer-derivation internally as well.
90#[derive(Clone)]
91pub enum RootSecret {
92    /// Derive an extra round of federation-id to the secret, like
93    /// Fedimint applications were doing manually in the past.
94    ///
95    /// **Note**: Applications MUST NOT do the derivation themselves anymore.
96    StandardDoubleDerive(DerivableSecret),
97    /// No double derivation
98    ///
99    /// This is useful for applications that for whatever reason do the
100    /// double-derivation externally, or use a custom scheme.
101    Custom(DerivableSecret),
102}
103
104impl RootSecret {
105    fn to_inner(&self, federation_id: FederationId) -> DerivableSecret {
106        match self {
107            RootSecret::StandardDoubleDerive(derivable_secret) => {
108                get_default_client_secret(derivable_secret, &federation_id)
109            }
110            RootSecret::Custom(derivable_secret) => derivable_secret.clone(),
111        }
112    }
113}
114
115/// Used to configure, assemble and build [`Client`]
116pub struct ClientBuilder {
117    module_inits: ClientModuleInitRegistry,
118    admin_creds: Option<AdminCreds>,
119    meta_service: Arc<crate::meta::MetaService>,
120    connector: ConnectorType,
121    stopped: bool,
122    log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
123    request_hook: ApiRequestHook,
124    iroh_enable_dht: bool,
125    iroh_enable_next: bool,
126}
127
128impl ClientBuilder {
129    pub(crate) fn new() -> Self {
130        trace!(
131            target: LOG_CLIENT,
132            version = %fedimint_build_code_version_env!(),
133            "Initializing fedimint client",
134        );
135        let meta_service = MetaService::new(LegacyMetaSource::default());
136        let (log_event_added_transient_tx, _log_event_added_transient_rx) =
137            broadcast::channel(1024);
138
139        ClientBuilder {
140            module_inits: ModuleInitRegistry::new(),
141            connector: ConnectorType::default(),
142            admin_creds: None,
143            stopped: false,
144            meta_service,
145            log_event_added_transient_tx,
146            request_hook: Arc::new(|api| api),
147            iroh_enable_dht: true,
148            iroh_enable_next: true,
149        }
150    }
151
152    pub(crate) fn from_existing(client: &Client) -> Self {
153        ClientBuilder {
154            module_inits: client.module_inits.clone(),
155            admin_creds: None,
156            stopped: false,
157            // non unique
158            meta_service: client.meta_service.clone(),
159            connector: client.connector,
160            log_event_added_transient_tx: client.log_event_added_transient_tx.clone(),
161            request_hook: client.request_hook.clone(),
162            iroh_enable_dht: client.iroh_enable_dht,
163            iroh_enable_next: client.iroh_enable_next,
164        }
165    }
166
167    /// Replace module generator registry entirely
168    pub fn with_module_inits(&mut self, module_inits: ClientModuleInitRegistry) {
169        self.module_inits = module_inits;
170    }
171
172    /// Make module generator available when reading the config
173    pub fn with_module<M: ClientModuleInit>(&mut self, module_init: M) {
174        self.module_inits.attach(module_init);
175    }
176
177    pub fn stopped(&mut self) {
178        self.stopped = true;
179    }
180    /// Build the [`Client`] with a custom wrapper around its api request logic
181    ///
182    /// This is intended to be used by downstream applications, e.g. to:
183    ///
184    /// * simulate offline mode,
185    /// * save battery when the OS indicates lack of connectivity,
186    /// * inject faults and delays for testing purposes,
187    /// * collect statistics and emit notifications.
188    pub fn with_api_request_hook(mut self, hook: ApiRequestHook) -> Self {
189        self.request_hook = hook;
190        self
191    }
192
193    pub fn with_meta_service(&mut self, meta_service: Arc<MetaService>) {
194        self.meta_service = meta_service;
195    }
196
197    /// Override if the DHT should be enabled when using Iroh to connect to
198    /// the federation
199    pub fn with_iroh_enable_dht(mut self, iroh_enable_dht: bool) -> Self {
200        self.iroh_enable_dht = iroh_enable_dht;
201        self
202    }
203
204    /// Override if the parallel unstable/next Iroh stack should be enabled when
205    /// using Iroh to connect to the federation
206    pub fn with_iroh_enable_next(mut self, iroh_enable_next: bool) -> Self {
207        self.iroh_enable_next = iroh_enable_next;
208        self
209    }
210
211    /// Migrate client module databases
212    ///
213    /// Note: Client core db migration are done immediately in
214    /// [`Client::builder`], to ensure db matches the code at all times,
215    /// while migrating modules requires figuring out what modules actually
216    /// are first.
217    async fn migrate_module_dbs(
218        &self,
219        db: &Database,
220        client_config: &ClientConfig,
221    ) -> anyhow::Result<()> {
222        for (module_id, module_cfg) in &client_config.modules {
223            let kind = module_cfg.kind.clone();
224            let Some(init) = self.module_inits.get(&kind) else {
225                // normal, expected and already logged about when building the client
226                continue;
227            };
228
229            let mut dbtx = db.begin_transaction().await;
230            apply_migrations_client_module_dbtx(
231                &mut dbtx.to_ref_nc(),
232                kind.to_string(),
233                init.get_database_migrations(),
234                *module_id,
235            )
236            .await?;
237            if let Some(used_db_prefixes) = init.used_db_prefixes()
238                && is_running_in_test_env()
239            {
240                verify_module_db_integrity_dbtx(
241                    &mut dbtx.to_ref_nc(),
242                    *module_id,
243                    kind,
244                    &used_db_prefixes,
245                )
246                .await;
247            }
248            dbtx.commit_tx_result().await?;
249        }
250
251        Ok(())
252    }
253
254    pub async fn load_existing_config(&self, db: &Database) -> anyhow::Result<ClientConfig> {
255        let Some(config) = Client::get_config_from_db(db).await else {
256            bail!("Client database not initialized")
257        };
258
259        Ok(config)
260    }
261
262    pub fn set_admin_creds(&mut self, creds: AdminCreds) {
263        self.admin_creds = Some(creds);
264    }
265
266    pub fn with_connector(&mut self, connector: ConnectorType) {
267        self.connector = connector;
268    }
269
270    #[cfg(feature = "tor")]
271    pub fn with_tor_connector(&mut self) {
272        self.with_connector(ConnectorType::tor());
273    }
274
275    #[allow(clippy::too_many_arguments)]
276    async fn init(
277        self,
278        connectors: ConnectorRegistry,
279        db_no_decoders: Database,
280        pre_root_secret: DerivableSecret,
281        config: ClientConfig,
282        api_secret: Option<String>,
283        init_mode: InitMode,
284        preview_prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
285        preview_prefetch_api_version_set: Option<
286            JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>,
287        >,
288    ) -> anyhow::Result<ClientHandle> {
289        if Client::is_initialized(&db_no_decoders).await {
290            bail!("Client database already initialized")
291        }
292
293        Client::run_core_migrations(&db_no_decoders).await?;
294
295        // Note: It's important all client initialization is performed as one big
296        // transaction to avoid half-initialized client state.
297        {
298            debug!(target: LOG_CLIENT, "Initializing client database");
299            let mut dbtx = db_no_decoders.begin_transaction().await;
300            // Save config to DB
301            dbtx.insert_new_entry(&crate::db::ClientConfigKey, &config)
302                .await;
303            dbtx.insert_entry(
304                &ClientPreRootSecretHashKey,
305                &pre_root_secret.derive_pre_root_secret_hash(),
306            )
307            .await;
308
309            if let Some(api_secret) = api_secret.as_ref() {
310                dbtx.insert_new_entry(&ApiSecretKey, api_secret).await;
311            }
312
313            let init_state = InitState::Pending(init_mode);
314            dbtx.insert_entry(&ClientInitStateKey, &init_state).await;
315
316            let metadata = init_state
317                .does_require_recovery()
318                .flatten()
319                .map_or(Metadata::empty(), |s| s.metadata);
320
321            dbtx.insert_new_entry(&ClientMetadataKey, &metadata).await;
322
323            dbtx.commit_tx_result().await?;
324        }
325
326        let stopped = self.stopped;
327        self.build(
328            connectors,
329            db_no_decoders,
330            pre_root_secret,
331            config,
332            api_secret,
333            stopped,
334            preview_prefetch_api_announcements,
335            preview_prefetch_api_version_set,
336        )
337        .await
338    }
339
340    pub async fn preview(
341        self,
342        connectors: ConnectorRegistry,
343        invite_code: &InviteCode,
344    ) -> anyhow::Result<ClientPreview> {
345        let (config, api) = self
346            .connector
347            .download_from_invite_code(&connectors, invite_code)
348            .await?;
349
350        let prefetch_api_announcements =
351            config
352                .global
353                .broadcast_public_keys
354                .clone()
355                .map(|guardian_pub_keys| {
356                    Jit::new({
357                        let api = api.clone();
358                        move || async move {
359                            // Fetching api announcements using invite urls before joining.
360                            // This ensures the client can communicated with
361                            // the Federation even if all the peers moved write them to database.
362                            fetch_api_announcements_from_at_least_num_of_peers(
363                                1,
364                                &api,
365                                &guardian_pub_keys,
366                                // If we can, we would love to get more than just one response,
367                                // but we need to wrap it up fast for good UX.
368                                Duration::from_millis(20),
369                            )
370                            .await
371                        }
372                    })
373                });
374
375        self.preview_inner(
376            connectors,
377            config,
378            invite_code.api_secret(),
379            Some(api),
380            prefetch_api_announcements,
381        )
382        .await
383    }
384
385    /// Use [`Self::preview`] instead
386    ///
387    /// If `reuse_api` is set, it will allow the preview to prefetch some data
388    /// to speed up the final join.
389    pub async fn preview_with_existing_config(
390        self,
391        connectors: ConnectorRegistry,
392        config: ClientConfig,
393        api_secret: Option<String>,
394    ) -> anyhow::Result<ClientPreview> {
395        self.preview_inner(connectors, config, api_secret, None, None)
396            .await
397    }
398
399    async fn preview_inner(
400        self,
401        connectors: ConnectorRegistry,
402        config: ClientConfig,
403        api_secret: Option<String>,
404        prefetch_api: Option<DynGlobalApi>,
405        prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
406    ) -> anyhow::Result<ClientPreview> {
407        let preview_prefetch_api_version_set = prefetch_api.map(|api| {
408            JitTry::new_try({
409                let config = config.clone();
410                || async move { Client::fetch_common_api_versions(&config, &api).await }
411            })
412        });
413
414        Ok(ClientPreview {
415            connectors,
416            inner: self,
417            config,
418            api_secret,
419            prefetch_api_announcements,
420            preview_prefetch_api_version_set,
421        })
422    }
423
424    pub async fn open(
425        self,
426        connectors: ConnectorRegistry,
427        db_no_decoders: Database,
428        pre_root_secret: RootSecret,
429    ) -> anyhow::Result<ClientHandle> {
430        Client::run_core_migrations(&db_no_decoders).await?;
431
432        // Check for pending config and migrate if present
433        Self::migrate_pending_config_if_present(&db_no_decoders).await;
434
435        let Some(config) = Client::get_config_from_db(&db_no_decoders).await else {
436            bail!("Client database not initialized")
437        };
438
439        let pre_root_secret = pre_root_secret.to_inner(config.calculate_federation_id());
440
441        match db_no_decoders
442            .begin_transaction_nc()
443            .await
444            .get_value(&ClientPreRootSecretHashKey)
445            .await
446        {
447            Some(secret_hash) => {
448                ensure!(
449                    pre_root_secret.derive_pre_root_secret_hash() == secret_hash,
450                    "Secret hash does not match. Incorrect secret"
451                );
452            }
453            _ => {
454                debug!(target: LOG_CLIENT, "Backfilling secret hash");
455                // Note: no need for dbtx autocommit, we are the only writer ATM
456                let mut dbtx = db_no_decoders.begin_transaction().await;
457                dbtx.insert_entry(
458                    &ClientPreRootSecretHashKey,
459                    &pre_root_secret.derive_pre_root_secret_hash(),
460                )
461                .await;
462                dbtx.commit_tx().await;
463            }
464        }
465
466        let api_secret = Client::get_api_secret_from_db(&db_no_decoders).await;
467        let stopped = self.stopped;
468        let request_hook = self.request_hook.clone();
469
470        let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
471        let client = self
472            .build_stopped(
473                connectors,
474                db_no_decoders,
475                pre_root_secret,
476                &config,
477                api_secret,
478                log_event_added_transient_tx,
479                request_hook,
480                None,
481                None,
482            )
483            .await?;
484        if !stopped {
485            client.as_inner().start_executor();
486        }
487        Ok(client)
488    }
489
490    /// Build a [`Client`] and start the executor
491    #[allow(clippy::too_many_arguments)]
492    pub(crate) async fn build(
493        self,
494        connectors: ConnectorRegistry,
495        db_no_decoders: Database,
496        pre_root_secret: DerivableSecret,
497        config: ClientConfig,
498        api_secret: Option<String>,
499        stopped: bool,
500        preview_prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
501        preview_prefetch_api_version_set: Option<
502            JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>,
503        >,
504    ) -> anyhow::Result<ClientHandle> {
505        let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
506        let request_hook = self.request_hook.clone();
507        let client = self
508            .build_stopped(
509                connectors,
510                db_no_decoders,
511                pre_root_secret,
512                &config,
513                api_secret,
514                log_event_added_transient_tx,
515                request_hook,
516                preview_prefetch_api_announcements,
517                preview_prefetch_api_version_set,
518            )
519            .await?;
520        if !stopped {
521            client.as_inner().start_executor();
522        }
523
524        Ok(client)
525    }
526
527    // TODO: remove config argument
528    /// Build a [`Client`] but do not start the executor
529    #[allow(clippy::too_many_arguments)]
530    async fn build_stopped(
531        self,
532        connectors: ConnectorRegistry,
533        db_no_decoders: Database,
534        pre_root_secret: DerivableSecret,
535        config: &ClientConfig,
536        api_secret: Option<String>,
537        log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
538        request_hook: ApiRequestHook,
539        preview_prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
540        preview_prefetch_api_version_set: Option<
541            JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>,
542        >,
543    ) -> anyhow::Result<ClientHandle> {
544        debug!(
545            target: LOG_CLIENT,
546            version = %fedimint_build_code_version_env!(),
547            "Building fedimint client",
548        );
549        let (log_event_added_tx, log_event_added_rx) = watch::channel(());
550        let (log_ordering_wakeup_tx, log_ordering_wakeup_rx) = watch::channel(());
551
552        let decoders = self.decoders(config);
553        let config = Self::config_decoded(config, &decoders)?;
554        let fed_id = config.calculate_federation_id();
555        let db = db_no_decoders.with_decoders(decoders.clone());
556        let connector = self.connector;
557        let peer_urls = get_api_urls(&db, &config).await;
558        let api = match self.admin_creds.as_ref() {
559            Some(admin_creds) => FederationApi::new(
560                connectors.clone(),
561                peer_urls,
562                Some(admin_creds.peer_id),
563                Some(&admin_creds.auth.0),
564            )
565            .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
566            .with_request_hook(&request_hook)
567            .with_cache()
568            .into(),
569            None => FederationApi::new(connectors.clone(), peer_urls, None, api_secret.as_deref())
570                .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
571                .with_request_hook(&request_hook)
572                .with_cache()
573                .into(),
574        };
575
576        let task_group = TaskGroup::new();
577
578        // Migrate the database before interacting with it in case any on-disk data
579        // structures have changed.
580        self.migrate_module_dbs(&db, &config).await?;
581
582        let init_state = Self::load_init_state(&db).await;
583
584        let notifier = Notifier::new();
585
586        if let Some(p) = preview_prefetch_api_announcements {
587            // We want to fail if we were unable to figure out
588            // current addresses of peers in the federation, as it will potentially never
589            // fix itself, so it's better to fail the join explicitly.
590            let announcements = p.get().await;
591
592            store_api_announcements_updates_from_peers(&db, announcements).await?
593        }
594
595        if let Some(preview_prefetch_api_version_set) = preview_prefetch_api_version_set {
596            match preview_prefetch_api_version_set.get_try().await {
597                Ok(peer_api_versions) => {
598                    Client::store_prefetched_api_versions(
599                        &db,
600                        &config,
601                        &self.module_inits,
602                        peer_api_versions,
603                    )
604                    .await;
605                }
606                Err(err) => {
607                    debug!(target: LOG_CLIENT, err = %err.fmt_compact(), "Prefetching api version negotiation failed");
608                }
609            }
610        }
611
612        let common_api_versions = Client::load_and_refresh_common_api_version_static(
613            &config,
614            &self.module_inits,
615            &api,
616            &db,
617            &task_group,
618        )
619        .await
620        .inspect_err(|err| {
621            warn!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Failed to discover API version to use.");
622        })
623        .unwrap_or(ApiVersionSet {
624            core: ApiVersion::new(0, 0),
625            // This will cause all modules to skip initialization
626            modules: BTreeMap::new(),
627        });
628
629        debug!(target: LOG_CLIENT, ?common_api_versions, "Completed api version negotiation");
630
631        // Asynchronously refetch client config and compare with existing
632        Self::load_and_refresh_client_config_static(&config, &api, &db, &task_group);
633
634        let mut module_recoveries: BTreeMap<
635            ModuleInstanceId,
636            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
637        > = BTreeMap::new();
638        let mut module_recovery_progress_receivers: BTreeMap<
639            ModuleInstanceId,
640            watch::Receiver<RecoveryProgress>,
641        > = BTreeMap::new();
642
643        let final_client = FinalClientIface::default();
644
645        let root_secret = Self::federation_root_secret(&pre_root_secret, &config);
646
647        let modules = {
648            let mut modules = ClientModuleRegistry::default();
649            for (module_instance_id, module_config) in config.modules.clone() {
650                let kind = module_config.kind().clone();
651                let Some(module_init) = self.module_inits.get(&kind).cloned() else {
652                    debug!(
653                        target: LOG_CLIENT,
654                        kind=%kind,
655                        instance_id=%module_instance_id,
656                        "Module kind of instance not found in module gens, skipping");
657                    continue;
658                };
659
660                let Some(&api_version) = common_api_versions.modules.get(&module_instance_id)
661                else {
662                    warn!(
663                        target: LOG_CLIENT,
664                        kind=%kind,
665                        instance_id=%module_instance_id,
666                        "Module kind of instance has incompatible api version, skipping"
667                    );
668                    continue;
669                };
670
671                // since the exact logic of when to start recovery is a bit gnarly,
672                // the recovery call is extracted here.
673                let start_module_recover_fn =
674                    |snapshot: Option<ClientBackup>, progress: RecoveryProgress| {
675                        let module_config = module_config.clone();
676                        let num_peers = NumPeers::from(config.global.api_endpoints.len());
677                        let db = db.clone();
678                        let kind = kind.clone();
679                        let notifier = notifier.clone();
680                        let api = api.clone();
681                        let root_secret = root_secret.clone();
682                        let admin_auth = self.admin_creds.as_ref().map(|creds| creds.auth.clone());
683                        let final_client = final_client.clone();
684                        let (progress_tx, progress_rx) = tokio::sync::watch::channel(progress);
685                        let task_group = task_group.clone();
686                        let module_init = module_init.clone();
687                        (
688                            Box::pin(async move {
689                                module_init
690                                    .recover(
691                                        final_client.clone(),
692                                        fed_id,
693                                        num_peers,
694                                        module_config.clone(),
695                                        db.clone(),
696                                        module_instance_id,
697                                        common_api_versions.core,
698                                        api_version,
699                                        root_secret.derive_module_secret(module_instance_id),
700                                        notifier.clone(),
701                                        api.clone(),
702                                        admin_auth,
703                                        snapshot.as_ref().and_then(|s| s.modules.get(&module_instance_id)),
704                                        progress_tx,
705                                        task_group,
706                                    )
707                                    .await
708                                    .inspect_err(|err| {
709                                        warn!(
710                                            target: LOG_CLIENT,
711                                            module_id = module_instance_id, %kind, err = %err.fmt_compact_anyhow(), "Module failed to recover"
712                                        );
713                                    })
714                            }),
715                            progress_rx,
716                        )
717                    };
718
719                let recovery = match init_state.does_require_recovery() {
720                    Some(snapshot) => {
721                        match db
722                            .begin_transaction_nc()
723                            .await
724                            .get_value(&ClientModuleRecovery { module_instance_id })
725                            .await
726                        {
727                            Some(module_recovery_state) => {
728                                if module_recovery_state.is_done() {
729                                    debug!(
730                                        id = %module_instance_id,
731                                        %kind, "Module recovery already complete"
732                                    );
733                                    None
734                                } else {
735                                    debug!(
736                                        id = %module_instance_id,
737                                        %kind,
738                                        progress = %module_recovery_state.progress,
739                                        "Starting module recovery with an existing progress"
740                                    );
741                                    Some(start_module_recover_fn(
742                                        snapshot,
743                                        module_recovery_state.progress,
744                                    ))
745                                }
746                            }
747                            _ => {
748                                let progress = RecoveryProgress::none();
749                                let mut dbtx = db.begin_transaction().await;
750                                dbtx.log_event(
751                                    log_ordering_wakeup_tx.clone(),
752                                    None,
753                                    ModuleRecoveryStarted::new(module_instance_id),
754                                )
755                                .await;
756                                dbtx.insert_entry(
757                                    &ClientModuleRecovery { module_instance_id },
758                                    &ClientModuleRecoveryState { progress },
759                                )
760                                .await;
761
762                                dbtx.commit_tx().await;
763
764                                debug!(
765                                    id = %module_instance_id,
766                                    %kind, "Starting new module recovery"
767                                );
768                                Some(start_module_recover_fn(snapshot, progress))
769                            }
770                        }
771                    }
772                    _ => None,
773                };
774
775                match recovery {
776                    Some((recovery, recovery_progress_rx)) => {
777                        module_recoveries.insert(module_instance_id, recovery);
778                        module_recovery_progress_receivers
779                            .insert(module_instance_id, recovery_progress_rx);
780                    }
781                    _ => {
782                        let module = module_init
783                            .init(
784                                final_client.clone(),
785                                fed_id,
786                                config.global.api_endpoints.len(),
787                                module_config,
788                                db.clone(),
789                                module_instance_id,
790                                common_api_versions.core,
791                                api_version,
792                                // This is a divergence from the legacy client, where the child
793                                // secret keys were derived using
794                                // *module kind*-specific derivation paths.
795                                // Since the new client has to support multiple, segregated modules
796                                // of the same kind we have to use
797                                // the instance id instead.
798                                root_secret.derive_module_secret(module_instance_id),
799                                notifier.clone(),
800                                api.clone(),
801                                self.admin_creds.as_ref().map(|cred| cred.auth.clone()),
802                                task_group.clone(),
803                                connectors.clone(),
804                            )
805                            .await?;
806
807                        modules.register_module(module_instance_id, kind, module);
808                    }
809                }
810            }
811            modules
812        };
813
814        if init_state.is_pending() && module_recoveries.is_empty() {
815            let mut dbtx = db.begin_transaction().await;
816            dbtx.insert_entry(&ClientInitStateKey, &init_state.into_complete())
817                .await;
818            dbtx.commit_tx().await;
819        }
820
821        let mut primary_modules: BTreeMap<PrimaryModulePriority, PrimaryModuleCandidates> =
822            BTreeMap::new();
823
824        for (module_id, _kind, module) in modules.iter_modules() {
825            match module.supports_being_primary() {
826                PrimaryModuleSupport::Any { priority } => {
827                    primary_modules
828                        .entry(priority)
829                        .or_default()
830                        .wildcard
831                        .push(module_id);
832                }
833                PrimaryModuleSupport::Selected { priority, units } => {
834                    for unit in units {
835                        primary_modules
836                            .entry(priority)
837                            .or_default()
838                            .specific
839                            .entry(unit)
840                            .or_default()
841                            .push(module_id);
842                    }
843                }
844                PrimaryModuleSupport::None => {}
845            }
846        }
847
848        let executor = {
849            let mut executor_builder = Executor::builder();
850            executor_builder
851                .with_module(TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext);
852
853            for (module_instance_id, _, module) in modules.iter_modules() {
854                executor_builder.with_module_dyn(module.context(module_instance_id));
855            }
856
857            for module_instance_id in module_recoveries.keys() {
858                executor_builder.with_valid_module_id(*module_instance_id);
859            }
860
861            executor_builder.build(
862                db.clone(),
863                notifier,
864                task_group.clone(),
865                log_ordering_wakeup_tx.clone(),
866            )
867        };
868
869        let recovery_receiver_init_val = module_recovery_progress_receivers
870            .iter()
871            .map(|(module_instance_id, rx)| (*module_instance_id, *rx.borrow()))
872            .collect::<BTreeMap<_, _>>();
873        let (client_recovery_progress_sender, client_recovery_progress_receiver) =
874            watch::channel(recovery_receiver_init_val);
875
876        let client_inner = Arc::new(Client {
877            final_client: final_client.clone(),
878            config: tokio::sync::RwLock::new(config.clone()),
879            api_secret,
880            decoders,
881            db: db.clone(),
882            connectors,
883            federation_id: fed_id,
884            federation_config_meta: config.global.meta,
885            primary_modules,
886            modules,
887            module_inits: self.module_inits.clone(),
888            log_ordering_wakeup_tx,
889            log_event_added_rx,
890            log_event_added_transient_tx: log_event_added_transient_tx.clone(),
891            request_hook,
892            executor,
893            api,
894            secp_ctx: Secp256k1::new(),
895            root_secret,
896            task_group,
897            operation_log: OperationLog::new(db.clone()),
898            client_recovery_progress_receiver,
899            meta_service: self.meta_service,
900            connector,
901            iroh_enable_dht: self.iroh_enable_dht,
902            iroh_enable_next: self.iroh_enable_next,
903        });
904        client_inner
905            .task_group
906            .spawn_cancellable("MetaService::update_continuously", {
907                let client_inner = client_inner.clone();
908                async move {
909                    client_inner
910                        .meta_service
911                        .update_continuously(&client_inner)
912                        .await;
913                }
914            });
915
916        client_inner.task_group.spawn_cancellable(
917            "update-api-announcements",
918            run_api_announcement_refresh_task(client_inner.clone()),
919        );
920
921        client_inner.task_group.spawn_cancellable(
922            "event log ordering task",
923            run_event_log_ordering_task(
924                db.clone(),
925                log_ordering_wakeup_rx,
926                log_event_added_tx,
927                log_event_added_transient_tx,
928            ),
929        );
930        let client_iface = std::sync::Arc::<Client>::downgrade(&client_inner);
931
932        let client_arc = ClientHandle::new(client_inner);
933
934        for (_, _, module) in client_arc.modules.iter_modules() {
935            module.start().await;
936        }
937
938        final_client.set(client_iface.clone());
939
940        if !module_recoveries.is_empty() {
941            client_arc.spawn_module_recoveries_task(
942                client_recovery_progress_sender,
943                module_recoveries,
944                module_recovery_progress_receivers,
945            );
946        }
947
948        Ok(client_arc)
949    }
950
951    async fn load_init_state(db: &Database) -> InitState {
952        let mut dbtx = db.begin_transaction_nc().await;
953        dbtx.get_value(&ClientInitStateKey)
954            .await
955            .unwrap_or_else(|| {
956                // could be turned in a hard error in the future, but for now
957                // no need to break backward compat.
958                warn!(
959                    target: LOG_CLIENT,
960                    "Client missing ClientRequiresRecovery: assuming complete"
961                );
962                db::InitState::Complete(db::InitModeComplete::Fresh)
963            })
964    }
965
966    fn decoders(&self, config: &ClientConfig) -> ModuleDecoderRegistry {
967        let mut decoders = client_decoders(
968            &self.module_inits,
969            config
970                .modules
971                .iter()
972                .map(|(module_instance, module_config)| (*module_instance, module_config.kind())),
973        );
974
975        decoders.register_module(
976            TRANSACTION_SUBMISSION_MODULE_INSTANCE,
977            ModuleKind::from_static_str("tx_submission"),
978            tx_submission_sm_decoder(),
979        );
980
981        decoders
982    }
983
984    fn config_decoded(
985        config: &ClientConfig,
986        decoders: &ModuleDecoderRegistry,
987    ) -> Result<ClientConfig, fedimint_core::encoding::DecodeError> {
988        config.clone().redecode_raw(decoders)
989    }
990
991    /// Re-derive client's `root_secret` using the federation ID. This
992    /// eliminates the possibility of having the same client `root_secret`
993    /// across multiple federations.
994    fn federation_root_secret(
995        pre_root_secret: &DerivableSecret,
996        config: &ClientConfig,
997    ) -> DerivableSecret {
998        pre_root_secret.federation_key(&config.global.calculate_federation_id())
999    }
1000
1001    /// Register to receiver all new transient (unpersisted) events
1002    pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
1003        self.log_event_added_transient_tx.subscribe()
1004    }
1005
1006    /// Check for pending config and migrate it if present.
1007    /// Returns the config to use (either the original or the migrated pending
1008    /// config).
1009    async fn migrate_pending_config_if_present(db: &Database) {
1010        if let Some(pending_config) = Client::get_pending_config_from_db(db).await {
1011            debug!(target: LOG_CLIENT, "Found pending client config, migrating to current config");
1012
1013            let mut dbtx = db.begin_transaction().await;
1014            // Update the main config with the pending config
1015            dbtx.insert_entry(&crate::db::ClientConfigKey, &pending_config)
1016                .await;
1017            // Remove the pending config
1018            dbtx.remove_entry(&PendingClientConfigKey).await;
1019            dbtx.commit_tx().await;
1020
1021            debug!(target: LOG_CLIENT, "Successfully migrated pending config to current config");
1022        }
1023    }
1024
1025    /// Asynchronously refetch client config from federation and compare with
1026    /// existing. If different, save to pending config in database.
1027    fn load_and_refresh_client_config_static(
1028        config: &ClientConfig,
1029        api: &DynGlobalApi,
1030        db: &Database,
1031        task_group: &TaskGroup,
1032    ) {
1033        let config = config.clone();
1034        let api = api.clone();
1035        let db = db.clone();
1036        let task_group = task_group.clone();
1037
1038        // Spawn background task to refetch config
1039        task_group.spawn_cancellable("refresh_client_config_static", async move {
1040            Self::refresh_client_config_static(&config, &api, &db).await;
1041        });
1042    }
1043
1044    /// Wrapper that handles errors from config refresh with proper logging
1045    async fn refresh_client_config_static(
1046        config: &ClientConfig,
1047        api: &DynGlobalApi,
1048        db: &Database,
1049    ) {
1050        if let Err(error) = Self::refresh_client_config_static_try(config, api, db).await {
1051            warn!(
1052                target: LOG_CLIENT,
1053                err = %error.fmt_compact_anyhow(), "Failed to refresh client config"
1054            );
1055        }
1056    }
1057
1058    /// Validate that a config update is valid
1059    fn validate_config_update(
1060        current_config: &ClientConfig,
1061        new_config: &ClientConfig,
1062    ) -> anyhow::Result<()> {
1063        // Global config must not change
1064        if current_config.global != new_config.global {
1065            bail!("Global configuration changes are not allowed in config updates");
1066        }
1067
1068        // Modules can only be added, existing ones must stay the same
1069        for (module_id, current_module_config) in &current_config.modules {
1070            match new_config.modules.get(module_id) {
1071                Some(new_module_config) => {
1072                    if current_module_config != new_module_config {
1073                        bail!(
1074                            "Module {} configuration changes are not allowed, only additions are permitted",
1075                            module_id
1076                        );
1077                    }
1078                }
1079                None => {
1080                    bail!(
1081                        "Module {} was removed in new config, only additions are allowed",
1082                        module_id
1083                    );
1084                }
1085            }
1086        }
1087
1088        Ok(())
1089    }
1090
1091    /// Refetch client config from federation and save as pending if different
1092    async fn refresh_client_config_static_try(
1093        current_config: &ClientConfig,
1094        api: &DynGlobalApi,
1095        db: &Database,
1096    ) -> anyhow::Result<()> {
1097        debug!(target: LOG_CLIENT, "Refreshing client config");
1098
1099        // Fetch latest config from federation
1100        let fetched_config = api
1101            .request_current_consensus::<ClientConfig>(
1102                CLIENT_CONFIG_ENDPOINT.to_owned(),
1103                ApiRequestErased::default(),
1104            )
1105            .await?;
1106
1107        // Validate the new config before proceeding
1108        Self::validate_config_update(current_config, &fetched_config)?;
1109
1110        // Compare with current config
1111        if current_config != &fetched_config {
1112            debug!(target: LOG_CLIENT, "Detected federation config change, saving as pending config");
1113
1114            let mut dbtx = db.begin_transaction().await;
1115            dbtx.insert_entry(&PendingClientConfigKey, &fetched_config)
1116                .await;
1117            dbtx.commit_tx().await;
1118        } else {
1119            debug!(target: LOG_CLIENT, "No federation config changes detected");
1120        }
1121
1122        Ok(())
1123    }
1124}
1125
1126/// An intermediate step before Client joining or recovering
1127///
1128/// Meant to support showing user some initial information about the Federation
1129/// before actually joining.
1130pub struct ClientPreview {
1131    inner: ClientBuilder,
1132    config: ClientConfig,
1133    connectors: ConnectorRegistry,
1134    api_secret: Option<String>,
1135    prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
1136    preview_prefetch_api_version_set:
1137        Option<JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>>,
1138}
1139
1140impl ClientPreview {
1141    /// Get the config
1142    pub fn config(&self) -> &ClientConfig {
1143        &self.config
1144    }
1145
1146    /// Join a new Federation
1147    ///
1148    /// When a user wants to connect to a new federation this function fetches
1149    /// the federation config and initializes the client database. If a user
1150    /// already joined the federation in the past and has a preexisting database
1151    /// use [`ClientBuilder::open`] instead.
1152    ///
1153    /// **Warning**: Calling `join` with a `root_secret` key that was used
1154    /// previous to `join` a Federation will lead to all sorts of malfunctions
1155    /// including likely loss of funds.
1156    ///
1157    /// This should be generally called only if the `root_secret` key is known
1158    /// not to have been used before (e.g. just randomly generated). For keys
1159    /// that might have been previous used (e.g. provided by the user),
1160    /// it's safer to call [`Self::recover`] which will attempt to recover
1161    /// client module states for the Federation.
1162    ///
1163    /// A typical "join federation" flow would look as follows:
1164    /// ```no_run
1165    /// # use std::str::FromStr;
1166    /// # use fedimint_core::invite_code::InviteCode;
1167    /// # use fedimint_core::config::ClientConfig;
1168    /// # use fedimint_derive_secret::DerivableSecret;
1169    /// # use fedimint_client::{Client, ClientBuilder, RootSecret};
1170    /// # use fedimint_connectors::ConnectorRegistry;
1171    /// # use fedimint_core::db::Database;
1172    /// # use fedimint_core::config::META_FEDERATION_NAME_KEY;
1173    /// #
1174    /// # #[tokio::main]
1175    /// # async fn main() -> anyhow::Result<()> {
1176    /// # let root_secret: DerivableSecret = unimplemented!();
1177    /// // Create a root secret, e.g. via fedimint-bip39, see also:
1178    /// // https://github.com/fedimint/fedimint/blob/master/docs/secret_derivation.md
1179    /// // let root_secret = …;
1180    ///
1181    /// // Get invite code from user
1182    /// let invite_code = InviteCode::from_str("fed11qgqpw9thwvaz7te3xgmjuvpwxqhrzw3jxumrvvf0qqqjpetvlg8glnpvzcufhffgzhv8m75f7y34ryk7suamh8x7zetly8h0v9v0rm")
1183    ///     .expect("Invalid invite code");
1184    ///
1185    /// // Tell the user the federation name, bitcoin network
1186    /// // (e.g. from wallet module config), and other details
1187    /// // that are typically contained in the federation's
1188    /// // meta fields.
1189    ///
1190    /// // let network = config.get_first_module_by_kind::<WalletClientConfig>("wallet")
1191    /// //     .expect("Module not found")
1192    /// //     .network;
1193    ///
1194    /// // Open the client's database, using the federation ID
1195    /// // as the DB name is a common pattern:
1196    ///
1197    /// // let db_path = format!("./path/to/db/{}", config.federation_id());
1198    /// // let db = RocksDb::open(db_path).expect("error opening DB");
1199    /// # let db: Database = unimplemented!();
1200    /// # let connectors: ConnectorRegistry = unimplemented!();
1201    ///
1202    /// let preview = Client::builder().await
1203    ///     // Mount the modules the client should support:
1204    ///     // .with_module(LightningClientInit)
1205    ///     // .with_module(MintClientInit)
1206    ///     // .with_module(WalletClientInit::default())
1207    ///      .expect("Error building client")
1208    ///      .preview(connectors, &invite_code).await?;
1209    ///
1210    /// println!(
1211    ///     "The federation name is: {}",
1212    ///     preview.config().meta::<String>(META_FEDERATION_NAME_KEY)
1213    ///         .expect("Could not decode name field")
1214    ///         .expect("Name isn't set")
1215    /// );
1216    ///
1217    /// let client = preview
1218    ///     .join(db, RootSecret::StandardDoubleDerive(root_secret))
1219    ///     .await
1220    ///     .expect("Error joining federation");
1221    /// # Ok(())
1222    /// # }
1223    /// ```
1224    pub async fn join(
1225        self,
1226        db_no_decoders: Database,
1227        pre_root_secret: RootSecret,
1228    ) -> anyhow::Result<ClientHandle> {
1229        let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1230
1231        let client = self
1232            .inner
1233            .init(
1234                self.connectors,
1235                db_no_decoders,
1236                pre_root_secret,
1237                self.config,
1238                self.api_secret,
1239                InitMode::Fresh,
1240                self.prefetch_api_announcements,
1241                self.preview_prefetch_api_version_set,
1242            )
1243            .await?;
1244
1245        Ok(client)
1246    }
1247
1248    /// Join a (possibly) previous joined Federation
1249    ///
1250    /// Unlike [`Self::join`], `recover` will run client module
1251    /// recovery for each client module attempting to recover any previous
1252    /// module state.
1253    ///
1254    /// Recovery process takes time during which each recovering client module
1255    /// will not be available for use.
1256    ///
1257    /// Calling `recovery` with a `root_secret` that was not actually previous
1258    /// used in a given Federation is safe.
1259    pub async fn recover(
1260        self,
1261        db_no_decoders: Database,
1262        pre_root_secret: RootSecret,
1263        backup: Option<ClientBackup>,
1264    ) -> anyhow::Result<ClientHandle> {
1265        let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1266
1267        let client = self
1268            .inner
1269            .init(
1270                self.connectors,
1271                db_no_decoders,
1272                pre_root_secret,
1273                self.config,
1274                self.api_secret,
1275                InitMode::Recover {
1276                    snapshot: backup.clone(),
1277                },
1278                self.prefetch_api_announcements,
1279                self.preview_prefetch_api_version_set,
1280            )
1281            .await?;
1282
1283        Ok(client)
1284    }
1285
1286    /// Download most recent valid backup found from the Federation
1287    pub async fn download_backup_from_federation(
1288        &self,
1289        pre_root_secret: RootSecret,
1290    ) -> anyhow::Result<Option<ClientBackup>> {
1291        let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1292        let api = DynGlobalApi::new(
1293            self.connectors.clone(),
1294            // TODO: change join logic to use FederationId v2
1295            self.config
1296                .global
1297                .api_endpoints
1298                .iter()
1299                .map(|(peer_id, peer_url)| (*peer_id, peer_url.url.clone()))
1300                .collect(),
1301            self.api_secret.as_deref(),
1302        )?;
1303
1304        Client::download_backup_from_federation_static(
1305            &api,
1306            &ClientBuilder::federation_root_secret(&pre_root_secret, &self.config),
1307            &self.inner.decoders(&self.config),
1308        )
1309        .await
1310    }
1311}