1use std::collections::BTreeMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::time::Duration;
6
7use anyhow::{bail, ensure};
8use bitcoin::key::Secp256k1;
9use fedimint_api_client::api::global_api::with_cache::GlobalFederationApiWithCacheExt as _;
10use fedimint_api_client::api::global_api::with_request_hook::{
11 ApiRequestHook, RawFederationApiWithRequestHookExt as _,
12};
13use fedimint_api_client::api::{ApiVersionSet, DynGlobalApi, FederationApi, FederationApiExt as _};
14use fedimint_api_client::download_from_invite_code;
15use fedimint_bitcoind::DynBitcoindRpc;
16use fedimint_client_module::api::ClientRawFederationApiExt as _;
17use fedimint_client_module::meta::LegacyMetaSource;
18use fedimint_client_module::module::init::{
19 BitcoindRpcFactory, BitcoindRpcNoChainIdFactory, ClientModuleInit,
20};
21use fedimint_client_module::module::recovery::RecoveryProgress;
22use fedimint_client_module::module::{
23 ClientModuleRegistry, FinalClientIface, PrimaryModulePriority, PrimaryModuleSupport,
24};
25use fedimint_client_module::secret::{DeriveableSecretClientExt as _, get_default_client_secret};
26use fedimint_client_module::transaction::{
27 TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext, tx_submission_sm_decoder,
28};
29use fedimint_client_module::{AdminCreds, ModuleRecoveryStarted};
30use fedimint_connectors::ConnectorRegistry;
31use fedimint_core::config::{ClientConfig, FederationId, ModuleInitRegistry};
32use fedimint_core::core::{ModuleInstanceId, ModuleKind};
33use fedimint_core::db::{
34 Database, IDatabaseTransactionOpsCoreTyped as _, verify_module_db_integrity_dbtx,
35};
36use fedimint_core::endpoint_constants::CLIENT_CONFIG_ENDPOINT;
37use fedimint_core::envs::is_running_in_test_env;
38use fedimint_core::invite_code::InviteCode;
39use fedimint_core::module::registry::ModuleDecoderRegistry;
40use fedimint_core::module::{ApiRequestErased, ApiVersion, SupportedApiVersionsSummary};
41use fedimint_core::task::TaskGroup;
42use fedimint_core::task::jit::{Jit, JitTry, JitTryAnyhow};
43use fedimint_core::util::{FmtCompact as _, FmtCompactAnyhow as _, SafeUrl};
44use fedimint_core::{ChainId, NumPeers, PeerId, fedimint_build_code_version_env, maybe_add_send};
45use fedimint_derive_secret::DerivableSecret;
46use fedimint_eventlog::{
47 DBTransactionEventLogExt as _, EventLogEntry, run_event_log_ordering_task,
48};
49use fedimint_logging::LOG_CLIENT;
50use tokio::sync::{broadcast, watch};
51use tracing::{debug, trace, warn};
52
53use super::handle::ClientHandle;
54use super::{Client, client_decoders};
55use crate::api_announcements::{
56 PeersSignedApiAnnouncements, fetch_api_announcements_from_at_least_num_of_peers, get_api_urls,
57 run_api_announcement_refresh_task, store_api_announcements_updates_from_peers,
58};
59use crate::backup::{ClientBackup, Metadata};
60use crate::client::PrimaryModuleCandidates;
61use crate::db::{
62 self, ApiSecretKey, ChainIdKey, ClientInitStateKey, ClientMetadataKey, ClientModuleRecovery,
63 ClientModuleRecoveryState, ClientPreRootSecretHashKey, InitMode, InitState,
64 PendingClientConfigKey, apply_migrations_client_module_dbtx,
65};
66use crate::guardian_metadata::run_guardian_metadata_refresh_task;
67use crate::meta::MetaService;
68use crate::module_init::ClientModuleInitRegistry;
69use crate::oplog::OperationLog;
70use crate::sm::executor::Executor;
71use crate::sm::notifier::Notifier;
72
73#[derive(Clone)]
95pub enum RootSecret {
96 StandardDoubleDerive(DerivableSecret),
101 Custom(DerivableSecret),
106}
107
108impl RootSecret {
109 fn to_inner(&self, federation_id: FederationId) -> DerivableSecret {
110 match self {
111 RootSecret::StandardDoubleDerive(derivable_secret) => {
112 get_default_client_secret(derivable_secret, &federation_id)
113 }
114 RootSecret::Custom(derivable_secret) => derivable_secret.clone(),
115 }
116 }
117}
118
119pub struct ClientBuilder {
121 module_inits: ClientModuleInitRegistry,
122 admin_creds: Option<AdminCreds>,
123 meta_service: Arc<crate::meta::MetaService>,
124 stopped: bool,
125 log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
126 request_hook: ApiRequestHook,
127 iroh_enable_dht: bool,
128 iroh_enable_next: bool,
129 bitcoind_rpc_factory: Option<BitcoindRpcFactory>,
130 bitcoind_rpc_no_chain_id_factory: Option<BitcoindRpcNoChainIdFactory>,
131}
132
133impl ClientBuilder {
134 pub(crate) fn new() -> Self {
135 trace!(
136 target: LOG_CLIENT,
137 version = %fedimint_build_code_version_env!(),
138 "Initializing fedimint client",
139 );
140 let meta_service = MetaService::new(LegacyMetaSource::default());
141 let (log_event_added_transient_tx, _log_event_added_transient_rx) =
142 broadcast::channel(1024);
143
144 ClientBuilder {
145 module_inits: ModuleInitRegistry::new(),
146 admin_creds: None,
147 stopped: false,
148 meta_service,
149 log_event_added_transient_tx,
150 request_hook: Arc::new(|api| api),
151 iroh_enable_dht: true,
152 iroh_enable_next: true,
153 bitcoind_rpc_factory: None,
154 bitcoind_rpc_no_chain_id_factory: None,
155 }
156 }
157
158 pub(crate) fn from_existing(client: &Client) -> Self {
159 ClientBuilder {
160 module_inits: client.module_inits.clone(),
161 admin_creds: None,
162 stopped: false,
163 meta_service: client.meta_service.clone(),
165 log_event_added_transient_tx: client.log_event_added_transient_tx.clone(),
166 request_hook: client.request_hook.clone(),
167 iroh_enable_dht: client.iroh_enable_dht,
168 iroh_enable_next: client.iroh_enable_next,
169 bitcoind_rpc_factory: None,
172 bitcoind_rpc_no_chain_id_factory: client.user_bitcoind_rpc_no_chain_id.clone(),
174 }
175 }
176
177 pub fn with_module_inits(&mut self, module_inits: ClientModuleInitRegistry) {
179 self.module_inits = module_inits;
180 }
181
182 pub fn with_module<M: ClientModuleInit>(&mut self, module_init: M) {
184 self.module_inits.attach(module_init);
185 }
186
187 pub fn stopped(&mut self) {
188 self.stopped = true;
189 }
190 pub fn with_api_request_hook(mut self, hook: ApiRequestHook) -> Self {
199 self.request_hook = hook;
200 self
201 }
202
203 pub fn with_meta_service(&mut self, meta_service: Arc<MetaService>) {
204 self.meta_service = meta_service;
205 }
206
207 pub fn with_iroh_enable_dht(mut self, iroh_enable_dht: bool) -> Self {
210 self.iroh_enable_dht = iroh_enable_dht;
211 self
212 }
213
214 pub fn with_iroh_enable_next(mut self, iroh_enable_next: bool) -> Self {
217 self.iroh_enable_next = iroh_enable_next;
218 self
219 }
220
221 pub fn with_bitcoind_rpc<F, Fut>(mut self, factory: F) -> Self
243 where
244 F: FnOnce(ChainId) -> Fut + Send + Sync + 'static,
245 Fut: Future<Output = Option<DynBitcoindRpc>> + Send + 'static,
246 {
247 self.bitcoind_rpc_factory = Some(Box::new(move |chain_id| Box::pin(factory(chain_id))));
248 self
249 }
250
251 pub fn with_bitcoind_rpc_no_chain_id<F, Fut>(mut self, factory: F) -> Self
274 where
275 F: Fn(SafeUrl) -> Fut + Send + Sync + 'static,
276 Fut: Future<Output = Option<DynBitcoindRpc>> + Send + 'static,
277 {
278 self.bitcoind_rpc_no_chain_id_factory = Some(Arc::new(move |url| Box::pin(factory(url))));
279 self
280 }
281
282 async fn migrate_module_dbs(
289 &self,
290 db: &Database,
291 client_config: &ClientConfig,
292 ) -> anyhow::Result<()> {
293 for (module_id, module_cfg) in &client_config.modules {
294 let kind = module_cfg.kind.clone();
295 let Some(init) = self.module_inits.get(&kind) else {
296 continue;
298 };
299
300 let mut dbtx = db.begin_transaction().await;
301 apply_migrations_client_module_dbtx(
302 &mut dbtx.to_ref_nc(),
303 kind.to_string(),
304 init.get_database_migrations(),
305 *module_id,
306 )
307 .await?;
308 if let Some(used_db_prefixes) = init.used_db_prefixes()
309 && is_running_in_test_env()
310 {
311 verify_module_db_integrity_dbtx(
312 &mut dbtx.to_ref_nc(),
313 *module_id,
314 kind,
315 &used_db_prefixes,
316 )
317 .await;
318 }
319 dbtx.commit_tx_result().await?;
320 }
321
322 Ok(())
323 }
324
325 pub async fn load_existing_config(&self, db: &Database) -> anyhow::Result<ClientConfig> {
326 let Some(config) = Client::get_config_from_db(db).await else {
327 bail!("Client database not initialized")
328 };
329
330 Ok(config)
331 }
332
333 pub fn set_admin_creds(&mut self, creds: AdminCreds) {
334 self.admin_creds = Some(creds);
335 }
336
337 #[allow(clippy::too_many_arguments)]
338 async fn init(
339 self,
340 connectors: ConnectorRegistry,
341 db_no_decoders: Database,
342 pre_root_secret: DerivableSecret,
343 config: ClientConfig,
344 api_secret: Option<String>,
345 init_mode: InitMode,
346 preview_prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
347 preview_prefetch_api_version_set: Option<
348 JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>,
349 >,
350 prefetch_chain_id: Option<JitTryAnyhow<ChainId>>,
351 ) -> anyhow::Result<ClientHandle> {
352 if Client::is_initialized(&db_no_decoders).await {
353 bail!("Client database already initialized")
354 }
355
356 Client::run_core_migrations(&db_no_decoders).await?;
357
358 {
361 debug!(target: LOG_CLIENT, "Initializing client database");
362 let mut dbtx = db_no_decoders.begin_transaction().await;
363 dbtx.insert_new_entry(&crate::db::ClientConfigKey, &config)
365 .await;
366 dbtx.insert_entry(
367 &ClientPreRootSecretHashKey,
368 &pre_root_secret.derive_pre_root_secret_hash(),
369 )
370 .await;
371
372 if let Some(api_secret) = api_secret.as_ref() {
373 dbtx.insert_new_entry(&ApiSecretKey, api_secret).await;
374 }
375
376 let init_state = InitState::Pending(init_mode);
377 dbtx.insert_entry(&ClientInitStateKey, &init_state).await;
378
379 let metadata = init_state
380 .does_require_recovery()
381 .flatten()
382 .map_or(Metadata::empty(), |s| s.metadata);
383
384 dbtx.insert_new_entry(&ClientMetadataKey, &metadata).await;
385
386 dbtx.commit_tx_result().await?;
387 }
388
389 let stopped = self.stopped;
390 self.build(
391 connectors,
392 db_no_decoders,
393 pre_root_secret,
394 config,
395 api_secret,
396 stopped,
397 preview_prefetch_api_announcements,
398 preview_prefetch_api_version_set,
399 prefetch_chain_id,
400 )
401 .await
402 }
403
404 pub async fn preview(
405 self,
406 connectors: ConnectorRegistry,
407 invite_code: &InviteCode,
408 ) -> anyhow::Result<ClientPreview> {
409 let (config, api) = download_from_invite_code(&connectors, invite_code).await?;
410
411 let prefetch_api_announcements =
412 config
413 .global
414 .broadcast_public_keys
415 .clone()
416 .map(|guardian_pub_keys| {
417 Jit::new({
418 let api = api.clone();
419 move || async move {
420 fetch_api_announcements_from_at_least_num_of_peers(
424 1,
425 &api,
426 &guardian_pub_keys,
427 Duration::from_millis(20),
430 )
431 .await
432 }
433 })
434 });
435
436 self.preview_inner(
437 connectors,
438 config,
439 invite_code.api_secret(),
440 Some(api),
441 prefetch_api_announcements,
442 )
443 .await
444 }
445
446 pub async fn preview_with_existing_config(
451 self,
452 connectors: ConnectorRegistry,
453 config: ClientConfig,
454 api_secret: Option<String>,
455 ) -> anyhow::Result<ClientPreview> {
456 self.preview_inner(connectors, config, api_secret, None, None)
457 .await
458 }
459
460 async fn preview_inner(
461 self,
462 connectors: ConnectorRegistry,
463 config: ClientConfig,
464 api_secret: Option<String>,
465 prefetch_api: Option<DynGlobalApi>,
466 prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
467 ) -> anyhow::Result<ClientPreview> {
468 let preview_prefetch_api_version_set = prefetch_api.as_ref().map(|api| {
469 JitTry::new_try({
470 let config = config.clone();
471 let api = api.clone();
472 || async move { Client::fetch_common_api_versions(&config, &api).await }
473 })
474 });
475
476 let prefetch_chain_id = prefetch_api.map(|api| {
477 JitTry::new_try(|| async move { api.chain_id().await.map_err(anyhow::Error::from) })
478 });
479
480 Ok(ClientPreview {
481 connectors,
482 inner: self,
483 config,
484 api_secret,
485 prefetch_api_announcements,
486 preview_prefetch_api_version_set,
487 prefetch_chain_id,
488 })
489 }
490
491 pub async fn open(
492 self,
493 connectors: ConnectorRegistry,
494 db_no_decoders: Database,
495 pre_root_secret: RootSecret,
496 ) -> anyhow::Result<ClientHandle> {
497 Client::run_core_migrations(&db_no_decoders).await?;
498
499 Self::migrate_pending_config_if_present(&db_no_decoders).await;
501
502 let Some(config) = Client::get_config_from_db(&db_no_decoders).await else {
503 bail!("Client database not initialized")
504 };
505
506 let pre_root_secret = pre_root_secret.to_inner(config.calculate_federation_id());
507
508 match db_no_decoders
509 .begin_transaction_nc()
510 .await
511 .get_value(&ClientPreRootSecretHashKey)
512 .await
513 {
514 Some(secret_hash) => {
515 ensure!(
516 pre_root_secret.derive_pre_root_secret_hash() == secret_hash,
517 "Secret hash does not match. Incorrect secret"
518 );
519 }
520 _ => {
521 debug!(target: LOG_CLIENT, "Backfilling secret hash");
522 let mut dbtx = db_no_decoders.begin_transaction().await;
524 dbtx.insert_entry(
525 &ClientPreRootSecretHashKey,
526 &pre_root_secret.derive_pre_root_secret_hash(),
527 )
528 .await;
529 dbtx.commit_tx().await;
530 }
531 }
532
533 let api_secret = Client::get_api_secret_from_db(&db_no_decoders).await;
534 let stopped = self.stopped;
535 let request_hook = self.request_hook.clone();
536
537 let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
538 let client = self
539 .build_stopped(
540 connectors,
541 db_no_decoders,
542 pre_root_secret,
543 &config,
544 api_secret,
545 log_event_added_transient_tx,
546 request_hook,
547 None,
548 None,
549 None, )
551 .await?;
552 if !stopped {
553 client.as_inner().start_executor();
554 }
555 Ok(client)
556 }
557
558 #[allow(clippy::too_many_arguments)]
560 pub(crate) async fn build(
561 self,
562 connectors: ConnectorRegistry,
563 db_no_decoders: Database,
564 pre_root_secret: DerivableSecret,
565 config: ClientConfig,
566 api_secret: Option<String>,
567 stopped: bool,
568 preview_prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
569 preview_prefetch_api_version_set: Option<
570 JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>,
571 >,
572 prefetch_chain_id: Option<JitTryAnyhow<ChainId>>,
573 ) -> anyhow::Result<ClientHandle> {
574 let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
575 let request_hook = self.request_hook.clone();
576 let client = self
577 .build_stopped(
578 connectors,
579 db_no_decoders,
580 pre_root_secret,
581 &config,
582 api_secret,
583 log_event_added_transient_tx,
584 request_hook,
585 preview_prefetch_api_announcements,
586 preview_prefetch_api_version_set,
587 prefetch_chain_id,
588 )
589 .await?;
590 if !stopped {
591 client.as_inner().start_executor();
592 }
593
594 Ok(client)
595 }
596
597 #[allow(clippy::too_many_arguments)]
600 async fn build_stopped(
601 mut self,
602 connectors: ConnectorRegistry,
603 db_no_decoders: Database,
604 pre_root_secret: DerivableSecret,
605 config: &ClientConfig,
606 api_secret: Option<String>,
607 log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
608 request_hook: ApiRequestHook,
609 preview_prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
610 preview_prefetch_api_version_set: Option<
611 JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>,
612 >,
613 prefetch_chain_id: Option<JitTryAnyhow<ChainId>>,
614 ) -> anyhow::Result<ClientHandle> {
615 debug!(
616 target: LOG_CLIENT,
617 version = %fedimint_build_code_version_env!(),
618 "Building fedimint client",
619 );
620 let (log_event_added_tx, log_event_added_rx) = watch::channel(());
621 let (log_ordering_wakeup_tx, log_ordering_wakeup_rx) = watch::channel(());
622
623 let decoders = self.decoders(config);
624 let config = Self::config_decoded(config, &decoders)?;
625 let fed_id = config.calculate_federation_id();
626 let db = db_no_decoders.with_decoders(decoders.clone());
627 let peer_urls = get_api_urls(&db, &config).await;
628 let api = match self.admin_creds.as_ref() {
629 Some(admin_creds) => FederationApi::new(
630 connectors.clone(),
631 peer_urls,
632 Some(admin_creds.peer_id),
633 Some(&admin_creds.auth.0),
634 )
635 .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
636 .with_request_hook(&request_hook)
637 .with_cache()
638 .into(),
639 None => FederationApi::new(connectors.clone(), peer_urls, None, api_secret.as_deref())
640 .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
641 .with_request_hook(&request_hook)
642 .with_cache()
643 .into(),
644 };
645
646 let task_group = TaskGroup::new();
647
648 self.migrate_module_dbs(&db, &config).await?;
651
652 let init_state = Self::load_init_state(&db).await;
653
654 let notifier = Notifier::new();
655
656 if let Some(p) = preview_prefetch_api_announcements {
657 let announcements = p.get().await;
661
662 store_api_announcements_updates_from_peers(&db, announcements).await?
663 }
664
665 if let Some(preview_prefetch_api_version_set) = preview_prefetch_api_version_set {
666 match preview_prefetch_api_version_set.get_try().await {
667 Ok(peer_api_versions) => {
668 Client::store_prefetched_api_versions(
669 &db,
670 &config,
671 &self.module_inits,
672 peer_api_versions,
673 )
674 .await;
675 }
676 Err(err) => {
677 debug!(target: LOG_CLIENT, err = %err.fmt_compact(), "Prefetching api version negotiation failed");
678 }
679 }
680 }
681
682 let common_api_versions = Client::load_and_refresh_common_api_version_static(
683 &config,
684 &self.module_inits,
685 connectors.clone(),
686 &api,
687 &db,
688 &task_group,
689 )
690 .await
691 .inspect_err(|err| {
692 warn!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Failed to discover API version to use.");
693 })
694 .unwrap_or(ApiVersionSet {
695 core: ApiVersion::new(0, 0),
696 modules: BTreeMap::new(),
698 });
699
700 debug!(target: LOG_CLIENT, ?common_api_versions, "Completed api version negotiation");
701
702 Self::load_and_refresh_client_config_static(&config, &api, &db, &task_group);
704
705 if let Some(prefetch_chain_id) = prefetch_chain_id {
709 match prefetch_chain_id.get_try().await {
710 Ok(chain_id) => {
711 debug!(target: LOG_CLIENT, %chain_id, "Caching prefetched chain ID");
712 let mut dbtx = db.begin_transaction().await;
713 dbtx.insert_entry(&ChainIdKey, chain_id).await;
714 dbtx.commit_tx().await;
715 }
716 Err(err) => {
717 debug!(target: LOG_CLIENT, err = %err.fmt_compact(), "Failed to prefetch chain ID, will retry on next start");
718 }
719 }
720 }
721
722 let user_bitcoind_rpc = if let Some(factory) = self.bitcoind_rpc_factory.take() {
724 let chain_id = db.begin_transaction_nc().await.get_value(&ChainIdKey).await;
726
727 if let Some(chain_id) = chain_id {
728 debug!(target: LOG_CLIENT, %chain_id, "Creating user-provided bitcoind RPC client");
729 factory(chain_id).await
730 } else {
731 debug!(target: LOG_CLIENT, "Chain ID not available, skipping user-provided bitcoind RPC creation");
732 None
733 }
734 } else {
735 None
736 };
737
738 let mut module_recoveries: BTreeMap<
739 ModuleInstanceId,
740 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
741 > = BTreeMap::new();
742 let mut module_recovery_progress_receivers: BTreeMap<
743 ModuleInstanceId,
744 watch::Receiver<RecoveryProgress>,
745 > = BTreeMap::new();
746
747 let final_client = FinalClientIface::default();
748
749 let root_secret = Self::federation_root_secret(&pre_root_secret, &config);
750
751 let modules = {
752 let mut modules = ClientModuleRegistry::default();
753 for (module_instance_id, module_config) in config.modules.clone() {
754 let kind = module_config.kind().clone();
755 let Some(module_init) = self.module_inits.get(&kind).cloned() else {
756 debug!(
757 target: LOG_CLIENT,
758 kind=%kind,
759 instance_id=%module_instance_id,
760 "Module kind of instance not found in module gens, skipping");
761 continue;
762 };
763
764 let Some(&api_version) = common_api_versions.modules.get(&module_instance_id)
765 else {
766 warn!(
767 target: LOG_CLIENT,
768 kind=%kind,
769 instance_id=%module_instance_id,
770 "Module kind of instance has incompatible api version, skipping"
771 );
772 continue;
773 };
774
775 let start_module_recover_fn =
778 |snapshot: Option<ClientBackup>, progress: RecoveryProgress| {
779 let module_config = module_config.clone();
780 let num_peers = NumPeers::from(config.global.api_endpoints.len());
781 let db = db.clone();
782 let kind = kind.clone();
783 let notifier = notifier.clone();
784 let api = api.clone();
785 let root_secret = root_secret.clone();
786 let admin_auth = self.admin_creds.as_ref().map(|creds| creds.auth.clone());
787 let final_client = final_client.clone();
788 let (progress_tx, progress_rx) = tokio::sync::watch::channel(progress);
789 let task_group = task_group.clone();
790 let module_init = module_init.clone();
791 let user_bitcoind_rpc = user_bitcoind_rpc.clone();
792 let user_bitcoind_rpc_no_chain_id =
793 self.bitcoind_rpc_no_chain_id_factory.clone();
794 (
795 Box::pin(async move {
796 module_init
797 .recover(
798 final_client.clone(),
799 fed_id,
800 num_peers,
801 module_config.clone(),
802 db.clone(),
803 module_instance_id,
804 common_api_versions.core,
805 api_version,
806 root_secret.derive_module_secret(module_instance_id),
807 notifier.clone(),
808 api.clone(),
809 admin_auth,
810 snapshot.as_ref().and_then(|s| s.modules.get(&module_instance_id)),
811 progress_tx,
812 task_group,
813 user_bitcoind_rpc,
814 user_bitcoind_rpc_no_chain_id,
815 )
816 .await
817 .inspect_err(|err| {
818 warn!(
819 target: LOG_CLIENT,
820 module_id = module_instance_id, %kind, err = %err.fmt_compact_anyhow(), "Module failed to recover"
821 );
822 })
823 }),
824 progress_rx,
825 )
826 };
827
828 let recovery = match init_state.does_require_recovery() {
829 Some(snapshot) => {
830 match db
831 .begin_transaction_nc()
832 .await
833 .get_value(&ClientModuleRecovery { module_instance_id })
834 .await
835 {
836 Some(module_recovery_state) => {
837 if module_recovery_state.is_done() {
838 debug!(
839 id = %module_instance_id,
840 %kind, "Module recovery already complete"
841 );
842 None
843 } else {
844 debug!(
845 id = %module_instance_id,
846 %kind,
847 progress = %module_recovery_state.progress,
848 "Starting module recovery with an existing progress"
849 );
850 Some(start_module_recover_fn(
851 snapshot,
852 module_recovery_state.progress,
853 ))
854 }
855 }
856 _ => {
857 let progress = RecoveryProgress::none();
858 let mut dbtx = db.begin_transaction().await;
859 dbtx.log_event(
860 log_ordering_wakeup_tx.clone(),
861 None,
862 ModuleRecoveryStarted::new(module_instance_id),
863 )
864 .await;
865 dbtx.insert_entry(
866 &ClientModuleRecovery { module_instance_id },
867 &ClientModuleRecoveryState { progress },
868 )
869 .await;
870
871 dbtx.commit_tx().await;
872
873 debug!(
874 id = %module_instance_id,
875 %kind, "Starting new module recovery"
876 );
877 Some(start_module_recover_fn(snapshot, progress))
878 }
879 }
880 }
881 _ => None,
882 };
883
884 match recovery {
885 Some((recovery, recovery_progress_rx)) => {
886 module_recoveries.insert(module_instance_id, recovery);
887 module_recovery_progress_receivers
888 .insert(module_instance_id, recovery_progress_rx);
889 }
890 _ => {
891 let module = module_init
892 .init(
893 final_client.clone(),
894 fed_id,
895 config.global.api_endpoints.len(),
896 module_config,
897 db.clone(),
898 module_instance_id,
899 common_api_versions.core,
900 api_version,
901 root_secret.derive_module_secret(module_instance_id),
908 notifier.clone(),
909 api.clone(),
910 self.admin_creds.as_ref().map(|cred| cred.auth.clone()),
911 task_group.clone(),
912 connectors.clone(),
913 user_bitcoind_rpc.clone(),
914 self.bitcoind_rpc_no_chain_id_factory.clone(),
915 )
916 .await?;
917
918 modules.register_module(module_instance_id, kind, module);
919 }
920 }
921 }
922 modules
923 };
924
925 if init_state.is_pending() && module_recoveries.is_empty() {
926 let mut dbtx = db.begin_transaction().await;
927 dbtx.insert_entry(&ClientInitStateKey, &init_state.into_complete())
928 .await;
929 dbtx.commit_tx().await;
930 }
931
932 let mut primary_modules: BTreeMap<PrimaryModulePriority, PrimaryModuleCandidates> =
933 BTreeMap::new();
934
935 for (module_id, _kind, module) in modules.iter_modules() {
936 match module.supports_being_primary() {
937 PrimaryModuleSupport::Any { priority } => {
938 primary_modules
939 .entry(priority)
940 .or_default()
941 .wildcard
942 .push(module_id);
943 }
944 PrimaryModuleSupport::Selected { priority, units } => {
945 for unit in units {
946 primary_modules
947 .entry(priority)
948 .or_default()
949 .specific
950 .entry(unit)
951 .or_default()
952 .push(module_id);
953 }
954 }
955 PrimaryModuleSupport::None => {}
956 }
957 }
958
959 let executor = {
960 let mut executor_builder = Executor::builder();
961 executor_builder
962 .with_module(TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext);
963
964 for (module_instance_id, _, module) in modules.iter_modules() {
965 executor_builder.with_module_dyn(module.context(module_instance_id));
966 }
967
968 for module_instance_id in module_recoveries.keys() {
969 executor_builder.with_valid_module_id(*module_instance_id);
970 }
971
972 executor_builder.build(
973 db.clone(),
974 notifier,
975 task_group.clone(),
976 log_ordering_wakeup_tx.clone(),
977 )
978 };
979
980 let recovery_receiver_init_val = module_recovery_progress_receivers
981 .iter()
982 .map(|(module_instance_id, rx)| (*module_instance_id, *rx.borrow()))
983 .collect::<BTreeMap<_, _>>();
984 let (client_recovery_progress_sender, client_recovery_progress_receiver) =
985 watch::channel(recovery_receiver_init_val);
986
987 let client_inner = Arc::new(Client {
988 final_client: final_client.clone(),
989 config: tokio::sync::RwLock::new(config.clone()),
990 api_secret,
991 decoders,
992 db: db.clone(),
993 connectors,
994 federation_id: fed_id,
995 federation_config_meta: config.global.meta,
996 primary_modules,
997 modules,
998 module_inits: self.module_inits.clone(),
999 log_ordering_wakeup_tx,
1000 log_event_added_rx,
1001 log_event_added_transient_tx: log_event_added_transient_tx.clone(),
1002 request_hook,
1003 executor,
1004 api,
1005 secp_ctx: Secp256k1::new(),
1006 root_secret,
1007 task_group,
1008 operation_log: OperationLog::new(db.clone()),
1009 client_recovery_progress_receiver,
1010 meta_service: self.meta_service,
1011 iroh_enable_dht: self.iroh_enable_dht,
1012 iroh_enable_next: self.iroh_enable_next,
1013 user_bitcoind_rpc,
1014 user_bitcoind_rpc_no_chain_id: self.bitcoind_rpc_no_chain_id_factory,
1015 });
1016 client_inner
1017 .task_group
1018 .spawn_cancellable("MetaService::update_continuously", {
1019 let client_inner = client_inner.clone();
1020 async move {
1021 client_inner
1022 .meta_service
1023 .update_continuously(&client_inner)
1024 .await;
1025 }
1026 });
1027
1028 client_inner
1029 .task_group
1030 .spawn_cancellable("update-api-announcements", {
1031 let client_inner = client_inner.clone();
1032 async move {
1033 client_inner
1034 .connectors
1035 .wait_for_initialized_connections()
1036 .await;
1037 run_api_announcement_refresh_task(client_inner.clone()).await
1038 }
1039 });
1040
1041 client_inner
1042 .task_group
1043 .spawn_cancellable("guardian metadata refresh task", {
1044 let client_inner = client_inner.clone();
1045 async move {
1046 client_inner
1047 .connectors
1048 .wait_for_initialized_connections()
1049 .await;
1050 run_guardian_metadata_refresh_task(client_inner.clone()).await
1051 }
1052 });
1053
1054 client_inner
1055 .task_group
1056 .spawn_cancellable("event log ordering task", {
1057 let client_inner = client_inner.clone();
1058 async move {
1059 client_inner
1060 .connectors
1061 .wait_for_initialized_connections()
1062 .await;
1063
1064 run_event_log_ordering_task(
1065 db.clone(),
1066 log_ordering_wakeup_rx,
1067 log_event_added_tx,
1068 log_event_added_transient_tx,
1069 )
1070 .await
1071 }
1072 });
1073
1074 if client_inner
1078 .db
1079 .begin_transaction_nc()
1080 .await
1081 .get_value(&ChainIdKey)
1082 .await
1083 .is_none()
1084 {
1085 client_inner
1086 .task_group
1087 .spawn_cancellable("fetch-chain-id", {
1088 let client_inner = client_inner.clone();
1089 async move {
1090 client_inner.api.wait_for_initialized_connections().await;
1091 match client_inner.api.chain_id().await {
1092 Ok(chain_id) => {
1093 debug!(target: LOG_CLIENT, %chain_id, "Caching chain ID from background fetch");
1094 let mut dbtx = client_inner.db.begin_transaction().await;
1095 dbtx.insert_entry(&ChainIdKey, &chain_id).await;
1096 dbtx.commit_tx().await;
1097 }
1098 Err(err) => {
1099 debug!(target: LOG_CLIENT, err = %err.fmt_compact(), "Background chain ID fetch failed, will retry on next start");
1100 }
1101 }
1102 }
1103 });
1104 }
1105
1106 let client_iface = std::sync::Arc::<Client>::downgrade(&client_inner);
1107
1108 let client_arc = ClientHandle::new(client_inner);
1109
1110 for (_, _, module) in client_arc.modules.iter_modules() {
1111 module.start().await;
1112 }
1113
1114 final_client.set(client_iface.clone());
1115
1116 if !module_recoveries.is_empty() {
1117 client_arc.spawn_module_recoveries_task(
1118 client_recovery_progress_sender,
1119 module_recoveries,
1120 module_recovery_progress_receivers,
1121 );
1122 }
1123
1124 Ok(client_arc)
1125 }
1126
1127 async fn load_init_state(db: &Database) -> InitState {
1128 let mut dbtx = db.begin_transaction_nc().await;
1129 dbtx.get_value(&ClientInitStateKey)
1130 .await
1131 .unwrap_or_else(|| {
1132 warn!(
1135 target: LOG_CLIENT,
1136 "Client missing ClientRequiresRecovery: assuming complete"
1137 );
1138 db::InitState::Complete(db::InitModeComplete::Fresh)
1139 })
1140 }
1141
1142 fn decoders(&self, config: &ClientConfig) -> ModuleDecoderRegistry {
1143 let mut decoders = client_decoders(
1144 &self.module_inits,
1145 config
1146 .modules
1147 .iter()
1148 .map(|(module_instance, module_config)| (*module_instance, module_config.kind())),
1149 );
1150
1151 decoders.register_module(
1152 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
1153 ModuleKind::from_static_str("tx_submission"),
1154 tx_submission_sm_decoder(),
1155 );
1156
1157 decoders
1158 }
1159
1160 fn config_decoded(
1161 config: &ClientConfig,
1162 decoders: &ModuleDecoderRegistry,
1163 ) -> Result<ClientConfig, fedimint_core::encoding::DecodeError> {
1164 config.clone().redecode_raw(decoders)
1165 }
1166
1167 fn federation_root_secret(
1171 pre_root_secret: &DerivableSecret,
1172 config: &ClientConfig,
1173 ) -> DerivableSecret {
1174 pre_root_secret.federation_key(&config.global.calculate_federation_id())
1175 }
1176
1177 pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
1179 self.log_event_added_transient_tx.subscribe()
1180 }
1181
1182 async fn migrate_pending_config_if_present(db: &Database) {
1186 if let Some(pending_config) = Client::get_pending_config_from_db(db).await {
1187 debug!(target: LOG_CLIENT, "Found pending client config, migrating to current config");
1188
1189 let mut dbtx = db.begin_transaction().await;
1190 dbtx.insert_entry(&crate::db::ClientConfigKey, &pending_config)
1192 .await;
1193 dbtx.remove_entry(&PendingClientConfigKey).await;
1195 dbtx.commit_tx().await;
1196
1197 debug!(target: LOG_CLIENT, "Successfully migrated pending config to current config");
1198 }
1199 }
1200
1201 fn load_and_refresh_client_config_static(
1204 config: &ClientConfig,
1205 api: &DynGlobalApi,
1206 db: &Database,
1207 task_group: &TaskGroup,
1208 ) {
1209 let config = config.clone();
1210 let api = api.clone();
1211 let db = db.clone();
1212 let task_group = task_group.clone();
1213
1214 task_group.spawn_cancellable("refresh_client_config_static", async move {
1216 api.wait_for_initialized_connections().await;
1217 Self::refresh_client_config_static(&config, &api, &db).await;
1218 });
1219 }
1220
1221 async fn refresh_client_config_static(
1223 config: &ClientConfig,
1224 api: &DynGlobalApi,
1225 db: &Database,
1226 ) {
1227 if let Err(error) = Self::refresh_client_config_static_try(config, api, db).await {
1228 warn!(
1229 target: LOG_CLIENT,
1230 err = %error.fmt_compact_anyhow(), "Failed to refresh client config"
1231 );
1232 }
1233 }
1234
1235 fn validate_config_update(
1237 current_config: &ClientConfig,
1238 new_config: &ClientConfig,
1239 ) -> anyhow::Result<()> {
1240 if current_config.global != new_config.global {
1242 bail!("Global configuration changes are not allowed in config updates");
1243 }
1244
1245 for (module_id, current_module_config) in ¤t_config.modules {
1247 match new_config.modules.get(module_id) {
1248 Some(new_module_config) => {
1249 if current_module_config != new_module_config {
1250 bail!(
1251 "Module {} configuration changes are not allowed, only additions are permitted",
1252 module_id
1253 );
1254 }
1255 }
1256 None => {
1257 bail!(
1258 "Module {} was removed in new config, only additions are allowed",
1259 module_id
1260 );
1261 }
1262 }
1263 }
1264
1265 Ok(())
1266 }
1267
1268 async fn refresh_client_config_static_try(
1270 current_config: &ClientConfig,
1271 api: &DynGlobalApi,
1272 db: &Database,
1273 ) -> anyhow::Result<()> {
1274 debug!(target: LOG_CLIENT, "Refreshing client config");
1275
1276 let fetched_config = api
1278 .request_current_consensus::<ClientConfig>(
1279 CLIENT_CONFIG_ENDPOINT.to_owned(),
1280 ApiRequestErased::default(),
1281 )
1282 .await?;
1283
1284 Self::validate_config_update(current_config, &fetched_config)?;
1286
1287 if current_config != &fetched_config {
1289 debug!(target: LOG_CLIENT, "Detected federation config change, saving as pending config");
1290
1291 let mut dbtx = db.begin_transaction().await;
1292 dbtx.insert_entry(&PendingClientConfigKey, &fetched_config)
1293 .await;
1294 dbtx.commit_tx().await;
1295 } else {
1296 debug!(target: LOG_CLIENT, "No federation config changes detected");
1297 }
1298
1299 Ok(())
1300 }
1301}
1302
1303pub struct ClientPreview {
1308 inner: ClientBuilder,
1309 config: ClientConfig,
1310 connectors: ConnectorRegistry,
1311 api_secret: Option<String>,
1312 prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
1313 preview_prefetch_api_version_set:
1314 Option<JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>>,
1315 prefetch_chain_id: Option<JitTryAnyhow<ChainId>>,
1316}
1317
1318impl ClientPreview {
1319 pub fn config(&self) -> &ClientConfig {
1321 &self.config
1322 }
1323
1324 pub async fn join(
1403 self,
1404 db_no_decoders: Database,
1405 pre_root_secret: RootSecret,
1406 ) -> anyhow::Result<ClientHandle> {
1407 let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1408
1409 let client = self
1410 .inner
1411 .init(
1412 self.connectors,
1413 db_no_decoders,
1414 pre_root_secret,
1415 self.config,
1416 self.api_secret,
1417 InitMode::Fresh,
1418 self.prefetch_api_announcements,
1419 self.preview_prefetch_api_version_set,
1420 self.prefetch_chain_id,
1421 )
1422 .await?;
1423
1424 Ok(client)
1425 }
1426
1427 pub async fn recover(
1439 self,
1440 db_no_decoders: Database,
1441 pre_root_secret: RootSecret,
1442 backup: Option<ClientBackup>,
1443 ) -> anyhow::Result<ClientHandle> {
1444 let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1445
1446 let client = self
1447 .inner
1448 .init(
1449 self.connectors,
1450 db_no_decoders,
1451 pre_root_secret,
1452 self.config,
1453 self.api_secret,
1454 InitMode::Recover {
1455 snapshot: backup.clone(),
1456 },
1457 self.prefetch_api_announcements,
1458 self.preview_prefetch_api_version_set,
1459 self.prefetch_chain_id,
1460 )
1461 .await?;
1462
1463 Ok(client)
1464 }
1465
1466 #[deprecated(
1468 note = "Recovery is now efficient enough that backups are no longer necessary. Backups will be removed in v0.13.0 due to backups being inherently complicated and brittle."
1469 )]
1470 #[allow(deprecated)]
1471 pub async fn download_backup_from_federation(
1472 &self,
1473 pre_root_secret: RootSecret,
1474 ) -> anyhow::Result<Option<ClientBackup>> {
1475 let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1476 let api = DynGlobalApi::new(
1477 self.connectors.clone(),
1478 self.config
1480 .global
1481 .api_endpoints
1482 .iter()
1483 .map(|(peer_id, peer_url)| (*peer_id, peer_url.url.clone()))
1484 .collect(),
1485 self.api_secret.as_deref(),
1486 )?;
1487
1488 Client::download_backup_from_federation_static(
1489 &api,
1490 &ClientBuilder::federation_root_secret(&pre_root_secret, &self.config),
1491 &self.inner.decoders(&self.config),
1492 )
1493 .await
1494 }
1495}