1use std::collections::BTreeMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::time::Duration;
6
7use anyhow::{bail, ensure};
8use bitcoin::key::Secp256k1;
9use fedimint_api_client::api::global_api::with_cache::GlobalFederationApiWithCacheExt as _;
10use fedimint_api_client::api::global_api::with_request_hook::{
11 ApiRequestHook, RawFederationApiWithRequestHookExt as _,
12};
13use fedimint_api_client::api::net::ConnectorType;
14use fedimint_api_client::api::{ApiVersionSet, DynGlobalApi, FederationApi, FederationApiExt as _};
15use fedimint_client_module::api::ClientRawFederationApiExt as _;
16use fedimint_client_module::meta::LegacyMetaSource;
17use fedimint_client_module::module::init::ClientModuleInit;
18use fedimint_client_module::module::recovery::RecoveryProgress;
19use fedimint_client_module::module::{
20 ClientModuleRegistry, FinalClientIface, PrimaryModulePriority, PrimaryModuleSupport,
21};
22use fedimint_client_module::secret::{DeriveableSecretClientExt as _, get_default_client_secret};
23use fedimint_client_module::transaction::{
24 TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext, tx_submission_sm_decoder,
25};
26use fedimint_client_module::{AdminCreds, ModuleRecoveryStarted};
27use fedimint_connectors::ConnectorRegistry;
28use fedimint_core::config::{ClientConfig, FederationId, ModuleInitRegistry};
29use fedimint_core::core::{ModuleInstanceId, ModuleKind};
30use fedimint_core::db::{
31 Database, IDatabaseTransactionOpsCoreTyped as _, verify_module_db_integrity_dbtx,
32};
33use fedimint_core::endpoint_constants::CLIENT_CONFIG_ENDPOINT;
34use fedimint_core::envs::is_running_in_test_env;
35use fedimint_core::invite_code::InviteCode;
36use fedimint_core::module::registry::ModuleDecoderRegistry;
37use fedimint_core::module::{ApiRequestErased, ApiVersion, SupportedApiVersionsSummary};
38use fedimint_core::task::TaskGroup;
39use fedimint_core::task::jit::{Jit, JitTry, JitTryAnyhow};
40use fedimint_core::util::{FmtCompact as _, FmtCompactAnyhow as _};
41use fedimint_core::{NumPeers, PeerId, fedimint_build_code_version_env, maybe_add_send};
42use fedimint_derive_secret::DerivableSecret;
43use fedimint_eventlog::{
44 DBTransactionEventLogExt as _, EventLogEntry, run_event_log_ordering_task,
45};
46use fedimint_logging::LOG_CLIENT;
47use tokio::sync::{broadcast, watch};
48use tracing::{debug, trace, warn};
49
50use super::handle::ClientHandle;
51use super::{Client, client_decoders};
52use crate::api_announcements::{
53 PeersSignedApiAnnouncements, fetch_api_announcements_from_at_least_num_of_peers, get_api_urls,
54 run_api_announcement_refresh_task, store_api_announcements_updates_from_peers,
55};
56use crate::backup::{ClientBackup, Metadata};
57use crate::client::PrimaryModuleCandidates;
58use crate::db::{
59 self, ApiSecretKey, ClientInitStateKey, ClientMetadataKey, ClientModuleRecovery,
60 ClientModuleRecoveryState, ClientPreRootSecretHashKey, InitMode, InitState,
61 PendingClientConfigKey, apply_migrations_client_module_dbtx,
62};
63use crate::meta::MetaService;
64use crate::module_init::ClientModuleInitRegistry;
65use crate::oplog::OperationLog;
66use crate::sm::executor::Executor;
67use crate::sm::notifier::Notifier;
68
69#[derive(Clone)]
91pub enum RootSecret {
92 StandardDoubleDerive(DerivableSecret),
97 Custom(DerivableSecret),
102}
103
104impl RootSecret {
105 fn to_inner(&self, federation_id: FederationId) -> DerivableSecret {
106 match self {
107 RootSecret::StandardDoubleDerive(derivable_secret) => {
108 get_default_client_secret(derivable_secret, &federation_id)
109 }
110 RootSecret::Custom(derivable_secret) => derivable_secret.clone(),
111 }
112 }
113}
114
115pub struct ClientBuilder {
117 module_inits: ClientModuleInitRegistry,
118 admin_creds: Option<AdminCreds>,
119 meta_service: Arc<crate::meta::MetaService>,
120 connector: ConnectorType,
121 stopped: bool,
122 log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
123 request_hook: ApiRequestHook,
124 iroh_enable_dht: bool,
125 iroh_enable_next: bool,
126}
127
128impl ClientBuilder {
129 pub(crate) fn new() -> Self {
130 trace!(
131 target: LOG_CLIENT,
132 version = %fedimint_build_code_version_env!(),
133 "Initializing fedimint client",
134 );
135 let meta_service = MetaService::new(LegacyMetaSource::default());
136 let (log_event_added_transient_tx, _log_event_added_transient_rx) =
137 broadcast::channel(1024);
138
139 ClientBuilder {
140 module_inits: ModuleInitRegistry::new(),
141 connector: ConnectorType::default(),
142 admin_creds: None,
143 stopped: false,
144 meta_service,
145 log_event_added_transient_tx,
146 request_hook: Arc::new(|api| api),
147 iroh_enable_dht: true,
148 iroh_enable_next: true,
149 }
150 }
151
152 pub(crate) fn from_existing(client: &Client) -> Self {
153 ClientBuilder {
154 module_inits: client.module_inits.clone(),
155 admin_creds: None,
156 stopped: false,
157 meta_service: client.meta_service.clone(),
159 connector: client.connector,
160 log_event_added_transient_tx: client.log_event_added_transient_tx.clone(),
161 request_hook: client.request_hook.clone(),
162 iroh_enable_dht: client.iroh_enable_dht,
163 iroh_enable_next: client.iroh_enable_next,
164 }
165 }
166
167 pub fn with_module_inits(&mut self, module_inits: ClientModuleInitRegistry) {
169 self.module_inits = module_inits;
170 }
171
172 pub fn with_module<M: ClientModuleInit>(&mut self, module_init: M) {
174 self.module_inits.attach(module_init);
175 }
176
177 pub fn stopped(&mut self) {
178 self.stopped = true;
179 }
180 pub fn with_api_request_hook(mut self, hook: ApiRequestHook) -> Self {
189 self.request_hook = hook;
190 self
191 }
192
193 pub fn with_meta_service(&mut self, meta_service: Arc<MetaService>) {
194 self.meta_service = meta_service;
195 }
196
197 pub fn with_iroh_enable_dht(mut self, iroh_enable_dht: bool) -> Self {
200 self.iroh_enable_dht = iroh_enable_dht;
201 self
202 }
203
204 pub fn with_iroh_enable_next(mut self, iroh_enable_next: bool) -> Self {
207 self.iroh_enable_next = iroh_enable_next;
208 self
209 }
210
211 async fn migrate_module_dbs(
218 &self,
219 db: &Database,
220 client_config: &ClientConfig,
221 ) -> anyhow::Result<()> {
222 for (module_id, module_cfg) in &client_config.modules {
223 let kind = module_cfg.kind.clone();
224 let Some(init) = self.module_inits.get(&kind) else {
225 continue;
227 };
228
229 let mut dbtx = db.begin_transaction().await;
230 apply_migrations_client_module_dbtx(
231 &mut dbtx.to_ref_nc(),
232 kind.to_string(),
233 init.get_database_migrations(),
234 *module_id,
235 )
236 .await?;
237 if let Some(used_db_prefixes) = init.used_db_prefixes()
238 && is_running_in_test_env()
239 {
240 verify_module_db_integrity_dbtx(
241 &mut dbtx.to_ref_nc(),
242 *module_id,
243 kind,
244 &used_db_prefixes,
245 )
246 .await;
247 }
248 dbtx.commit_tx_result().await?;
249 }
250
251 Ok(())
252 }
253
254 pub async fn load_existing_config(&self, db: &Database) -> anyhow::Result<ClientConfig> {
255 let Some(config) = Client::get_config_from_db(db).await else {
256 bail!("Client database not initialized")
257 };
258
259 Ok(config)
260 }
261
262 pub fn set_admin_creds(&mut self, creds: AdminCreds) {
263 self.admin_creds = Some(creds);
264 }
265
266 pub fn with_connector(&mut self, connector: ConnectorType) {
267 self.connector = connector;
268 }
269
270 #[cfg(feature = "tor")]
271 pub fn with_tor_connector(&mut self) {
272 self.with_connector(ConnectorType::tor());
273 }
274
275 #[allow(clippy::too_many_arguments)]
276 async fn init(
277 self,
278 connectors: ConnectorRegistry,
279 db_no_decoders: Database,
280 pre_root_secret: DerivableSecret,
281 config: ClientConfig,
282 api_secret: Option<String>,
283 init_mode: InitMode,
284 preview_prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
285 preview_prefetch_api_version_set: Option<
286 JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>,
287 >,
288 ) -> anyhow::Result<ClientHandle> {
289 if Client::is_initialized(&db_no_decoders).await {
290 bail!("Client database already initialized")
291 }
292
293 Client::run_core_migrations(&db_no_decoders).await?;
294
295 {
298 debug!(target: LOG_CLIENT, "Initializing client database");
299 let mut dbtx = db_no_decoders.begin_transaction().await;
300 dbtx.insert_new_entry(&crate::db::ClientConfigKey, &config)
302 .await;
303 dbtx.insert_entry(
304 &ClientPreRootSecretHashKey,
305 &pre_root_secret.derive_pre_root_secret_hash(),
306 )
307 .await;
308
309 if let Some(api_secret) = api_secret.as_ref() {
310 dbtx.insert_new_entry(&ApiSecretKey, api_secret).await;
311 }
312
313 let init_state = InitState::Pending(init_mode);
314 dbtx.insert_entry(&ClientInitStateKey, &init_state).await;
315
316 let metadata = init_state
317 .does_require_recovery()
318 .flatten()
319 .map_or(Metadata::empty(), |s| s.metadata);
320
321 dbtx.insert_new_entry(&ClientMetadataKey, &metadata).await;
322
323 dbtx.commit_tx_result().await?;
324 }
325
326 let stopped = self.stopped;
327 self.build(
328 connectors,
329 db_no_decoders,
330 pre_root_secret,
331 config,
332 api_secret,
333 stopped,
334 preview_prefetch_api_announcements,
335 preview_prefetch_api_version_set,
336 )
337 .await
338 }
339
340 pub async fn preview(
341 self,
342 connectors: ConnectorRegistry,
343 invite_code: &InviteCode,
344 ) -> anyhow::Result<ClientPreview> {
345 let (config, api) = self
346 .connector
347 .download_from_invite_code(&connectors, invite_code)
348 .await?;
349
350 let prefetch_api_announcements =
351 config
352 .global
353 .broadcast_public_keys
354 .clone()
355 .map(|guardian_pub_keys| {
356 Jit::new({
357 let api = api.clone();
358 move || async move {
359 fetch_api_announcements_from_at_least_num_of_peers(
363 1,
364 &api,
365 &guardian_pub_keys,
366 Duration::from_millis(20),
369 )
370 .await
371 }
372 })
373 });
374
375 self.preview_inner(
376 connectors,
377 config,
378 invite_code.api_secret(),
379 Some(api),
380 prefetch_api_announcements,
381 )
382 .await
383 }
384
385 pub async fn preview_with_existing_config(
390 self,
391 connectors: ConnectorRegistry,
392 config: ClientConfig,
393 api_secret: Option<String>,
394 ) -> anyhow::Result<ClientPreview> {
395 self.preview_inner(connectors, config, api_secret, None, None)
396 .await
397 }
398
399 async fn preview_inner(
400 self,
401 connectors: ConnectorRegistry,
402 config: ClientConfig,
403 api_secret: Option<String>,
404 prefetch_api: Option<DynGlobalApi>,
405 prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
406 ) -> anyhow::Result<ClientPreview> {
407 let preview_prefetch_api_version_set = prefetch_api.map(|api| {
408 JitTry::new_try({
409 let config = config.clone();
410 || async move { Client::fetch_common_api_versions(&config, &api).await }
411 })
412 });
413
414 Ok(ClientPreview {
415 connectors,
416 inner: self,
417 config,
418 api_secret,
419 prefetch_api_announcements,
420 preview_prefetch_api_version_set,
421 })
422 }
423
424 pub async fn open(
425 self,
426 connectors: ConnectorRegistry,
427 db_no_decoders: Database,
428 pre_root_secret: RootSecret,
429 ) -> anyhow::Result<ClientHandle> {
430 Client::run_core_migrations(&db_no_decoders).await?;
431
432 Self::migrate_pending_config_if_present(&db_no_decoders).await;
434
435 let Some(config) = Client::get_config_from_db(&db_no_decoders).await else {
436 bail!("Client database not initialized")
437 };
438
439 let pre_root_secret = pre_root_secret.to_inner(config.calculate_federation_id());
440
441 match db_no_decoders
442 .begin_transaction_nc()
443 .await
444 .get_value(&ClientPreRootSecretHashKey)
445 .await
446 {
447 Some(secret_hash) => {
448 ensure!(
449 pre_root_secret.derive_pre_root_secret_hash() == secret_hash,
450 "Secret hash does not match. Incorrect secret"
451 );
452 }
453 _ => {
454 debug!(target: LOG_CLIENT, "Backfilling secret hash");
455 let mut dbtx = db_no_decoders.begin_transaction().await;
457 dbtx.insert_entry(
458 &ClientPreRootSecretHashKey,
459 &pre_root_secret.derive_pre_root_secret_hash(),
460 )
461 .await;
462 dbtx.commit_tx().await;
463 }
464 }
465
466 let api_secret = Client::get_api_secret_from_db(&db_no_decoders).await;
467 let stopped = self.stopped;
468 let request_hook = self.request_hook.clone();
469
470 let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
471 let client = self
472 .build_stopped(
473 connectors,
474 db_no_decoders,
475 pre_root_secret,
476 &config,
477 api_secret,
478 log_event_added_transient_tx,
479 request_hook,
480 None,
481 None,
482 )
483 .await?;
484 if !stopped {
485 client.as_inner().start_executor();
486 }
487 Ok(client)
488 }
489
490 #[allow(clippy::too_many_arguments)]
492 pub(crate) async fn build(
493 self,
494 connectors: ConnectorRegistry,
495 db_no_decoders: Database,
496 pre_root_secret: DerivableSecret,
497 config: ClientConfig,
498 api_secret: Option<String>,
499 stopped: bool,
500 preview_prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
501 preview_prefetch_api_version_set: Option<
502 JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>,
503 >,
504 ) -> anyhow::Result<ClientHandle> {
505 let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
506 let request_hook = self.request_hook.clone();
507 let client = self
508 .build_stopped(
509 connectors,
510 db_no_decoders,
511 pre_root_secret,
512 &config,
513 api_secret,
514 log_event_added_transient_tx,
515 request_hook,
516 preview_prefetch_api_announcements,
517 preview_prefetch_api_version_set,
518 )
519 .await?;
520 if !stopped {
521 client.as_inner().start_executor();
522 }
523
524 Ok(client)
525 }
526
527 #[allow(clippy::too_many_arguments)]
530 async fn build_stopped(
531 self,
532 connectors: ConnectorRegistry,
533 db_no_decoders: Database,
534 pre_root_secret: DerivableSecret,
535 config: &ClientConfig,
536 api_secret: Option<String>,
537 log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
538 request_hook: ApiRequestHook,
539 preview_prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
540 preview_prefetch_api_version_set: Option<
541 JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>,
542 >,
543 ) -> anyhow::Result<ClientHandle> {
544 debug!(
545 target: LOG_CLIENT,
546 version = %fedimint_build_code_version_env!(),
547 "Building fedimint client",
548 );
549 let (log_event_added_tx, log_event_added_rx) = watch::channel(());
550 let (log_ordering_wakeup_tx, log_ordering_wakeup_rx) = watch::channel(());
551
552 let decoders = self.decoders(config);
553 let config = Self::config_decoded(config, &decoders)?;
554 let fed_id = config.calculate_federation_id();
555 let db = db_no_decoders.with_decoders(decoders.clone());
556 let connector = self.connector;
557 let peer_urls = get_api_urls(&db, &config).await;
558 let api = match self.admin_creds.as_ref() {
559 Some(admin_creds) => FederationApi::new(
560 connectors.clone(),
561 peer_urls,
562 Some(admin_creds.peer_id),
563 Some(&admin_creds.auth.0),
564 )
565 .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
566 .with_request_hook(&request_hook)
567 .with_cache()
568 .into(),
569 None => FederationApi::new(connectors.clone(), peer_urls, None, api_secret.as_deref())
570 .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
571 .with_request_hook(&request_hook)
572 .with_cache()
573 .into(),
574 };
575
576 let task_group = TaskGroup::new();
577
578 self.migrate_module_dbs(&db, &config).await?;
581
582 let init_state = Self::load_init_state(&db).await;
583
584 let notifier = Notifier::new();
585
586 if let Some(p) = preview_prefetch_api_announcements {
587 let announcements = p.get().await;
591
592 store_api_announcements_updates_from_peers(&db, announcements).await?
593 }
594
595 if let Some(preview_prefetch_api_version_set) = preview_prefetch_api_version_set {
596 match preview_prefetch_api_version_set.get_try().await {
597 Ok(peer_api_versions) => {
598 Client::store_prefetched_api_versions(
599 &db,
600 &config,
601 &self.module_inits,
602 peer_api_versions,
603 )
604 .await;
605 }
606 Err(err) => {
607 debug!(target: LOG_CLIENT, err = %err.fmt_compact(), "Prefetching api version negotiation failed");
608 }
609 }
610 }
611
612 let common_api_versions = Client::load_and_refresh_common_api_version_static(
613 &config,
614 &self.module_inits,
615 connectors.clone(),
616 &api,
617 &db,
618 &task_group,
619 )
620 .await
621 .inspect_err(|err| {
622 warn!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Failed to discover API version to use.");
623 })
624 .unwrap_or(ApiVersionSet {
625 core: ApiVersion::new(0, 0),
626 modules: BTreeMap::new(),
628 });
629
630 debug!(target: LOG_CLIENT, ?common_api_versions, "Completed api version negotiation");
631
632 Self::load_and_refresh_client_config_static(&config, &api, &db, &task_group);
634
635 let mut module_recoveries: BTreeMap<
636 ModuleInstanceId,
637 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
638 > = BTreeMap::new();
639 let mut module_recovery_progress_receivers: BTreeMap<
640 ModuleInstanceId,
641 watch::Receiver<RecoveryProgress>,
642 > = BTreeMap::new();
643
644 let final_client = FinalClientIface::default();
645
646 let root_secret = Self::federation_root_secret(&pre_root_secret, &config);
647
648 let modules = {
649 let mut modules = ClientModuleRegistry::default();
650 for (module_instance_id, module_config) in config.modules.clone() {
651 let kind = module_config.kind().clone();
652 let Some(module_init) = self.module_inits.get(&kind).cloned() else {
653 debug!(
654 target: LOG_CLIENT,
655 kind=%kind,
656 instance_id=%module_instance_id,
657 "Module kind of instance not found in module gens, skipping");
658 continue;
659 };
660
661 let Some(&api_version) = common_api_versions.modules.get(&module_instance_id)
662 else {
663 warn!(
664 target: LOG_CLIENT,
665 kind=%kind,
666 instance_id=%module_instance_id,
667 "Module kind of instance has incompatible api version, skipping"
668 );
669 continue;
670 };
671
672 let start_module_recover_fn =
675 |snapshot: Option<ClientBackup>, progress: RecoveryProgress| {
676 let module_config = module_config.clone();
677 let num_peers = NumPeers::from(config.global.api_endpoints.len());
678 let db = db.clone();
679 let kind = kind.clone();
680 let notifier = notifier.clone();
681 let api = api.clone();
682 let root_secret = root_secret.clone();
683 let admin_auth = self.admin_creds.as_ref().map(|creds| creds.auth.clone());
684 let final_client = final_client.clone();
685 let (progress_tx, progress_rx) = tokio::sync::watch::channel(progress);
686 let task_group = task_group.clone();
687 let module_init = module_init.clone();
688 (
689 Box::pin(async move {
690 module_init
691 .recover(
692 final_client.clone(),
693 fed_id,
694 num_peers,
695 module_config.clone(),
696 db.clone(),
697 module_instance_id,
698 common_api_versions.core,
699 api_version,
700 root_secret.derive_module_secret(module_instance_id),
701 notifier.clone(),
702 api.clone(),
703 admin_auth,
704 snapshot.as_ref().and_then(|s| s.modules.get(&module_instance_id)),
705 progress_tx,
706 task_group,
707 )
708 .await
709 .inspect_err(|err| {
710 warn!(
711 target: LOG_CLIENT,
712 module_id = module_instance_id, %kind, err = %err.fmt_compact_anyhow(), "Module failed to recover"
713 );
714 })
715 }),
716 progress_rx,
717 )
718 };
719
720 let recovery = match init_state.does_require_recovery() {
721 Some(snapshot) => {
722 match db
723 .begin_transaction_nc()
724 .await
725 .get_value(&ClientModuleRecovery { module_instance_id })
726 .await
727 {
728 Some(module_recovery_state) => {
729 if module_recovery_state.is_done() {
730 debug!(
731 id = %module_instance_id,
732 %kind, "Module recovery already complete"
733 );
734 None
735 } else {
736 debug!(
737 id = %module_instance_id,
738 %kind,
739 progress = %module_recovery_state.progress,
740 "Starting module recovery with an existing progress"
741 );
742 Some(start_module_recover_fn(
743 snapshot,
744 module_recovery_state.progress,
745 ))
746 }
747 }
748 _ => {
749 let progress = RecoveryProgress::none();
750 let mut dbtx = db.begin_transaction().await;
751 dbtx.log_event(
752 log_ordering_wakeup_tx.clone(),
753 None,
754 ModuleRecoveryStarted::new(module_instance_id),
755 )
756 .await;
757 dbtx.insert_entry(
758 &ClientModuleRecovery { module_instance_id },
759 &ClientModuleRecoveryState { progress },
760 )
761 .await;
762
763 dbtx.commit_tx().await;
764
765 debug!(
766 id = %module_instance_id,
767 %kind, "Starting new module recovery"
768 );
769 Some(start_module_recover_fn(snapshot, progress))
770 }
771 }
772 }
773 _ => None,
774 };
775
776 match recovery {
777 Some((recovery, recovery_progress_rx)) => {
778 module_recoveries.insert(module_instance_id, recovery);
779 module_recovery_progress_receivers
780 .insert(module_instance_id, recovery_progress_rx);
781 }
782 _ => {
783 let module = module_init
784 .init(
785 final_client.clone(),
786 fed_id,
787 config.global.api_endpoints.len(),
788 module_config,
789 db.clone(),
790 module_instance_id,
791 common_api_versions.core,
792 api_version,
793 root_secret.derive_module_secret(module_instance_id),
800 notifier.clone(),
801 api.clone(),
802 self.admin_creds.as_ref().map(|cred| cred.auth.clone()),
803 task_group.clone(),
804 connectors.clone(),
805 )
806 .await?;
807
808 modules.register_module(module_instance_id, kind, module);
809 }
810 }
811 }
812 modules
813 };
814
815 if init_state.is_pending() && module_recoveries.is_empty() {
816 let mut dbtx = db.begin_transaction().await;
817 dbtx.insert_entry(&ClientInitStateKey, &init_state.into_complete())
818 .await;
819 dbtx.commit_tx().await;
820 }
821
822 let mut primary_modules: BTreeMap<PrimaryModulePriority, PrimaryModuleCandidates> =
823 BTreeMap::new();
824
825 for (module_id, _kind, module) in modules.iter_modules() {
826 match module.supports_being_primary() {
827 PrimaryModuleSupport::Any { priority } => {
828 primary_modules
829 .entry(priority)
830 .or_default()
831 .wildcard
832 .push(module_id);
833 }
834 PrimaryModuleSupport::Selected { priority, units } => {
835 for unit in units {
836 primary_modules
837 .entry(priority)
838 .or_default()
839 .specific
840 .entry(unit)
841 .or_default()
842 .push(module_id);
843 }
844 }
845 PrimaryModuleSupport::None => {}
846 }
847 }
848
849 let executor = {
850 let mut executor_builder = Executor::builder();
851 executor_builder
852 .with_module(TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext);
853
854 for (module_instance_id, _, module) in modules.iter_modules() {
855 executor_builder.with_module_dyn(module.context(module_instance_id));
856 }
857
858 for module_instance_id in module_recoveries.keys() {
859 executor_builder.with_valid_module_id(*module_instance_id);
860 }
861
862 executor_builder.build(
863 db.clone(),
864 notifier,
865 task_group.clone(),
866 log_ordering_wakeup_tx.clone(),
867 )
868 };
869
870 let recovery_receiver_init_val = module_recovery_progress_receivers
871 .iter()
872 .map(|(module_instance_id, rx)| (*module_instance_id, *rx.borrow()))
873 .collect::<BTreeMap<_, _>>();
874 let (client_recovery_progress_sender, client_recovery_progress_receiver) =
875 watch::channel(recovery_receiver_init_val);
876
877 let client_inner = Arc::new(Client {
878 final_client: final_client.clone(),
879 config: tokio::sync::RwLock::new(config.clone()),
880 api_secret,
881 decoders,
882 db: db.clone(),
883 connectors,
884 federation_id: fed_id,
885 federation_config_meta: config.global.meta,
886 primary_modules,
887 modules,
888 module_inits: self.module_inits.clone(),
889 log_ordering_wakeup_tx,
890 log_event_added_rx,
891 log_event_added_transient_tx: log_event_added_transient_tx.clone(),
892 request_hook,
893 executor,
894 api,
895 secp_ctx: Secp256k1::new(),
896 root_secret,
897 task_group,
898 operation_log: OperationLog::new(db.clone()),
899 client_recovery_progress_receiver,
900 meta_service: self.meta_service,
901 connector,
902 iroh_enable_dht: self.iroh_enable_dht,
903 iroh_enable_next: self.iroh_enable_next,
904 });
905 client_inner
906 .task_group
907 .spawn_cancellable("MetaService::update_continuously", {
908 let client_inner = client_inner.clone();
909 async move {
910 client_inner
911 .meta_service
912 .update_continuously(&client_inner)
913 .await;
914 }
915 });
916
917 client_inner
918 .task_group
919 .spawn_cancellable("update-api-announcements", {
920 let client_inner = client_inner.clone();
921 async move {
922 client_inner
923 .connectors
924 .wait_for_initialized_connections()
925 .await;
926 run_api_announcement_refresh_task(client_inner.clone()).await
927 }
928 });
929
930 client_inner
931 .task_group
932 .spawn_cancellable("event log ordering task", {
933 let client_inner = client_inner.clone();
934 async move {
935 client_inner
936 .connectors
937 .wait_for_initialized_connections()
938 .await;
939
940 run_event_log_ordering_task(
941 db.clone(),
942 log_ordering_wakeup_rx,
943 log_event_added_tx,
944 log_event_added_transient_tx,
945 )
946 .await
947 }
948 });
949 let client_iface = std::sync::Arc::<Client>::downgrade(&client_inner);
950
951 let client_arc = ClientHandle::new(client_inner);
952
953 for (_, _, module) in client_arc.modules.iter_modules() {
954 module.start().await;
955 }
956
957 final_client.set(client_iface.clone());
958
959 if !module_recoveries.is_empty() {
960 client_arc.spawn_module_recoveries_task(
961 client_recovery_progress_sender,
962 module_recoveries,
963 module_recovery_progress_receivers,
964 );
965 }
966
967 Ok(client_arc)
968 }
969
970 async fn load_init_state(db: &Database) -> InitState {
971 let mut dbtx = db.begin_transaction_nc().await;
972 dbtx.get_value(&ClientInitStateKey)
973 .await
974 .unwrap_or_else(|| {
975 warn!(
978 target: LOG_CLIENT,
979 "Client missing ClientRequiresRecovery: assuming complete"
980 );
981 db::InitState::Complete(db::InitModeComplete::Fresh)
982 })
983 }
984
985 fn decoders(&self, config: &ClientConfig) -> ModuleDecoderRegistry {
986 let mut decoders = client_decoders(
987 &self.module_inits,
988 config
989 .modules
990 .iter()
991 .map(|(module_instance, module_config)| (*module_instance, module_config.kind())),
992 );
993
994 decoders.register_module(
995 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
996 ModuleKind::from_static_str("tx_submission"),
997 tx_submission_sm_decoder(),
998 );
999
1000 decoders
1001 }
1002
1003 fn config_decoded(
1004 config: &ClientConfig,
1005 decoders: &ModuleDecoderRegistry,
1006 ) -> Result<ClientConfig, fedimint_core::encoding::DecodeError> {
1007 config.clone().redecode_raw(decoders)
1008 }
1009
1010 fn federation_root_secret(
1014 pre_root_secret: &DerivableSecret,
1015 config: &ClientConfig,
1016 ) -> DerivableSecret {
1017 pre_root_secret.federation_key(&config.global.calculate_federation_id())
1018 }
1019
1020 pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
1022 self.log_event_added_transient_tx.subscribe()
1023 }
1024
1025 async fn migrate_pending_config_if_present(db: &Database) {
1029 if let Some(pending_config) = Client::get_pending_config_from_db(db).await {
1030 debug!(target: LOG_CLIENT, "Found pending client config, migrating to current config");
1031
1032 let mut dbtx = db.begin_transaction().await;
1033 dbtx.insert_entry(&crate::db::ClientConfigKey, &pending_config)
1035 .await;
1036 dbtx.remove_entry(&PendingClientConfigKey).await;
1038 dbtx.commit_tx().await;
1039
1040 debug!(target: LOG_CLIENT, "Successfully migrated pending config to current config");
1041 }
1042 }
1043
1044 fn load_and_refresh_client_config_static(
1047 config: &ClientConfig,
1048 api: &DynGlobalApi,
1049 db: &Database,
1050 task_group: &TaskGroup,
1051 ) {
1052 let config = config.clone();
1053 let api = api.clone();
1054 let db = db.clone();
1055 let task_group = task_group.clone();
1056
1057 task_group.spawn_cancellable("refresh_client_config_static", async move {
1059 Self::refresh_client_config_static(&config, &api, &db).await;
1060 });
1061 }
1062
1063 async fn refresh_client_config_static(
1065 config: &ClientConfig,
1066 api: &DynGlobalApi,
1067 db: &Database,
1068 ) {
1069 if let Err(error) = Self::refresh_client_config_static_try(config, api, db).await {
1070 warn!(
1071 target: LOG_CLIENT,
1072 err = %error.fmt_compact_anyhow(), "Failed to refresh client config"
1073 );
1074 }
1075 }
1076
1077 fn validate_config_update(
1079 current_config: &ClientConfig,
1080 new_config: &ClientConfig,
1081 ) -> anyhow::Result<()> {
1082 if current_config.global != new_config.global {
1084 bail!("Global configuration changes are not allowed in config updates");
1085 }
1086
1087 for (module_id, current_module_config) in ¤t_config.modules {
1089 match new_config.modules.get(module_id) {
1090 Some(new_module_config) => {
1091 if current_module_config != new_module_config {
1092 bail!(
1093 "Module {} configuration changes are not allowed, only additions are permitted",
1094 module_id
1095 );
1096 }
1097 }
1098 None => {
1099 bail!(
1100 "Module {} was removed in new config, only additions are allowed",
1101 module_id
1102 );
1103 }
1104 }
1105 }
1106
1107 Ok(())
1108 }
1109
1110 async fn refresh_client_config_static_try(
1112 current_config: &ClientConfig,
1113 api: &DynGlobalApi,
1114 db: &Database,
1115 ) -> anyhow::Result<()> {
1116 debug!(target: LOG_CLIENT, "Refreshing client config");
1117
1118 let fetched_config = api
1120 .request_current_consensus::<ClientConfig>(
1121 CLIENT_CONFIG_ENDPOINT.to_owned(),
1122 ApiRequestErased::default(),
1123 )
1124 .await?;
1125
1126 Self::validate_config_update(current_config, &fetched_config)?;
1128
1129 if current_config != &fetched_config {
1131 debug!(target: LOG_CLIENT, "Detected federation config change, saving as pending config");
1132
1133 let mut dbtx = db.begin_transaction().await;
1134 dbtx.insert_entry(&PendingClientConfigKey, &fetched_config)
1135 .await;
1136 dbtx.commit_tx().await;
1137 } else {
1138 debug!(target: LOG_CLIENT, "No federation config changes detected");
1139 }
1140
1141 Ok(())
1142 }
1143}
1144
1145pub struct ClientPreview {
1150 inner: ClientBuilder,
1151 config: ClientConfig,
1152 connectors: ConnectorRegistry,
1153 api_secret: Option<String>,
1154 prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
1155 preview_prefetch_api_version_set:
1156 Option<JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>>,
1157}
1158
1159impl ClientPreview {
1160 pub fn config(&self) -> &ClientConfig {
1162 &self.config
1163 }
1164
1165 pub async fn join(
1244 self,
1245 db_no_decoders: Database,
1246 pre_root_secret: RootSecret,
1247 ) -> anyhow::Result<ClientHandle> {
1248 let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1249
1250 let client = self
1251 .inner
1252 .init(
1253 self.connectors,
1254 db_no_decoders,
1255 pre_root_secret,
1256 self.config,
1257 self.api_secret,
1258 InitMode::Fresh,
1259 self.prefetch_api_announcements,
1260 self.preview_prefetch_api_version_set,
1261 )
1262 .await?;
1263
1264 Ok(client)
1265 }
1266
1267 pub async fn recover(
1279 self,
1280 db_no_decoders: Database,
1281 pre_root_secret: RootSecret,
1282 backup: Option<ClientBackup>,
1283 ) -> anyhow::Result<ClientHandle> {
1284 let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1285
1286 let client = self
1287 .inner
1288 .init(
1289 self.connectors,
1290 db_no_decoders,
1291 pre_root_secret,
1292 self.config,
1293 self.api_secret,
1294 InitMode::Recover {
1295 snapshot: backup.clone(),
1296 },
1297 self.prefetch_api_announcements,
1298 self.preview_prefetch_api_version_set,
1299 )
1300 .await?;
1301
1302 Ok(client)
1303 }
1304
1305 pub async fn download_backup_from_federation(
1307 &self,
1308 pre_root_secret: RootSecret,
1309 ) -> anyhow::Result<Option<ClientBackup>> {
1310 let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1311 let api = DynGlobalApi::new(
1312 self.connectors.clone(),
1313 self.config
1315 .global
1316 .api_endpoints
1317 .iter()
1318 .map(|(peer_id, peer_url)| (*peer_id, peer_url.url.clone()))
1319 .collect(),
1320 self.api_secret.as_deref(),
1321 )?;
1322
1323 Client::download_backup_from_federation_static(
1324 &api,
1325 &ClientBuilder::federation_root_secret(&pre_root_secret, &self.config),
1326 &self.inner.decoders(&self.config),
1327 )
1328 .await
1329 }
1330}