fedimint_client/client/
builder.rs

1use std::collections::BTreeMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::time::Duration;
6
7use anyhow::{bail, ensure};
8use bitcoin::key::Secp256k1;
9use fedimint_api_client::api::global_api::with_cache::GlobalFederationApiWithCacheExt as _;
10use fedimint_api_client::api::global_api::with_request_hook::{
11    ApiRequestHook, RawFederationApiWithRequestHookExt as _,
12};
13use fedimint_api_client::api::net::ConnectorType;
14use fedimint_api_client::api::{ApiVersionSet, DynGlobalApi, FederationApi, FederationApiExt as _};
15use fedimint_client_module::api::ClientRawFederationApiExt as _;
16use fedimint_client_module::meta::LegacyMetaSource;
17use fedimint_client_module::module::init::ClientModuleInit;
18use fedimint_client_module::module::recovery::RecoveryProgress;
19use fedimint_client_module::module::{
20    ClientModuleRegistry, FinalClientIface, PrimaryModulePriority, PrimaryModuleSupport,
21};
22use fedimint_client_module::secret::{DeriveableSecretClientExt as _, get_default_client_secret};
23use fedimint_client_module::transaction::{
24    TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext, tx_submission_sm_decoder,
25};
26use fedimint_client_module::{AdminCreds, ModuleRecoveryStarted};
27use fedimint_connectors::ConnectorRegistry;
28use fedimint_core::config::{ClientConfig, FederationId, ModuleInitRegistry};
29use fedimint_core::core::{ModuleInstanceId, ModuleKind};
30use fedimint_core::db::{
31    Database, IDatabaseTransactionOpsCoreTyped as _, verify_module_db_integrity_dbtx,
32};
33use fedimint_core::endpoint_constants::CLIENT_CONFIG_ENDPOINT;
34use fedimint_core::envs::is_running_in_test_env;
35use fedimint_core::invite_code::InviteCode;
36use fedimint_core::module::registry::ModuleDecoderRegistry;
37use fedimint_core::module::{ApiRequestErased, ApiVersion, SupportedApiVersionsSummary};
38use fedimint_core::task::TaskGroup;
39use fedimint_core::task::jit::{Jit, JitTry, JitTryAnyhow};
40use fedimint_core::util::{FmtCompact as _, FmtCompactAnyhow as _};
41use fedimint_core::{NumPeers, PeerId, fedimint_build_code_version_env, maybe_add_send};
42use fedimint_derive_secret::DerivableSecret;
43use fedimint_eventlog::{
44    DBTransactionEventLogExt as _, EventLogEntry, run_event_log_ordering_task,
45};
46use fedimint_logging::LOG_CLIENT;
47use tokio::sync::{broadcast, watch};
48use tracing::{debug, trace, warn};
49
50use super::handle::ClientHandle;
51use super::{Client, client_decoders};
52use crate::api_announcements::{
53    PeersSignedApiAnnouncements, fetch_api_announcements_from_at_least_num_of_peers, get_api_urls,
54    run_api_announcement_refresh_task, store_api_announcements_updates_from_peers,
55};
56use crate::backup::{ClientBackup, Metadata};
57use crate::client::PrimaryModuleCandidates;
58use crate::db::{
59    self, ApiSecretKey, ClientInitStateKey, ClientMetadataKey, ClientModuleRecovery,
60    ClientModuleRecoveryState, ClientPreRootSecretHashKey, InitMode, InitState,
61    PendingClientConfigKey, apply_migrations_client_module_dbtx,
62};
63use crate::meta::MetaService;
64use crate::module_init::ClientModuleInitRegistry;
65use crate::oplog::OperationLog;
66use crate::sm::executor::Executor;
67use crate::sm::notifier::Notifier;
68
69/// The type of root secret hashing
70///
71/// *Please read this documentation carefully if, especially if you're upgrading
72/// downstream Fedimint client application.*
73///
74/// Internally, client will always hash-in federation id
75/// to the root secret provided to the [`ClientBuilder`],
76/// to ensure a different actual root secret is used for ever federation.
77/// This makes reusing a single root secret for different federations
78/// in a multi-federation client, perfectly fine, and frees the client
79/// from worrying about `FederationId`.
80///
81/// However, in the past Fedimint applications (including `fedimint-cli`)
82/// were doing the hashing-in of `FederationId` outside of `fedimint-client` as
83/// well, which lead to effectively doing it twice, and pushed downloading of
84/// the client config on join to application code, a sub-optimal API, especially
85/// after joining federation needed to handle even more functionality.
86///
87/// To keep the interoperability of the seed phrases this double-derivation
88/// is preserved, due to other architectural reason, `fedimint-client`
89/// will now do the outer-derivation internally as well.
90#[derive(Clone)]
91pub enum RootSecret {
92    /// Derive an extra round of federation-id to the secret, like
93    /// Fedimint applications were doing manually in the past.
94    ///
95    /// **Note**: Applications MUST NOT do the derivation themselves anymore.
96    StandardDoubleDerive(DerivableSecret),
97    /// No double derivation
98    ///
99    /// This is useful for applications that for whatever reason do the
100    /// double-derivation externally, or use a custom scheme.
101    Custom(DerivableSecret),
102}
103
104impl RootSecret {
105    fn to_inner(&self, federation_id: FederationId) -> DerivableSecret {
106        match self {
107            RootSecret::StandardDoubleDerive(derivable_secret) => {
108                get_default_client_secret(derivable_secret, &federation_id)
109            }
110            RootSecret::Custom(derivable_secret) => derivable_secret.clone(),
111        }
112    }
113}
114
115/// Used to configure, assemble and build [`Client`]
116pub struct ClientBuilder {
117    module_inits: ClientModuleInitRegistry,
118    admin_creds: Option<AdminCreds>,
119    meta_service: Arc<crate::meta::MetaService>,
120    connector: ConnectorType,
121    stopped: bool,
122    log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
123    request_hook: ApiRequestHook,
124    iroh_enable_dht: bool,
125    iroh_enable_next: bool,
126}
127
128impl ClientBuilder {
129    pub(crate) fn new() -> Self {
130        trace!(
131            target: LOG_CLIENT,
132            version = %fedimint_build_code_version_env!(),
133            "Initializing fedimint client",
134        );
135        let meta_service = MetaService::new(LegacyMetaSource::default());
136        let (log_event_added_transient_tx, _log_event_added_transient_rx) =
137            broadcast::channel(1024);
138
139        ClientBuilder {
140            module_inits: ModuleInitRegistry::new(),
141            connector: ConnectorType::default(),
142            admin_creds: None,
143            stopped: false,
144            meta_service,
145            log_event_added_transient_tx,
146            request_hook: Arc::new(|api| api),
147            iroh_enable_dht: true,
148            iroh_enable_next: true,
149        }
150    }
151
152    pub(crate) fn from_existing(client: &Client) -> Self {
153        ClientBuilder {
154            module_inits: client.module_inits.clone(),
155            admin_creds: None,
156            stopped: false,
157            // non unique
158            meta_service: client.meta_service.clone(),
159            connector: client.connector,
160            log_event_added_transient_tx: client.log_event_added_transient_tx.clone(),
161            request_hook: client.request_hook.clone(),
162            iroh_enable_dht: client.iroh_enable_dht,
163            iroh_enable_next: client.iroh_enable_next,
164        }
165    }
166
167    /// Replace module generator registry entirely
168    pub fn with_module_inits(&mut self, module_inits: ClientModuleInitRegistry) {
169        self.module_inits = module_inits;
170    }
171
172    /// Make module generator available when reading the config
173    pub fn with_module<M: ClientModuleInit>(&mut self, module_init: M) {
174        self.module_inits.attach(module_init);
175    }
176
177    pub fn stopped(&mut self) {
178        self.stopped = true;
179    }
180    /// Build the [`Client`] with a custom wrapper around its api request logic
181    ///
182    /// This is intended to be used by downstream applications, e.g. to:
183    ///
184    /// * simulate offline mode,
185    /// * save battery when the OS indicates lack of connectivity,
186    /// * inject faults and delays for testing purposes,
187    /// * collect statistics and emit notifications.
188    pub fn with_api_request_hook(mut self, hook: ApiRequestHook) -> Self {
189        self.request_hook = hook;
190        self
191    }
192
193    pub fn with_meta_service(&mut self, meta_service: Arc<MetaService>) {
194        self.meta_service = meta_service;
195    }
196
197    /// Override if the DHT should be enabled when using Iroh to connect to
198    /// the federation
199    pub fn with_iroh_enable_dht(mut self, iroh_enable_dht: bool) -> Self {
200        self.iroh_enable_dht = iroh_enable_dht;
201        self
202    }
203
204    /// Override if the parallel unstable/next Iroh stack should be enabled when
205    /// using Iroh to connect to the federation
206    pub fn with_iroh_enable_next(mut self, iroh_enable_next: bool) -> Self {
207        self.iroh_enable_next = iroh_enable_next;
208        self
209    }
210
211    /// Migrate client module databases
212    ///
213    /// Note: Client core db migration are done immediately in
214    /// [`Client::builder`], to ensure db matches the code at all times,
215    /// while migrating modules requires figuring out what modules actually
216    /// are first.
217    async fn migrate_module_dbs(
218        &self,
219        db: &Database,
220        client_config: &ClientConfig,
221    ) -> anyhow::Result<()> {
222        for (module_id, module_cfg) in &client_config.modules {
223            let kind = module_cfg.kind.clone();
224            let Some(init) = self.module_inits.get(&kind) else {
225                // normal, expected and already logged about when building the client
226                continue;
227            };
228
229            let mut dbtx = db.begin_transaction().await;
230            apply_migrations_client_module_dbtx(
231                &mut dbtx.to_ref_nc(),
232                kind.to_string(),
233                init.get_database_migrations(),
234                *module_id,
235            )
236            .await?;
237            if let Some(used_db_prefixes) = init.used_db_prefixes()
238                && is_running_in_test_env()
239            {
240                verify_module_db_integrity_dbtx(
241                    &mut dbtx.to_ref_nc(),
242                    *module_id,
243                    kind,
244                    &used_db_prefixes,
245                )
246                .await;
247            }
248            dbtx.commit_tx_result().await?;
249        }
250
251        Ok(())
252    }
253
254    pub async fn load_existing_config(&self, db: &Database) -> anyhow::Result<ClientConfig> {
255        let Some(config) = Client::get_config_from_db(db).await else {
256            bail!("Client database not initialized")
257        };
258
259        Ok(config)
260    }
261
262    pub fn set_admin_creds(&mut self, creds: AdminCreds) {
263        self.admin_creds = Some(creds);
264    }
265
266    pub fn with_connector(&mut self, connector: ConnectorType) {
267        self.connector = connector;
268    }
269
270    #[cfg(feature = "tor")]
271    pub fn with_tor_connector(&mut self) {
272        self.with_connector(ConnectorType::tor());
273    }
274
275    #[allow(clippy::too_many_arguments)]
276    async fn init(
277        self,
278        connectors: ConnectorRegistry,
279        db_no_decoders: Database,
280        pre_root_secret: DerivableSecret,
281        config: ClientConfig,
282        api_secret: Option<String>,
283        init_mode: InitMode,
284        preview_prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
285        preview_prefetch_api_version_set: Option<
286            JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>,
287        >,
288    ) -> anyhow::Result<ClientHandle> {
289        if Client::is_initialized(&db_no_decoders).await {
290            bail!("Client database already initialized")
291        }
292
293        Client::run_core_migrations(&db_no_decoders).await?;
294
295        // Note: It's important all client initialization is performed as one big
296        // transaction to avoid half-initialized client state.
297        {
298            debug!(target: LOG_CLIENT, "Initializing client database");
299            let mut dbtx = db_no_decoders.begin_transaction().await;
300            // Save config to DB
301            dbtx.insert_new_entry(&crate::db::ClientConfigKey, &config)
302                .await;
303            dbtx.insert_entry(
304                &ClientPreRootSecretHashKey,
305                &pre_root_secret.derive_pre_root_secret_hash(),
306            )
307            .await;
308
309            if let Some(api_secret) = api_secret.as_ref() {
310                dbtx.insert_new_entry(&ApiSecretKey, api_secret).await;
311            }
312
313            let init_state = InitState::Pending(init_mode);
314            dbtx.insert_entry(&ClientInitStateKey, &init_state).await;
315
316            let metadata = init_state
317                .does_require_recovery()
318                .flatten()
319                .map_or(Metadata::empty(), |s| s.metadata);
320
321            dbtx.insert_new_entry(&ClientMetadataKey, &metadata).await;
322
323            dbtx.commit_tx_result().await?;
324        }
325
326        let stopped = self.stopped;
327        self.build(
328            connectors,
329            db_no_decoders,
330            pre_root_secret,
331            config,
332            api_secret,
333            stopped,
334            preview_prefetch_api_announcements,
335            preview_prefetch_api_version_set,
336        )
337        .await
338    }
339
340    pub async fn preview(
341        self,
342        connectors: ConnectorRegistry,
343        invite_code: &InviteCode,
344    ) -> anyhow::Result<ClientPreview> {
345        let (config, api) = self
346            .connector
347            .download_from_invite_code(&connectors, invite_code)
348            .await?;
349
350        let prefetch_api_announcements =
351            config
352                .global
353                .broadcast_public_keys
354                .clone()
355                .map(|guardian_pub_keys| {
356                    Jit::new({
357                        let api = api.clone();
358                        move || async move {
359                            // Fetching api announcements using invite urls before joining.
360                            // This ensures the client can communicated with
361                            // the Federation even if all the peers moved write them to database.
362                            fetch_api_announcements_from_at_least_num_of_peers(
363                                1,
364                                &api,
365                                &guardian_pub_keys,
366                                // If we can, we would love to get more than just one response,
367                                // but we need to wrap it up fast for good UX.
368                                Duration::from_millis(20),
369                            )
370                            .await
371                        }
372                    })
373                });
374
375        self.preview_inner(
376            connectors,
377            config,
378            invite_code.api_secret(),
379            Some(api),
380            prefetch_api_announcements,
381        )
382        .await
383    }
384
385    /// Use [`Self::preview`] instead
386    ///
387    /// If `reuse_api` is set, it will allow the preview to prefetch some data
388    /// to speed up the final join.
389    pub async fn preview_with_existing_config(
390        self,
391        connectors: ConnectorRegistry,
392        config: ClientConfig,
393        api_secret: Option<String>,
394    ) -> anyhow::Result<ClientPreview> {
395        self.preview_inner(connectors, config, api_secret, None, None)
396            .await
397    }
398
399    async fn preview_inner(
400        self,
401        connectors: ConnectorRegistry,
402        config: ClientConfig,
403        api_secret: Option<String>,
404        prefetch_api: Option<DynGlobalApi>,
405        prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
406    ) -> anyhow::Result<ClientPreview> {
407        let preview_prefetch_api_version_set = prefetch_api.map(|api| {
408            JitTry::new_try({
409                let config = config.clone();
410                || async move { Client::fetch_common_api_versions(&config, &api).await }
411            })
412        });
413
414        Ok(ClientPreview {
415            connectors,
416            inner: self,
417            config,
418            api_secret,
419            prefetch_api_announcements,
420            preview_prefetch_api_version_set,
421        })
422    }
423
424    pub async fn open(
425        self,
426        connectors: ConnectorRegistry,
427        db_no_decoders: Database,
428        pre_root_secret: RootSecret,
429    ) -> anyhow::Result<ClientHandle> {
430        Client::run_core_migrations(&db_no_decoders).await?;
431
432        // Check for pending config and migrate if present
433        Self::migrate_pending_config_if_present(&db_no_decoders).await;
434
435        let Some(config) = Client::get_config_from_db(&db_no_decoders).await else {
436            bail!("Client database not initialized")
437        };
438
439        let pre_root_secret = pre_root_secret.to_inner(config.calculate_federation_id());
440
441        match db_no_decoders
442            .begin_transaction_nc()
443            .await
444            .get_value(&ClientPreRootSecretHashKey)
445            .await
446        {
447            Some(secret_hash) => {
448                ensure!(
449                    pre_root_secret.derive_pre_root_secret_hash() == secret_hash,
450                    "Secret hash does not match. Incorrect secret"
451                );
452            }
453            _ => {
454                debug!(target: LOG_CLIENT, "Backfilling secret hash");
455                // Note: no need for dbtx autocommit, we are the only writer ATM
456                let mut dbtx = db_no_decoders.begin_transaction().await;
457                dbtx.insert_entry(
458                    &ClientPreRootSecretHashKey,
459                    &pre_root_secret.derive_pre_root_secret_hash(),
460                )
461                .await;
462                dbtx.commit_tx().await;
463            }
464        }
465
466        let api_secret = Client::get_api_secret_from_db(&db_no_decoders).await;
467        let stopped = self.stopped;
468        let request_hook = self.request_hook.clone();
469
470        let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
471        let client = self
472            .build_stopped(
473                connectors,
474                db_no_decoders,
475                pre_root_secret,
476                &config,
477                api_secret,
478                log_event_added_transient_tx,
479                request_hook,
480                None,
481                None,
482            )
483            .await?;
484        if !stopped {
485            client.as_inner().start_executor();
486        }
487        Ok(client)
488    }
489
490    /// Build a [`Client`] and start the executor
491    #[allow(clippy::too_many_arguments)]
492    pub(crate) async fn build(
493        self,
494        connectors: ConnectorRegistry,
495        db_no_decoders: Database,
496        pre_root_secret: DerivableSecret,
497        config: ClientConfig,
498        api_secret: Option<String>,
499        stopped: bool,
500        preview_prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
501        preview_prefetch_api_version_set: Option<
502            JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>,
503        >,
504    ) -> anyhow::Result<ClientHandle> {
505        let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
506        let request_hook = self.request_hook.clone();
507        let client = self
508            .build_stopped(
509                connectors,
510                db_no_decoders,
511                pre_root_secret,
512                &config,
513                api_secret,
514                log_event_added_transient_tx,
515                request_hook,
516                preview_prefetch_api_announcements,
517                preview_prefetch_api_version_set,
518            )
519            .await?;
520        if !stopped {
521            client.as_inner().start_executor();
522        }
523
524        Ok(client)
525    }
526
527    // TODO: remove config argument
528    /// Build a [`Client`] but do not start the executor
529    #[allow(clippy::too_many_arguments)]
530    async fn build_stopped(
531        self,
532        connectors: ConnectorRegistry,
533        db_no_decoders: Database,
534        pre_root_secret: DerivableSecret,
535        config: &ClientConfig,
536        api_secret: Option<String>,
537        log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
538        request_hook: ApiRequestHook,
539        preview_prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
540        preview_prefetch_api_version_set: Option<
541            JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>,
542        >,
543    ) -> anyhow::Result<ClientHandle> {
544        debug!(
545            target: LOG_CLIENT,
546            version = %fedimint_build_code_version_env!(),
547            "Building fedimint client",
548        );
549        let (log_event_added_tx, log_event_added_rx) = watch::channel(());
550        let (log_ordering_wakeup_tx, log_ordering_wakeup_rx) = watch::channel(());
551
552        let decoders = self.decoders(config);
553        let config = Self::config_decoded(config, &decoders)?;
554        let fed_id = config.calculate_federation_id();
555        let db = db_no_decoders.with_decoders(decoders.clone());
556        let connector = self.connector;
557        let peer_urls = get_api_urls(&db, &config).await;
558        let api = match self.admin_creds.as_ref() {
559            Some(admin_creds) => FederationApi::new(
560                connectors.clone(),
561                peer_urls,
562                Some(admin_creds.peer_id),
563                Some(&admin_creds.auth.0),
564            )
565            .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
566            .with_request_hook(&request_hook)
567            .with_cache()
568            .into(),
569            None => FederationApi::new(connectors.clone(), peer_urls, None, api_secret.as_deref())
570                .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
571                .with_request_hook(&request_hook)
572                .with_cache()
573                .into(),
574        };
575
576        let task_group = TaskGroup::new();
577
578        // Migrate the database before interacting with it in case any on-disk data
579        // structures have changed.
580        self.migrate_module_dbs(&db, &config).await?;
581
582        let init_state = Self::load_init_state(&db).await;
583
584        let notifier = Notifier::new();
585
586        if let Some(p) = preview_prefetch_api_announcements {
587            // We want to fail if we were unable to figure out
588            // current addresses of peers in the federation, as it will potentially never
589            // fix itself, so it's better to fail the join explicitly.
590            let announcements = p.get().await;
591
592            store_api_announcements_updates_from_peers(&db, announcements).await?
593        }
594
595        if let Some(preview_prefetch_api_version_set) = preview_prefetch_api_version_set {
596            match preview_prefetch_api_version_set.get_try().await {
597                Ok(peer_api_versions) => {
598                    Client::store_prefetched_api_versions(
599                        &db,
600                        &config,
601                        &self.module_inits,
602                        peer_api_versions,
603                    )
604                    .await;
605                }
606                Err(err) => {
607                    debug!(target: LOG_CLIENT, err = %err.fmt_compact(), "Prefetching api version negotiation failed");
608                }
609            }
610        }
611
612        let common_api_versions = Client::load_and_refresh_common_api_version_static(
613            &config,
614            &self.module_inits,
615            connectors.clone(),
616            &api,
617            &db,
618            &task_group,
619        )
620        .await
621        .inspect_err(|err| {
622            warn!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Failed to discover API version to use.");
623        })
624        .unwrap_or(ApiVersionSet {
625            core: ApiVersion::new(0, 0),
626            // This will cause all modules to skip initialization
627            modules: BTreeMap::new(),
628        });
629
630        debug!(target: LOG_CLIENT, ?common_api_versions, "Completed api version negotiation");
631
632        // Asynchronously refetch client config and compare with existing
633        Self::load_and_refresh_client_config_static(&config, &api, &db, &task_group);
634
635        let mut module_recoveries: BTreeMap<
636            ModuleInstanceId,
637            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
638        > = BTreeMap::new();
639        let mut module_recovery_progress_receivers: BTreeMap<
640            ModuleInstanceId,
641            watch::Receiver<RecoveryProgress>,
642        > = BTreeMap::new();
643
644        let final_client = FinalClientIface::default();
645
646        let root_secret = Self::federation_root_secret(&pre_root_secret, &config);
647
648        let modules = {
649            let mut modules = ClientModuleRegistry::default();
650            for (module_instance_id, module_config) in config.modules.clone() {
651                let kind = module_config.kind().clone();
652                let Some(module_init) = self.module_inits.get(&kind).cloned() else {
653                    debug!(
654                        target: LOG_CLIENT,
655                        kind=%kind,
656                        instance_id=%module_instance_id,
657                        "Module kind of instance not found in module gens, skipping");
658                    continue;
659                };
660
661                let Some(&api_version) = common_api_versions.modules.get(&module_instance_id)
662                else {
663                    warn!(
664                        target: LOG_CLIENT,
665                        kind=%kind,
666                        instance_id=%module_instance_id,
667                        "Module kind of instance has incompatible api version, skipping"
668                    );
669                    continue;
670                };
671
672                // since the exact logic of when to start recovery is a bit gnarly,
673                // the recovery call is extracted here.
674                let start_module_recover_fn =
675                    |snapshot: Option<ClientBackup>, progress: RecoveryProgress| {
676                        let module_config = module_config.clone();
677                        let num_peers = NumPeers::from(config.global.api_endpoints.len());
678                        let db = db.clone();
679                        let kind = kind.clone();
680                        let notifier = notifier.clone();
681                        let api = api.clone();
682                        let root_secret = root_secret.clone();
683                        let admin_auth = self.admin_creds.as_ref().map(|creds| creds.auth.clone());
684                        let final_client = final_client.clone();
685                        let (progress_tx, progress_rx) = tokio::sync::watch::channel(progress);
686                        let task_group = task_group.clone();
687                        let module_init = module_init.clone();
688                        (
689                            Box::pin(async move {
690                                module_init
691                                    .recover(
692                                        final_client.clone(),
693                                        fed_id,
694                                        num_peers,
695                                        module_config.clone(),
696                                        db.clone(),
697                                        module_instance_id,
698                                        common_api_versions.core,
699                                        api_version,
700                                        root_secret.derive_module_secret(module_instance_id),
701                                        notifier.clone(),
702                                        api.clone(),
703                                        admin_auth,
704                                        snapshot.as_ref().and_then(|s| s.modules.get(&module_instance_id)),
705                                        progress_tx,
706                                        task_group,
707                                    )
708                                    .await
709                                    .inspect_err(|err| {
710                                        warn!(
711                                            target: LOG_CLIENT,
712                                            module_id = module_instance_id, %kind, err = %err.fmt_compact_anyhow(), "Module failed to recover"
713                                        );
714                                    })
715                            }),
716                            progress_rx,
717                        )
718                    };
719
720                let recovery = match init_state.does_require_recovery() {
721                    Some(snapshot) => {
722                        match db
723                            .begin_transaction_nc()
724                            .await
725                            .get_value(&ClientModuleRecovery { module_instance_id })
726                            .await
727                        {
728                            Some(module_recovery_state) => {
729                                if module_recovery_state.is_done() {
730                                    debug!(
731                                        id = %module_instance_id,
732                                        %kind, "Module recovery already complete"
733                                    );
734                                    None
735                                } else {
736                                    debug!(
737                                        id = %module_instance_id,
738                                        %kind,
739                                        progress = %module_recovery_state.progress,
740                                        "Starting module recovery with an existing progress"
741                                    );
742                                    Some(start_module_recover_fn(
743                                        snapshot,
744                                        module_recovery_state.progress,
745                                    ))
746                                }
747                            }
748                            _ => {
749                                let progress = RecoveryProgress::none();
750                                let mut dbtx = db.begin_transaction().await;
751                                dbtx.log_event(
752                                    log_ordering_wakeup_tx.clone(),
753                                    None,
754                                    ModuleRecoveryStarted::new(module_instance_id),
755                                )
756                                .await;
757                                dbtx.insert_entry(
758                                    &ClientModuleRecovery { module_instance_id },
759                                    &ClientModuleRecoveryState { progress },
760                                )
761                                .await;
762
763                                dbtx.commit_tx().await;
764
765                                debug!(
766                                    id = %module_instance_id,
767                                    %kind, "Starting new module recovery"
768                                );
769                                Some(start_module_recover_fn(snapshot, progress))
770                            }
771                        }
772                    }
773                    _ => None,
774                };
775
776                match recovery {
777                    Some((recovery, recovery_progress_rx)) => {
778                        module_recoveries.insert(module_instance_id, recovery);
779                        module_recovery_progress_receivers
780                            .insert(module_instance_id, recovery_progress_rx);
781                    }
782                    _ => {
783                        let module = module_init
784                            .init(
785                                final_client.clone(),
786                                fed_id,
787                                config.global.api_endpoints.len(),
788                                module_config,
789                                db.clone(),
790                                module_instance_id,
791                                common_api_versions.core,
792                                api_version,
793                                // This is a divergence from the legacy client, where the child
794                                // secret keys were derived using
795                                // *module kind*-specific derivation paths.
796                                // Since the new client has to support multiple, segregated modules
797                                // of the same kind we have to use
798                                // the instance id instead.
799                                root_secret.derive_module_secret(module_instance_id),
800                                notifier.clone(),
801                                api.clone(),
802                                self.admin_creds.as_ref().map(|cred| cred.auth.clone()),
803                                task_group.clone(),
804                                connectors.clone(),
805                            )
806                            .await?;
807
808                        modules.register_module(module_instance_id, kind, module);
809                    }
810                }
811            }
812            modules
813        };
814
815        if init_state.is_pending() && module_recoveries.is_empty() {
816            let mut dbtx = db.begin_transaction().await;
817            dbtx.insert_entry(&ClientInitStateKey, &init_state.into_complete())
818                .await;
819            dbtx.commit_tx().await;
820        }
821
822        let mut primary_modules: BTreeMap<PrimaryModulePriority, PrimaryModuleCandidates> =
823            BTreeMap::new();
824
825        for (module_id, _kind, module) in modules.iter_modules() {
826            match module.supports_being_primary() {
827                PrimaryModuleSupport::Any { priority } => {
828                    primary_modules
829                        .entry(priority)
830                        .or_default()
831                        .wildcard
832                        .push(module_id);
833                }
834                PrimaryModuleSupport::Selected { priority, units } => {
835                    for unit in units {
836                        primary_modules
837                            .entry(priority)
838                            .or_default()
839                            .specific
840                            .entry(unit)
841                            .or_default()
842                            .push(module_id);
843                    }
844                }
845                PrimaryModuleSupport::None => {}
846            }
847        }
848
849        let executor = {
850            let mut executor_builder = Executor::builder();
851            executor_builder
852                .with_module(TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext);
853
854            for (module_instance_id, _, module) in modules.iter_modules() {
855                executor_builder.with_module_dyn(module.context(module_instance_id));
856            }
857
858            for module_instance_id in module_recoveries.keys() {
859                executor_builder.with_valid_module_id(*module_instance_id);
860            }
861
862            executor_builder.build(
863                db.clone(),
864                notifier,
865                task_group.clone(),
866                log_ordering_wakeup_tx.clone(),
867            )
868        };
869
870        let recovery_receiver_init_val = module_recovery_progress_receivers
871            .iter()
872            .map(|(module_instance_id, rx)| (*module_instance_id, *rx.borrow()))
873            .collect::<BTreeMap<_, _>>();
874        let (client_recovery_progress_sender, client_recovery_progress_receiver) =
875            watch::channel(recovery_receiver_init_val);
876
877        let client_inner = Arc::new(Client {
878            final_client: final_client.clone(),
879            config: tokio::sync::RwLock::new(config.clone()),
880            api_secret,
881            decoders,
882            db: db.clone(),
883            connectors,
884            federation_id: fed_id,
885            federation_config_meta: config.global.meta,
886            primary_modules,
887            modules,
888            module_inits: self.module_inits.clone(),
889            log_ordering_wakeup_tx,
890            log_event_added_rx,
891            log_event_added_transient_tx: log_event_added_transient_tx.clone(),
892            request_hook,
893            executor,
894            api,
895            secp_ctx: Secp256k1::new(),
896            root_secret,
897            task_group,
898            operation_log: OperationLog::new(db.clone()),
899            client_recovery_progress_receiver,
900            meta_service: self.meta_service,
901            connector,
902            iroh_enable_dht: self.iroh_enable_dht,
903            iroh_enable_next: self.iroh_enable_next,
904        });
905        client_inner
906            .task_group
907            .spawn_cancellable("MetaService::update_continuously", {
908                let client_inner = client_inner.clone();
909                async move {
910                    client_inner
911                        .meta_service
912                        .update_continuously(&client_inner)
913                        .await;
914                }
915            });
916
917        client_inner
918            .task_group
919            .spawn_cancellable("update-api-announcements", {
920                let client_inner = client_inner.clone();
921                async move {
922                    client_inner
923                        .connectors
924                        .wait_for_initialized_connections()
925                        .await;
926                    run_api_announcement_refresh_task(client_inner.clone()).await
927                }
928            });
929
930        client_inner
931            .task_group
932            .spawn_cancellable("event log ordering task", {
933                let client_inner = client_inner.clone();
934                async move {
935                    client_inner
936                        .connectors
937                        .wait_for_initialized_connections()
938                        .await;
939
940                    run_event_log_ordering_task(
941                        db.clone(),
942                        log_ordering_wakeup_rx,
943                        log_event_added_tx,
944                        log_event_added_transient_tx,
945                    )
946                    .await
947                }
948            });
949        let client_iface = std::sync::Arc::<Client>::downgrade(&client_inner);
950
951        let client_arc = ClientHandle::new(client_inner);
952
953        for (_, _, module) in client_arc.modules.iter_modules() {
954            module.start().await;
955        }
956
957        final_client.set(client_iface.clone());
958
959        if !module_recoveries.is_empty() {
960            client_arc.spawn_module_recoveries_task(
961                client_recovery_progress_sender,
962                module_recoveries,
963                module_recovery_progress_receivers,
964            );
965        }
966
967        Ok(client_arc)
968    }
969
970    async fn load_init_state(db: &Database) -> InitState {
971        let mut dbtx = db.begin_transaction_nc().await;
972        dbtx.get_value(&ClientInitStateKey)
973            .await
974            .unwrap_or_else(|| {
975                // could be turned in a hard error in the future, but for now
976                // no need to break backward compat.
977                warn!(
978                    target: LOG_CLIENT,
979                    "Client missing ClientRequiresRecovery: assuming complete"
980                );
981                db::InitState::Complete(db::InitModeComplete::Fresh)
982            })
983    }
984
985    fn decoders(&self, config: &ClientConfig) -> ModuleDecoderRegistry {
986        let mut decoders = client_decoders(
987            &self.module_inits,
988            config
989                .modules
990                .iter()
991                .map(|(module_instance, module_config)| (*module_instance, module_config.kind())),
992        );
993
994        decoders.register_module(
995            TRANSACTION_SUBMISSION_MODULE_INSTANCE,
996            ModuleKind::from_static_str("tx_submission"),
997            tx_submission_sm_decoder(),
998        );
999
1000        decoders
1001    }
1002
1003    fn config_decoded(
1004        config: &ClientConfig,
1005        decoders: &ModuleDecoderRegistry,
1006    ) -> Result<ClientConfig, fedimint_core::encoding::DecodeError> {
1007        config.clone().redecode_raw(decoders)
1008    }
1009
1010    /// Re-derive client's `root_secret` using the federation ID. This
1011    /// eliminates the possibility of having the same client `root_secret`
1012    /// across multiple federations.
1013    fn federation_root_secret(
1014        pre_root_secret: &DerivableSecret,
1015        config: &ClientConfig,
1016    ) -> DerivableSecret {
1017        pre_root_secret.federation_key(&config.global.calculate_federation_id())
1018    }
1019
1020    /// Register to receiver all new transient (unpersisted) events
1021    pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
1022        self.log_event_added_transient_tx.subscribe()
1023    }
1024
1025    /// Check for pending config and migrate it if present.
1026    /// Returns the config to use (either the original or the migrated pending
1027    /// config).
1028    async fn migrate_pending_config_if_present(db: &Database) {
1029        if let Some(pending_config) = Client::get_pending_config_from_db(db).await {
1030            debug!(target: LOG_CLIENT, "Found pending client config, migrating to current config");
1031
1032            let mut dbtx = db.begin_transaction().await;
1033            // Update the main config with the pending config
1034            dbtx.insert_entry(&crate::db::ClientConfigKey, &pending_config)
1035                .await;
1036            // Remove the pending config
1037            dbtx.remove_entry(&PendingClientConfigKey).await;
1038            dbtx.commit_tx().await;
1039
1040            debug!(target: LOG_CLIENT, "Successfully migrated pending config to current config");
1041        }
1042    }
1043
1044    /// Asynchronously refetch client config from federation and compare with
1045    /// existing. If different, save to pending config in database.
1046    fn load_and_refresh_client_config_static(
1047        config: &ClientConfig,
1048        api: &DynGlobalApi,
1049        db: &Database,
1050        task_group: &TaskGroup,
1051    ) {
1052        let config = config.clone();
1053        let api = api.clone();
1054        let db = db.clone();
1055        let task_group = task_group.clone();
1056
1057        // Spawn background task to refetch config
1058        task_group.spawn_cancellable("refresh_client_config_static", async move {
1059            Self::refresh_client_config_static(&config, &api, &db).await;
1060        });
1061    }
1062
1063    /// Wrapper that handles errors from config refresh with proper logging
1064    async fn refresh_client_config_static(
1065        config: &ClientConfig,
1066        api: &DynGlobalApi,
1067        db: &Database,
1068    ) {
1069        if let Err(error) = Self::refresh_client_config_static_try(config, api, db).await {
1070            warn!(
1071                target: LOG_CLIENT,
1072                err = %error.fmt_compact_anyhow(), "Failed to refresh client config"
1073            );
1074        }
1075    }
1076
1077    /// Validate that a config update is valid
1078    fn validate_config_update(
1079        current_config: &ClientConfig,
1080        new_config: &ClientConfig,
1081    ) -> anyhow::Result<()> {
1082        // Global config must not change
1083        if current_config.global != new_config.global {
1084            bail!("Global configuration changes are not allowed in config updates");
1085        }
1086
1087        // Modules can only be added, existing ones must stay the same
1088        for (module_id, current_module_config) in &current_config.modules {
1089            match new_config.modules.get(module_id) {
1090                Some(new_module_config) => {
1091                    if current_module_config != new_module_config {
1092                        bail!(
1093                            "Module {} configuration changes are not allowed, only additions are permitted",
1094                            module_id
1095                        );
1096                    }
1097                }
1098                None => {
1099                    bail!(
1100                        "Module {} was removed in new config, only additions are allowed",
1101                        module_id
1102                    );
1103                }
1104            }
1105        }
1106
1107        Ok(())
1108    }
1109
1110    /// Refetch client config from federation and save as pending if different
1111    async fn refresh_client_config_static_try(
1112        current_config: &ClientConfig,
1113        api: &DynGlobalApi,
1114        db: &Database,
1115    ) -> anyhow::Result<()> {
1116        debug!(target: LOG_CLIENT, "Refreshing client config");
1117
1118        // Fetch latest config from federation
1119        let fetched_config = api
1120            .request_current_consensus::<ClientConfig>(
1121                CLIENT_CONFIG_ENDPOINT.to_owned(),
1122                ApiRequestErased::default(),
1123            )
1124            .await?;
1125
1126        // Validate the new config before proceeding
1127        Self::validate_config_update(current_config, &fetched_config)?;
1128
1129        // Compare with current config
1130        if current_config != &fetched_config {
1131            debug!(target: LOG_CLIENT, "Detected federation config change, saving as pending config");
1132
1133            let mut dbtx = db.begin_transaction().await;
1134            dbtx.insert_entry(&PendingClientConfigKey, &fetched_config)
1135                .await;
1136            dbtx.commit_tx().await;
1137        } else {
1138            debug!(target: LOG_CLIENT, "No federation config changes detected");
1139        }
1140
1141        Ok(())
1142    }
1143}
1144
1145/// An intermediate step before Client joining or recovering
1146///
1147/// Meant to support showing user some initial information about the Federation
1148/// before actually joining.
1149pub struct ClientPreview {
1150    inner: ClientBuilder,
1151    config: ClientConfig,
1152    connectors: ConnectorRegistry,
1153    api_secret: Option<String>,
1154    prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
1155    preview_prefetch_api_version_set:
1156        Option<JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>>,
1157}
1158
1159impl ClientPreview {
1160    /// Get the config
1161    pub fn config(&self) -> &ClientConfig {
1162        &self.config
1163    }
1164
1165    /// Join a new Federation
1166    ///
1167    /// When a user wants to connect to a new federation this function fetches
1168    /// the federation config and initializes the client database. If a user
1169    /// already joined the federation in the past and has a preexisting database
1170    /// use [`ClientBuilder::open`] instead.
1171    ///
1172    /// **Warning**: Calling `join` with a `root_secret` key that was used
1173    /// previous to `join` a Federation will lead to all sorts of malfunctions
1174    /// including likely loss of funds.
1175    ///
1176    /// This should be generally called only if the `root_secret` key is known
1177    /// not to have been used before (e.g. just randomly generated). For keys
1178    /// that might have been previous used (e.g. provided by the user),
1179    /// it's safer to call [`Self::recover`] which will attempt to recover
1180    /// client module states for the Federation.
1181    ///
1182    /// A typical "join federation" flow would look as follows:
1183    /// ```no_run
1184    /// # use std::str::FromStr;
1185    /// # use fedimint_core::invite_code::InviteCode;
1186    /// # use fedimint_core::config::ClientConfig;
1187    /// # use fedimint_derive_secret::DerivableSecret;
1188    /// # use fedimint_client::{Client, ClientBuilder, RootSecret};
1189    /// # use fedimint_connectors::ConnectorRegistry;
1190    /// # use fedimint_core::db::Database;
1191    /// # use fedimint_core::config::META_FEDERATION_NAME_KEY;
1192    /// #
1193    /// # #[tokio::main]
1194    /// # async fn main() -> anyhow::Result<()> {
1195    /// # let root_secret: DerivableSecret = unimplemented!();
1196    /// // Create a root secret, e.g. via fedimint-bip39, see also:
1197    /// // https://github.com/fedimint/fedimint/blob/master/docs/secret_derivation.md
1198    /// // let root_secret = …;
1199    ///
1200    /// // Get invite code from user
1201    /// let invite_code = InviteCode::from_str("fed11qgqpw9thwvaz7te3xgmjuvpwxqhrzw3jxumrvvf0qqqjpetvlg8glnpvzcufhffgzhv8m75f7y34ryk7suamh8x7zetly8h0v9v0rm")
1202    ///     .expect("Invalid invite code");
1203    ///
1204    /// // Tell the user the federation name, bitcoin network
1205    /// // (e.g. from wallet module config), and other details
1206    /// // that are typically contained in the federation's
1207    /// // meta fields.
1208    ///
1209    /// // let network = config.get_first_module_by_kind::<WalletClientConfig>("wallet")
1210    /// //     .expect("Module not found")
1211    /// //     .network;
1212    ///
1213    /// // Open the client's database, using the federation ID
1214    /// // as the DB name is a common pattern:
1215    ///
1216    /// // let db_path = format!("./path/to/db/{}", config.federation_id());
1217    /// // let db = RocksDb::open(db_path).expect("error opening DB");
1218    /// # let db: Database = unimplemented!();
1219    /// # let connectors: ConnectorRegistry = unimplemented!();
1220    ///
1221    /// let preview = Client::builder().await
1222    ///     // Mount the modules the client should support:
1223    ///     // .with_module(LightningClientInit)
1224    ///     // .with_module(MintClientInit)
1225    ///     // .with_module(WalletClientInit::default())
1226    ///      .expect("Error building client")
1227    ///      .preview(connectors, &invite_code).await?;
1228    ///
1229    /// println!(
1230    ///     "The federation name is: {}",
1231    ///     preview.config().meta::<String>(META_FEDERATION_NAME_KEY)
1232    ///         .expect("Could not decode name field")
1233    ///         .expect("Name isn't set")
1234    /// );
1235    ///
1236    /// let client = preview
1237    ///     .join(db, RootSecret::StandardDoubleDerive(root_secret))
1238    ///     .await
1239    ///     .expect("Error joining federation");
1240    /// # Ok(())
1241    /// # }
1242    /// ```
1243    pub async fn join(
1244        self,
1245        db_no_decoders: Database,
1246        pre_root_secret: RootSecret,
1247    ) -> anyhow::Result<ClientHandle> {
1248        let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1249
1250        let client = self
1251            .inner
1252            .init(
1253                self.connectors,
1254                db_no_decoders,
1255                pre_root_secret,
1256                self.config,
1257                self.api_secret,
1258                InitMode::Fresh,
1259                self.prefetch_api_announcements,
1260                self.preview_prefetch_api_version_set,
1261            )
1262            .await?;
1263
1264        Ok(client)
1265    }
1266
1267    /// Join a (possibly) previous joined Federation
1268    ///
1269    /// Unlike [`Self::join`], `recover` will run client module
1270    /// recovery for each client module attempting to recover any previous
1271    /// module state.
1272    ///
1273    /// Recovery process takes time during which each recovering client module
1274    /// will not be available for use.
1275    ///
1276    /// Calling `recovery` with a `root_secret` that was not actually previous
1277    /// used in a given Federation is safe.
1278    pub async fn recover(
1279        self,
1280        db_no_decoders: Database,
1281        pre_root_secret: RootSecret,
1282        backup: Option<ClientBackup>,
1283    ) -> anyhow::Result<ClientHandle> {
1284        let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1285
1286        let client = self
1287            .inner
1288            .init(
1289                self.connectors,
1290                db_no_decoders,
1291                pre_root_secret,
1292                self.config,
1293                self.api_secret,
1294                InitMode::Recover {
1295                    snapshot: backup.clone(),
1296                },
1297                self.prefetch_api_announcements,
1298                self.preview_prefetch_api_version_set,
1299            )
1300            .await?;
1301
1302        Ok(client)
1303    }
1304
1305    /// Download most recent valid backup found from the Federation
1306    pub async fn download_backup_from_federation(
1307        &self,
1308        pre_root_secret: RootSecret,
1309    ) -> anyhow::Result<Option<ClientBackup>> {
1310        let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1311        let api = DynGlobalApi::new(
1312            self.connectors.clone(),
1313            // TODO: change join logic to use FederationId v2
1314            self.config
1315                .global
1316                .api_endpoints
1317                .iter()
1318                .map(|(peer_id, peer_url)| (*peer_id, peer_url.url.clone()))
1319                .collect(),
1320            self.api_secret.as_deref(),
1321        )?;
1322
1323        Client::download_backup_from_federation_static(
1324            &api,
1325            &ClientBuilder::federation_root_secret(&pre_root_secret, &self.config),
1326            &self.inner.decoders(&self.config),
1327        )
1328        .await
1329    }
1330}