fedimint_client/client/
builder.rs

1use std::collections::BTreeMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5
6use anyhow::{Context as _, anyhow, bail, ensure};
7use bitcoin::key::Secp256k1;
8use fedimint_api_client::api::global_api::with_cache::GlobalFederationApiWithCacheExt as _;
9use fedimint_api_client::api::global_api::with_request_hook::{
10    ApiRequestHook, RawFederationApiWithRequestHookExt as _,
11};
12use fedimint_api_client::api::net::Connector;
13use fedimint_api_client::api::{
14    ApiVersionSet, DynGlobalApi, FederationApiExt as _, ReconnectFederationApi,
15};
16use fedimint_client_module::api::ClientRawFederationApiExt as _;
17use fedimint_client_module::meta::LegacyMetaSource;
18use fedimint_client_module::module::init::ClientModuleInit;
19use fedimint_client_module::module::recovery::RecoveryProgress;
20use fedimint_client_module::module::{ClientModuleRegistry, FinalClientIface};
21use fedimint_client_module::secret::{DeriveableSecretClientExt as _, get_default_client_secret};
22use fedimint_client_module::transaction::{
23    TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext, tx_submission_sm_decoder,
24};
25use fedimint_client_module::{AdminCreds, ModuleRecoveryStarted};
26use fedimint_core::config::{ClientConfig, FederationId, ModuleInitRegistry};
27use fedimint_core::core::{ModuleInstanceId, ModuleKind};
28use fedimint_core::db::{
29    Database, IDatabaseTransactionOpsCoreTyped as _, verify_module_db_integrity_dbtx,
30};
31use fedimint_core::endpoint_constants::CLIENT_CONFIG_ENDPOINT;
32use fedimint_core::envs::is_running_in_test_env;
33use fedimint_core::invite_code::InviteCode;
34use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
35use fedimint_core::module::{ApiRequestErased, ApiVersion};
36use fedimint_core::task::TaskGroup;
37use fedimint_core::util::FmtCompactAnyhow as _;
38use fedimint_core::{NumPeers, maybe_add_send};
39use fedimint_derive_secret::DerivableSecret;
40use fedimint_eventlog::{
41    DBTransactionEventLogExt as _, EventLogEntry, run_event_log_ordering_task,
42};
43use fedimint_logging::LOG_CLIENT;
44use tokio::sync::{broadcast, watch};
45use tracing::{debug, warn};
46
47use super::handle::ClientHandle;
48use super::{Client, client_decoders};
49use crate::api_announcements::{
50    get_api_urls, refresh_api_announcement_sync, run_api_announcement_sync,
51};
52use crate::backup::{ClientBackup, Metadata};
53use crate::db::{
54    self, ApiSecretKey, ClientInitStateKey, ClientMetadataKey, ClientModuleRecovery,
55    ClientModuleRecoveryState, ClientPreRootSecretHashKey, InitMode, InitState,
56    PendingClientConfigKey, apply_migrations_client_module_dbtx,
57};
58use crate::meta::MetaService;
59use crate::module_init::ClientModuleInitRegistry;
60use crate::oplog::OperationLog;
61use crate::sm::executor::Executor;
62use crate::sm::notifier::Notifier;
63
64/// The type of root secret hashing
65///
66/// *Please read this documentation carefully if, especially if you're upgrading
67/// downstream Fedimint client application.*
68///
69/// Internally, client will always hash-in federation id
70/// to the root secret provided to the [`ClientBuilder`],
71/// to ensure a different actual root secret is used for ever federation.
72/// This makes reusing a single root secret for different federations
73/// in a multi-federation client, perfectly fine, and frees the client
74/// from worrying about `FederationId`.
75///
76/// However, in the past Fedimint applications (including `fedimint-cli`)
77/// were doing the hashing-in of `FederationId` outside of `fedimint-client` as
78/// well, which lead to effectively doing it twice, and pushed downloading of
79/// the client config on join to application code, a sub-optimal API, especially
80/// after joining federation needed to handle even more functionality.
81///
82/// To keep the interoperability of the seed phrases this double-derivation
83/// is preserved, due to other architectural reason, `fedimint-client`
84/// will now do the outer-derivation internally as well.
85#[derive(Clone)]
86pub enum RootSecret {
87    /// Derive an extra round of federation-id to the secret, like
88    /// Fedimint applications were doing manually in the past.
89    ///
90    /// **Note**: Applications MUST NOT do the derivation themselves anymore.
91    StandardDoubleDerive(DerivableSecret),
92    /// No double derivation
93    ///
94    /// This is useful for applications that for whatever reason do the
95    /// double-derivation externally, or use a custom scheme.
96    Custom(DerivableSecret),
97}
98
99impl RootSecret {
100    fn to_inner(&self, federation_id: FederationId) -> DerivableSecret {
101        match self {
102            RootSecret::StandardDoubleDerive(derivable_secret) => {
103                get_default_client_secret(derivable_secret, &federation_id)
104            }
105            RootSecret::Custom(derivable_secret) => derivable_secret.clone(),
106        }
107    }
108}
109
110/// Used to configure, assemble and build [`Client`]
111pub struct ClientBuilder {
112    module_inits: ClientModuleInitRegistry,
113    primary_module_instance: Option<ModuleInstanceId>,
114    primary_module_kind: Option<ModuleKind>,
115    admin_creds: Option<AdminCreds>,
116    db_no_decoders: Database,
117    meta_service: Arc<crate::meta::MetaService>,
118    connector: Connector,
119    stopped: bool,
120    log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
121    request_hook: ApiRequestHook,
122}
123
124impl ClientBuilder {
125    pub(crate) fn new(db: Database) -> Self {
126        let meta_service = MetaService::new(LegacyMetaSource::default());
127        let (log_event_added_transient_tx, _log_event_added_transient_rx) =
128            broadcast::channel(1024);
129        ClientBuilder {
130            module_inits: ModuleInitRegistry::new(),
131            primary_module_instance: None,
132            primary_module_kind: None,
133            connector: Connector::default(),
134            admin_creds: None,
135            db_no_decoders: db,
136            stopped: false,
137            meta_service,
138            log_event_added_transient_tx,
139            request_hook: Arc::new(|api| api),
140        }
141    }
142
143    pub(crate) fn from_existing(client: &Client) -> Self {
144        ClientBuilder {
145            module_inits: client.module_inits.clone(),
146            primary_module_instance: Some(client.primary_module_instance),
147            primary_module_kind: None,
148            admin_creds: None,
149            db_no_decoders: client.db.with_decoders(ModuleRegistry::default()),
150            stopped: false,
151            // non unique
152            meta_service: client.meta_service.clone(),
153            connector: client.connector,
154            log_event_added_transient_tx: client.log_event_added_transient_tx.clone(),
155            request_hook: client.request_hook.clone(),
156        }
157    }
158
159    /// Replace module generator registry entirely
160    ///
161    /// There has to be at least one module supporting being primary among the
162    /// registered modules. The client won't start without the federation and
163    /// the client having at least one overlapping primary module. In case there
164    /// are multiple, the one to use can be selected with
165    /// [`ClientBuilder::with_primary_module_kind`] or
166    /// [`ClientBuilder::with_primary_module_instance_id`].
167    pub fn with_module_inits(&mut self, module_inits: ClientModuleInitRegistry) {
168        self.module_inits = module_inits;
169    }
170
171    /// Make module generator available when reading the config
172    ///
173    /// There has to be at least one module supporting being primary among the
174    /// registered modules. The client won't start without the federation and
175    /// the client having at least one overlapping primary module. In case there
176    /// are multiple, the one to use can be selected with
177    /// [`ClientBuilder::with_primary_module_kind`] or
178    /// [`ClientBuilder::with_primary_module_instance_id`].
179    pub fn with_module<M: ClientModuleInit>(&mut self, module_init: M) {
180        self.module_inits.attach(module_init);
181    }
182
183    pub fn stopped(&mut self) {
184        self.stopped = true;
185    }
186
187    /// Build the [`Client`] with a custom wrapper around its api request logic
188    ///
189    /// This is intended to be used by downstream applications, e.g. to:
190    ///
191    /// * simulate offline mode,
192    /// * save battery when the OS indicates lack of connectivity,
193    /// * inject faults and delays for testing purposes,
194    /// * collect statistics and emit notifications.
195    pub fn with_api_request_hook(mut self, hook: ApiRequestHook) -> Self {
196        self.request_hook = hook;
197        self
198    }
199
200    /// Uses this module with the given instance id as the primary module. See
201    /// [`fedimint_client_module::ClientModule::supports_being_primary`] for
202    /// more information.
203    ///
204    /// ## Panics
205    /// If there was a primary module specified previously
206    #[deprecated(
207        since = "0.6.0",
208        note = "Use `with_primary_module_kind` instead, as the instance id can't be known upfront. If you *really* need the old behavior you can use `with_primary_module_instance_id`."
209    )]
210    pub fn with_primary_module(&mut self, primary_module_instance: ModuleInstanceId) {
211        self.with_primary_module_instance_id(primary_module_instance);
212    }
213
214    /// **You are likely looking for
215    /// [`ClientBuilder::with_primary_module_kind`]. This function is rarely
216    /// useful and often dangerous, handle with care.**
217    ///
218    /// Uses this module with the given instance id as the primary module. See
219    /// [`fedimint_client_module::ClientModule::supports_being_primary`] for
220    /// more information. Since the module instance id of modules of a
221    /// specific kind may differ between different federations it is
222    /// generally not recommended to specify it, but rather to specify the
223    /// module kind that should be used as primary. See
224    /// [`ClientBuilder::with_primary_module_kind`].
225    ///
226    /// ## Panics
227    /// If there was a primary module specified previously
228    pub fn with_primary_module_instance_id(&mut self, primary_module_instance: ModuleInstanceId) {
229        let was_replaced = self
230            .primary_module_instance
231            .replace(primary_module_instance)
232            .is_some();
233        assert!(
234            !was_replaced,
235            "Only one primary module can be given to the builder."
236        );
237    }
238
239    /// Uses this module kind as the primary module if present in the config.
240    /// See [`fedimint_client_module::ClientModule::supports_being_primary`] for
241    /// more information.
242    ///
243    /// ## Panics
244    /// If there was a primary module kind specified previously
245    pub fn with_primary_module_kind(&mut self, primary_module_kind: ModuleKind) {
246        let was_replaced = self
247            .primary_module_kind
248            .replace(primary_module_kind)
249            .is_some();
250        assert!(
251            !was_replaced,
252            "Only one primary module kind can be given to the builder."
253        );
254    }
255
256    pub fn with_meta_service(&mut self, meta_service: Arc<MetaService>) {
257        self.meta_service = meta_service;
258    }
259
260    /// Migrate client module databases
261    ///
262    /// Note: Client core db migration are done immediately in
263    /// [`Client::builder`], to ensure db matches the code at all times,
264    /// while migrating modules requires figuring out what modules actually
265    /// are first.
266    async fn migrate_module_dbs(&self, db: &Database) -> anyhow::Result<()> {
267        // Only apply the client database migrations if the database has been
268        // initialized.
269        // This only works as long as you don't change the client config
270        if let Ok(client_config) = self.load_existing_config().await {
271            for (module_id, module_cfg) in client_config.modules {
272                let kind = module_cfg.kind.clone();
273                let Some(init) = self.module_inits.get(&kind) else {
274                    // normal, expected and already logged about when building the client
275                    continue;
276                };
277
278                let mut dbtx = db.begin_transaction().await;
279                apply_migrations_client_module_dbtx(
280                    &mut dbtx.to_ref_nc(),
281                    kind.to_string(),
282                    init.get_database_migrations(),
283                    module_id,
284                )
285                .await?;
286                if let Some(used_db_prefixes) = init.used_db_prefixes()
287                    && is_running_in_test_env()
288                {
289                    verify_module_db_integrity_dbtx(
290                        &mut dbtx.to_ref_nc(),
291                        module_id,
292                        kind,
293                        &used_db_prefixes,
294                    )
295                    .await;
296                }
297                dbtx.commit_tx_result().await?;
298            }
299        }
300
301        Ok(())
302    }
303
304    pub fn db_no_decoders(&self) -> &Database {
305        &self.db_no_decoders
306    }
307
308    pub async fn load_existing_config(&self) -> anyhow::Result<ClientConfig> {
309        let Some(config) = Client::get_config_from_db(&self.db_no_decoders).await else {
310            bail!("Client database not initialized")
311        };
312
313        Ok(config)
314    }
315
316    pub fn set_admin_creds(&mut self, creds: AdminCreds) {
317        self.admin_creds = Some(creds);
318    }
319
320    pub fn with_connector(&mut self, connector: Connector) {
321        self.connector = connector;
322    }
323
324    #[cfg(feature = "tor")]
325    pub fn with_tor_connector(&mut self) {
326        self.with_connector(Connector::tor());
327    }
328
329    async fn init(
330        self,
331        pre_root_secret: DerivableSecret,
332        config: ClientConfig,
333        api_secret: Option<String>,
334        init_mode: InitMode,
335    ) -> anyhow::Result<ClientHandle> {
336        if Client::is_initialized(&self.db_no_decoders).await {
337            bail!("Client database already initialized")
338        }
339
340        // Note: It's important all client initialization is performed as one big
341        // transaction to avoid half-initialized client state.
342        {
343            debug!(target: LOG_CLIENT, "Initializing client database");
344            let mut dbtx = self.db_no_decoders.begin_transaction().await;
345            // Save config to DB
346            dbtx.insert_new_entry(&crate::db::ClientConfigKey, &config)
347                .await;
348            dbtx.insert_entry(
349                &ClientPreRootSecretHashKey,
350                &pre_root_secret.derive_pre_root_secret_hash(),
351            )
352            .await;
353
354            if let Some(api_secret) = api_secret.as_ref() {
355                dbtx.insert_new_entry(&ApiSecretKey, api_secret).await;
356            }
357
358            let init_state = InitState::Pending(init_mode);
359            dbtx.insert_entry(&ClientInitStateKey, &init_state).await;
360
361            let metadata = init_state
362                .does_require_recovery()
363                .flatten()
364                .map_or(Metadata::empty(), |s| s.metadata);
365
366            dbtx.insert_new_entry(&ClientMetadataKey, &metadata).await;
367
368            dbtx.commit_tx_result().await?;
369        }
370
371        let stopped = self.stopped;
372        self.build(pre_root_secret, config, api_secret, stopped)
373            .await
374    }
375
376    pub async fn preview(self, invite_code: &InviteCode) -> anyhow::Result<ClientPreview> {
377        let config = self
378            .connector
379            .download_from_invite_code(invite_code)
380            .await?;
381
382        if let Some(guardian_pub_keys) = config.global.broadcast_public_keys.clone() {
383            // Fetching api announcements using invite urls before joining, then write them
384            // to database This ensures the client can communicated with the
385            // Federation even if all the peers moved.
386            let api = DynGlobalApi::from_endpoints(invite_code.peers(), &invite_code.api_secret())
387                .await?;
388            refresh_api_announcement_sync(&api, self.db_no_decoders(), &guardian_pub_keys).await?;
389        }
390
391        Ok(ClientPreview {
392            inner: self,
393            config,
394            api_secret: invite_code.api_secret(),
395        })
396    }
397
398    /// Use [`Self::preview`] instead
399    pub async fn preview_with_existing_config(
400        self,
401        config: ClientConfig,
402        api_secret: Option<String>,
403    ) -> anyhow::Result<ClientPreview> {
404        Ok(ClientPreview {
405            inner: self,
406            config,
407            api_secret,
408        })
409    }
410
411    pub async fn open(self, pre_root_secret: RootSecret) -> anyhow::Result<ClientHandle> {
412        // Check for pending config and migrate if present
413        Self::migrate_pending_config_if_present(&self.db_no_decoders).await;
414
415        let Some(config) = Client::get_config_from_db(&self.db_no_decoders).await else {
416            bail!("Client database not initialized")
417        };
418
419        let pre_root_secret = pre_root_secret.to_inner(config.calculate_federation_id());
420
421        match self
422            .db_no_decoders()
423            .begin_transaction_nc()
424            .await
425            .get_value(&ClientPreRootSecretHashKey)
426            .await
427        {
428            Some(secret_hash) => {
429                ensure!(
430                    pre_root_secret.derive_pre_root_secret_hash() == secret_hash,
431                    "Secret hash does not match. Incorrect secret"
432                );
433            }
434            _ => {
435                debug!(target: LOG_CLIENT, "Backfilling secret hash");
436                // Note: no need for dbtx autocommit, we are the only writer ATM
437                let mut dbtx = self.db_no_decoders.begin_transaction().await;
438                dbtx.insert_entry(
439                    &ClientPreRootSecretHashKey,
440                    &pre_root_secret.derive_pre_root_secret_hash(),
441                )
442                .await;
443                dbtx.commit_tx().await;
444            }
445        }
446
447        let api_secret = Client::get_api_secret_from_db(&self.db_no_decoders).await;
448        let stopped = self.stopped;
449        let request_hook = self.request_hook.clone();
450
451        let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
452        let client = self
453            .build_stopped(
454                pre_root_secret,
455                &config,
456                api_secret,
457                log_event_added_transient_tx,
458                request_hook,
459            )
460            .await?;
461        if !stopped {
462            client.as_inner().start_executor();
463        }
464        Ok(client)
465    }
466
467    /// Build a [`Client`] and start the executor
468    pub(crate) async fn build(
469        self,
470        pre_root_secret: DerivableSecret,
471        config: ClientConfig,
472        api_secret: Option<String>,
473        stopped: bool,
474    ) -> anyhow::Result<ClientHandle> {
475        let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
476        let request_hook = self.request_hook.clone();
477        let client = self
478            .build_stopped(
479                pre_root_secret,
480                &config,
481                api_secret,
482                log_event_added_transient_tx,
483                request_hook,
484            )
485            .await?;
486        if !stopped {
487            client.as_inner().start_executor();
488        }
489
490        Ok(client)
491    }
492
493    // TODO: remove config argument
494    /// Build a [`Client`] but do not start the executor
495    async fn build_stopped(
496        self,
497        pre_root_secret: DerivableSecret,
498        config: &ClientConfig,
499        api_secret: Option<String>,
500        log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
501        request_hook: ApiRequestHook,
502    ) -> anyhow::Result<ClientHandle> {
503        let (log_event_added_tx, log_event_added_rx) = watch::channel(());
504        let (log_ordering_wakeup_tx, log_ordering_wakeup_rx) = watch::channel(());
505
506        let decoders = self.decoders(config);
507        let config = Self::config_decoded(config, &decoders)?;
508        let fed_id = config.calculate_federation_id();
509        let db = self.db_no_decoders.with_decoders(decoders.clone());
510        let connector = self.connector;
511        let peer_urls = get_api_urls(&db, &config).await;
512        let api = match self.admin_creds.as_ref() {
513            Some(admin_creds) => ReconnectFederationApi::new_admin(
514                admin_creds.peer_id,
515                peer_urls
516                    .into_iter()
517                    .find_map(|(peer, api_url)| (admin_creds.peer_id == peer).then_some(api_url))
518                    .context("Admin creds should match a peer")?,
519                &api_secret,
520            )
521            .await?
522            .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
523            .with_request_hook(&request_hook)
524            .with_cache()
525            .into(),
526            None => ReconnectFederationApi::from_endpoints(peer_urls, &api_secret, None)
527                .await?
528                .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
529                .with_request_hook(&request_hook)
530                .with_cache()
531                .into(),
532        };
533        let task_group = TaskGroup::new();
534
535        // Migrate the database before interacting with it in case any on-disk data
536        // structures have changed.
537        self.migrate_module_dbs(&db).await?;
538
539        let init_state = Self::load_init_state(&db).await;
540
541        let mut primary_module_instance = self.primary_module_instance.or_else(|| {
542            let primary_module_kind = self.primary_module_kind?;
543            config
544                .modules
545                .iter()
546                .find_map(|(module_instance_id, module_config)| {
547                    (module_config.kind() == &primary_module_kind).then_some(*module_instance_id)
548                })
549        });
550
551        let notifier = Notifier::new();
552
553        let common_api_versions = Client::load_and_refresh_common_api_version_static(
554            &config,
555            &self.module_inits,
556            &api,
557            &db,
558            &task_group,
559        )
560        .await
561        .inspect_err(|err| {
562            warn!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Failed to discover initial API version to use.");
563        })
564        .unwrap_or(ApiVersionSet {
565            core: ApiVersion::new(0, 0),
566            // This will cause all modules to skip initialization
567            modules: BTreeMap::new(),
568        });
569
570        debug!(target: LOG_CLIENT, ?common_api_versions, "Completed api version negotiation");
571
572        // Asynchronously refetch client config and compare with existing
573        Self::load_and_refresh_client_config_static(&config, &api, &db, &task_group);
574
575        let mut module_recoveries: BTreeMap<
576            ModuleInstanceId,
577            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
578        > = BTreeMap::new();
579        let mut module_recovery_progress_receivers: BTreeMap<
580            ModuleInstanceId,
581            watch::Receiver<RecoveryProgress>,
582        > = BTreeMap::new();
583
584        let final_client = FinalClientIface::default();
585
586        let root_secret = Self::federation_root_secret(&pre_root_secret, &config);
587
588        let modules = {
589            let mut modules = ClientModuleRegistry::default();
590            for (module_instance_id, module_config) in config.modules.clone() {
591                let kind = module_config.kind().clone();
592                let Some(module_init) = self.module_inits.get(&kind).cloned() else {
593                    debug!(
594                        target: LOG_CLIENT,
595                        kind=%kind,
596                        instance_id=%module_instance_id,
597                        "Module kind of instance not found in module gens, skipping");
598                    continue;
599                };
600
601                let Some(&api_version) = common_api_versions.modules.get(&module_instance_id)
602                else {
603                    warn!(
604                        target: LOG_CLIENT,
605                        kind=%kind,
606                        instance_id=%module_instance_id,
607                        "Module kind of instance has incompatible api version, skipping"
608                    );
609                    continue;
610                };
611
612                // since the exact logic of when to start recovery is a bit gnarly,
613                // the recovery call is extracted here.
614                let start_module_recover_fn =
615                    |snapshot: Option<ClientBackup>, progress: RecoveryProgress| {
616                        let module_config = module_config.clone();
617                        let num_peers = NumPeers::from(config.global.api_endpoints.len());
618                        let db = db.clone();
619                        let kind = kind.clone();
620                        let notifier = notifier.clone();
621                        let api = api.clone();
622                        let root_secret = root_secret.clone();
623                        let admin_auth = self.admin_creds.as_ref().map(|creds| creds.auth.clone());
624                        let final_client = final_client.clone();
625                        let (progress_tx, progress_rx) = tokio::sync::watch::channel(progress);
626                        let task_group = task_group.clone();
627                        let module_init = module_init.clone();
628                        (
629                            Box::pin(async move {
630                                module_init
631                                    .recover(
632                                        final_client.clone(),
633                                        fed_id,
634                                        num_peers,
635                                        module_config.clone(),
636                                        db.clone(),
637                                        module_instance_id,
638                                        common_api_versions.core,
639                                        api_version,
640                                        root_secret.derive_module_secret(module_instance_id),
641                                        notifier.clone(),
642                                        api.clone(),
643                                        admin_auth,
644                                        snapshot.as_ref().and_then(|s| s.modules.get(&module_instance_id)),
645                                        progress_tx,
646                                        task_group,
647                                    )
648                                    .await
649                                    .inspect_err(|err| {
650                                        warn!(
651                                            target: LOG_CLIENT,
652                                            module_id = module_instance_id, %kind, err = %err.fmt_compact_anyhow(), "Module failed to recover"
653                                        );
654                                    })
655                            }),
656                            progress_rx,
657                        )
658                    };
659
660                let recovery = match init_state.does_require_recovery() {
661                    Some(snapshot) => {
662                        match db
663                            .begin_transaction_nc()
664                            .await
665                            .get_value(&ClientModuleRecovery { module_instance_id })
666                            .await
667                        {
668                            Some(module_recovery_state) => {
669                                if module_recovery_state.is_done() {
670                                    debug!(
671                                        id = %module_instance_id,
672                                        %kind, "Module recovery already complete"
673                                    );
674                                    None
675                                } else {
676                                    debug!(
677                                        id = %module_instance_id,
678                                        %kind,
679                                        progress = %module_recovery_state.progress,
680                                        "Starting module recovery with an existing progress"
681                                    );
682                                    Some(start_module_recover_fn(
683                                        snapshot,
684                                        module_recovery_state.progress,
685                                    ))
686                                }
687                            }
688                            _ => {
689                                let progress = RecoveryProgress::none();
690                                let mut dbtx = db.begin_transaction().await;
691                                dbtx.log_event(
692                                    log_ordering_wakeup_tx.clone(),
693                                    None,
694                                    ModuleRecoveryStarted::new(module_instance_id),
695                                )
696                                .await;
697                                dbtx.insert_entry(
698                                    &ClientModuleRecovery { module_instance_id },
699                                    &ClientModuleRecoveryState { progress },
700                                )
701                                .await;
702
703                                dbtx.commit_tx().await;
704
705                                debug!(
706                                    id = %module_instance_id,
707                                    %kind, "Starting new module recovery"
708                                );
709                                Some(start_module_recover_fn(snapshot, progress))
710                            }
711                        }
712                    }
713                    _ => None,
714                };
715
716                match recovery {
717                    Some((recovery, recovery_progress_rx)) => {
718                        module_recoveries.insert(module_instance_id, recovery);
719                        module_recovery_progress_receivers
720                            .insert(module_instance_id, recovery_progress_rx);
721                    }
722                    _ => {
723                        let module = module_init
724                            .init(
725                                final_client.clone(),
726                                fed_id,
727                                config.global.api_endpoints.len(),
728                                module_config,
729                                db.clone(),
730                                module_instance_id,
731                                common_api_versions.core,
732                                api_version,
733                                // This is a divergence from the legacy client, where the child
734                                // secret keys were derived using
735                                // *module kind*-specific derivation paths.
736                                // Since the new client has to support multiple, segregated modules
737                                // of the same kind we have to use
738                                // the instance id instead.
739                                root_secret.derive_module_secret(module_instance_id),
740                                notifier.clone(),
741                                api.clone(),
742                                self.admin_creds.as_ref().map(|cred| cred.auth.clone()),
743                                task_group.clone(),
744                            )
745                            .await?;
746
747                        if primary_module_instance.is_none() && module.supports_being_primary() {
748                            primary_module_instance = Some(module_instance_id);
749                        } else if primary_module_instance == Some(module_instance_id)
750                            && !module.supports_being_primary()
751                        {
752                            bail!(
753                                "Module instance {module_instance_id} of kind {kind} does not support being a primary module"
754                            );
755                        }
756
757                        modules.register_module(module_instance_id, kind, module);
758                    }
759                }
760            }
761            modules
762        };
763
764        if init_state.is_pending() && module_recoveries.is_empty() {
765            let mut dbtx = db.begin_transaction().await;
766            dbtx.insert_entry(&ClientInitStateKey, &init_state.into_complete())
767                .await;
768            dbtx.commit_tx().await;
769        }
770
771        let executor = {
772            let mut executor_builder = Executor::builder();
773            executor_builder
774                .with_module(TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext);
775
776            for (module_instance_id, _, module) in modules.iter_modules() {
777                executor_builder.with_module_dyn(module.context(module_instance_id));
778            }
779
780            for module_instance_id in module_recoveries.keys() {
781                executor_builder.with_valid_module_id(*module_instance_id);
782            }
783
784            executor_builder.build(
785                db.clone(),
786                notifier,
787                task_group.clone(),
788                log_ordering_wakeup_tx.clone(),
789            )
790        };
791
792        let recovery_receiver_init_val = module_recovery_progress_receivers
793            .iter()
794            .map(|(module_instance_id, rx)| (*module_instance_id, *rx.borrow()))
795            .collect::<BTreeMap<_, _>>();
796        let (client_recovery_progress_sender, client_recovery_progress_receiver) =
797            watch::channel(recovery_receiver_init_val);
798
799        let client_inner = Arc::new(Client {
800            final_client: final_client.clone(),
801            config: tokio::sync::RwLock::new(config.clone()),
802            api_secret,
803            decoders,
804            db: db.clone(),
805            federation_id: fed_id,
806            federation_config_meta: config.global.meta,
807            primary_module_instance: primary_module_instance
808                .ok_or(anyhow!("No primary module set or found"))?,
809            modules,
810            module_inits: self.module_inits.clone(),
811            log_ordering_wakeup_tx,
812            log_event_added_rx,
813            log_event_added_transient_tx: log_event_added_transient_tx.clone(),
814            request_hook,
815            executor,
816            api,
817            secp_ctx: Secp256k1::new(),
818            root_secret,
819            task_group,
820            operation_log: OperationLog::new(db.clone()),
821            client_recovery_progress_receiver,
822            meta_service: self.meta_service,
823            connector,
824        });
825        client_inner
826            .task_group
827            .spawn_cancellable("MetaService::update_continuously", {
828                let client_inner = client_inner.clone();
829                async move {
830                    client_inner
831                        .meta_service
832                        .update_continuously(&client_inner)
833                        .await;
834                }
835            });
836
837        client_inner.task_group.spawn_cancellable(
838            "update-api-announcements",
839            run_api_announcement_sync(client_inner.clone()),
840        );
841
842        client_inner.task_group.spawn_cancellable(
843            "event log ordering task",
844            run_event_log_ordering_task(
845                db.clone(),
846                log_ordering_wakeup_rx,
847                log_event_added_tx,
848                log_event_added_transient_tx,
849            ),
850        );
851        let client_iface = std::sync::Arc::<Client>::downgrade(&client_inner);
852
853        let client_arc = ClientHandle::new(client_inner);
854
855        for (_, _, module) in client_arc.modules.iter_modules() {
856            module.start().await;
857        }
858
859        final_client.set(client_iface.clone());
860
861        if !module_recoveries.is_empty() {
862            client_arc.spawn_module_recoveries_task(
863                client_recovery_progress_sender,
864                module_recoveries,
865                module_recovery_progress_receivers,
866            );
867        }
868
869        Ok(client_arc)
870    }
871
872    async fn load_init_state(db: &Database) -> InitState {
873        let mut dbtx = db.begin_transaction_nc().await;
874        dbtx.get_value(&ClientInitStateKey)
875            .await
876            .unwrap_or_else(|| {
877                // could be turned in a hard error in the future, but for now
878                // no need to break backward compat.
879                warn!(
880                    target: LOG_CLIENT,
881                    "Client missing ClientRequiresRecovery: assuming complete"
882                );
883                db::InitState::Complete(db::InitModeComplete::Fresh)
884            })
885    }
886
887    fn decoders(&self, config: &ClientConfig) -> ModuleDecoderRegistry {
888        let mut decoders = client_decoders(
889            &self.module_inits,
890            config
891                .modules
892                .iter()
893                .map(|(module_instance, module_config)| (*module_instance, module_config.kind())),
894        );
895
896        decoders.register_module(
897            TRANSACTION_SUBMISSION_MODULE_INSTANCE,
898            ModuleKind::from_static_str("tx_submission"),
899            tx_submission_sm_decoder(),
900        );
901
902        decoders
903    }
904
905    fn config_decoded(
906        config: &ClientConfig,
907        decoders: &ModuleDecoderRegistry,
908    ) -> Result<ClientConfig, fedimint_core::encoding::DecodeError> {
909        config.clone().redecode_raw(decoders)
910    }
911
912    /// Re-derive client's `root_secret` using the federation ID. This
913    /// eliminates the possibility of having the same client `root_secret`
914    /// across multiple federations.
915    fn federation_root_secret(
916        pre_root_secret: &DerivableSecret,
917        config: &ClientConfig,
918    ) -> DerivableSecret {
919        pre_root_secret.federation_key(&config.global.calculate_federation_id())
920    }
921
922    /// Register to receiver all new transient (unpersisted) events
923    pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
924        self.log_event_added_transient_tx.subscribe()
925    }
926
927    /// Check for pending config and migrate it if present.
928    /// Returns the config to use (either the original or the migrated pending
929    /// config).
930    async fn migrate_pending_config_if_present(db: &Database) {
931        if let Some(pending_config) = Client::get_pending_config_from_db(db).await {
932            debug!(target: LOG_CLIENT, "Found pending client config, migrating to current config");
933
934            let mut dbtx = db.begin_transaction().await;
935            // Update the main config with the pending config
936            dbtx.insert_entry(&crate::db::ClientConfigKey, &pending_config)
937                .await;
938            // Remove the pending config
939            dbtx.remove_entry(&PendingClientConfigKey).await;
940            dbtx.commit_tx().await;
941
942            debug!(target: LOG_CLIENT, "Successfully migrated pending config to current config");
943        }
944    }
945
946    /// Asynchronously refetch client config from federation and compare with
947    /// existing. If different, save to pending config in database.
948    fn load_and_refresh_client_config_static(
949        config: &ClientConfig,
950        api: &DynGlobalApi,
951        db: &Database,
952        task_group: &TaskGroup,
953    ) {
954        let config = config.clone();
955        let api = api.clone();
956        let db = db.clone();
957        let task_group = task_group.clone();
958
959        // Spawn background task to refetch config
960        task_group.spawn_cancellable("refresh_client_config_static", async move {
961            Self::refresh_client_config_static(&config, &api, &db).await;
962        });
963    }
964
965    /// Wrapper that handles errors from config refresh with proper logging
966    async fn refresh_client_config_static(
967        config: &ClientConfig,
968        api: &DynGlobalApi,
969        db: &Database,
970    ) {
971        if let Err(error) = Self::refresh_client_config_static_try(config, api, db).await {
972            warn!(
973                target: LOG_CLIENT,
974                err = %error.fmt_compact_anyhow(), "Failed to refresh client config"
975            );
976        }
977    }
978
979    /// Validate that a config update is valid
980    fn validate_config_update(
981        current_config: &ClientConfig,
982        new_config: &ClientConfig,
983    ) -> anyhow::Result<()> {
984        // Global config must not change
985        if current_config.global != new_config.global {
986            bail!("Global configuration changes are not allowed in config updates");
987        }
988
989        // Modules can only be added, existing ones must stay the same
990        for (module_id, current_module_config) in &current_config.modules {
991            match new_config.modules.get(module_id) {
992                Some(new_module_config) => {
993                    if current_module_config != new_module_config {
994                        bail!(
995                            "Module {} configuration changes are not allowed, only additions are permitted",
996                            module_id
997                        );
998                    }
999                }
1000                None => {
1001                    bail!(
1002                        "Module {} was removed in new config, only additions are allowed",
1003                        module_id
1004                    );
1005                }
1006            }
1007        }
1008
1009        Ok(())
1010    }
1011
1012    /// Refetch client config from federation and save as pending if different
1013    async fn refresh_client_config_static_try(
1014        current_config: &ClientConfig,
1015        api: &DynGlobalApi,
1016        db: &Database,
1017    ) -> anyhow::Result<()> {
1018        debug!(target: LOG_CLIENT, "Refreshing client config");
1019
1020        // Fetch latest config from federation
1021        let fetched_config = api
1022            .request_current_consensus::<ClientConfig>(
1023                CLIENT_CONFIG_ENDPOINT.to_owned(),
1024                ApiRequestErased::default(),
1025            )
1026            .await?;
1027
1028        // Validate the new config before proceeding
1029        Self::validate_config_update(current_config, &fetched_config)?;
1030
1031        // Compare with current config
1032        if current_config != &fetched_config {
1033            debug!(target: LOG_CLIENT, "Detected federation config change, saving as pending config");
1034
1035            let mut dbtx = db.begin_transaction().await;
1036            dbtx.insert_entry(&PendingClientConfigKey, &fetched_config)
1037                .await;
1038            dbtx.commit_tx().await;
1039        } else {
1040            debug!(target: LOG_CLIENT, "No federation config changes detected");
1041        }
1042
1043        Ok(())
1044    }
1045}
1046
1047pub struct ClientPreview {
1048    inner: ClientBuilder,
1049    config: ClientConfig,
1050    api_secret: Option<String>,
1051}
1052
1053impl ClientPreview {
1054    /// Get the config
1055    pub fn config(&self) -> &ClientConfig {
1056        &self.config
1057    }
1058
1059    /// Join a new Federation
1060    ///
1061    /// When a user wants to connect to a new federation this function fetches
1062    /// the federation config and initializes the client database. If a user
1063    /// already joined the federation in the past and has a preexisting database
1064    /// use [`ClientBuilder::open`] instead.
1065    ///
1066    /// **Warning**: Calling `join` with a `root_secret` key that was used
1067    /// previous to `join` a Federation will lead to all sorts of malfunctions
1068    /// including likely loss of funds.
1069    ///
1070    /// This should be generally called only if the `root_secret` key is known
1071    /// not to have been used before (e.g. just randomly generated). For keys
1072    /// that might have been previous used (e.g. provided by the user),
1073    /// it's safer to call [`Self::recover`] which will attempt to recover
1074    /// client module states for the Federation.
1075    ///
1076    /// A typical "join federation" flow would look as follows:
1077    /// ```no_run
1078    /// # use std::str::FromStr;
1079    /// # use fedimint_core::invite_code::InviteCode;
1080    /// # use fedimint_core::config::ClientConfig;
1081    /// # use fedimint_derive_secret::DerivableSecret;
1082    /// # use fedimint_client::{Client, ClientBuilder, RootSecret};
1083    /// # use fedimint_core::db::Database;
1084    /// # use fedimint_core::config::META_FEDERATION_NAME_KEY;
1085    /// #
1086    /// # #[tokio::main]
1087    /// # async fn main() -> anyhow::Result<()> {
1088    /// # let root_secret: DerivableSecret = unimplemented!();
1089    /// // Create a root secret, e.g. via fedimint-bip39, see also:
1090    /// // https://github.com/fedimint/fedimint/blob/master/docs/secret_derivation.md
1091    /// // let root_secret = …;
1092    ///
1093    /// // Get invite code from user
1094    /// let invite_code = InviteCode::from_str("fed11qgqpw9thwvaz7te3xgmjuvpwxqhrzw3jxumrvvf0qqqjpetvlg8glnpvzcufhffgzhv8m75f7y34ryk7suamh8x7zetly8h0v9v0rm")
1095    ///     .expect("Invalid invite code");
1096    ///
1097    /// // Tell the user the federation name, bitcoin network
1098    /// // (e.g. from wallet module config), and other details
1099    /// // that are typically contained in the federation's
1100    /// // meta fields.
1101    ///
1102    /// // let network = config.get_first_module_by_kind::<WalletClientConfig>("wallet")
1103    /// //     .expect("Module not found")
1104    /// //     .network;
1105    ///
1106    /// // Open the client's database, using the federation ID
1107    /// // as the DB name is a common pattern:
1108    ///
1109    /// // let db_path = format!("./path/to/db/{}", config.federation_id());
1110    /// // let db = RocksDb::open(db_path).expect("error opening DB");
1111    /// # let db: Database = unimplemented!();
1112    ///
1113    /// let preview = Client::builder(db).await
1114    ///     // Mount the modules the client should support:
1115    ///     // .with_module(LightningClientInit)
1116    ///     // .with_module(MintClientInit)
1117    ///     // .with_module(WalletClientInit::default())
1118    ///      .expect("Error building client")
1119    ///      .preview(&invite_code).await?;
1120    ///
1121    /// println!(
1122    ///     "The federation name is: {}",
1123    ///     preview.config().meta::<String>(META_FEDERATION_NAME_KEY)
1124    ///         .expect("Could not decode name field")
1125    ///         .expect("Name isn't set")
1126    /// );
1127    ///
1128    /// let client = preview
1129    ///     .join(RootSecret::StandardDoubleDerive(root_secret))
1130    ///     .await
1131    ///     .expect("Error joining federation");
1132    /// # Ok(())
1133    /// # }
1134    /// ```
1135    pub async fn join(self, pre_root_secret: RootSecret) -> anyhow::Result<ClientHandle> {
1136        let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1137
1138        let client = self
1139            .inner
1140            .init(
1141                pre_root_secret,
1142                self.config,
1143                self.api_secret,
1144                InitMode::Fresh,
1145            )
1146            .await?;
1147
1148        Ok(client)
1149    }
1150
1151    /// Join a (possibly) previous joined Federation
1152    ///
1153    /// Unlike [`Self::join`], `recover` will run client module
1154    /// recovery for each client module attempting to recover any previous
1155    /// module state.
1156    ///
1157    /// Recovery process takes time during which each recovering client module
1158    /// will not be available for use.
1159    ///
1160    /// Calling `recovery` with a `root_secret` that was not actually previous
1161    /// used in a given Federation is safe.
1162    pub async fn recover(
1163        self,
1164        pre_root_secret: RootSecret,
1165        backup: Option<ClientBackup>,
1166    ) -> anyhow::Result<ClientHandle> {
1167        let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1168
1169        let client = self
1170            .inner
1171            .init(
1172                pre_root_secret,
1173                self.config,
1174                self.api_secret,
1175                InitMode::Recover {
1176                    snapshot: backup.clone(),
1177                },
1178            )
1179            .await?;
1180
1181        Ok(client)
1182    }
1183
1184    /// Download most recent valid backup found from the Federation
1185    pub async fn download_backup_from_federation(
1186        &self,
1187        pre_root_secret: RootSecret,
1188    ) -> anyhow::Result<Option<ClientBackup>> {
1189        let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1190        let api = DynGlobalApi::from_endpoints(
1191            // TODO: change join logic to use FederationId v2
1192            self.config
1193                .global
1194                .api_endpoints
1195                .iter()
1196                .map(|(peer_id, peer_url)| (*peer_id, peer_url.url.clone())),
1197            &self.api_secret,
1198        )
1199        .await?;
1200
1201        Client::download_backup_from_federation_static(
1202            &api,
1203            &ClientBuilder::federation_root_secret(&pre_root_secret, &self.config),
1204            &self.inner.decoders(&self.config),
1205        )
1206        .await
1207    }
1208}