1use std::collections::BTreeMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::time::Duration;
6
7use anyhow::{bail, ensure};
8use bitcoin::key::Secp256k1;
9use fedimint_api_client::api::global_api::with_cache::GlobalFederationApiWithCacheExt as _;
10use fedimint_api_client::api::global_api::with_request_hook::{
11 ApiRequestHook, RawFederationApiWithRequestHookExt as _,
12};
13use fedimint_api_client::api::{ApiVersionSet, DynGlobalApi, FederationApi, FederationApiExt as _};
14use fedimint_api_client::download_from_invite_code;
15use fedimint_bitcoind::DynBitcoindRpc;
16use fedimint_client_module::api::ClientRawFederationApiExt as _;
17use fedimint_client_module::meta::LegacyMetaSource;
18use fedimint_client_module::module::init::{
19 BitcoindRpcFactory, BitcoindRpcNoChainIdFactory, ClientModuleInit,
20};
21use fedimint_client_module::module::recovery::RecoveryProgress;
22use fedimint_client_module::module::{
23 ClientModuleRegistry, FinalClientIface, PrimaryModulePriority, PrimaryModuleSupport,
24};
25use fedimint_client_module::secret::{DeriveableSecretClientExt as _, get_default_client_secret};
26use fedimint_client_module::transaction::{
27 TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext, tx_submission_sm_decoder,
28};
29use fedimint_client_module::{AdminCreds, ModuleRecoveryStarted};
30use fedimint_connectors::ConnectorRegistry;
31use fedimint_core::config::{ClientConfig, FederationId, ModuleInitRegistry};
32use fedimint_core::core::{ModuleInstanceId, ModuleKind};
33use fedimint_core::db::{
34 Database, IDatabaseTransactionOpsCoreTyped as _, verify_module_db_integrity_dbtx,
35};
36use fedimint_core::endpoint_constants::CLIENT_CONFIG_ENDPOINT;
37use fedimint_core::envs::is_running_in_test_env;
38use fedimint_core::invite_code::InviteCode;
39use fedimint_core::module::registry::ModuleDecoderRegistry;
40use fedimint_core::module::{ApiRequestErased, ApiVersion, SupportedApiVersionsSummary};
41use fedimint_core::task::TaskGroup;
42use fedimint_core::task::jit::{Jit, JitTry, JitTryAnyhow};
43use fedimint_core::util::{FmtCompact as _, FmtCompactAnyhow as _, SafeUrl};
44use fedimint_core::{ChainId, NumPeers, PeerId, fedimint_build_code_version_env, maybe_add_send};
45use fedimint_derive_secret::DerivableSecret;
46use fedimint_eventlog::{
47 DBTransactionEventLogExt as _, EventLogEntry, run_event_log_ordering_task,
48};
49use fedimint_logging::LOG_CLIENT;
50use tokio::sync::{broadcast, watch};
51use tracing::{Span, debug, trace, warn};
52
53use super::handle::ClientHandle;
54use super::{Client, client_decoders};
55use crate::api_announcements::{
56 PeersSignedApiAnnouncements, fetch_api_announcements_from_at_least_num_of_peers, get_api_urls,
57 run_api_announcement_refresh_task, store_api_announcements_updates_from_peers,
58};
59use crate::backup::{ClientBackup, Metadata};
60use crate::client::PrimaryModuleCandidates;
61use crate::db::{
62 self, ApiSecretKey, ChainIdKey, ClientInitStateKey, ClientMetadataKey, ClientModuleRecovery,
63 ClientModuleRecoveryState, ClientPreRootSecretHashKey, InitMode, InitState,
64 PendingClientConfigKey, apply_migrations_client_module_dbtx,
65};
66use crate::guardian_metadata::run_guardian_metadata_refresh_task;
67use crate::meta::MetaService;
68use crate::module_init::ClientModuleInitRegistry;
69use crate::oplog::OperationLog;
70use crate::sm::executor::Executor;
71use crate::sm::notifier::Notifier;
72
73#[derive(Clone)]
95pub enum RootSecret {
96 StandardDoubleDerive(DerivableSecret),
101 Custom(DerivableSecret),
106}
107
108impl RootSecret {
109 fn to_inner(&self, federation_id: FederationId) -> DerivableSecret {
110 match self {
111 RootSecret::StandardDoubleDerive(derivable_secret) => {
112 get_default_client_secret(derivable_secret, &federation_id)
113 }
114 RootSecret::Custom(derivable_secret) => derivable_secret.clone(),
115 }
116 }
117}
118
119pub struct ClientBuilder {
121 module_inits: ClientModuleInitRegistry,
122 admin_creds: Option<AdminCreds>,
123 meta_service: Arc<crate::meta::MetaService>,
124 stopped: bool,
125 log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
126 request_hook: ApiRequestHook,
127 iroh_enable_dht: bool,
128 iroh_enable_next: bool,
129 bitcoind_rpc_factory: Option<BitcoindRpcFactory>,
130 bitcoind_rpc_no_chain_id_factory: Option<BitcoindRpcNoChainIdFactory>,
131}
132
133impl ClientBuilder {
134 pub(crate) fn new() -> Self {
135 trace!(
136 target: LOG_CLIENT,
137 version = %fedimint_build_code_version_env!(),
138 "Initializing fedimint client",
139 );
140 let meta_service = MetaService::new(LegacyMetaSource::default());
141 let (log_event_added_transient_tx, _log_event_added_transient_rx) =
142 broadcast::channel(1024);
143
144 ClientBuilder {
145 module_inits: ModuleInitRegistry::new(),
146 admin_creds: None,
147 stopped: false,
148 meta_service,
149 log_event_added_transient_tx,
150 request_hook: Arc::new(|api| api),
151 iroh_enable_dht: true,
152 iroh_enable_next: true,
153 bitcoind_rpc_factory: None,
154 bitcoind_rpc_no_chain_id_factory: None,
155 }
156 }
157
158 pub(crate) fn from_existing(client: &Client) -> Self {
159 ClientBuilder {
160 module_inits: client.module_inits.clone(),
161 admin_creds: None,
162 stopped: false,
163 meta_service: client.meta_service.clone(),
165 log_event_added_transient_tx: client.log_event_added_transient_tx.clone(),
166 request_hook: client.request_hook.clone(),
167 iroh_enable_dht: client.iroh_enable_dht,
168 iroh_enable_next: client.iroh_enable_next,
169 bitcoind_rpc_factory: None,
172 bitcoind_rpc_no_chain_id_factory: client.user_bitcoind_rpc_no_chain_id.clone(),
174 }
175 }
176
177 pub fn with_module_inits(&mut self, module_inits: ClientModuleInitRegistry) {
179 self.module_inits = module_inits;
180 }
181
182 pub fn with_module<M: ClientModuleInit>(&mut self, module_init: M) {
184 self.module_inits.attach(module_init);
185 }
186
187 pub fn stopped(&mut self) {
188 self.stopped = true;
189 }
190 pub fn with_api_request_hook(mut self, hook: ApiRequestHook) -> Self {
199 self.request_hook = hook;
200 self
201 }
202
203 pub fn with_meta_service(&mut self, meta_service: Arc<MetaService>) {
204 self.meta_service = meta_service;
205 }
206
207 pub fn with_iroh_enable_dht(mut self, iroh_enable_dht: bool) -> Self {
210 self.iroh_enable_dht = iroh_enable_dht;
211 self
212 }
213
214 pub fn with_iroh_enable_next(mut self, iroh_enable_next: bool) -> Self {
217 self.iroh_enable_next = iroh_enable_next;
218 self
219 }
220
221 pub fn with_bitcoind_rpc<F, Fut>(mut self, factory: F) -> Self
243 where
244 F: FnOnce(ChainId) -> Fut + Send + Sync + 'static,
245 Fut: Future<Output = Option<DynBitcoindRpc>> + Send + 'static,
246 {
247 self.bitcoind_rpc_factory = Some(Box::new(move |chain_id| Box::pin(factory(chain_id))));
248 self
249 }
250
251 pub fn with_bitcoind_rpc_no_chain_id<F, Fut>(mut self, factory: F) -> Self
274 where
275 F: Fn(SafeUrl) -> Fut + Send + Sync + 'static,
276 Fut: Future<Output = Option<DynBitcoindRpc>> + Send + 'static,
277 {
278 self.bitcoind_rpc_no_chain_id_factory = Some(Arc::new(move |url| Box::pin(factory(url))));
279 self
280 }
281
282 async fn migrate_module_dbs(
289 &self,
290 db: &Database,
291 client_config: &ClientConfig,
292 ) -> anyhow::Result<()> {
293 for (module_id, module_cfg) in &client_config.modules {
294 let kind = module_cfg.kind.clone();
295 let Some(init) = self.module_inits.get(&kind) else {
296 continue;
298 };
299
300 let mut dbtx = db.begin_transaction().await;
301 apply_migrations_client_module_dbtx(
302 &mut dbtx.to_ref_nc(),
303 kind.to_string(),
304 init.get_database_migrations(),
305 *module_id,
306 )
307 .await?;
308 if let Some(used_db_prefixes) = init.used_db_prefixes()
309 && is_running_in_test_env()
310 {
311 verify_module_db_integrity_dbtx(
312 &mut dbtx.to_ref_nc(),
313 *module_id,
314 kind,
315 &used_db_prefixes,
316 )
317 .await;
318 }
319 dbtx.commit_tx_result().await?;
320 }
321
322 Ok(())
323 }
324
325 pub async fn load_existing_config(&self, db: &Database) -> anyhow::Result<ClientConfig> {
326 let Some(config) = Client::get_config_from_db(db).await else {
327 bail!("Client database not initialized")
328 };
329
330 Ok(config)
331 }
332
333 pub fn set_admin_creds(&mut self, creds: AdminCreds) {
334 self.admin_creds = Some(creds);
335 }
336
337 #[allow(clippy::too_many_arguments)]
338 async fn init(
339 self,
340 connectors: ConnectorRegistry,
341 db_no_decoders: Database,
342 pre_root_secret: DerivableSecret,
343 config: ClientConfig,
344 api_secret: Option<String>,
345 init_mode: InitMode,
346 preview_prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
347 preview_prefetch_api_version_set: Option<
348 JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>,
349 >,
350 prefetch_chain_id: Option<JitTryAnyhow<ChainId>>,
351 ) -> anyhow::Result<ClientHandle> {
352 if Client::is_initialized(&db_no_decoders).await {
353 bail!("Client database already initialized")
354 }
355
356 Client::run_core_migrations(&db_no_decoders).await?;
357
358 {
361 debug!(target: LOG_CLIENT, "Initializing client database");
362 let mut dbtx = db_no_decoders.begin_transaction().await;
363 dbtx.insert_new_entry(&crate::db::ClientConfigKey, &config)
365 .await;
366 dbtx.insert_entry(
367 &ClientPreRootSecretHashKey,
368 &pre_root_secret.derive_pre_root_secret_hash(),
369 )
370 .await;
371
372 if let Some(api_secret) = api_secret.as_ref() {
373 dbtx.insert_new_entry(&ApiSecretKey, api_secret).await;
374 }
375
376 let init_state = InitState::Pending(init_mode);
377 dbtx.insert_entry(&ClientInitStateKey, &init_state).await;
378
379 let metadata = init_state
380 .does_require_recovery()
381 .flatten()
382 .map_or(Metadata::empty(), |s| s.metadata);
383
384 dbtx.insert_new_entry(&ClientMetadataKey, &metadata).await;
385
386 dbtx.commit_tx_result().await?;
387 }
388
389 let stopped = self.stopped;
390 self.build(
391 connectors,
392 db_no_decoders,
393 pre_root_secret,
394 config,
395 api_secret,
396 stopped,
397 preview_prefetch_api_announcements,
398 preview_prefetch_api_version_set,
399 prefetch_chain_id,
400 )
401 .await
402 }
403
404 pub async fn preview(
405 self,
406 connectors: ConnectorRegistry,
407 invite_code: &InviteCode,
408 ) -> anyhow::Result<ClientPreview> {
409 let (config, api) = download_from_invite_code(&connectors, invite_code).await?;
410
411 let prefetch_api_announcements =
412 config
413 .global
414 .broadcast_public_keys
415 .clone()
416 .map(|guardian_pub_keys| {
417 Jit::new({
418 let api = api.clone();
419 move || async move {
420 fetch_api_announcements_from_at_least_num_of_peers(
424 1,
425 &api,
426 &guardian_pub_keys,
427 Duration::from_millis(20),
430 )
431 .await
432 }
433 })
434 });
435
436 self.preview_inner(
437 connectors,
438 config,
439 invite_code.api_secret(),
440 Some(api),
441 prefetch_api_announcements,
442 )
443 .await
444 }
445
446 pub async fn preview_with_existing_config(
451 self,
452 connectors: ConnectorRegistry,
453 config: ClientConfig,
454 api_secret: Option<String>,
455 ) -> anyhow::Result<ClientPreview> {
456 self.preview_inner(connectors, config, api_secret, None, None)
457 .await
458 }
459
460 async fn preview_inner(
461 self,
462 connectors: ConnectorRegistry,
463 config: ClientConfig,
464 api_secret: Option<String>,
465 prefetch_api: Option<DynGlobalApi>,
466 prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
467 ) -> anyhow::Result<ClientPreview> {
468 let preview_prefetch_api_version_set = prefetch_api.as_ref().map(|api| {
469 JitTry::new_try({
470 let config = config.clone();
471 let api = api.clone();
472 || async move { Client::fetch_common_api_versions(&config, &api).await }
473 })
474 });
475
476 let prefetch_chain_id = prefetch_api.map(|api| {
477 JitTry::new_try(|| async move { api.chain_id().await.map_err(anyhow::Error::from) })
478 });
479
480 Ok(ClientPreview {
481 connectors,
482 inner: self,
483 config,
484 api_secret,
485 prefetch_api_announcements,
486 preview_prefetch_api_version_set,
487 prefetch_chain_id,
488 })
489 }
490
491 pub async fn open(
492 self,
493 connectors: ConnectorRegistry,
494 db_no_decoders: Database,
495 pre_root_secret: RootSecret,
496 ) -> anyhow::Result<ClientHandle> {
497 Client::run_core_migrations(&db_no_decoders).await?;
498
499 Self::migrate_pending_config_if_present(&db_no_decoders).await;
501
502 let Some(config) = Client::get_config_from_db(&db_no_decoders).await else {
503 bail!("Client database not initialized")
504 };
505
506 let pre_root_secret = pre_root_secret.to_inner(config.calculate_federation_id());
507
508 match db_no_decoders
509 .begin_transaction_nc()
510 .await
511 .get_value(&ClientPreRootSecretHashKey)
512 .await
513 {
514 Some(secret_hash) => {
515 ensure!(
516 pre_root_secret.derive_pre_root_secret_hash() == secret_hash,
517 "Secret hash does not match. Incorrect secret"
518 );
519 }
520 _ => {
521 debug!(target: LOG_CLIENT, "Backfilling secret hash");
522 let mut dbtx = db_no_decoders.begin_transaction().await;
524 dbtx.insert_entry(
525 &ClientPreRootSecretHashKey,
526 &pre_root_secret.derive_pre_root_secret_hash(),
527 )
528 .await;
529 dbtx.commit_tx().await;
530 }
531 }
532
533 let api_secret = Client::get_api_secret_from_db(&db_no_decoders).await;
534 let stopped = self.stopped;
535 let request_hook = self.request_hook.clone();
536
537 let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
538 let client = self
539 .build_stopped(
540 connectors,
541 db_no_decoders,
542 pre_root_secret,
543 &config,
544 api_secret,
545 log_event_added_transient_tx,
546 request_hook,
547 None,
548 None,
549 None, )
551 .await?;
552 if !stopped {
553 client.as_inner().start_executor();
554 }
555 Ok(client)
556 }
557
558 #[allow(clippy::too_many_arguments)]
560 pub(crate) async fn build(
561 self,
562 connectors: ConnectorRegistry,
563 db_no_decoders: Database,
564 pre_root_secret: DerivableSecret,
565 config: ClientConfig,
566 api_secret: Option<String>,
567 stopped: bool,
568 preview_prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
569 preview_prefetch_api_version_set: Option<
570 JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>,
571 >,
572 prefetch_chain_id: Option<JitTryAnyhow<ChainId>>,
573 ) -> anyhow::Result<ClientHandle> {
574 let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
575 let request_hook = self.request_hook.clone();
576 let client = self
577 .build_stopped(
578 connectors,
579 db_no_decoders,
580 pre_root_secret,
581 &config,
582 api_secret,
583 log_event_added_transient_tx,
584 request_hook,
585 preview_prefetch_api_announcements,
586 preview_prefetch_api_version_set,
587 prefetch_chain_id,
588 )
589 .await?;
590 if !stopped {
591 client.as_inner().start_executor();
592 }
593
594 Ok(client)
595 }
596
597 #[allow(clippy::too_many_arguments)]
600 async fn build_stopped(
601 mut self,
602 connectors: ConnectorRegistry,
603 db_no_decoders: Database,
604 pre_root_secret: DerivableSecret,
605 config: &ClientConfig,
606 api_secret: Option<String>,
607 log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
608 request_hook: ApiRequestHook,
609 preview_prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
610 preview_prefetch_api_version_set: Option<
611 JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>,
612 >,
613 prefetch_chain_id: Option<JitTryAnyhow<ChainId>>,
614 ) -> anyhow::Result<ClientHandle> {
615 debug!(
616 target: LOG_CLIENT,
617 version = %fedimint_build_code_version_env!(),
618 "Building fedimint client",
619 );
620 for (kind, module) in self.module_inits.iter() {
621 debug!(
622 target: LOG_CLIENT,
623 module = %kind,
624 supported_api = %module.supported_api_versions(),
625 "Supported module api versions",
626 );
627 }
628 let (log_event_added_tx, log_event_added_rx) = watch::channel(());
629 let (log_ordering_wakeup_tx, log_ordering_wakeup_rx) = watch::channel(());
630
631 let decoders = self.decoders(config);
632 let config = Self::config_decoded(config, &decoders)?;
633 let fed_id = config.calculate_federation_id();
634 let db = db_no_decoders.with_decoders(decoders.clone());
635 let peer_urls = get_api_urls(&db, &config).await;
636 let api = match self.admin_creds.as_ref() {
637 Some(admin_creds) => FederationApi::new(
638 connectors.clone(),
639 peer_urls,
640 Some(admin_creds.peer_id),
641 Some(admin_creds.auth.as_str()),
642 )
643 .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
644 .with_request_hook(&request_hook)
645 .with_cache()
646 .into(),
647 None => FederationApi::new(connectors.clone(), peer_urls, None, api_secret.as_deref())
648 .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
649 .with_request_hook(&request_hook)
650 .with_cache()
651 .into(),
652 };
653
654 let task_group = TaskGroup::new();
655 let client_span = Client::make_client_span(fed_id);
656
657 self.migrate_module_dbs(&db, &config).await?;
660
661 let init_state = Self::load_init_state(&db).await;
662
663 let notifier = Notifier::new();
664
665 if let Some(p) = preview_prefetch_api_announcements {
666 let announcements = p.get().await;
670
671 store_api_announcements_updates_from_peers(&db, announcements).await?
672 }
673
674 if let Some(preview_prefetch_api_version_set) = preview_prefetch_api_version_set {
675 match preview_prefetch_api_version_set.get_try().await {
676 Ok(peer_api_versions) => {
677 Client::store_prefetched_api_versions(
678 &db,
679 &config,
680 &self.module_inits,
681 peer_api_versions,
682 )
683 .await;
684 }
685 Err(err) => {
686 debug!(target: LOG_CLIENT, err = %err.fmt_compact(), "Prefetching api version negotiation failed");
687 }
688 }
689 }
690
691 let common_api_versions = Client::load_and_refresh_common_api_version_static(
692 &config,
693 &self.module_inits,
694 connectors.clone(),
695 &api,
696 &db,
697 &task_group,
698 &client_span,
699 )
700 .await
701 .inspect_err(|err| {
702 warn!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Failed to discover API version to use.");
703 })
704 .unwrap_or(ApiVersionSet {
705 core: ApiVersion::new(0, 0),
706 modules: BTreeMap::new(),
708 });
709
710 client_span.in_scope(|| {
711 debug!(
712 target: LOG_CLIENT,
713 core = %common_api_versions.core,
714 "Negotiated core API version",
715 );
716 for (module_id, api_version) in &common_api_versions.modules {
717 let kind = config.modules.get(module_id).map(|m| m.kind());
718 let kind_str = kind
719 .as_ref()
720 .map(|k| k.to_string())
721 .unwrap_or_else(|| format!("unknown({module_id})"));
722 let supported = kind
723 .and_then(|k| self.module_inits.get(k))
724 .map(|m| m.supported_api_versions().to_string());
725 debug!(
726 target: LOG_CLIENT,
727 module = %kind_str,
728 api = %api_version,
729 supported = %supported.as_deref().unwrap_or("unknown"),
730 "Negotiated module API version",
731 );
732 }
733 });
734
735 Self::load_and_refresh_client_config_static(&config, &api, &db, &task_group, &client_span);
737
738 if let Some(prefetch_chain_id) = prefetch_chain_id {
742 match prefetch_chain_id.get_try().await {
743 Ok(chain_id) => {
744 debug!(target: LOG_CLIENT, %chain_id, "Caching prefetched chain ID");
745 let mut dbtx = db.begin_transaction().await;
746 dbtx.insert_entry(&ChainIdKey, chain_id).await;
747 dbtx.commit_tx().await;
748 }
749 Err(err) => {
750 debug!(target: LOG_CLIENT, err = %err.fmt_compact(), "Failed to prefetch chain ID, will retry on next start");
751 }
752 }
753 }
754
755 let user_bitcoind_rpc = if let Some(factory) = self.bitcoind_rpc_factory.take() {
757 let chain_id = db.begin_transaction_nc().await.get_value(&ChainIdKey).await;
759
760 if let Some(chain_id) = chain_id {
761 debug!(target: LOG_CLIENT, %chain_id, "Creating user-provided bitcoind RPC client");
762 factory(chain_id).await
763 } else {
764 debug!(target: LOG_CLIENT, "Chain ID not available, skipping user-provided bitcoind RPC creation");
765 None
766 }
767 } else {
768 None
769 };
770
771 let mut module_recoveries: BTreeMap<
772 ModuleInstanceId,
773 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
774 > = BTreeMap::new();
775 let mut module_recovery_progress_receivers: BTreeMap<
776 ModuleInstanceId,
777 watch::Receiver<RecoveryProgress>,
778 > = BTreeMap::new();
779
780 let final_client = FinalClientIface::default();
781
782 let root_secret = Self::federation_root_secret(&pre_root_secret, &config);
783
784 let modules = {
785 let mut modules = ClientModuleRegistry::default();
786 for (module_instance_id, module_config) in config.modules.clone() {
787 let kind = module_config.kind().clone();
788 let Some(module_init) = self.module_inits.get(&kind).cloned() else {
789 client_span.in_scope(|| {
790 debug!(
791 target: LOG_CLIENT,
792 kind=%kind,
793 instance_id=%module_instance_id,
794 "Module kind of instance not found in module gens, skipping");
795 });
796 continue;
797 };
798
799 let Some(&api_version) = common_api_versions.modules.get(&module_instance_id)
800 else {
801 client_span.in_scope(|| {
802 warn!(
803 target: LOG_CLIENT,
804 kind=%kind,
805 instance_id=%module_instance_id,
806 "Module kind of instance has incompatible api version, skipping"
807 );
808 });
809 continue;
810 };
811
812 let start_module_recover_fn =
815 |snapshot: Option<ClientBackup>, progress: RecoveryProgress| {
816 let module_config = module_config.clone();
817 let num_peers = NumPeers::from(config.global.api_endpoints.len());
818 let db = db.clone();
819 let kind = kind.clone();
820 let notifier = notifier.clone();
821 let api = api.clone();
822 let root_secret = root_secret.clone();
823 let admin_auth = self.admin_creds.as_ref().map(|creds| creds.auth.clone());
824 let final_client = final_client.clone();
825 let (progress_tx, progress_rx) = tokio::sync::watch::channel(progress);
826 let task_group = task_group.clone();
827 let module_init = module_init.clone();
828 let user_bitcoind_rpc = user_bitcoind_rpc.clone();
829 let user_bitcoind_rpc_no_chain_id =
830 self.bitcoind_rpc_no_chain_id_factory.clone();
831 let client_span = client_span.clone();
832 (
833 Box::pin(async move {
834 module_init
835 .recover(
836 final_client.clone(),
837 fed_id,
838 num_peers,
839 module_config.clone(),
840 db.clone(),
841 module_instance_id,
842 common_api_versions.core,
843 api_version,
844 root_secret.derive_module_secret(module_instance_id),
845 notifier.clone(),
846 api.clone(),
847 admin_auth,
848 snapshot.as_ref().and_then(|s| s.modules.get(&module_instance_id)),
849 progress_tx,
850 task_group,
851 client_span,
852 user_bitcoind_rpc,
853 user_bitcoind_rpc_no_chain_id,
854 )
855 .await
856 .inspect_err(|err| {
857 warn!(
858 target: LOG_CLIENT,
859 module_id = module_instance_id, %kind, err = %err.fmt_compact_anyhow(), "Module failed to recover"
860 );
861 })
862 }),
863 progress_rx,
864 )
865 };
866
867 let recovery = match init_state.does_require_recovery() {
868 Some(snapshot) => {
869 match db
870 .begin_transaction_nc()
871 .await
872 .get_value(&ClientModuleRecovery { module_instance_id })
873 .await
874 {
875 Some(module_recovery_state) => {
876 if module_recovery_state.is_done() {
877 debug!(
878 id = %module_instance_id,
879 %kind, "Module recovery already complete"
880 );
881 None
882 } else {
883 debug!(
884 id = %module_instance_id,
885 %kind,
886 progress = %module_recovery_state.progress,
887 "Starting module recovery with an existing progress"
888 );
889 Some(start_module_recover_fn(
890 snapshot,
891 module_recovery_state.progress,
892 ))
893 }
894 }
895 _ => {
896 let progress = RecoveryProgress::none();
897 let mut dbtx = db.begin_transaction().await;
898 dbtx.log_event(
899 log_ordering_wakeup_tx.clone(),
900 None,
901 ModuleRecoveryStarted::new(module_instance_id),
902 )
903 .await;
904 dbtx.insert_entry(
905 &ClientModuleRecovery { module_instance_id },
906 &ClientModuleRecoveryState { progress },
907 )
908 .await;
909
910 dbtx.commit_tx().await;
911
912 debug!(
913 id = %module_instance_id,
914 %kind, "Starting new module recovery"
915 );
916 Some(start_module_recover_fn(snapshot, progress))
917 }
918 }
919 }
920 _ => None,
921 };
922
923 match recovery {
924 Some((recovery, recovery_progress_rx)) => {
925 module_recoveries.insert(module_instance_id, recovery);
926 module_recovery_progress_receivers
927 .insert(module_instance_id, recovery_progress_rx);
928 }
929 _ => {
930 let module = module_init
931 .init(
932 final_client.clone(),
933 fed_id,
934 config.global.api_endpoints.len(),
935 module_config,
936 db.clone(),
937 module_instance_id,
938 common_api_versions.core,
939 api_version,
940 root_secret.derive_module_secret(module_instance_id),
947 notifier.clone(),
948 api.clone(),
949 self.admin_creds.as_ref().map(|cred| cred.auth.clone()),
950 task_group.clone(),
951 client_span.clone(),
952 connectors.clone(),
953 user_bitcoind_rpc.clone(),
954 self.bitcoind_rpc_no_chain_id_factory.clone(),
955 )
956 .await?;
957
958 modules.register_module(module_instance_id, kind, module);
959 }
960 }
961 }
962 modules
963 };
964
965 if init_state.is_pending() && module_recoveries.is_empty() {
966 let mut dbtx = db.begin_transaction().await;
967 dbtx.insert_entry(&ClientInitStateKey, &init_state.into_complete())
968 .await;
969 dbtx.commit_tx().await;
970 }
971
972 let mut primary_modules: BTreeMap<PrimaryModulePriority, PrimaryModuleCandidates> =
973 BTreeMap::new();
974
975 for (module_id, _kind, module) in modules.iter_modules() {
976 match module.supports_being_primary() {
977 PrimaryModuleSupport::Any { priority } => {
978 primary_modules
979 .entry(priority)
980 .or_default()
981 .wildcard
982 .push(module_id);
983 }
984 PrimaryModuleSupport::Selected { priority, units } => {
985 for unit in units {
986 primary_modules
987 .entry(priority)
988 .or_default()
989 .specific
990 .entry(unit)
991 .or_default()
992 .push(module_id);
993 }
994 }
995 PrimaryModuleSupport::None => {}
996 }
997 }
998
999 let executor = client_span.in_scope(|| {
1000 let mut executor_builder = Executor::builder();
1001 executor_builder
1002 .with_module(TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext);
1003
1004 for (module_instance_id, _, module) in modules.iter_modules() {
1005 executor_builder.with_module_dyn(module.context(module_instance_id));
1006 }
1007
1008 for module_instance_id in module_recoveries.keys() {
1009 executor_builder.with_valid_module_id(*module_instance_id);
1010 }
1011
1012 executor_builder.build(
1013 db.clone(),
1014 notifier,
1015 task_group.clone(),
1016 log_ordering_wakeup_tx.clone(),
1017 )
1018 });
1019
1020 let recovery_receiver_init_val = module_recovery_progress_receivers
1021 .iter()
1022 .map(|(module_instance_id, rx)| (*module_instance_id, *rx.borrow()))
1023 .collect::<BTreeMap<_, _>>();
1024 let (client_recovery_progress_sender, client_recovery_progress_receiver) =
1025 watch::channel(recovery_receiver_init_val);
1026
1027 let client_inner = Arc::new(Client {
1028 final_client: final_client.clone(),
1029 config: tokio::sync::RwLock::new(config.clone()),
1030 api_secret,
1031 decoders,
1032 db: db.clone(),
1033 connectors,
1034 federation_id: fed_id,
1035 federation_config_meta: config.global.meta,
1036 primary_modules,
1037 modules,
1038 module_inits: self.module_inits.clone(),
1039 log_ordering_wakeup_tx,
1040 log_event_added_rx,
1041 log_event_added_transient_tx: log_event_added_transient_tx.clone(),
1042 request_hook,
1043 executor,
1044 api,
1045 secp_ctx: Secp256k1::new(),
1046 root_secret,
1047 task_group,
1048 client_span,
1049 operation_log: OperationLog::new(db.clone()),
1050 client_recovery_progress_receiver,
1051 meta_service: self.meta_service,
1052 iroh_enable_dht: self.iroh_enable_dht,
1053 iroh_enable_next: self.iroh_enable_next,
1054 user_bitcoind_rpc,
1055 user_bitcoind_rpc_no_chain_id: self.bitcoind_rpc_no_chain_id_factory,
1056 });
1057 client_inner.spawn_cancellable("MetaService::update_continuously", {
1058 let client_inner = client_inner.clone();
1059 async move {
1060 client_inner
1061 .meta_service
1062 .update_continuously(&client_inner)
1063 .await;
1064 }
1065 });
1066
1067 client_inner.spawn_cancellable("update-api-announcements", {
1068 let client_inner = client_inner.clone();
1069 async move {
1070 client_inner
1071 .connectors
1072 .wait_for_initialized_connections()
1073 .await;
1074 run_api_announcement_refresh_task(client_inner.clone()).await
1075 }
1076 });
1077
1078 client_inner.spawn_cancellable("guardian metadata refresh task", {
1079 let client_inner = client_inner.clone();
1080 async move {
1081 client_inner
1082 .connectors
1083 .wait_for_initialized_connections()
1084 .await;
1085 run_guardian_metadata_refresh_task(client_inner.clone()).await
1086 }
1087 });
1088
1089 client_inner.spawn_cancellable("event log ordering task", {
1090 let client_inner = client_inner.clone();
1091 async move {
1092 client_inner
1093 .connectors
1094 .wait_for_initialized_connections()
1095 .await;
1096
1097 run_event_log_ordering_task(
1098 db.clone(),
1099 log_ordering_wakeup_rx,
1100 log_event_added_tx,
1101 log_event_added_transient_tx,
1102 )
1103 .await
1104 }
1105 });
1106
1107 if client_inner
1111 .db
1112 .begin_transaction_nc()
1113 .await
1114 .get_value(&ChainIdKey)
1115 .await
1116 .is_none()
1117 {
1118 client_inner.spawn_cancellable("fetch-chain-id", {
1119 let client_inner = client_inner.clone();
1120 async move {
1121 client_inner.api.wait_for_initialized_connections().await;
1122 match client_inner.api.chain_id().await {
1123 Ok(chain_id) => {
1124 debug!(target: LOG_CLIENT, %chain_id, "Caching chain ID from background fetch");
1125 let mut dbtx = client_inner.db.begin_transaction().await;
1126 dbtx.insert_entry(&ChainIdKey, &chain_id).await;
1127 dbtx.commit_tx().await;
1128 }
1129 Err(err) => {
1130 debug!(target: LOG_CLIENT, err = %err.fmt_compact(), "Background chain ID fetch failed, will retry on next start");
1131 }
1132 }
1133 }
1134 });
1135 }
1136
1137 let client_iface = std::sync::Arc::<Client>::downgrade(&client_inner);
1138
1139 let client_arc = ClientHandle::new(client_inner);
1140
1141 for (_, _, module) in client_arc.modules.iter_modules() {
1142 module.start().await;
1143 }
1144
1145 final_client.set(client_iface.clone());
1146
1147 if !module_recoveries.is_empty() {
1148 client_arc.spawn_module_recoveries_task(
1149 client_recovery_progress_sender,
1150 module_recoveries,
1151 module_recovery_progress_receivers,
1152 );
1153 }
1154
1155 Ok(client_arc)
1156 }
1157
1158 async fn load_init_state(db: &Database) -> InitState {
1159 let mut dbtx = db.begin_transaction_nc().await;
1160 dbtx.get_value(&ClientInitStateKey)
1161 .await
1162 .unwrap_or_else(|| {
1163 warn!(
1166 target: LOG_CLIENT,
1167 "Client missing ClientRequiresRecovery: assuming complete"
1168 );
1169 db::InitState::Complete(db::InitModeComplete::Fresh)
1170 })
1171 }
1172
1173 fn decoders(&self, config: &ClientConfig) -> ModuleDecoderRegistry {
1174 let mut decoders = client_decoders(
1175 &self.module_inits,
1176 config
1177 .modules
1178 .iter()
1179 .map(|(module_instance, module_config)| (*module_instance, module_config.kind())),
1180 );
1181
1182 decoders.register_module(
1183 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
1184 ModuleKind::from_static_str("tx_submission"),
1185 tx_submission_sm_decoder(),
1186 );
1187
1188 decoders
1189 }
1190
1191 fn config_decoded(
1192 config: &ClientConfig,
1193 decoders: &ModuleDecoderRegistry,
1194 ) -> Result<ClientConfig, fedimint_core::encoding::DecodeError> {
1195 config.clone().redecode_raw(decoders)
1196 }
1197
1198 fn federation_root_secret(
1202 pre_root_secret: &DerivableSecret,
1203 config: &ClientConfig,
1204 ) -> DerivableSecret {
1205 pre_root_secret.federation_key(&config.global.calculate_federation_id())
1206 }
1207
1208 pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
1210 self.log_event_added_transient_tx.subscribe()
1211 }
1212
1213 async fn migrate_pending_config_if_present(db: &Database) {
1217 if let Some(pending_config) = Client::get_pending_config_from_db(db).await {
1218 debug!(target: LOG_CLIENT, "Found pending client config, migrating to current config");
1219
1220 let mut dbtx = db.begin_transaction().await;
1221 dbtx.insert_entry(&crate::db::ClientConfigKey, &pending_config)
1223 .await;
1224 dbtx.remove_entry(&PendingClientConfigKey).await;
1226 dbtx.commit_tx().await;
1227
1228 debug!(target: LOG_CLIENT, "Successfully migrated pending config to current config");
1229 }
1230 }
1231
1232 fn load_and_refresh_client_config_static(
1235 config: &ClientConfig,
1236 api: &DynGlobalApi,
1237 db: &Database,
1238 task_group: &TaskGroup,
1239 client_span: &Span,
1240 ) {
1241 let config = config.clone();
1242 let api = api.clone();
1243 let db = db.clone();
1244 let task_group = task_group.clone();
1245
1246 task_group.spawn_cancellable_with_span(
1248 client_span.clone(),
1249 "refresh_client_config_static",
1250 async move {
1251 api.wait_for_initialized_connections().await;
1252 Self::refresh_client_config_static(&config, &api, &db).await;
1253 },
1254 );
1255 }
1256
1257 async fn refresh_client_config_static(
1259 config: &ClientConfig,
1260 api: &DynGlobalApi,
1261 db: &Database,
1262 ) {
1263 if let Err(error) = Self::refresh_client_config_static_try(config, api, db).await {
1264 warn!(
1265 target: LOG_CLIENT,
1266 err = %error.fmt_compact_anyhow(), "Failed to refresh client config"
1267 );
1268 }
1269 }
1270
1271 fn validate_config_update(
1273 current_config: &ClientConfig,
1274 new_config: &ClientConfig,
1275 ) -> anyhow::Result<()> {
1276 if current_config.global != new_config.global {
1278 bail!("Global configuration changes are not allowed in config updates");
1279 }
1280
1281 for (module_id, current_module_config) in ¤t_config.modules {
1283 match new_config.modules.get(module_id) {
1284 Some(new_module_config) => {
1285 if current_module_config != new_module_config {
1286 bail!(
1287 "Module {} configuration changes are not allowed, only additions are permitted",
1288 module_id
1289 );
1290 }
1291 }
1292 None => {
1293 bail!(
1294 "Module {} was removed in new config, only additions are allowed",
1295 module_id
1296 );
1297 }
1298 }
1299 }
1300
1301 Ok(())
1302 }
1303
1304 async fn refresh_client_config_static_try(
1306 current_config: &ClientConfig,
1307 api: &DynGlobalApi,
1308 db: &Database,
1309 ) -> anyhow::Result<()> {
1310 debug!(target: LOG_CLIENT, "Refreshing client config");
1311
1312 let fetched_config = api
1314 .request_current_consensus::<ClientConfig>(
1315 CLIENT_CONFIG_ENDPOINT.to_owned(),
1316 ApiRequestErased::default(),
1317 )
1318 .await?;
1319
1320 Self::validate_config_update(current_config, &fetched_config)?;
1322
1323 if current_config != &fetched_config {
1325 debug!(target: LOG_CLIENT, "Detected federation config change, saving as pending config");
1326
1327 let mut dbtx = db.begin_transaction().await;
1328 dbtx.insert_entry(&PendingClientConfigKey, &fetched_config)
1329 .await;
1330 dbtx.commit_tx().await;
1331 } else {
1332 debug!(target: LOG_CLIENT, "No federation config changes detected");
1333 }
1334
1335 Ok(())
1336 }
1337}
1338
1339pub struct ClientPreview {
1344 inner: ClientBuilder,
1345 config: ClientConfig,
1346 connectors: ConnectorRegistry,
1347 api_secret: Option<String>,
1348 prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
1349 preview_prefetch_api_version_set:
1350 Option<JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>>,
1351 prefetch_chain_id: Option<JitTryAnyhow<ChainId>>,
1352}
1353
1354impl ClientPreview {
1355 pub fn config(&self) -> &ClientConfig {
1357 &self.config
1358 }
1359
1360 pub async fn join(
1439 self,
1440 db_no_decoders: Database,
1441 pre_root_secret: RootSecret,
1442 ) -> anyhow::Result<ClientHandle> {
1443 let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1444
1445 let client = self
1446 .inner
1447 .init(
1448 self.connectors,
1449 db_no_decoders,
1450 pre_root_secret,
1451 self.config,
1452 self.api_secret,
1453 InitMode::Fresh,
1454 self.prefetch_api_announcements,
1455 self.preview_prefetch_api_version_set,
1456 self.prefetch_chain_id,
1457 )
1458 .await?;
1459
1460 Ok(client)
1461 }
1462
1463 pub async fn recover(
1475 self,
1476 db_no_decoders: Database,
1477 pre_root_secret: RootSecret,
1478 backup: Option<ClientBackup>,
1479 ) -> anyhow::Result<ClientHandle> {
1480 let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1481
1482 let client = self
1483 .inner
1484 .init(
1485 self.connectors,
1486 db_no_decoders,
1487 pre_root_secret,
1488 self.config,
1489 self.api_secret,
1490 InitMode::Recover {
1491 snapshot: backup.clone(),
1492 },
1493 self.prefetch_api_announcements,
1494 self.preview_prefetch_api_version_set,
1495 self.prefetch_chain_id,
1496 )
1497 .await?;
1498
1499 Ok(client)
1500 }
1501
1502 #[deprecated(
1504 note = "Recovery is now efficient enough that backups are no longer necessary. Backups will be removed in v0.13.0 due to backups being inherently complicated and brittle."
1505 )]
1506 #[allow(deprecated)]
1507 pub async fn download_backup_from_federation(
1508 &self,
1509 pre_root_secret: RootSecret,
1510 ) -> anyhow::Result<Option<ClientBackup>> {
1511 let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1512 let api = DynGlobalApi::new(
1513 self.connectors.clone(),
1514 self.config
1516 .global
1517 .api_endpoints
1518 .iter()
1519 .map(|(peer_id, peer_url)| (*peer_id, peer_url.url.clone()))
1520 .collect(),
1521 self.api_secret.as_deref(),
1522 )?;
1523
1524 Client::download_backup_from_federation_static(
1525 &api,
1526 &ClientBuilder::federation_root_secret(&pre_root_secret, &self.config),
1527 &self.inner.decoders(&self.config),
1528 )
1529 .await
1530 }
1531}