fedimint_client/client/
builder.rs

1use std::collections::BTreeMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::time::Duration;
6
7use anyhow::{bail, ensure};
8use bitcoin::key::Secp256k1;
9use fedimint_api_client::api::global_api::with_cache::GlobalFederationApiWithCacheExt as _;
10use fedimint_api_client::api::global_api::with_request_hook::{
11    ApiRequestHook, RawFederationApiWithRequestHookExt as _,
12};
13use fedimint_api_client::api::{ApiVersionSet, DynGlobalApi, FederationApi, FederationApiExt as _};
14use fedimint_api_client::download_from_invite_code;
15use fedimint_client_module::api::ClientRawFederationApiExt as _;
16use fedimint_client_module::meta::LegacyMetaSource;
17use fedimint_client_module::module::init::ClientModuleInit;
18use fedimint_client_module::module::recovery::RecoveryProgress;
19use fedimint_client_module::module::{
20    ClientModuleRegistry, FinalClientIface, PrimaryModulePriority, PrimaryModuleSupport,
21};
22use fedimint_client_module::secret::{DeriveableSecretClientExt as _, get_default_client_secret};
23use fedimint_client_module::transaction::{
24    TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext, tx_submission_sm_decoder,
25};
26use fedimint_client_module::{AdminCreds, ModuleRecoveryStarted};
27use fedimint_connectors::ConnectorRegistry;
28use fedimint_core::config::{ClientConfig, FederationId, ModuleInitRegistry};
29use fedimint_core::core::{ModuleInstanceId, ModuleKind};
30use fedimint_core::db::{
31    Database, IDatabaseTransactionOpsCoreTyped as _, verify_module_db_integrity_dbtx,
32};
33use fedimint_core::endpoint_constants::CLIENT_CONFIG_ENDPOINT;
34use fedimint_core::envs::is_running_in_test_env;
35use fedimint_core::invite_code::InviteCode;
36use fedimint_core::module::registry::ModuleDecoderRegistry;
37use fedimint_core::module::{ApiRequestErased, ApiVersion, SupportedApiVersionsSummary};
38use fedimint_core::task::TaskGroup;
39use fedimint_core::task::jit::{Jit, JitTry, JitTryAnyhow};
40use fedimint_core::util::{FmtCompact as _, FmtCompactAnyhow as _};
41use fedimint_core::{NumPeers, PeerId, fedimint_build_code_version_env, maybe_add_send};
42use fedimint_derive_secret::DerivableSecret;
43use fedimint_eventlog::{
44    DBTransactionEventLogExt as _, EventLogEntry, run_event_log_ordering_task,
45};
46use fedimint_logging::LOG_CLIENT;
47use tokio::sync::{broadcast, watch};
48use tracing::{debug, trace, warn};
49
50use super::handle::ClientHandle;
51use super::{Client, client_decoders};
52use crate::api_announcements::{
53    PeersSignedApiAnnouncements, fetch_api_announcements_from_at_least_num_of_peers, get_api_urls,
54    run_api_announcement_refresh_task, store_api_announcements_updates_from_peers,
55};
56use crate::backup::{ClientBackup, Metadata};
57use crate::client::PrimaryModuleCandidates;
58use crate::db::{
59    self, ApiSecretKey, ClientInitStateKey, ClientMetadataKey, ClientModuleRecovery,
60    ClientModuleRecoveryState, ClientPreRootSecretHashKey, InitMode, InitState,
61    PendingClientConfigKey, apply_migrations_client_module_dbtx,
62};
63use crate::meta::MetaService;
64use crate::module_init::ClientModuleInitRegistry;
65use crate::oplog::OperationLog;
66use crate::sm::executor::Executor;
67use crate::sm::notifier::Notifier;
68
69/// The type of root secret hashing
70///
71/// *Please read this documentation carefully if, especially if you're upgrading
72/// downstream Fedimint client application.*
73///
74/// Internally, client will always hash-in federation id
75/// to the root secret provided to the [`ClientBuilder`],
76/// to ensure a different actual root secret is used for ever federation.
77/// This makes reusing a single root secret for different federations
78/// in a multi-federation client, perfectly fine, and frees the client
79/// from worrying about `FederationId`.
80///
81/// However, in the past Fedimint applications (including `fedimint-cli`)
82/// were doing the hashing-in of `FederationId` outside of `fedimint-client` as
83/// well, which lead to effectively doing it twice, and pushed downloading of
84/// the client config on join to application code, a sub-optimal API, especially
85/// after joining federation needed to handle even more functionality.
86///
87/// To keep the interoperability of the seed phrases this double-derivation
88/// is preserved, due to other architectural reason, `fedimint-client`
89/// will now do the outer-derivation internally as well.
90#[derive(Clone)]
91pub enum RootSecret {
92    /// Derive an extra round of federation-id to the secret, like
93    /// Fedimint applications were doing manually in the past.
94    ///
95    /// **Note**: Applications MUST NOT do the derivation themselves anymore.
96    StandardDoubleDerive(DerivableSecret),
97    /// No double derivation
98    ///
99    /// This is useful for applications that for whatever reason do the
100    /// double-derivation externally, or use a custom scheme.
101    Custom(DerivableSecret),
102}
103
104impl RootSecret {
105    fn to_inner(&self, federation_id: FederationId) -> DerivableSecret {
106        match self {
107            RootSecret::StandardDoubleDerive(derivable_secret) => {
108                get_default_client_secret(derivable_secret, &federation_id)
109            }
110            RootSecret::Custom(derivable_secret) => derivable_secret.clone(),
111        }
112    }
113}
114
115/// Used to configure, assemble and build [`Client`]
116pub struct ClientBuilder {
117    module_inits: ClientModuleInitRegistry,
118    admin_creds: Option<AdminCreds>,
119    meta_service: Arc<crate::meta::MetaService>,
120    stopped: bool,
121    log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
122    request_hook: ApiRequestHook,
123    iroh_enable_dht: bool,
124    iroh_enable_next: bool,
125}
126
127impl ClientBuilder {
128    pub(crate) fn new() -> Self {
129        trace!(
130            target: LOG_CLIENT,
131            version = %fedimint_build_code_version_env!(),
132            "Initializing fedimint client",
133        );
134        let meta_service = MetaService::new(LegacyMetaSource::default());
135        let (log_event_added_transient_tx, _log_event_added_transient_rx) =
136            broadcast::channel(1024);
137
138        ClientBuilder {
139            module_inits: ModuleInitRegistry::new(),
140            admin_creds: None,
141            stopped: false,
142            meta_service,
143            log_event_added_transient_tx,
144            request_hook: Arc::new(|api| api),
145            iroh_enable_dht: true,
146            iroh_enable_next: true,
147        }
148    }
149
150    pub(crate) fn from_existing(client: &Client) -> Self {
151        ClientBuilder {
152            module_inits: client.module_inits.clone(),
153            admin_creds: None,
154            stopped: false,
155            // non unique
156            meta_service: client.meta_service.clone(),
157            log_event_added_transient_tx: client.log_event_added_transient_tx.clone(),
158            request_hook: client.request_hook.clone(),
159            iroh_enable_dht: client.iroh_enable_dht,
160            iroh_enable_next: client.iroh_enable_next,
161        }
162    }
163
164    /// Replace module generator registry entirely
165    pub fn with_module_inits(&mut self, module_inits: ClientModuleInitRegistry) {
166        self.module_inits = module_inits;
167    }
168
169    /// Make module generator available when reading the config
170    pub fn with_module<M: ClientModuleInit>(&mut self, module_init: M) {
171        self.module_inits.attach(module_init);
172    }
173
174    pub fn stopped(&mut self) {
175        self.stopped = true;
176    }
177    /// Build the [`Client`] with a custom wrapper around its api request logic
178    ///
179    /// This is intended to be used by downstream applications, e.g. to:
180    ///
181    /// * simulate offline mode,
182    /// * save battery when the OS indicates lack of connectivity,
183    /// * inject faults and delays for testing purposes,
184    /// * collect statistics and emit notifications.
185    pub fn with_api_request_hook(mut self, hook: ApiRequestHook) -> Self {
186        self.request_hook = hook;
187        self
188    }
189
190    pub fn with_meta_service(&mut self, meta_service: Arc<MetaService>) {
191        self.meta_service = meta_service;
192    }
193
194    /// Override if the DHT should be enabled when using Iroh to connect to
195    /// the federation
196    pub fn with_iroh_enable_dht(mut self, iroh_enable_dht: bool) -> Self {
197        self.iroh_enable_dht = iroh_enable_dht;
198        self
199    }
200
201    /// Override if the parallel unstable/next Iroh stack should be enabled when
202    /// using Iroh to connect to the federation
203    pub fn with_iroh_enable_next(mut self, iroh_enable_next: bool) -> Self {
204        self.iroh_enable_next = iroh_enable_next;
205        self
206    }
207
208    /// Migrate client module databases
209    ///
210    /// Note: Client core db migration are done immediately in
211    /// [`Client::builder`], to ensure db matches the code at all times,
212    /// while migrating modules requires figuring out what modules actually
213    /// are first.
214    async fn migrate_module_dbs(
215        &self,
216        db: &Database,
217        client_config: &ClientConfig,
218    ) -> anyhow::Result<()> {
219        for (module_id, module_cfg) in &client_config.modules {
220            let kind = module_cfg.kind.clone();
221            let Some(init) = self.module_inits.get(&kind) else {
222                // normal, expected and already logged about when building the client
223                continue;
224            };
225
226            let mut dbtx = db.begin_transaction().await;
227            apply_migrations_client_module_dbtx(
228                &mut dbtx.to_ref_nc(),
229                kind.to_string(),
230                init.get_database_migrations(),
231                *module_id,
232            )
233            .await?;
234            if let Some(used_db_prefixes) = init.used_db_prefixes()
235                && is_running_in_test_env()
236            {
237                verify_module_db_integrity_dbtx(
238                    &mut dbtx.to_ref_nc(),
239                    *module_id,
240                    kind,
241                    &used_db_prefixes,
242                )
243                .await;
244            }
245            dbtx.commit_tx_result().await?;
246        }
247
248        Ok(())
249    }
250
251    pub async fn load_existing_config(&self, db: &Database) -> anyhow::Result<ClientConfig> {
252        let Some(config) = Client::get_config_from_db(db).await else {
253            bail!("Client database not initialized")
254        };
255
256        Ok(config)
257    }
258
259    pub fn set_admin_creds(&mut self, creds: AdminCreds) {
260        self.admin_creds = Some(creds);
261    }
262
263    #[allow(clippy::too_many_arguments)]
264    async fn init(
265        self,
266        connectors: ConnectorRegistry,
267        db_no_decoders: Database,
268        pre_root_secret: DerivableSecret,
269        config: ClientConfig,
270        api_secret: Option<String>,
271        init_mode: InitMode,
272        preview_prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
273        preview_prefetch_api_version_set: Option<
274            JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>,
275        >,
276    ) -> anyhow::Result<ClientHandle> {
277        if Client::is_initialized(&db_no_decoders).await {
278            bail!("Client database already initialized")
279        }
280
281        Client::run_core_migrations(&db_no_decoders).await?;
282
283        // Note: It's important all client initialization is performed as one big
284        // transaction to avoid half-initialized client state.
285        {
286            debug!(target: LOG_CLIENT, "Initializing client database");
287            let mut dbtx = db_no_decoders.begin_transaction().await;
288            // Save config to DB
289            dbtx.insert_new_entry(&crate::db::ClientConfigKey, &config)
290                .await;
291            dbtx.insert_entry(
292                &ClientPreRootSecretHashKey,
293                &pre_root_secret.derive_pre_root_secret_hash(),
294            )
295            .await;
296
297            if let Some(api_secret) = api_secret.as_ref() {
298                dbtx.insert_new_entry(&ApiSecretKey, api_secret).await;
299            }
300
301            let init_state = InitState::Pending(init_mode);
302            dbtx.insert_entry(&ClientInitStateKey, &init_state).await;
303
304            let metadata = init_state
305                .does_require_recovery()
306                .flatten()
307                .map_or(Metadata::empty(), |s| s.metadata);
308
309            dbtx.insert_new_entry(&ClientMetadataKey, &metadata).await;
310
311            dbtx.commit_tx_result().await?;
312        }
313
314        let stopped = self.stopped;
315        self.build(
316            connectors,
317            db_no_decoders,
318            pre_root_secret,
319            config,
320            api_secret,
321            stopped,
322            preview_prefetch_api_announcements,
323            preview_prefetch_api_version_set,
324        )
325        .await
326    }
327
328    pub async fn preview(
329        self,
330        connectors: ConnectorRegistry,
331        invite_code: &InviteCode,
332    ) -> anyhow::Result<ClientPreview> {
333        let (config, api) = download_from_invite_code(&connectors, invite_code).await?;
334
335        let prefetch_api_announcements =
336            config
337                .global
338                .broadcast_public_keys
339                .clone()
340                .map(|guardian_pub_keys| {
341                    Jit::new({
342                        let api = api.clone();
343                        move || async move {
344                            // Fetching api announcements using invite urls before joining.
345                            // This ensures the client can communicated with
346                            // the Federation even if all the peers moved write them to database.
347                            fetch_api_announcements_from_at_least_num_of_peers(
348                                1,
349                                &api,
350                                &guardian_pub_keys,
351                                // If we can, we would love to get more than just one response,
352                                // but we need to wrap it up fast for good UX.
353                                Duration::from_millis(20),
354                            )
355                            .await
356                        }
357                    })
358                });
359
360        self.preview_inner(
361            connectors,
362            config,
363            invite_code.api_secret(),
364            Some(api),
365            prefetch_api_announcements,
366        )
367        .await
368    }
369
370    /// Use [`Self::preview`] instead
371    ///
372    /// If `reuse_api` is set, it will allow the preview to prefetch some data
373    /// to speed up the final join.
374    pub async fn preview_with_existing_config(
375        self,
376        connectors: ConnectorRegistry,
377        config: ClientConfig,
378        api_secret: Option<String>,
379    ) -> anyhow::Result<ClientPreview> {
380        self.preview_inner(connectors, config, api_secret, None, None)
381            .await
382    }
383
384    async fn preview_inner(
385        self,
386        connectors: ConnectorRegistry,
387        config: ClientConfig,
388        api_secret: Option<String>,
389        prefetch_api: Option<DynGlobalApi>,
390        prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
391    ) -> anyhow::Result<ClientPreview> {
392        let preview_prefetch_api_version_set = prefetch_api.map(|api| {
393            JitTry::new_try({
394                let config = config.clone();
395                || async move { Client::fetch_common_api_versions(&config, &api).await }
396            })
397        });
398
399        Ok(ClientPreview {
400            connectors,
401            inner: self,
402            config,
403            api_secret,
404            prefetch_api_announcements,
405            preview_prefetch_api_version_set,
406        })
407    }
408
409    pub async fn open(
410        self,
411        connectors: ConnectorRegistry,
412        db_no_decoders: Database,
413        pre_root_secret: RootSecret,
414    ) -> anyhow::Result<ClientHandle> {
415        Client::run_core_migrations(&db_no_decoders).await?;
416
417        // Check for pending config and migrate if present
418        Self::migrate_pending_config_if_present(&db_no_decoders).await;
419
420        let Some(config) = Client::get_config_from_db(&db_no_decoders).await else {
421            bail!("Client database not initialized")
422        };
423
424        let pre_root_secret = pre_root_secret.to_inner(config.calculate_federation_id());
425
426        match db_no_decoders
427            .begin_transaction_nc()
428            .await
429            .get_value(&ClientPreRootSecretHashKey)
430            .await
431        {
432            Some(secret_hash) => {
433                ensure!(
434                    pre_root_secret.derive_pre_root_secret_hash() == secret_hash,
435                    "Secret hash does not match. Incorrect secret"
436                );
437            }
438            _ => {
439                debug!(target: LOG_CLIENT, "Backfilling secret hash");
440                // Note: no need for dbtx autocommit, we are the only writer ATM
441                let mut dbtx = db_no_decoders.begin_transaction().await;
442                dbtx.insert_entry(
443                    &ClientPreRootSecretHashKey,
444                    &pre_root_secret.derive_pre_root_secret_hash(),
445                )
446                .await;
447                dbtx.commit_tx().await;
448            }
449        }
450
451        let api_secret = Client::get_api_secret_from_db(&db_no_decoders).await;
452        let stopped = self.stopped;
453        let request_hook = self.request_hook.clone();
454
455        let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
456        let client = self
457            .build_stopped(
458                connectors,
459                db_no_decoders,
460                pre_root_secret,
461                &config,
462                api_secret,
463                log_event_added_transient_tx,
464                request_hook,
465                None,
466                None,
467            )
468            .await?;
469        if !stopped {
470            client.as_inner().start_executor();
471        }
472        Ok(client)
473    }
474
475    /// Build a [`Client`] and start the executor
476    #[allow(clippy::too_many_arguments)]
477    pub(crate) async fn build(
478        self,
479        connectors: ConnectorRegistry,
480        db_no_decoders: Database,
481        pre_root_secret: DerivableSecret,
482        config: ClientConfig,
483        api_secret: Option<String>,
484        stopped: bool,
485        preview_prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
486        preview_prefetch_api_version_set: Option<
487            JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>,
488        >,
489    ) -> anyhow::Result<ClientHandle> {
490        let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
491        let request_hook = self.request_hook.clone();
492        let client = self
493            .build_stopped(
494                connectors,
495                db_no_decoders,
496                pre_root_secret,
497                &config,
498                api_secret,
499                log_event_added_transient_tx,
500                request_hook,
501                preview_prefetch_api_announcements,
502                preview_prefetch_api_version_set,
503            )
504            .await?;
505        if !stopped {
506            client.as_inner().start_executor();
507        }
508
509        Ok(client)
510    }
511
512    // TODO: remove config argument
513    /// Build a [`Client`] but do not start the executor
514    #[allow(clippy::too_many_arguments)]
515    async fn build_stopped(
516        self,
517        connectors: ConnectorRegistry,
518        db_no_decoders: Database,
519        pre_root_secret: DerivableSecret,
520        config: &ClientConfig,
521        api_secret: Option<String>,
522        log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
523        request_hook: ApiRequestHook,
524        preview_prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
525        preview_prefetch_api_version_set: Option<
526            JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>,
527        >,
528    ) -> anyhow::Result<ClientHandle> {
529        debug!(
530            target: LOG_CLIENT,
531            version = %fedimint_build_code_version_env!(),
532            "Building fedimint client",
533        );
534        let (log_event_added_tx, log_event_added_rx) = watch::channel(());
535        let (log_ordering_wakeup_tx, log_ordering_wakeup_rx) = watch::channel(());
536
537        let decoders = self.decoders(config);
538        let config = Self::config_decoded(config, &decoders)?;
539        let fed_id = config.calculate_federation_id();
540        let db = db_no_decoders.with_decoders(decoders.clone());
541        let peer_urls = get_api_urls(&db, &config).await;
542        let api = match self.admin_creds.as_ref() {
543            Some(admin_creds) => FederationApi::new(
544                connectors.clone(),
545                peer_urls,
546                Some(admin_creds.peer_id),
547                Some(&admin_creds.auth.0),
548            )
549            .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
550            .with_request_hook(&request_hook)
551            .with_cache()
552            .into(),
553            None => FederationApi::new(connectors.clone(), peer_urls, None, api_secret.as_deref())
554                .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
555                .with_request_hook(&request_hook)
556                .with_cache()
557                .into(),
558        };
559
560        let task_group = TaskGroup::new();
561
562        // Migrate the database before interacting with it in case any on-disk data
563        // structures have changed.
564        self.migrate_module_dbs(&db, &config).await?;
565
566        let init_state = Self::load_init_state(&db).await;
567
568        let notifier = Notifier::new();
569
570        if let Some(p) = preview_prefetch_api_announcements {
571            // We want to fail if we were unable to figure out
572            // current addresses of peers in the federation, as it will potentially never
573            // fix itself, so it's better to fail the join explicitly.
574            let announcements = p.get().await;
575
576            store_api_announcements_updates_from_peers(&db, announcements).await?
577        }
578
579        if let Some(preview_prefetch_api_version_set) = preview_prefetch_api_version_set {
580            match preview_prefetch_api_version_set.get_try().await {
581                Ok(peer_api_versions) => {
582                    Client::store_prefetched_api_versions(
583                        &db,
584                        &config,
585                        &self.module_inits,
586                        peer_api_versions,
587                    )
588                    .await;
589                }
590                Err(err) => {
591                    debug!(target: LOG_CLIENT, err = %err.fmt_compact(), "Prefetching api version negotiation failed");
592                }
593            }
594        }
595
596        let common_api_versions = Client::load_and_refresh_common_api_version_static(
597            &config,
598            &self.module_inits,
599            connectors.clone(),
600            &api,
601            &db,
602            &task_group,
603        )
604        .await
605        .inspect_err(|err| {
606            warn!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Failed to discover API version to use.");
607        })
608        .unwrap_or(ApiVersionSet {
609            core: ApiVersion::new(0, 0),
610            // This will cause all modules to skip initialization
611            modules: BTreeMap::new(),
612        });
613
614        debug!(target: LOG_CLIENT, ?common_api_versions, "Completed api version negotiation");
615
616        // Asynchronously refetch client config and compare with existing
617        Self::load_and_refresh_client_config_static(&config, &api, &db, &task_group);
618
619        let mut module_recoveries: BTreeMap<
620            ModuleInstanceId,
621            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
622        > = BTreeMap::new();
623        let mut module_recovery_progress_receivers: BTreeMap<
624            ModuleInstanceId,
625            watch::Receiver<RecoveryProgress>,
626        > = BTreeMap::new();
627
628        let final_client = FinalClientIface::default();
629
630        let root_secret = Self::federation_root_secret(&pre_root_secret, &config);
631
632        let modules = {
633            let mut modules = ClientModuleRegistry::default();
634            for (module_instance_id, module_config) in config.modules.clone() {
635                let kind = module_config.kind().clone();
636                let Some(module_init) = self.module_inits.get(&kind).cloned() else {
637                    debug!(
638                        target: LOG_CLIENT,
639                        kind=%kind,
640                        instance_id=%module_instance_id,
641                        "Module kind of instance not found in module gens, skipping");
642                    continue;
643                };
644
645                let Some(&api_version) = common_api_versions.modules.get(&module_instance_id)
646                else {
647                    warn!(
648                        target: LOG_CLIENT,
649                        kind=%kind,
650                        instance_id=%module_instance_id,
651                        "Module kind of instance has incompatible api version, skipping"
652                    );
653                    continue;
654                };
655
656                // since the exact logic of when to start recovery is a bit gnarly,
657                // the recovery call is extracted here.
658                let start_module_recover_fn =
659                    |snapshot: Option<ClientBackup>, progress: RecoveryProgress| {
660                        let module_config = module_config.clone();
661                        let num_peers = NumPeers::from(config.global.api_endpoints.len());
662                        let db = db.clone();
663                        let kind = kind.clone();
664                        let notifier = notifier.clone();
665                        let api = api.clone();
666                        let root_secret = root_secret.clone();
667                        let admin_auth = self.admin_creds.as_ref().map(|creds| creds.auth.clone());
668                        let final_client = final_client.clone();
669                        let (progress_tx, progress_rx) = tokio::sync::watch::channel(progress);
670                        let task_group = task_group.clone();
671                        let module_init = module_init.clone();
672                        (
673                            Box::pin(async move {
674                                module_init
675                                    .recover(
676                                        final_client.clone(),
677                                        fed_id,
678                                        num_peers,
679                                        module_config.clone(),
680                                        db.clone(),
681                                        module_instance_id,
682                                        common_api_versions.core,
683                                        api_version,
684                                        root_secret.derive_module_secret(module_instance_id),
685                                        notifier.clone(),
686                                        api.clone(),
687                                        admin_auth,
688                                        snapshot.as_ref().and_then(|s| s.modules.get(&module_instance_id)),
689                                        progress_tx,
690                                        task_group,
691                                    )
692                                    .await
693                                    .inspect_err(|err| {
694                                        warn!(
695                                            target: LOG_CLIENT,
696                                            module_id = module_instance_id, %kind, err = %err.fmt_compact_anyhow(), "Module failed to recover"
697                                        );
698                                    })
699                            }),
700                            progress_rx,
701                        )
702                    };
703
704                let recovery = match init_state.does_require_recovery() {
705                    Some(snapshot) => {
706                        match db
707                            .begin_transaction_nc()
708                            .await
709                            .get_value(&ClientModuleRecovery { module_instance_id })
710                            .await
711                        {
712                            Some(module_recovery_state) => {
713                                if module_recovery_state.is_done() {
714                                    debug!(
715                                        id = %module_instance_id,
716                                        %kind, "Module recovery already complete"
717                                    );
718                                    None
719                                } else {
720                                    debug!(
721                                        id = %module_instance_id,
722                                        %kind,
723                                        progress = %module_recovery_state.progress,
724                                        "Starting module recovery with an existing progress"
725                                    );
726                                    Some(start_module_recover_fn(
727                                        snapshot,
728                                        module_recovery_state.progress,
729                                    ))
730                                }
731                            }
732                            _ => {
733                                let progress = RecoveryProgress::none();
734                                let mut dbtx = db.begin_transaction().await;
735                                dbtx.log_event(
736                                    log_ordering_wakeup_tx.clone(),
737                                    None,
738                                    ModuleRecoveryStarted::new(module_instance_id),
739                                )
740                                .await;
741                                dbtx.insert_entry(
742                                    &ClientModuleRecovery { module_instance_id },
743                                    &ClientModuleRecoveryState { progress },
744                                )
745                                .await;
746
747                                dbtx.commit_tx().await;
748
749                                debug!(
750                                    id = %module_instance_id,
751                                    %kind, "Starting new module recovery"
752                                );
753                                Some(start_module_recover_fn(snapshot, progress))
754                            }
755                        }
756                    }
757                    _ => None,
758                };
759
760                match recovery {
761                    Some((recovery, recovery_progress_rx)) => {
762                        module_recoveries.insert(module_instance_id, recovery);
763                        module_recovery_progress_receivers
764                            .insert(module_instance_id, recovery_progress_rx);
765                    }
766                    _ => {
767                        let module = module_init
768                            .init(
769                                final_client.clone(),
770                                fed_id,
771                                config.global.api_endpoints.len(),
772                                module_config,
773                                db.clone(),
774                                module_instance_id,
775                                common_api_versions.core,
776                                api_version,
777                                // This is a divergence from the legacy client, where the child
778                                // secret keys were derived using
779                                // *module kind*-specific derivation paths.
780                                // Since the new client has to support multiple, segregated modules
781                                // of the same kind we have to use
782                                // the instance id instead.
783                                root_secret.derive_module_secret(module_instance_id),
784                                notifier.clone(),
785                                api.clone(),
786                                self.admin_creds.as_ref().map(|cred| cred.auth.clone()),
787                                task_group.clone(),
788                                connectors.clone(),
789                            )
790                            .await?;
791
792                        modules.register_module(module_instance_id, kind, module);
793                    }
794                }
795            }
796            modules
797        };
798
799        if init_state.is_pending() && module_recoveries.is_empty() {
800            let mut dbtx = db.begin_transaction().await;
801            dbtx.insert_entry(&ClientInitStateKey, &init_state.into_complete())
802                .await;
803            dbtx.commit_tx().await;
804        }
805
806        let mut primary_modules: BTreeMap<PrimaryModulePriority, PrimaryModuleCandidates> =
807            BTreeMap::new();
808
809        for (module_id, _kind, module) in modules.iter_modules() {
810            match module.supports_being_primary() {
811                PrimaryModuleSupport::Any { priority } => {
812                    primary_modules
813                        .entry(priority)
814                        .or_default()
815                        .wildcard
816                        .push(module_id);
817                }
818                PrimaryModuleSupport::Selected { priority, units } => {
819                    for unit in units {
820                        primary_modules
821                            .entry(priority)
822                            .or_default()
823                            .specific
824                            .entry(unit)
825                            .or_default()
826                            .push(module_id);
827                    }
828                }
829                PrimaryModuleSupport::None => {}
830            }
831        }
832
833        let executor = {
834            let mut executor_builder = Executor::builder();
835            executor_builder
836                .with_module(TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext);
837
838            for (module_instance_id, _, module) in modules.iter_modules() {
839                executor_builder.with_module_dyn(module.context(module_instance_id));
840            }
841
842            for module_instance_id in module_recoveries.keys() {
843                executor_builder.with_valid_module_id(*module_instance_id);
844            }
845
846            executor_builder.build(
847                db.clone(),
848                notifier,
849                task_group.clone(),
850                log_ordering_wakeup_tx.clone(),
851            )
852        };
853
854        let recovery_receiver_init_val = module_recovery_progress_receivers
855            .iter()
856            .map(|(module_instance_id, rx)| (*module_instance_id, *rx.borrow()))
857            .collect::<BTreeMap<_, _>>();
858        let (client_recovery_progress_sender, client_recovery_progress_receiver) =
859            watch::channel(recovery_receiver_init_val);
860
861        let client_inner = Arc::new(Client {
862            final_client: final_client.clone(),
863            config: tokio::sync::RwLock::new(config.clone()),
864            api_secret,
865            decoders,
866            db: db.clone(),
867            connectors,
868            federation_id: fed_id,
869            federation_config_meta: config.global.meta,
870            primary_modules,
871            modules,
872            module_inits: self.module_inits.clone(),
873            log_ordering_wakeup_tx,
874            log_event_added_rx,
875            log_event_added_transient_tx: log_event_added_transient_tx.clone(),
876            request_hook,
877            executor,
878            api,
879            secp_ctx: Secp256k1::new(),
880            root_secret,
881            task_group,
882            operation_log: OperationLog::new(db.clone()),
883            client_recovery_progress_receiver,
884            meta_service: self.meta_service,
885            iroh_enable_dht: self.iroh_enable_dht,
886            iroh_enable_next: self.iroh_enable_next,
887        });
888        client_inner
889            .task_group
890            .spawn_cancellable("MetaService::update_continuously", {
891                let client_inner = client_inner.clone();
892                async move {
893                    client_inner
894                        .meta_service
895                        .update_continuously(&client_inner)
896                        .await;
897                }
898            });
899
900        client_inner
901            .task_group
902            .spawn_cancellable("update-api-announcements", {
903                let client_inner = client_inner.clone();
904                async move {
905                    client_inner
906                        .connectors
907                        .wait_for_initialized_connections()
908                        .await;
909                    run_api_announcement_refresh_task(client_inner.clone()).await
910                }
911            });
912
913        client_inner
914            .task_group
915            .spawn_cancellable("event log ordering task", {
916                let client_inner = client_inner.clone();
917                async move {
918                    client_inner
919                        .connectors
920                        .wait_for_initialized_connections()
921                        .await;
922
923                    run_event_log_ordering_task(
924                        db.clone(),
925                        log_ordering_wakeup_rx,
926                        log_event_added_tx,
927                        log_event_added_transient_tx,
928                    )
929                    .await
930                }
931            });
932        let client_iface = std::sync::Arc::<Client>::downgrade(&client_inner);
933
934        let client_arc = ClientHandle::new(client_inner);
935
936        for (_, _, module) in client_arc.modules.iter_modules() {
937            module.start().await;
938        }
939
940        final_client.set(client_iface.clone());
941
942        if !module_recoveries.is_empty() {
943            client_arc.spawn_module_recoveries_task(
944                client_recovery_progress_sender,
945                module_recoveries,
946                module_recovery_progress_receivers,
947            );
948        }
949
950        Ok(client_arc)
951    }
952
953    async fn load_init_state(db: &Database) -> InitState {
954        let mut dbtx = db.begin_transaction_nc().await;
955        dbtx.get_value(&ClientInitStateKey)
956            .await
957            .unwrap_or_else(|| {
958                // could be turned in a hard error in the future, but for now
959                // no need to break backward compat.
960                warn!(
961                    target: LOG_CLIENT,
962                    "Client missing ClientRequiresRecovery: assuming complete"
963                );
964                db::InitState::Complete(db::InitModeComplete::Fresh)
965            })
966    }
967
968    fn decoders(&self, config: &ClientConfig) -> ModuleDecoderRegistry {
969        let mut decoders = client_decoders(
970            &self.module_inits,
971            config
972                .modules
973                .iter()
974                .map(|(module_instance, module_config)| (*module_instance, module_config.kind())),
975        );
976
977        decoders.register_module(
978            TRANSACTION_SUBMISSION_MODULE_INSTANCE,
979            ModuleKind::from_static_str("tx_submission"),
980            tx_submission_sm_decoder(),
981        );
982
983        decoders
984    }
985
986    fn config_decoded(
987        config: &ClientConfig,
988        decoders: &ModuleDecoderRegistry,
989    ) -> Result<ClientConfig, fedimint_core::encoding::DecodeError> {
990        config.clone().redecode_raw(decoders)
991    }
992
993    /// Re-derive client's `root_secret` using the federation ID. This
994    /// eliminates the possibility of having the same client `root_secret`
995    /// across multiple federations.
996    fn federation_root_secret(
997        pre_root_secret: &DerivableSecret,
998        config: &ClientConfig,
999    ) -> DerivableSecret {
1000        pre_root_secret.federation_key(&config.global.calculate_federation_id())
1001    }
1002
1003    /// Register to receiver all new transient (unpersisted) events
1004    pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
1005        self.log_event_added_transient_tx.subscribe()
1006    }
1007
1008    /// Check for pending config and migrate it if present.
1009    /// Returns the config to use (either the original or the migrated pending
1010    /// config).
1011    async fn migrate_pending_config_if_present(db: &Database) {
1012        if let Some(pending_config) = Client::get_pending_config_from_db(db).await {
1013            debug!(target: LOG_CLIENT, "Found pending client config, migrating to current config");
1014
1015            let mut dbtx = db.begin_transaction().await;
1016            // Update the main config with the pending config
1017            dbtx.insert_entry(&crate::db::ClientConfigKey, &pending_config)
1018                .await;
1019            // Remove the pending config
1020            dbtx.remove_entry(&PendingClientConfigKey).await;
1021            dbtx.commit_tx().await;
1022
1023            debug!(target: LOG_CLIENT, "Successfully migrated pending config to current config");
1024        }
1025    }
1026
1027    /// Asynchronously refetch client config from federation and compare with
1028    /// existing. If different, save to pending config in database.
1029    fn load_and_refresh_client_config_static(
1030        config: &ClientConfig,
1031        api: &DynGlobalApi,
1032        db: &Database,
1033        task_group: &TaskGroup,
1034    ) {
1035        let config = config.clone();
1036        let api = api.clone();
1037        let db = db.clone();
1038        let task_group = task_group.clone();
1039
1040        // Spawn background task to refetch config
1041        task_group.spawn_cancellable("refresh_client_config_static", async move {
1042            api.wait_for_initialized_connections().await;
1043            Self::refresh_client_config_static(&config, &api, &db).await;
1044        });
1045    }
1046
1047    /// Wrapper that handles errors from config refresh with proper logging
1048    async fn refresh_client_config_static(
1049        config: &ClientConfig,
1050        api: &DynGlobalApi,
1051        db: &Database,
1052    ) {
1053        if let Err(error) = Self::refresh_client_config_static_try(config, api, db).await {
1054            warn!(
1055                target: LOG_CLIENT,
1056                err = %error.fmt_compact_anyhow(), "Failed to refresh client config"
1057            );
1058        }
1059    }
1060
1061    /// Validate that a config update is valid
1062    fn validate_config_update(
1063        current_config: &ClientConfig,
1064        new_config: &ClientConfig,
1065    ) -> anyhow::Result<()> {
1066        // Global config must not change
1067        if current_config.global != new_config.global {
1068            bail!("Global configuration changes are not allowed in config updates");
1069        }
1070
1071        // Modules can only be added, existing ones must stay the same
1072        for (module_id, current_module_config) in &current_config.modules {
1073            match new_config.modules.get(module_id) {
1074                Some(new_module_config) => {
1075                    if current_module_config != new_module_config {
1076                        bail!(
1077                            "Module {} configuration changes are not allowed, only additions are permitted",
1078                            module_id
1079                        );
1080                    }
1081                }
1082                None => {
1083                    bail!(
1084                        "Module {} was removed in new config, only additions are allowed",
1085                        module_id
1086                    );
1087                }
1088            }
1089        }
1090
1091        Ok(())
1092    }
1093
1094    /// Refetch client config from federation and save as pending if different
1095    async fn refresh_client_config_static_try(
1096        current_config: &ClientConfig,
1097        api: &DynGlobalApi,
1098        db: &Database,
1099    ) -> anyhow::Result<()> {
1100        debug!(target: LOG_CLIENT, "Refreshing client config");
1101
1102        // Fetch latest config from federation
1103        let fetched_config = api
1104            .request_current_consensus::<ClientConfig>(
1105                CLIENT_CONFIG_ENDPOINT.to_owned(),
1106                ApiRequestErased::default(),
1107            )
1108            .await?;
1109
1110        // Validate the new config before proceeding
1111        Self::validate_config_update(current_config, &fetched_config)?;
1112
1113        // Compare with current config
1114        if current_config != &fetched_config {
1115            debug!(target: LOG_CLIENT, "Detected federation config change, saving as pending config");
1116
1117            let mut dbtx = db.begin_transaction().await;
1118            dbtx.insert_entry(&PendingClientConfigKey, &fetched_config)
1119                .await;
1120            dbtx.commit_tx().await;
1121        } else {
1122            debug!(target: LOG_CLIENT, "No federation config changes detected");
1123        }
1124
1125        Ok(())
1126    }
1127}
1128
1129/// An intermediate step before Client joining or recovering
1130///
1131/// Meant to support showing user some initial information about the Federation
1132/// before actually joining.
1133pub struct ClientPreview {
1134    inner: ClientBuilder,
1135    config: ClientConfig,
1136    connectors: ConnectorRegistry,
1137    api_secret: Option<String>,
1138    prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
1139    preview_prefetch_api_version_set:
1140        Option<JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>>,
1141}
1142
1143impl ClientPreview {
1144    /// Get the config
1145    pub fn config(&self) -> &ClientConfig {
1146        &self.config
1147    }
1148
1149    /// Join a new Federation
1150    ///
1151    /// When a user wants to connect to a new federation this function fetches
1152    /// the federation config and initializes the client database. If a user
1153    /// already joined the federation in the past and has a preexisting database
1154    /// use [`ClientBuilder::open`] instead.
1155    ///
1156    /// **Warning**: Calling `join` with a `root_secret` key that was used
1157    /// previous to `join` a Federation will lead to all sorts of malfunctions
1158    /// including likely loss of funds.
1159    ///
1160    /// This should be generally called only if the `root_secret` key is known
1161    /// not to have been used before (e.g. just randomly generated). For keys
1162    /// that might have been previous used (e.g. provided by the user),
1163    /// it's safer to call [`Self::recover`] which will attempt to recover
1164    /// client module states for the Federation.
1165    ///
1166    /// A typical "join federation" flow would look as follows:
1167    /// ```no_run
1168    /// # use std::str::FromStr;
1169    /// # use fedimint_core::invite_code::InviteCode;
1170    /// # use fedimint_core::config::ClientConfig;
1171    /// # use fedimint_derive_secret::DerivableSecret;
1172    /// # use fedimint_client::{Client, ClientBuilder, RootSecret};
1173    /// # use fedimint_connectors::ConnectorRegistry;
1174    /// # use fedimint_core::db::Database;
1175    /// # use fedimint_core::config::META_FEDERATION_NAME_KEY;
1176    /// #
1177    /// # #[tokio::main]
1178    /// # async fn main() -> anyhow::Result<()> {
1179    /// # let root_secret: DerivableSecret = unimplemented!();
1180    /// // Create a root secret, e.g. via fedimint-bip39, see also:
1181    /// // https://github.com/fedimint/fedimint/blob/master/docs/secret_derivation.md
1182    /// // let root_secret = …;
1183    ///
1184    /// // Get invite code from user
1185    /// let invite_code = InviteCode::from_str("fed11qgqpw9thwvaz7te3xgmjuvpwxqhrzw3jxumrvvf0qqqjpetvlg8glnpvzcufhffgzhv8m75f7y34ryk7suamh8x7zetly8h0v9v0rm")
1186    ///     .expect("Invalid invite code");
1187    ///
1188    /// // Tell the user the federation name, bitcoin network
1189    /// // (e.g. from wallet module config), and other details
1190    /// // that are typically contained in the federation's
1191    /// // meta fields.
1192    ///
1193    /// // let network = config.get_first_module_by_kind::<WalletClientConfig>("wallet")
1194    /// //     .expect("Module not found")
1195    /// //     .network;
1196    ///
1197    /// // Open the client's database, using the federation ID
1198    /// // as the DB name is a common pattern:
1199    ///
1200    /// // let db_path = format!("./path/to/db/{}", config.federation_id());
1201    /// // let db = RocksDb::open(db_path).expect("error opening DB");
1202    /// # let db: Database = unimplemented!();
1203    /// # let connectors: ConnectorRegistry = unimplemented!();
1204    ///
1205    /// let preview = Client::builder().await
1206    ///     // Mount the modules the client should support:
1207    ///     // .with_module(LightningClientInit)
1208    ///     // .with_module(MintClientInit)
1209    ///     // .with_module(WalletClientInit::default())
1210    ///      .expect("Error building client")
1211    ///      .preview(connectors, &invite_code).await?;
1212    ///
1213    /// println!(
1214    ///     "The federation name is: {}",
1215    ///     preview.config().meta::<String>(META_FEDERATION_NAME_KEY)
1216    ///         .expect("Could not decode name field")
1217    ///         .expect("Name isn't set")
1218    /// );
1219    ///
1220    /// let client = preview
1221    ///     .join(db, RootSecret::StandardDoubleDerive(root_secret))
1222    ///     .await
1223    ///     .expect("Error joining federation");
1224    /// # Ok(())
1225    /// # }
1226    /// ```
1227    pub async fn join(
1228        self,
1229        db_no_decoders: Database,
1230        pre_root_secret: RootSecret,
1231    ) -> anyhow::Result<ClientHandle> {
1232        let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1233
1234        let client = self
1235            .inner
1236            .init(
1237                self.connectors,
1238                db_no_decoders,
1239                pre_root_secret,
1240                self.config,
1241                self.api_secret,
1242                InitMode::Fresh,
1243                self.prefetch_api_announcements,
1244                self.preview_prefetch_api_version_set,
1245            )
1246            .await?;
1247
1248        Ok(client)
1249    }
1250
1251    /// Join a (possibly) previous joined Federation
1252    ///
1253    /// Unlike [`Self::join`], `recover` will run client module
1254    /// recovery for each client module attempting to recover any previous
1255    /// module state.
1256    ///
1257    /// Recovery process takes time during which each recovering client module
1258    /// will not be available for use.
1259    ///
1260    /// Calling `recovery` with a `root_secret` that was not actually previous
1261    /// used in a given Federation is safe.
1262    pub async fn recover(
1263        self,
1264        db_no_decoders: Database,
1265        pre_root_secret: RootSecret,
1266        backup: Option<ClientBackup>,
1267    ) -> anyhow::Result<ClientHandle> {
1268        let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1269
1270        let client = self
1271            .inner
1272            .init(
1273                self.connectors,
1274                db_no_decoders,
1275                pre_root_secret,
1276                self.config,
1277                self.api_secret,
1278                InitMode::Recover {
1279                    snapshot: backup.clone(),
1280                },
1281                self.prefetch_api_announcements,
1282                self.preview_prefetch_api_version_set,
1283            )
1284            .await?;
1285
1286        Ok(client)
1287    }
1288
1289    /// Download most recent valid backup found from the Federation
1290    pub async fn download_backup_from_federation(
1291        &self,
1292        pre_root_secret: RootSecret,
1293    ) -> anyhow::Result<Option<ClientBackup>> {
1294        let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1295        let api = DynGlobalApi::new(
1296            self.connectors.clone(),
1297            // TODO: change join logic to use FederationId v2
1298            self.config
1299                .global
1300                .api_endpoints
1301                .iter()
1302                .map(|(peer_id, peer_url)| (*peer_id, peer_url.url.clone()))
1303                .collect(),
1304            self.api_secret.as_deref(),
1305        )?;
1306
1307        Client::download_backup_from_federation_static(
1308            &api,
1309            &ClientBuilder::federation_root_secret(&pre_root_secret, &self.config),
1310            &self.inner.decoders(&self.config),
1311        )
1312        .await
1313    }
1314}