fedimint_client/client/
builder.rs

1use std::collections::BTreeMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::time::Duration;
6
7use anyhow::{Context as _, bail, ensure};
8use bitcoin::key::Secp256k1;
9use fedimint_api_client::api::global_api::with_cache::GlobalFederationApiWithCacheExt as _;
10use fedimint_api_client::api::global_api::with_request_hook::{
11    ApiRequestHook, RawFederationApiWithRequestHookExt as _,
12};
13use fedimint_api_client::api::net::ConnectorType;
14use fedimint_api_client::api::{
15    ApiVersionSet, DynClientConnector, DynGlobalApi, FederationApiExt as _, ReconnectFederationApi,
16    make_admin_connector, make_connector,
17};
18use fedimint_client_module::api::ClientRawFederationApiExt as _;
19use fedimint_client_module::meta::LegacyMetaSource;
20use fedimint_client_module::module::init::ClientModuleInit;
21use fedimint_client_module::module::recovery::RecoveryProgress;
22use fedimint_client_module::module::{
23    ClientModuleRegistry, FinalClientIface, PrimaryModulePriority, PrimaryModuleSupport,
24};
25use fedimint_client_module::secret::{DeriveableSecretClientExt as _, get_default_client_secret};
26use fedimint_client_module::transaction::{
27    TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext, tx_submission_sm_decoder,
28};
29use fedimint_client_module::{AdminCreds, ModuleRecoveryStarted};
30use fedimint_core::config::{ClientConfig, FederationId, ModuleInitRegistry};
31use fedimint_core::core::{ModuleInstanceId, ModuleKind};
32use fedimint_core::db::{
33    Database, IDatabaseTransactionOpsCoreTyped as _, verify_module_db_integrity_dbtx,
34};
35use fedimint_core::endpoint_constants::CLIENT_CONFIG_ENDPOINT;
36use fedimint_core::envs::is_running_in_test_env;
37use fedimint_core::invite_code::InviteCode;
38use fedimint_core::module::registry::ModuleDecoderRegistry;
39use fedimint_core::module::{ApiRequestErased, ApiVersion, SupportedApiVersionsSummary};
40use fedimint_core::task::TaskGroup;
41use fedimint_core::task::jit::{Jit, JitTry, JitTryAnyhow};
42use fedimint_core::util::{FmtCompact as _, FmtCompactAnyhow as _};
43use fedimint_core::{NumPeers, PeerId, fedimint_build_code_version_env, maybe_add_send};
44use fedimint_derive_secret::DerivableSecret;
45use fedimint_eventlog::{
46    DBTransactionEventLogExt as _, EventLogEntry, run_event_log_ordering_task,
47};
48use fedimint_logging::LOG_CLIENT;
49use tokio::sync::{broadcast, watch};
50use tracing::{debug, trace, warn};
51
52use super::handle::ClientHandle;
53use super::{Client, client_decoders};
54use crate::api_announcements::{
55    PeersSignedApiAnnouncements, fetch_api_announcements_from_at_least_num_of_peers, get_api_urls,
56    run_api_announcement_refresh_task, store_api_announcements_updates_from_peers,
57};
58use crate::backup::{ClientBackup, Metadata};
59use crate::client::PrimaryModuleCandidates;
60use crate::db::{
61    self, ApiSecretKey, ClientInitStateKey, ClientMetadataKey, ClientModuleRecovery,
62    ClientModuleRecoveryState, ClientPreRootSecretHashKey, InitMode, InitState,
63    PendingClientConfigKey, apply_migrations_client_module_dbtx,
64};
65use crate::meta::MetaService;
66use crate::module_init::ClientModuleInitRegistry;
67use crate::oplog::OperationLog;
68use crate::sm::executor::Executor;
69use crate::sm::notifier::Notifier;
70
71/// The type of root secret hashing
72///
73/// *Please read this documentation carefully if, especially if you're upgrading
74/// downstream Fedimint client application.*
75///
76/// Internally, client will always hash-in federation id
77/// to the root secret provided to the [`ClientBuilder`],
78/// to ensure a different actual root secret is used for ever federation.
79/// This makes reusing a single root secret for different federations
80/// in a multi-federation client, perfectly fine, and frees the client
81/// from worrying about `FederationId`.
82///
83/// However, in the past Fedimint applications (including `fedimint-cli`)
84/// were doing the hashing-in of `FederationId` outside of `fedimint-client` as
85/// well, which lead to effectively doing it twice, and pushed downloading of
86/// the client config on join to application code, a sub-optimal API, especially
87/// after joining federation needed to handle even more functionality.
88///
89/// To keep the interoperability of the seed phrases this double-derivation
90/// is preserved, due to other architectural reason, `fedimint-client`
91/// will now do the outer-derivation internally as well.
92#[derive(Clone)]
93pub enum RootSecret {
94    /// Derive an extra round of federation-id to the secret, like
95    /// Fedimint applications were doing manually in the past.
96    ///
97    /// **Note**: Applications MUST NOT do the derivation themselves anymore.
98    StandardDoubleDerive(DerivableSecret),
99    /// No double derivation
100    ///
101    /// This is useful for applications that for whatever reason do the
102    /// double-derivation externally, or use a custom scheme.
103    Custom(DerivableSecret),
104}
105
106impl RootSecret {
107    fn to_inner(&self, federation_id: FederationId) -> DerivableSecret {
108        match self {
109            RootSecret::StandardDoubleDerive(derivable_secret) => {
110                get_default_client_secret(derivable_secret, &federation_id)
111            }
112            RootSecret::Custom(derivable_secret) => derivable_secret.clone(),
113        }
114    }
115}
116
117/// Used to configure, assemble and build [`Client`]
118pub struct ClientBuilder {
119    module_inits: ClientModuleInitRegistry,
120    admin_creds: Option<AdminCreds>,
121    meta_service: Arc<crate::meta::MetaService>,
122    connector: ConnectorType,
123    stopped: bool,
124    log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
125    request_hook: ApiRequestHook,
126    reuse_connector: Option<DynClientConnector>,
127    iroh_enable_dht: bool,
128    iroh_enable_next: bool,
129}
130
131impl ClientBuilder {
132    pub(crate) fn new() -> Self {
133        trace!(
134            target: LOG_CLIENT,
135            version = %fedimint_build_code_version_env!(),
136            "Initializing fedimint client",
137        );
138        let meta_service = MetaService::new(LegacyMetaSource::default());
139        let (log_event_added_transient_tx, _log_event_added_transient_rx) =
140            broadcast::channel(1024);
141        ClientBuilder {
142            module_inits: ModuleInitRegistry::new(),
143            connector: ConnectorType::default(),
144            admin_creds: None,
145            stopped: false,
146            meta_service,
147            log_event_added_transient_tx,
148            request_hook: Arc::new(|api| api),
149            reuse_connector: None,
150            iroh_enable_dht: true,
151            iroh_enable_next: true,
152        }
153    }
154
155    pub(crate) fn from_existing(client: &Client) -> Self {
156        ClientBuilder {
157            module_inits: client.module_inits.clone(),
158            admin_creds: None,
159            stopped: false,
160            // non unique
161            meta_service: client.meta_service.clone(),
162            connector: client.connector,
163            log_event_added_transient_tx: client.log_event_added_transient_tx.clone(),
164            request_hook: client.request_hook.clone(),
165            reuse_connector: Some(client.api.connector().clone()),
166            iroh_enable_dht: client.iroh_enable_dht,
167            iroh_enable_next: client.iroh_enable_next,
168        }
169    }
170
171    /// Replace module generator registry entirely
172    pub fn with_module_inits(&mut self, module_inits: ClientModuleInitRegistry) {
173        self.module_inits = module_inits;
174    }
175
176    /// Make module generator available when reading the config
177    pub fn with_module<M: ClientModuleInit>(&mut self, module_init: M) {
178        self.module_inits.attach(module_init);
179    }
180
181    pub fn stopped(&mut self) {
182        self.stopped = true;
183    }
184
185    /// Build the [`Client`] with a custom wrapper around its api request logic
186    ///
187    /// This is intended to be used by downstream applications, e.g. to:
188    ///
189    /// * simulate offline mode,
190    /// * save battery when the OS indicates lack of connectivity,
191    /// * inject faults and delays for testing purposes,
192    /// * collect statistics and emit notifications.
193    pub fn with_api_request_hook(mut self, hook: ApiRequestHook) -> Self {
194        self.request_hook = hook;
195        self
196    }
197
198    pub fn with_meta_service(&mut self, meta_service: Arc<MetaService>) {
199        self.meta_service = meta_service;
200    }
201
202    /// Override if the DHT should be enabled when using Iroh to connect to
203    /// the federation
204    pub fn with_iroh_enable_dht(mut self, iroh_enable_dht: bool) -> Self {
205        self.iroh_enable_dht = iroh_enable_dht;
206        self
207    }
208
209    /// Override if the parallel unstable/next Iroh stack should be enabled when
210    /// using Iroh to connect to the federation
211    pub fn with_iroh_enable_next(mut self, iroh_enable_next: bool) -> Self {
212        self.iroh_enable_next = iroh_enable_next;
213        self
214    }
215
216    /// Migrate client module databases
217    ///
218    /// Note: Client core db migration are done immediately in
219    /// [`Client::builder`], to ensure db matches the code at all times,
220    /// while migrating modules requires figuring out what modules actually
221    /// are first.
222    async fn migrate_module_dbs(&self, db: &Database) -> anyhow::Result<()> {
223        // Only apply the client database migrations if the database has been
224        // initialized.
225        // This only works as long as you don't change the client config
226        if let Ok(client_config) = self.load_existing_config(db).await {
227            for (module_id, module_cfg) in client_config.modules {
228                let kind = module_cfg.kind.clone();
229                let Some(init) = self.module_inits.get(&kind) else {
230                    // normal, expected and already logged about when building the client
231                    continue;
232                };
233
234                let mut dbtx = db.begin_transaction().await;
235                apply_migrations_client_module_dbtx(
236                    &mut dbtx.to_ref_nc(),
237                    kind.to_string(),
238                    init.get_database_migrations(),
239                    module_id,
240                )
241                .await?;
242                if let Some(used_db_prefixes) = init.used_db_prefixes()
243                    && is_running_in_test_env()
244                {
245                    verify_module_db_integrity_dbtx(
246                        &mut dbtx.to_ref_nc(),
247                        module_id,
248                        kind,
249                        &used_db_prefixes,
250                    )
251                    .await;
252                }
253                dbtx.commit_tx_result().await?;
254            }
255        }
256
257        Ok(())
258    }
259
260    pub async fn load_existing_config(&self, db: &Database) -> anyhow::Result<ClientConfig> {
261        let Some(config) = Client::get_config_from_db(db).await else {
262            bail!("Client database not initialized")
263        };
264
265        Ok(config)
266    }
267
268    pub fn set_admin_creds(&mut self, creds: AdminCreds) {
269        self.admin_creds = Some(creds);
270    }
271
272    pub fn with_connector(&mut self, connector: ConnectorType) {
273        self.connector = connector;
274    }
275
276    #[cfg(feature = "tor")]
277    pub fn with_tor_connector(&mut self) {
278        self.with_connector(ConnectorType::tor());
279    }
280
281    #[allow(clippy::too_many_arguments)]
282    async fn init(
283        self,
284        db_no_decoders: Database,
285        pre_root_secret: DerivableSecret,
286        config: ClientConfig,
287        api_secret: Option<String>,
288        init_mode: InitMode,
289        preview_prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
290        preview_prefetch_api_version_set: Option<
291            JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>,
292        >,
293    ) -> anyhow::Result<ClientHandle> {
294        if Client::is_initialized(&db_no_decoders).await {
295            bail!("Client database already initialized")
296        }
297
298        Client::run_core_migrations(&db_no_decoders).await?;
299
300        // Note: It's important all client initialization is performed as one big
301        // transaction to avoid half-initialized client state.
302        {
303            debug!(target: LOG_CLIENT, "Initializing client database");
304            let mut dbtx = db_no_decoders.begin_transaction().await;
305            // Save config to DB
306            dbtx.insert_new_entry(&crate::db::ClientConfigKey, &config)
307                .await;
308            dbtx.insert_entry(
309                &ClientPreRootSecretHashKey,
310                &pre_root_secret.derive_pre_root_secret_hash(),
311            )
312            .await;
313
314            if let Some(api_secret) = api_secret.as_ref() {
315                dbtx.insert_new_entry(&ApiSecretKey, api_secret).await;
316            }
317
318            let init_state = InitState::Pending(init_mode);
319            dbtx.insert_entry(&ClientInitStateKey, &init_state).await;
320
321            let metadata = init_state
322                .does_require_recovery()
323                .flatten()
324                .map_or(Metadata::empty(), |s| s.metadata);
325
326            dbtx.insert_new_entry(&ClientMetadataKey, &metadata).await;
327
328            dbtx.commit_tx_result().await?;
329        }
330
331        let stopped = self.stopped;
332        self.build(
333            db_no_decoders,
334            pre_root_secret,
335            config,
336            api_secret,
337            stopped,
338            preview_prefetch_api_announcements,
339            preview_prefetch_api_version_set,
340        )
341        .await
342    }
343
344    pub async fn preview(self, invite_code: &InviteCode) -> anyhow::Result<ClientPreview> {
345        let (config, api) = self
346            .connector
347            .download_from_invite_code(invite_code, self.iroh_enable_dht, self.iroh_enable_next)
348            .await?;
349
350        let prefetch_api_announcements =
351            config
352                .global
353                .broadcast_public_keys
354                .clone()
355                .map(|guardian_pub_keys| {
356                    Jit::new({
357                        let api = api.clone();
358                        move || async move {
359                            // Fetching api announcements using invite urls before joining.
360                            // This ensures the client can communicated with
361                            // the Federation even if all the peers moved write them to database.
362                            fetch_api_announcements_from_at_least_num_of_peers(
363                                1,
364                                &api,
365                                &guardian_pub_keys,
366                                // If we can, we would love to get more than just one response,
367                                // but we need to wrap it up fast for good UX.
368                                Duration::from_millis(20),
369                            )
370                            .await
371                        }
372                    })
373                });
374
375        self.preview_inner(
376            config,
377            invite_code.api_secret(),
378            Some(api),
379            prefetch_api_announcements,
380        )
381        .await
382    }
383
384    /// Use [`Self::preview`] instead
385    ///
386    /// If `reuse_api` is set, it will allow the preview to prefetch some data
387    /// to speed up the final join.
388    pub async fn preview_with_existing_config(
389        self,
390        config: ClientConfig,
391        api_secret: Option<String>,
392        reuse_api: Option<DynGlobalApi>,
393    ) -> anyhow::Result<ClientPreview> {
394        self.preview_inner(config, api_secret, reuse_api, None)
395            .await
396    }
397
398    async fn preview_inner(
399        mut self,
400        config: ClientConfig,
401        api_secret: Option<String>,
402        reuse_api: Option<DynGlobalApi>,
403        prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
404    ) -> anyhow::Result<ClientPreview> {
405        let preview_prefetch_api_version_set = if let Some(api) = reuse_api {
406            self.reuse_connector = Some(api.connector().clone());
407
408            Some(JitTry::new_try({
409                let config = config.clone();
410                || async move { Client::fetch_common_api_versions(&config, &api).await }
411            }))
412        } else {
413            None
414        };
415        Ok(ClientPreview {
416            inner: self,
417            config,
418            api_secret,
419            prefetch_api_announcements,
420            preview_prefetch_api_version_set,
421        })
422    }
423
424    pub async fn open(
425        self,
426        db_no_decoders: Database,
427        pre_root_secret: RootSecret,
428    ) -> anyhow::Result<ClientHandle> {
429        Client::run_core_migrations(&db_no_decoders).await?;
430
431        // Check for pending config and migrate if present
432        Self::migrate_pending_config_if_present(&db_no_decoders).await;
433
434        let Some(config) = Client::get_config_from_db(&db_no_decoders).await else {
435            bail!("Client database not initialized")
436        };
437
438        let pre_root_secret = pre_root_secret.to_inner(config.calculate_federation_id());
439
440        match db_no_decoders
441            .begin_transaction_nc()
442            .await
443            .get_value(&ClientPreRootSecretHashKey)
444            .await
445        {
446            Some(secret_hash) => {
447                ensure!(
448                    pre_root_secret.derive_pre_root_secret_hash() == secret_hash,
449                    "Secret hash does not match. Incorrect secret"
450                );
451            }
452            _ => {
453                debug!(target: LOG_CLIENT, "Backfilling secret hash");
454                // Note: no need for dbtx autocommit, we are the only writer ATM
455                let mut dbtx = db_no_decoders.begin_transaction().await;
456                dbtx.insert_entry(
457                    &ClientPreRootSecretHashKey,
458                    &pre_root_secret.derive_pre_root_secret_hash(),
459                )
460                .await;
461                dbtx.commit_tx().await;
462            }
463        }
464
465        let api_secret = Client::get_api_secret_from_db(&db_no_decoders).await;
466        let stopped = self.stopped;
467        let request_hook = self.request_hook.clone();
468
469        let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
470        let client = self
471            .build_stopped(
472                db_no_decoders,
473                pre_root_secret,
474                &config,
475                api_secret,
476                log_event_added_transient_tx,
477                request_hook,
478                None,
479                None,
480            )
481            .await?;
482        if !stopped {
483            client.as_inner().start_executor();
484        }
485        Ok(client)
486    }
487
488    /// Build a [`Client`] and start the executor
489    #[allow(clippy::too_many_arguments)]
490    pub(crate) async fn build(
491        self,
492        db_no_decoders: Database,
493        pre_root_secret: DerivableSecret,
494        config: ClientConfig,
495        api_secret: Option<String>,
496        stopped: bool,
497        preview_prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
498        preview_prefetch_api_version_set: Option<
499            JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>,
500        >,
501    ) -> anyhow::Result<ClientHandle> {
502        let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
503        let request_hook = self.request_hook.clone();
504        let client = self
505            .build_stopped(
506                db_no_decoders,
507                pre_root_secret,
508                &config,
509                api_secret,
510                log_event_added_transient_tx,
511                request_hook,
512                preview_prefetch_api_announcements,
513                preview_prefetch_api_version_set,
514            )
515            .await?;
516        if !stopped {
517            client.as_inner().start_executor();
518        }
519
520        Ok(client)
521    }
522
523    // TODO: remove config argument
524    /// Build a [`Client`] but do not start the executor
525    #[allow(clippy::too_many_arguments)]
526    async fn build_stopped(
527        self,
528        db_no_decoders: Database,
529        pre_root_secret: DerivableSecret,
530        config: &ClientConfig,
531        api_secret: Option<String>,
532        log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
533        request_hook: ApiRequestHook,
534        preview_prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
535        preview_prefetch_api_version_set: Option<
536            JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>,
537        >,
538    ) -> anyhow::Result<ClientHandle> {
539        debug!(
540            target: LOG_CLIENT,
541            version = %fedimint_build_code_version_env!(),
542            "Building fedimint client",
543        );
544        let (log_event_added_tx, log_event_added_rx) = watch::channel(());
545        let (log_ordering_wakeup_tx, log_ordering_wakeup_rx) = watch::channel(());
546
547        let decoders = self.decoders(config);
548        let config = Self::config_decoded(config, &decoders)?;
549        let fed_id = config.calculate_federation_id();
550        let db = db_no_decoders.with_decoders(decoders.clone());
551        let connector = self.connector;
552        let peer_urls = get_api_urls(&db, &config).await;
553        let api = match self.admin_creds.as_ref() {
554            Some(admin_creds) => {
555                let connector = make_admin_connector(
556                    admin_creds.peer_id,
557                    peer_urls
558                        .into_iter()
559                        .find_map(|(peer, api_url)| {
560                            (admin_creds.peer_id == peer).then_some(api_url)
561                        })
562                        .context("Admin creds should match a peer")?,
563                    &api_secret,
564                    self.iroh_enable_dht,
565                    self.iroh_enable_next,
566                )
567                .await?;
568                ReconnectFederationApi::new_admin(connector, admin_creds.peer_id)
569                    .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
570                    .with_request_hook(&request_hook)
571                    .with_cache()
572                    .into()
573            }
574            None => {
575                let connector = if let Some(connector) = self.reuse_connector.clone()
576                    && connector.peers().len() == peer_urls.len()
577                {
578                    connector
579                } else {
580                    make_connector(
581                        peer_urls,
582                        &api_secret,
583                        self.iroh_enable_dht,
584                        self.iroh_enable_next,
585                    )
586                    .await?
587                };
588                ReconnectFederationApi::new(connector, None)
589                    .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
590                    .with_request_hook(&request_hook)
591                    .with_cache()
592                    .into()
593            }
594        };
595
596        let task_group = TaskGroup::new();
597
598        // Migrate the database before interacting with it in case any on-disk data
599        // structures have changed.
600        self.migrate_module_dbs(&db).await?;
601
602        let init_state = Self::load_init_state(&db).await;
603
604        let notifier = Notifier::new();
605
606        if let Some(p) = preview_prefetch_api_announcements {
607            // We want to fail if we were unable to figure out
608            // current addresses of peers in the federation, as it will potentially never
609            // fix itself, so it's better to fail the join explicitly.
610            let announcements = p.get().await;
611
612            store_api_announcements_updates_from_peers(&db, announcements).await?
613        }
614
615        if let Some(preview_prefetch_api_version_set) = preview_prefetch_api_version_set {
616            match preview_prefetch_api_version_set.get_try().await {
617                Ok(peer_api_versions) => {
618                    Client::store_prefetched_api_versions(
619                        &db,
620                        &config,
621                        &self.module_inits,
622                        peer_api_versions,
623                    )
624                    .await;
625                }
626                Err(err) => {
627                    debug!(target: LOG_CLIENT, err = %err.fmt_compact(), "Prefetching api version negotiation failed");
628                }
629            }
630        }
631
632        let common_api_versions = Client::load_and_refresh_common_api_version_static(
633            &config,
634            &self.module_inits,
635            &api,
636            &db,
637            &task_group,
638        )
639        .await
640        .inspect_err(|err| {
641            warn!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Failed to discover API version to use.");
642        })
643        .unwrap_or(ApiVersionSet {
644            core: ApiVersion::new(0, 0),
645            // This will cause all modules to skip initialization
646            modules: BTreeMap::new(),
647        });
648
649        debug!(target: LOG_CLIENT, ?common_api_versions, "Completed api version negotiation");
650
651        // Asynchronously refetch client config and compare with existing
652        Self::load_and_refresh_client_config_static(&config, &api, &db, &task_group);
653
654        let mut module_recoveries: BTreeMap<
655            ModuleInstanceId,
656            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
657        > = BTreeMap::new();
658        let mut module_recovery_progress_receivers: BTreeMap<
659            ModuleInstanceId,
660            watch::Receiver<RecoveryProgress>,
661        > = BTreeMap::new();
662
663        let final_client = FinalClientIface::default();
664
665        let root_secret = Self::federation_root_secret(&pre_root_secret, &config);
666
667        let modules = {
668            let mut modules = ClientModuleRegistry::default();
669            for (module_instance_id, module_config) in config.modules.clone() {
670                let kind = module_config.kind().clone();
671                let Some(module_init) = self.module_inits.get(&kind).cloned() else {
672                    debug!(
673                        target: LOG_CLIENT,
674                        kind=%kind,
675                        instance_id=%module_instance_id,
676                        "Module kind of instance not found in module gens, skipping");
677                    continue;
678                };
679
680                let Some(&api_version) = common_api_versions.modules.get(&module_instance_id)
681                else {
682                    warn!(
683                        target: LOG_CLIENT,
684                        kind=%kind,
685                        instance_id=%module_instance_id,
686                        "Module kind of instance has incompatible api version, skipping"
687                    );
688                    continue;
689                };
690
691                // since the exact logic of when to start recovery is a bit gnarly,
692                // the recovery call is extracted here.
693                let start_module_recover_fn =
694                    |snapshot: Option<ClientBackup>, progress: RecoveryProgress| {
695                        let module_config = module_config.clone();
696                        let num_peers = NumPeers::from(config.global.api_endpoints.len());
697                        let db = db.clone();
698                        let kind = kind.clone();
699                        let notifier = notifier.clone();
700                        let api = api.clone();
701                        let root_secret = root_secret.clone();
702                        let admin_auth = self.admin_creds.as_ref().map(|creds| creds.auth.clone());
703                        let final_client = final_client.clone();
704                        let (progress_tx, progress_rx) = tokio::sync::watch::channel(progress);
705                        let task_group = task_group.clone();
706                        let module_init = module_init.clone();
707                        (
708                            Box::pin(async move {
709                                module_init
710                                    .recover(
711                                        final_client.clone(),
712                                        fed_id,
713                                        num_peers,
714                                        module_config.clone(),
715                                        db.clone(),
716                                        module_instance_id,
717                                        common_api_versions.core,
718                                        api_version,
719                                        root_secret.derive_module_secret(module_instance_id),
720                                        notifier.clone(),
721                                        api.clone(),
722                                        admin_auth,
723                                        snapshot.as_ref().and_then(|s| s.modules.get(&module_instance_id)),
724                                        progress_tx,
725                                        task_group,
726                                    )
727                                    .await
728                                    .inspect_err(|err| {
729                                        warn!(
730                                            target: LOG_CLIENT,
731                                            module_id = module_instance_id, %kind, err = %err.fmt_compact_anyhow(), "Module failed to recover"
732                                        );
733                                    })
734                            }),
735                            progress_rx,
736                        )
737                    };
738
739                let recovery = match init_state.does_require_recovery() {
740                    Some(snapshot) => {
741                        match db
742                            .begin_transaction_nc()
743                            .await
744                            .get_value(&ClientModuleRecovery { module_instance_id })
745                            .await
746                        {
747                            Some(module_recovery_state) => {
748                                if module_recovery_state.is_done() {
749                                    debug!(
750                                        id = %module_instance_id,
751                                        %kind, "Module recovery already complete"
752                                    );
753                                    None
754                                } else {
755                                    debug!(
756                                        id = %module_instance_id,
757                                        %kind,
758                                        progress = %module_recovery_state.progress,
759                                        "Starting module recovery with an existing progress"
760                                    );
761                                    Some(start_module_recover_fn(
762                                        snapshot,
763                                        module_recovery_state.progress,
764                                    ))
765                                }
766                            }
767                            _ => {
768                                let progress = RecoveryProgress::none();
769                                let mut dbtx = db.begin_transaction().await;
770                                dbtx.log_event(
771                                    log_ordering_wakeup_tx.clone(),
772                                    None,
773                                    ModuleRecoveryStarted::new(module_instance_id),
774                                )
775                                .await;
776                                dbtx.insert_entry(
777                                    &ClientModuleRecovery { module_instance_id },
778                                    &ClientModuleRecoveryState { progress },
779                                )
780                                .await;
781
782                                dbtx.commit_tx().await;
783
784                                debug!(
785                                    id = %module_instance_id,
786                                    %kind, "Starting new module recovery"
787                                );
788                                Some(start_module_recover_fn(snapshot, progress))
789                            }
790                        }
791                    }
792                    _ => None,
793                };
794
795                match recovery {
796                    Some((recovery, recovery_progress_rx)) => {
797                        module_recoveries.insert(module_instance_id, recovery);
798                        module_recovery_progress_receivers
799                            .insert(module_instance_id, recovery_progress_rx);
800                    }
801                    _ => {
802                        let module = module_init
803                            .init(
804                                final_client.clone(),
805                                fed_id,
806                                config.global.api_endpoints.len(),
807                                module_config,
808                                db.clone(),
809                                module_instance_id,
810                                common_api_versions.core,
811                                api_version,
812                                // This is a divergence from the legacy client, where the child
813                                // secret keys were derived using
814                                // *module kind*-specific derivation paths.
815                                // Since the new client has to support multiple, segregated modules
816                                // of the same kind we have to use
817                                // the instance id instead.
818                                root_secret.derive_module_secret(module_instance_id),
819                                notifier.clone(),
820                                api.clone(),
821                                self.admin_creds.as_ref().map(|cred| cred.auth.clone()),
822                                task_group.clone(),
823                            )
824                            .await?;
825
826                        modules.register_module(module_instance_id, kind, module);
827                    }
828                }
829            }
830            modules
831        };
832
833        if init_state.is_pending() && module_recoveries.is_empty() {
834            let mut dbtx = db.begin_transaction().await;
835            dbtx.insert_entry(&ClientInitStateKey, &init_state.into_complete())
836                .await;
837            dbtx.commit_tx().await;
838        }
839
840        let mut primary_modules: BTreeMap<PrimaryModulePriority, PrimaryModuleCandidates> =
841            BTreeMap::new();
842
843        for (module_id, _kind, module) in modules.iter_modules() {
844            match module.supports_being_primary() {
845                PrimaryModuleSupport::Any { priority } => {
846                    primary_modules
847                        .entry(priority)
848                        .or_default()
849                        .wildcard
850                        .push(module_id);
851                }
852                PrimaryModuleSupport::Selected { priority, units } => {
853                    for unit in units {
854                        primary_modules
855                            .entry(priority)
856                            .or_default()
857                            .specific
858                            .entry(unit)
859                            .or_default()
860                            .push(module_id);
861                    }
862                }
863                PrimaryModuleSupport::None => {}
864            }
865        }
866
867        let executor = {
868            let mut executor_builder = Executor::builder();
869            executor_builder
870                .with_module(TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext);
871
872            for (module_instance_id, _, module) in modules.iter_modules() {
873                executor_builder.with_module_dyn(module.context(module_instance_id));
874            }
875
876            for module_instance_id in module_recoveries.keys() {
877                executor_builder.with_valid_module_id(*module_instance_id);
878            }
879
880            executor_builder.build(
881                db.clone(),
882                notifier,
883                task_group.clone(),
884                log_ordering_wakeup_tx.clone(),
885            )
886        };
887
888        let recovery_receiver_init_val = module_recovery_progress_receivers
889            .iter()
890            .map(|(module_instance_id, rx)| (*module_instance_id, *rx.borrow()))
891            .collect::<BTreeMap<_, _>>();
892        let (client_recovery_progress_sender, client_recovery_progress_receiver) =
893            watch::channel(recovery_receiver_init_val);
894
895        let client_inner = Arc::new(Client {
896            final_client: final_client.clone(),
897            config: tokio::sync::RwLock::new(config.clone()),
898            api_secret,
899            decoders,
900            db: db.clone(),
901            federation_id: fed_id,
902            federation_config_meta: config.global.meta,
903            primary_modules,
904            modules,
905            module_inits: self.module_inits.clone(),
906            log_ordering_wakeup_tx,
907            log_event_added_rx,
908            log_event_added_transient_tx: log_event_added_transient_tx.clone(),
909            request_hook,
910            executor,
911            api,
912            secp_ctx: Secp256k1::new(),
913            root_secret,
914            task_group,
915            operation_log: OperationLog::new(db.clone()),
916            client_recovery_progress_receiver,
917            meta_service: self.meta_service,
918            connector,
919            iroh_enable_dht: self.iroh_enable_dht,
920            iroh_enable_next: self.iroh_enable_next,
921        });
922        client_inner
923            .task_group
924            .spawn_cancellable("MetaService::update_continuously", {
925                let client_inner = client_inner.clone();
926                async move {
927                    client_inner
928                        .meta_service
929                        .update_continuously(&client_inner)
930                        .await;
931                }
932            });
933
934        client_inner.task_group.spawn_cancellable(
935            "update-api-announcements",
936            run_api_announcement_refresh_task(client_inner.clone()),
937        );
938
939        client_inner.task_group.spawn_cancellable(
940            "event log ordering task",
941            run_event_log_ordering_task(
942                db.clone(),
943                log_ordering_wakeup_rx,
944                log_event_added_tx,
945                log_event_added_transient_tx,
946            ),
947        );
948        let client_iface = std::sync::Arc::<Client>::downgrade(&client_inner);
949
950        let client_arc = ClientHandle::new(client_inner);
951
952        for (_, _, module) in client_arc.modules.iter_modules() {
953            module.start().await;
954        }
955
956        final_client.set(client_iface.clone());
957
958        if !module_recoveries.is_empty() {
959            client_arc.spawn_module_recoveries_task(
960                client_recovery_progress_sender,
961                module_recoveries,
962                module_recovery_progress_receivers,
963            );
964        }
965
966        Ok(client_arc)
967    }
968
969    async fn load_init_state(db: &Database) -> InitState {
970        let mut dbtx = db.begin_transaction_nc().await;
971        dbtx.get_value(&ClientInitStateKey)
972            .await
973            .unwrap_or_else(|| {
974                // could be turned in a hard error in the future, but for now
975                // no need to break backward compat.
976                warn!(
977                    target: LOG_CLIENT,
978                    "Client missing ClientRequiresRecovery: assuming complete"
979                );
980                db::InitState::Complete(db::InitModeComplete::Fresh)
981            })
982    }
983
984    fn decoders(&self, config: &ClientConfig) -> ModuleDecoderRegistry {
985        let mut decoders = client_decoders(
986            &self.module_inits,
987            config
988                .modules
989                .iter()
990                .map(|(module_instance, module_config)| (*module_instance, module_config.kind())),
991        );
992
993        decoders.register_module(
994            TRANSACTION_SUBMISSION_MODULE_INSTANCE,
995            ModuleKind::from_static_str("tx_submission"),
996            tx_submission_sm_decoder(),
997        );
998
999        decoders
1000    }
1001
1002    fn config_decoded(
1003        config: &ClientConfig,
1004        decoders: &ModuleDecoderRegistry,
1005    ) -> Result<ClientConfig, fedimint_core::encoding::DecodeError> {
1006        config.clone().redecode_raw(decoders)
1007    }
1008
1009    /// Re-derive client's `root_secret` using the federation ID. This
1010    /// eliminates the possibility of having the same client `root_secret`
1011    /// across multiple federations.
1012    fn federation_root_secret(
1013        pre_root_secret: &DerivableSecret,
1014        config: &ClientConfig,
1015    ) -> DerivableSecret {
1016        pre_root_secret.federation_key(&config.global.calculate_federation_id())
1017    }
1018
1019    /// Register to receiver all new transient (unpersisted) events
1020    pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
1021        self.log_event_added_transient_tx.subscribe()
1022    }
1023
1024    /// Check for pending config and migrate it if present.
1025    /// Returns the config to use (either the original or the migrated pending
1026    /// config).
1027    async fn migrate_pending_config_if_present(db: &Database) {
1028        if let Some(pending_config) = Client::get_pending_config_from_db(db).await {
1029            debug!(target: LOG_CLIENT, "Found pending client config, migrating to current config");
1030
1031            let mut dbtx = db.begin_transaction().await;
1032            // Update the main config with the pending config
1033            dbtx.insert_entry(&crate::db::ClientConfigKey, &pending_config)
1034                .await;
1035            // Remove the pending config
1036            dbtx.remove_entry(&PendingClientConfigKey).await;
1037            dbtx.commit_tx().await;
1038
1039            debug!(target: LOG_CLIENT, "Successfully migrated pending config to current config");
1040        }
1041    }
1042
1043    /// Asynchronously refetch client config from federation and compare with
1044    /// existing. If different, save to pending config in database.
1045    fn load_and_refresh_client_config_static(
1046        config: &ClientConfig,
1047        api: &DynGlobalApi,
1048        db: &Database,
1049        task_group: &TaskGroup,
1050    ) {
1051        let config = config.clone();
1052        let api = api.clone();
1053        let db = db.clone();
1054        let task_group = task_group.clone();
1055
1056        // Spawn background task to refetch config
1057        task_group.spawn_cancellable("refresh_client_config_static", async move {
1058            Self::refresh_client_config_static(&config, &api, &db).await;
1059        });
1060    }
1061
1062    /// Wrapper that handles errors from config refresh with proper logging
1063    async fn refresh_client_config_static(
1064        config: &ClientConfig,
1065        api: &DynGlobalApi,
1066        db: &Database,
1067    ) {
1068        if let Err(error) = Self::refresh_client_config_static_try(config, api, db).await {
1069            warn!(
1070                target: LOG_CLIENT,
1071                err = %error.fmt_compact_anyhow(), "Failed to refresh client config"
1072            );
1073        }
1074    }
1075
1076    /// Validate that a config update is valid
1077    fn validate_config_update(
1078        current_config: &ClientConfig,
1079        new_config: &ClientConfig,
1080    ) -> anyhow::Result<()> {
1081        // Global config must not change
1082        if current_config.global != new_config.global {
1083            bail!("Global configuration changes are not allowed in config updates");
1084        }
1085
1086        // Modules can only be added, existing ones must stay the same
1087        for (module_id, current_module_config) in &current_config.modules {
1088            match new_config.modules.get(module_id) {
1089                Some(new_module_config) => {
1090                    if current_module_config != new_module_config {
1091                        bail!(
1092                            "Module {} configuration changes are not allowed, only additions are permitted",
1093                            module_id
1094                        );
1095                    }
1096                }
1097                None => {
1098                    bail!(
1099                        "Module {} was removed in new config, only additions are allowed",
1100                        module_id
1101                    );
1102                }
1103            }
1104        }
1105
1106        Ok(())
1107    }
1108
1109    /// Refetch client config from federation and save as pending if different
1110    async fn refresh_client_config_static_try(
1111        current_config: &ClientConfig,
1112        api: &DynGlobalApi,
1113        db: &Database,
1114    ) -> anyhow::Result<()> {
1115        debug!(target: LOG_CLIENT, "Refreshing client config");
1116
1117        // Fetch latest config from federation
1118        let fetched_config = api
1119            .request_current_consensus::<ClientConfig>(
1120                CLIENT_CONFIG_ENDPOINT.to_owned(),
1121                ApiRequestErased::default(),
1122            )
1123            .await?;
1124
1125        // Validate the new config before proceeding
1126        Self::validate_config_update(current_config, &fetched_config)?;
1127
1128        // Compare with current config
1129        if current_config != &fetched_config {
1130            debug!(target: LOG_CLIENT, "Detected federation config change, saving as pending config");
1131
1132            let mut dbtx = db.begin_transaction().await;
1133            dbtx.insert_entry(&PendingClientConfigKey, &fetched_config)
1134                .await;
1135            dbtx.commit_tx().await;
1136        } else {
1137            debug!(target: LOG_CLIENT, "No federation config changes detected");
1138        }
1139
1140        Ok(())
1141    }
1142}
1143
1144pub struct ClientPreview {
1145    inner: ClientBuilder,
1146    config: ClientConfig,
1147    api_secret: Option<String>,
1148    prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
1149    preview_prefetch_api_version_set:
1150        Option<JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>>,
1151}
1152
1153impl ClientPreview {
1154    /// Get the config
1155    pub fn config(&self) -> &ClientConfig {
1156        &self.config
1157    }
1158
1159    /// Join a new Federation
1160    ///
1161    /// When a user wants to connect to a new federation this function fetches
1162    /// the federation config and initializes the client database. If a user
1163    /// already joined the federation in the past and has a preexisting database
1164    /// use [`ClientBuilder::open`] instead.
1165    ///
1166    /// **Warning**: Calling `join` with a `root_secret` key that was used
1167    /// previous to `join` a Federation will lead to all sorts of malfunctions
1168    /// including likely loss of funds.
1169    ///
1170    /// This should be generally called only if the `root_secret` key is known
1171    /// not to have been used before (e.g. just randomly generated). For keys
1172    /// that might have been previous used (e.g. provided by the user),
1173    /// it's safer to call [`Self::recover`] which will attempt to recover
1174    /// client module states for the Federation.
1175    ///
1176    /// A typical "join federation" flow would look as follows:
1177    /// ```no_run
1178    /// # use std::str::FromStr;
1179    /// # use fedimint_core::invite_code::InviteCode;
1180    /// # use fedimint_core::config::ClientConfig;
1181    /// # use fedimint_derive_secret::DerivableSecret;
1182    /// # use fedimint_client::{Client, ClientBuilder, RootSecret};
1183    /// # use fedimint_core::db::Database;
1184    /// # use fedimint_core::config::META_FEDERATION_NAME_KEY;
1185    /// #
1186    /// # #[tokio::main]
1187    /// # async fn main() -> anyhow::Result<()> {
1188    /// # let root_secret: DerivableSecret = unimplemented!();
1189    /// // Create a root secret, e.g. via fedimint-bip39, see also:
1190    /// // https://github.com/fedimint/fedimint/blob/master/docs/secret_derivation.md
1191    /// // let root_secret = …;
1192    ///
1193    /// // Get invite code from user
1194    /// let invite_code = InviteCode::from_str("fed11qgqpw9thwvaz7te3xgmjuvpwxqhrzw3jxumrvvf0qqqjpetvlg8glnpvzcufhffgzhv8m75f7y34ryk7suamh8x7zetly8h0v9v0rm")
1195    ///     .expect("Invalid invite code");
1196    ///
1197    /// // Tell the user the federation name, bitcoin network
1198    /// // (e.g. from wallet module config), and other details
1199    /// // that are typically contained in the federation's
1200    /// // meta fields.
1201    ///
1202    /// // let network = config.get_first_module_by_kind::<WalletClientConfig>("wallet")
1203    /// //     .expect("Module not found")
1204    /// //     .network;
1205    ///
1206    /// // Open the client's database, using the federation ID
1207    /// // as the DB name is a common pattern:
1208    ///
1209    /// // let db_path = format!("./path/to/db/{}", config.federation_id());
1210    /// // let db = RocksDb::open(db_path).expect("error opening DB");
1211    /// # let db: Database = unimplemented!();
1212    ///
1213    /// let preview = Client::builder().await
1214    ///     // Mount the modules the client should support:
1215    ///     // .with_module(LightningClientInit)
1216    ///     // .with_module(MintClientInit)
1217    ///     // .with_module(WalletClientInit::default())
1218    ///      .expect("Error building client")
1219    ///      .preview(&invite_code).await?;
1220    ///
1221    /// println!(
1222    ///     "The federation name is: {}",
1223    ///     preview.config().meta::<String>(META_FEDERATION_NAME_KEY)
1224    ///         .expect("Could not decode name field")
1225    ///         .expect("Name isn't set")
1226    /// );
1227    ///
1228    /// let client = preview
1229    ///     .join(db, RootSecret::StandardDoubleDerive(root_secret))
1230    ///     .await
1231    ///     .expect("Error joining federation");
1232    /// # Ok(())
1233    /// # }
1234    /// ```
1235    pub async fn join(
1236        self,
1237        db_no_decoders: Database,
1238        pre_root_secret: RootSecret,
1239    ) -> anyhow::Result<ClientHandle> {
1240        let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1241
1242        let client = self
1243            .inner
1244            .init(
1245                db_no_decoders,
1246                pre_root_secret,
1247                self.config,
1248                self.api_secret,
1249                InitMode::Fresh,
1250                self.prefetch_api_announcements,
1251                self.preview_prefetch_api_version_set,
1252            )
1253            .await?;
1254
1255        Ok(client)
1256    }
1257
1258    /// Join a (possibly) previous joined Federation
1259    ///
1260    /// Unlike [`Self::join`], `recover` will run client module
1261    /// recovery for each client module attempting to recover any previous
1262    /// module state.
1263    ///
1264    /// Recovery process takes time during which each recovering client module
1265    /// will not be available for use.
1266    ///
1267    /// Calling `recovery` with a `root_secret` that was not actually previous
1268    /// used in a given Federation is safe.
1269    pub async fn recover(
1270        self,
1271        db_no_decoders: Database,
1272        pre_root_secret: RootSecret,
1273        backup: Option<ClientBackup>,
1274    ) -> anyhow::Result<ClientHandle> {
1275        let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1276
1277        let client = self
1278            .inner
1279            .init(
1280                db_no_decoders,
1281                pre_root_secret,
1282                self.config,
1283                self.api_secret,
1284                InitMode::Recover {
1285                    snapshot: backup.clone(),
1286                },
1287                self.prefetch_api_announcements,
1288                self.preview_prefetch_api_version_set,
1289            )
1290            .await?;
1291
1292        Ok(client)
1293    }
1294
1295    /// Download most recent valid backup found from the Federation
1296    pub async fn download_backup_from_federation(
1297        &self,
1298        pre_root_secret: RootSecret,
1299    ) -> anyhow::Result<Option<ClientBackup>> {
1300        let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1301        let api = DynGlobalApi::from_endpoints(
1302            // TODO: change join logic to use FederationId v2
1303            self.config
1304                .global
1305                .api_endpoints
1306                .iter()
1307                .map(|(peer_id, peer_url)| (*peer_id, peer_url.url.clone())),
1308            &self.api_secret,
1309            self.inner.iroh_enable_dht,
1310            self.inner.iroh_enable_next,
1311        )
1312        .await?;
1313
1314        Client::download_backup_from_federation_static(
1315            &api,
1316            &ClientBuilder::federation_root_secret(&pre_root_secret, &self.config),
1317            &self.inner.decoders(&self.config),
1318        )
1319        .await
1320    }
1321}