1use std::collections::BTreeMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::time::Duration;
6
7use anyhow::{bail, ensure};
8use bitcoin::key::Secp256k1;
9use fedimint_api_client::api::global_api::with_cache::GlobalFederationApiWithCacheExt as _;
10use fedimint_api_client::api::global_api::with_request_hook::{
11 ApiRequestHook, RawFederationApiWithRequestHookExt as _,
12};
13use fedimint_api_client::api::net::ConnectorType;
14use fedimint_api_client::api::{ApiVersionSet, DynGlobalApi, FederationApi, FederationApiExt as _};
15use fedimint_client_module::api::ClientRawFederationApiExt as _;
16use fedimint_client_module::meta::LegacyMetaSource;
17use fedimint_client_module::module::init::ClientModuleInit;
18use fedimint_client_module::module::recovery::RecoveryProgress;
19use fedimint_client_module::module::{
20 ClientModuleRegistry, FinalClientIface, PrimaryModulePriority, PrimaryModuleSupport,
21};
22use fedimint_client_module::secret::{DeriveableSecretClientExt as _, get_default_client_secret};
23use fedimint_client_module::transaction::{
24 TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext, tx_submission_sm_decoder,
25};
26use fedimint_client_module::{AdminCreds, ModuleRecoveryStarted};
27use fedimint_connectors::ConnectorRegistry;
28use fedimint_core::config::{ClientConfig, FederationId, ModuleInitRegistry};
29use fedimint_core::core::{ModuleInstanceId, ModuleKind};
30use fedimint_core::db::{
31 Database, IDatabaseTransactionOpsCoreTyped as _, verify_module_db_integrity_dbtx,
32};
33use fedimint_core::endpoint_constants::CLIENT_CONFIG_ENDPOINT;
34use fedimint_core::envs::is_running_in_test_env;
35use fedimint_core::invite_code::InviteCode;
36use fedimint_core::module::registry::ModuleDecoderRegistry;
37use fedimint_core::module::{ApiRequestErased, ApiVersion, SupportedApiVersionsSummary};
38use fedimint_core::task::TaskGroup;
39use fedimint_core::task::jit::{Jit, JitTry, JitTryAnyhow};
40use fedimint_core::util::{FmtCompact as _, FmtCompactAnyhow as _};
41use fedimint_core::{NumPeers, PeerId, fedimint_build_code_version_env, maybe_add_send};
42use fedimint_derive_secret::DerivableSecret;
43use fedimint_eventlog::{
44 DBTransactionEventLogExt as _, EventLogEntry, run_event_log_ordering_task,
45};
46use fedimint_logging::LOG_CLIENT;
47use tokio::sync::{broadcast, watch};
48use tracing::{debug, trace, warn};
49
50use super::handle::ClientHandle;
51use super::{Client, client_decoders};
52use crate::api_announcements::{
53 PeersSignedApiAnnouncements, fetch_api_announcements_from_at_least_num_of_peers, get_api_urls,
54 run_api_announcement_refresh_task, store_api_announcements_updates_from_peers,
55};
56use crate::backup::{ClientBackup, Metadata};
57use crate::client::PrimaryModuleCandidates;
58use crate::db::{
59 self, ApiSecretKey, ClientInitStateKey, ClientMetadataKey, ClientModuleRecovery,
60 ClientModuleRecoveryState, ClientPreRootSecretHashKey, InitMode, InitState,
61 PendingClientConfigKey, apply_migrations_client_module_dbtx,
62};
63use crate::meta::MetaService;
64use crate::module_init::ClientModuleInitRegistry;
65use crate::oplog::OperationLog;
66use crate::sm::executor::Executor;
67use crate::sm::notifier::Notifier;
68
69#[derive(Clone)]
91pub enum RootSecret {
92 StandardDoubleDerive(DerivableSecret),
97 Custom(DerivableSecret),
102}
103
104impl RootSecret {
105 fn to_inner(&self, federation_id: FederationId) -> DerivableSecret {
106 match self {
107 RootSecret::StandardDoubleDerive(derivable_secret) => {
108 get_default_client_secret(derivable_secret, &federation_id)
109 }
110 RootSecret::Custom(derivable_secret) => derivable_secret.clone(),
111 }
112 }
113}
114
115pub struct ClientBuilder {
117 module_inits: ClientModuleInitRegistry,
118 admin_creds: Option<AdminCreds>,
119 meta_service: Arc<crate::meta::MetaService>,
120 connector: ConnectorType,
121 stopped: bool,
122 log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
123 request_hook: ApiRequestHook,
124 iroh_enable_dht: bool,
125 iroh_enable_next: bool,
126}
127
128impl ClientBuilder {
129 pub(crate) fn new() -> Self {
130 trace!(
131 target: LOG_CLIENT,
132 version = %fedimint_build_code_version_env!(),
133 "Initializing fedimint client",
134 );
135 let meta_service = MetaService::new(LegacyMetaSource::default());
136 let (log_event_added_transient_tx, _log_event_added_transient_rx) =
137 broadcast::channel(1024);
138
139 ClientBuilder {
140 module_inits: ModuleInitRegistry::new(),
141 connector: ConnectorType::default(),
142 admin_creds: None,
143 stopped: false,
144 meta_service,
145 log_event_added_transient_tx,
146 request_hook: Arc::new(|api| api),
147 iroh_enable_dht: true,
148 iroh_enable_next: true,
149 }
150 }
151
152 pub(crate) fn from_existing(client: &Client) -> Self {
153 ClientBuilder {
154 module_inits: client.module_inits.clone(),
155 admin_creds: None,
156 stopped: false,
157 meta_service: client.meta_service.clone(),
159 connector: client.connector,
160 log_event_added_transient_tx: client.log_event_added_transient_tx.clone(),
161 request_hook: client.request_hook.clone(),
162 iroh_enable_dht: client.iroh_enable_dht,
163 iroh_enable_next: client.iroh_enable_next,
164 }
165 }
166
167 pub fn with_module_inits(&mut self, module_inits: ClientModuleInitRegistry) {
169 self.module_inits = module_inits;
170 }
171
172 pub fn with_module<M: ClientModuleInit>(&mut self, module_init: M) {
174 self.module_inits.attach(module_init);
175 }
176
177 pub fn stopped(&mut self) {
178 self.stopped = true;
179 }
180 pub fn with_api_request_hook(mut self, hook: ApiRequestHook) -> Self {
189 self.request_hook = hook;
190 self
191 }
192
193 pub fn with_meta_service(&mut self, meta_service: Arc<MetaService>) {
194 self.meta_service = meta_service;
195 }
196
197 pub fn with_iroh_enable_dht(mut self, iroh_enable_dht: bool) -> Self {
200 self.iroh_enable_dht = iroh_enable_dht;
201 self
202 }
203
204 pub fn with_iroh_enable_next(mut self, iroh_enable_next: bool) -> Self {
207 self.iroh_enable_next = iroh_enable_next;
208 self
209 }
210
211 async fn migrate_module_dbs(
218 &self,
219 db: &Database,
220 client_config: &ClientConfig,
221 ) -> anyhow::Result<()> {
222 for (module_id, module_cfg) in &client_config.modules {
223 let kind = module_cfg.kind.clone();
224 let Some(init) = self.module_inits.get(&kind) else {
225 continue;
227 };
228
229 let mut dbtx = db.begin_transaction().await;
230 apply_migrations_client_module_dbtx(
231 &mut dbtx.to_ref_nc(),
232 kind.to_string(),
233 init.get_database_migrations(),
234 *module_id,
235 )
236 .await?;
237 if let Some(used_db_prefixes) = init.used_db_prefixes()
238 && is_running_in_test_env()
239 {
240 verify_module_db_integrity_dbtx(
241 &mut dbtx.to_ref_nc(),
242 *module_id,
243 kind,
244 &used_db_prefixes,
245 )
246 .await;
247 }
248 dbtx.commit_tx_result().await?;
249 }
250
251 Ok(())
252 }
253
254 pub async fn load_existing_config(&self, db: &Database) -> anyhow::Result<ClientConfig> {
255 let Some(config) = Client::get_config_from_db(db).await else {
256 bail!("Client database not initialized")
257 };
258
259 Ok(config)
260 }
261
262 pub fn set_admin_creds(&mut self, creds: AdminCreds) {
263 self.admin_creds = Some(creds);
264 }
265
266 pub fn with_connector(&mut self, connector: ConnectorType) {
267 self.connector = connector;
268 }
269
270 #[cfg(feature = "tor")]
271 pub fn with_tor_connector(&mut self) {
272 self.with_connector(ConnectorType::tor());
273 }
274
275 #[allow(clippy::too_many_arguments)]
276 async fn init(
277 self,
278 connectors: ConnectorRegistry,
279 db_no_decoders: Database,
280 pre_root_secret: DerivableSecret,
281 config: ClientConfig,
282 api_secret: Option<String>,
283 init_mode: InitMode,
284 preview_prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
285 preview_prefetch_api_version_set: Option<
286 JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>,
287 >,
288 ) -> anyhow::Result<ClientHandle> {
289 if Client::is_initialized(&db_no_decoders).await {
290 bail!("Client database already initialized")
291 }
292
293 Client::run_core_migrations(&db_no_decoders).await?;
294
295 {
298 debug!(target: LOG_CLIENT, "Initializing client database");
299 let mut dbtx = db_no_decoders.begin_transaction().await;
300 dbtx.insert_new_entry(&crate::db::ClientConfigKey, &config)
302 .await;
303 dbtx.insert_entry(
304 &ClientPreRootSecretHashKey,
305 &pre_root_secret.derive_pre_root_secret_hash(),
306 )
307 .await;
308
309 if let Some(api_secret) = api_secret.as_ref() {
310 dbtx.insert_new_entry(&ApiSecretKey, api_secret).await;
311 }
312
313 let init_state = InitState::Pending(init_mode);
314 dbtx.insert_entry(&ClientInitStateKey, &init_state).await;
315
316 let metadata = init_state
317 .does_require_recovery()
318 .flatten()
319 .map_or(Metadata::empty(), |s| s.metadata);
320
321 dbtx.insert_new_entry(&ClientMetadataKey, &metadata).await;
322
323 dbtx.commit_tx_result().await?;
324 }
325
326 let stopped = self.stopped;
327 self.build(
328 connectors,
329 db_no_decoders,
330 pre_root_secret,
331 config,
332 api_secret,
333 stopped,
334 preview_prefetch_api_announcements,
335 preview_prefetch_api_version_set,
336 )
337 .await
338 }
339
340 pub async fn preview(
341 self,
342 connectors: ConnectorRegistry,
343 invite_code: &InviteCode,
344 ) -> anyhow::Result<ClientPreview> {
345 let (config, api) = self
346 .connector
347 .download_from_invite_code(&connectors, invite_code)
348 .await?;
349
350 let prefetch_api_announcements =
351 config
352 .global
353 .broadcast_public_keys
354 .clone()
355 .map(|guardian_pub_keys| {
356 Jit::new({
357 let api = api.clone();
358 move || async move {
359 fetch_api_announcements_from_at_least_num_of_peers(
363 1,
364 &api,
365 &guardian_pub_keys,
366 Duration::from_millis(20),
369 )
370 .await
371 }
372 })
373 });
374
375 self.preview_inner(
376 connectors,
377 config,
378 invite_code.api_secret(),
379 Some(api),
380 prefetch_api_announcements,
381 )
382 .await
383 }
384
385 pub async fn preview_with_existing_config(
390 self,
391 connectors: ConnectorRegistry,
392 config: ClientConfig,
393 api_secret: Option<String>,
394 ) -> anyhow::Result<ClientPreview> {
395 self.preview_inner(connectors, config, api_secret, None, None)
396 .await
397 }
398
399 async fn preview_inner(
400 self,
401 connectors: ConnectorRegistry,
402 config: ClientConfig,
403 api_secret: Option<String>,
404 prefetch_api: Option<DynGlobalApi>,
405 prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
406 ) -> anyhow::Result<ClientPreview> {
407 let preview_prefetch_api_version_set = prefetch_api.map(|api| {
408 JitTry::new_try({
409 let config = config.clone();
410 || async move { Client::fetch_common_api_versions(&config, &api).await }
411 })
412 });
413
414 Ok(ClientPreview {
415 connectors,
416 inner: self,
417 config,
418 api_secret,
419 prefetch_api_announcements,
420 preview_prefetch_api_version_set,
421 })
422 }
423
424 pub async fn open(
425 self,
426 connectors: ConnectorRegistry,
427 db_no_decoders: Database,
428 pre_root_secret: RootSecret,
429 ) -> anyhow::Result<ClientHandle> {
430 Client::run_core_migrations(&db_no_decoders).await?;
431
432 Self::migrate_pending_config_if_present(&db_no_decoders).await;
434
435 let Some(config) = Client::get_config_from_db(&db_no_decoders).await else {
436 bail!("Client database not initialized")
437 };
438
439 let pre_root_secret = pre_root_secret.to_inner(config.calculate_federation_id());
440
441 match db_no_decoders
442 .begin_transaction_nc()
443 .await
444 .get_value(&ClientPreRootSecretHashKey)
445 .await
446 {
447 Some(secret_hash) => {
448 ensure!(
449 pre_root_secret.derive_pre_root_secret_hash() == secret_hash,
450 "Secret hash does not match. Incorrect secret"
451 );
452 }
453 _ => {
454 debug!(target: LOG_CLIENT, "Backfilling secret hash");
455 let mut dbtx = db_no_decoders.begin_transaction().await;
457 dbtx.insert_entry(
458 &ClientPreRootSecretHashKey,
459 &pre_root_secret.derive_pre_root_secret_hash(),
460 )
461 .await;
462 dbtx.commit_tx().await;
463 }
464 }
465
466 let api_secret = Client::get_api_secret_from_db(&db_no_decoders).await;
467 let stopped = self.stopped;
468 let request_hook = self.request_hook.clone();
469
470 let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
471 let client = self
472 .build_stopped(
473 connectors,
474 db_no_decoders,
475 pre_root_secret,
476 &config,
477 api_secret,
478 log_event_added_transient_tx,
479 request_hook,
480 None,
481 None,
482 )
483 .await?;
484 if !stopped {
485 client.as_inner().start_executor();
486 }
487 Ok(client)
488 }
489
490 #[allow(clippy::too_many_arguments)]
492 pub(crate) async fn build(
493 self,
494 connectors: ConnectorRegistry,
495 db_no_decoders: Database,
496 pre_root_secret: DerivableSecret,
497 config: ClientConfig,
498 api_secret: Option<String>,
499 stopped: bool,
500 preview_prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
501 preview_prefetch_api_version_set: Option<
502 JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>,
503 >,
504 ) -> anyhow::Result<ClientHandle> {
505 let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
506 let request_hook = self.request_hook.clone();
507 let client = self
508 .build_stopped(
509 connectors,
510 db_no_decoders,
511 pre_root_secret,
512 &config,
513 api_secret,
514 log_event_added_transient_tx,
515 request_hook,
516 preview_prefetch_api_announcements,
517 preview_prefetch_api_version_set,
518 )
519 .await?;
520 if !stopped {
521 client.as_inner().start_executor();
522 }
523
524 Ok(client)
525 }
526
527 #[allow(clippy::too_many_arguments)]
530 async fn build_stopped(
531 self,
532 connectors: ConnectorRegistry,
533 db_no_decoders: Database,
534 pre_root_secret: DerivableSecret,
535 config: &ClientConfig,
536 api_secret: Option<String>,
537 log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
538 request_hook: ApiRequestHook,
539 preview_prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
540 preview_prefetch_api_version_set: Option<
541 JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>,
542 >,
543 ) -> anyhow::Result<ClientHandle> {
544 debug!(
545 target: LOG_CLIENT,
546 version = %fedimint_build_code_version_env!(),
547 "Building fedimint client",
548 );
549 let (log_event_added_tx, log_event_added_rx) = watch::channel(());
550 let (log_ordering_wakeup_tx, log_ordering_wakeup_rx) = watch::channel(());
551
552 let decoders = self.decoders(config);
553 let config = Self::config_decoded(config, &decoders)?;
554 let fed_id = config.calculate_federation_id();
555 let db = db_no_decoders.with_decoders(decoders.clone());
556 let connector = self.connector;
557 let peer_urls = get_api_urls(&db, &config).await;
558 let api = match self.admin_creds.as_ref() {
559 Some(admin_creds) => FederationApi::new(
560 connectors.clone(),
561 peer_urls,
562 Some(admin_creds.peer_id),
563 Some(&admin_creds.auth.0),
564 )
565 .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
566 .with_request_hook(&request_hook)
567 .with_cache()
568 .into(),
569 None => FederationApi::new(connectors.clone(), peer_urls, None, api_secret.as_deref())
570 .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
571 .with_request_hook(&request_hook)
572 .with_cache()
573 .into(),
574 };
575
576 let task_group = TaskGroup::new();
577
578 self.migrate_module_dbs(&db, &config).await?;
581
582 let init_state = Self::load_init_state(&db).await;
583
584 let notifier = Notifier::new();
585
586 if let Some(p) = preview_prefetch_api_announcements {
587 let announcements = p.get().await;
591
592 store_api_announcements_updates_from_peers(&db, announcements).await?
593 }
594
595 if let Some(preview_prefetch_api_version_set) = preview_prefetch_api_version_set {
596 match preview_prefetch_api_version_set.get_try().await {
597 Ok(peer_api_versions) => {
598 Client::store_prefetched_api_versions(
599 &db,
600 &config,
601 &self.module_inits,
602 peer_api_versions,
603 )
604 .await;
605 }
606 Err(err) => {
607 debug!(target: LOG_CLIENT, err = %err.fmt_compact(), "Prefetching api version negotiation failed");
608 }
609 }
610 }
611
612 let common_api_versions = Client::load_and_refresh_common_api_version_static(
613 &config,
614 &self.module_inits,
615 &api,
616 &db,
617 &task_group,
618 )
619 .await
620 .inspect_err(|err| {
621 warn!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Failed to discover API version to use.");
622 })
623 .unwrap_or(ApiVersionSet {
624 core: ApiVersion::new(0, 0),
625 modules: BTreeMap::new(),
627 });
628
629 debug!(target: LOG_CLIENT, ?common_api_versions, "Completed api version negotiation");
630
631 Self::load_and_refresh_client_config_static(&config, &api, &db, &task_group);
633
634 let mut module_recoveries: BTreeMap<
635 ModuleInstanceId,
636 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
637 > = BTreeMap::new();
638 let mut module_recovery_progress_receivers: BTreeMap<
639 ModuleInstanceId,
640 watch::Receiver<RecoveryProgress>,
641 > = BTreeMap::new();
642
643 let final_client = FinalClientIface::default();
644
645 let root_secret = Self::federation_root_secret(&pre_root_secret, &config);
646
647 let modules = {
648 let mut modules = ClientModuleRegistry::default();
649 for (module_instance_id, module_config) in config.modules.clone() {
650 let kind = module_config.kind().clone();
651 let Some(module_init) = self.module_inits.get(&kind).cloned() else {
652 debug!(
653 target: LOG_CLIENT,
654 kind=%kind,
655 instance_id=%module_instance_id,
656 "Module kind of instance not found in module gens, skipping");
657 continue;
658 };
659
660 let Some(&api_version) = common_api_versions.modules.get(&module_instance_id)
661 else {
662 warn!(
663 target: LOG_CLIENT,
664 kind=%kind,
665 instance_id=%module_instance_id,
666 "Module kind of instance has incompatible api version, skipping"
667 );
668 continue;
669 };
670
671 let start_module_recover_fn =
674 |snapshot: Option<ClientBackup>, progress: RecoveryProgress| {
675 let module_config = module_config.clone();
676 let num_peers = NumPeers::from(config.global.api_endpoints.len());
677 let db = db.clone();
678 let kind = kind.clone();
679 let notifier = notifier.clone();
680 let api = api.clone();
681 let root_secret = root_secret.clone();
682 let admin_auth = self.admin_creds.as_ref().map(|creds| creds.auth.clone());
683 let final_client = final_client.clone();
684 let (progress_tx, progress_rx) = tokio::sync::watch::channel(progress);
685 let task_group = task_group.clone();
686 let module_init = module_init.clone();
687 (
688 Box::pin(async move {
689 module_init
690 .recover(
691 final_client.clone(),
692 fed_id,
693 num_peers,
694 module_config.clone(),
695 db.clone(),
696 module_instance_id,
697 common_api_versions.core,
698 api_version,
699 root_secret.derive_module_secret(module_instance_id),
700 notifier.clone(),
701 api.clone(),
702 admin_auth,
703 snapshot.as_ref().and_then(|s| s.modules.get(&module_instance_id)),
704 progress_tx,
705 task_group,
706 )
707 .await
708 .inspect_err(|err| {
709 warn!(
710 target: LOG_CLIENT,
711 module_id = module_instance_id, %kind, err = %err.fmt_compact_anyhow(), "Module failed to recover"
712 );
713 })
714 }),
715 progress_rx,
716 )
717 };
718
719 let recovery = match init_state.does_require_recovery() {
720 Some(snapshot) => {
721 match db
722 .begin_transaction_nc()
723 .await
724 .get_value(&ClientModuleRecovery { module_instance_id })
725 .await
726 {
727 Some(module_recovery_state) => {
728 if module_recovery_state.is_done() {
729 debug!(
730 id = %module_instance_id,
731 %kind, "Module recovery already complete"
732 );
733 None
734 } else {
735 debug!(
736 id = %module_instance_id,
737 %kind,
738 progress = %module_recovery_state.progress,
739 "Starting module recovery with an existing progress"
740 );
741 Some(start_module_recover_fn(
742 snapshot,
743 module_recovery_state.progress,
744 ))
745 }
746 }
747 _ => {
748 let progress = RecoveryProgress::none();
749 let mut dbtx = db.begin_transaction().await;
750 dbtx.log_event(
751 log_ordering_wakeup_tx.clone(),
752 None,
753 ModuleRecoveryStarted::new(module_instance_id),
754 )
755 .await;
756 dbtx.insert_entry(
757 &ClientModuleRecovery { module_instance_id },
758 &ClientModuleRecoveryState { progress },
759 )
760 .await;
761
762 dbtx.commit_tx().await;
763
764 debug!(
765 id = %module_instance_id,
766 %kind, "Starting new module recovery"
767 );
768 Some(start_module_recover_fn(snapshot, progress))
769 }
770 }
771 }
772 _ => None,
773 };
774
775 match recovery {
776 Some((recovery, recovery_progress_rx)) => {
777 module_recoveries.insert(module_instance_id, recovery);
778 module_recovery_progress_receivers
779 .insert(module_instance_id, recovery_progress_rx);
780 }
781 _ => {
782 let module = module_init
783 .init(
784 final_client.clone(),
785 fed_id,
786 config.global.api_endpoints.len(),
787 module_config,
788 db.clone(),
789 module_instance_id,
790 common_api_versions.core,
791 api_version,
792 root_secret.derive_module_secret(module_instance_id),
799 notifier.clone(),
800 api.clone(),
801 self.admin_creds.as_ref().map(|cred| cred.auth.clone()),
802 task_group.clone(),
803 connectors.clone(),
804 )
805 .await?;
806
807 modules.register_module(module_instance_id, kind, module);
808 }
809 }
810 }
811 modules
812 };
813
814 if init_state.is_pending() && module_recoveries.is_empty() {
815 let mut dbtx = db.begin_transaction().await;
816 dbtx.insert_entry(&ClientInitStateKey, &init_state.into_complete())
817 .await;
818 dbtx.commit_tx().await;
819 }
820
821 let mut primary_modules: BTreeMap<PrimaryModulePriority, PrimaryModuleCandidates> =
822 BTreeMap::new();
823
824 for (module_id, _kind, module) in modules.iter_modules() {
825 match module.supports_being_primary() {
826 PrimaryModuleSupport::Any { priority } => {
827 primary_modules
828 .entry(priority)
829 .or_default()
830 .wildcard
831 .push(module_id);
832 }
833 PrimaryModuleSupport::Selected { priority, units } => {
834 for unit in units {
835 primary_modules
836 .entry(priority)
837 .or_default()
838 .specific
839 .entry(unit)
840 .or_default()
841 .push(module_id);
842 }
843 }
844 PrimaryModuleSupport::None => {}
845 }
846 }
847
848 let executor = {
849 let mut executor_builder = Executor::builder();
850 executor_builder
851 .with_module(TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext);
852
853 for (module_instance_id, _, module) in modules.iter_modules() {
854 executor_builder.with_module_dyn(module.context(module_instance_id));
855 }
856
857 for module_instance_id in module_recoveries.keys() {
858 executor_builder.with_valid_module_id(*module_instance_id);
859 }
860
861 executor_builder.build(
862 db.clone(),
863 notifier,
864 task_group.clone(),
865 log_ordering_wakeup_tx.clone(),
866 )
867 };
868
869 let recovery_receiver_init_val = module_recovery_progress_receivers
870 .iter()
871 .map(|(module_instance_id, rx)| (*module_instance_id, *rx.borrow()))
872 .collect::<BTreeMap<_, _>>();
873 let (client_recovery_progress_sender, client_recovery_progress_receiver) =
874 watch::channel(recovery_receiver_init_val);
875
876 let client_inner = Arc::new(Client {
877 final_client: final_client.clone(),
878 config: tokio::sync::RwLock::new(config.clone()),
879 api_secret,
880 decoders,
881 db: db.clone(),
882 connectors,
883 federation_id: fed_id,
884 federation_config_meta: config.global.meta,
885 primary_modules,
886 modules,
887 module_inits: self.module_inits.clone(),
888 log_ordering_wakeup_tx,
889 log_event_added_rx,
890 log_event_added_transient_tx: log_event_added_transient_tx.clone(),
891 request_hook,
892 executor,
893 api,
894 secp_ctx: Secp256k1::new(),
895 root_secret,
896 task_group,
897 operation_log: OperationLog::new(db.clone()),
898 client_recovery_progress_receiver,
899 meta_service: self.meta_service,
900 connector,
901 iroh_enable_dht: self.iroh_enable_dht,
902 iroh_enable_next: self.iroh_enable_next,
903 });
904 client_inner
905 .task_group
906 .spawn_cancellable("MetaService::update_continuously", {
907 let client_inner = client_inner.clone();
908 async move {
909 client_inner
910 .meta_service
911 .update_continuously(&client_inner)
912 .await;
913 }
914 });
915
916 client_inner.task_group.spawn_cancellable(
917 "update-api-announcements",
918 run_api_announcement_refresh_task(client_inner.clone()),
919 );
920
921 client_inner.task_group.spawn_cancellable(
922 "event log ordering task",
923 run_event_log_ordering_task(
924 db.clone(),
925 log_ordering_wakeup_rx,
926 log_event_added_tx,
927 log_event_added_transient_tx,
928 ),
929 );
930 let client_iface = std::sync::Arc::<Client>::downgrade(&client_inner);
931
932 let client_arc = ClientHandle::new(client_inner);
933
934 for (_, _, module) in client_arc.modules.iter_modules() {
935 module.start().await;
936 }
937
938 final_client.set(client_iface.clone());
939
940 if !module_recoveries.is_empty() {
941 client_arc.spawn_module_recoveries_task(
942 client_recovery_progress_sender,
943 module_recoveries,
944 module_recovery_progress_receivers,
945 );
946 }
947
948 Ok(client_arc)
949 }
950
951 async fn load_init_state(db: &Database) -> InitState {
952 let mut dbtx = db.begin_transaction_nc().await;
953 dbtx.get_value(&ClientInitStateKey)
954 .await
955 .unwrap_or_else(|| {
956 warn!(
959 target: LOG_CLIENT,
960 "Client missing ClientRequiresRecovery: assuming complete"
961 );
962 db::InitState::Complete(db::InitModeComplete::Fresh)
963 })
964 }
965
966 fn decoders(&self, config: &ClientConfig) -> ModuleDecoderRegistry {
967 let mut decoders = client_decoders(
968 &self.module_inits,
969 config
970 .modules
971 .iter()
972 .map(|(module_instance, module_config)| (*module_instance, module_config.kind())),
973 );
974
975 decoders.register_module(
976 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
977 ModuleKind::from_static_str("tx_submission"),
978 tx_submission_sm_decoder(),
979 );
980
981 decoders
982 }
983
984 fn config_decoded(
985 config: &ClientConfig,
986 decoders: &ModuleDecoderRegistry,
987 ) -> Result<ClientConfig, fedimint_core::encoding::DecodeError> {
988 config.clone().redecode_raw(decoders)
989 }
990
991 fn federation_root_secret(
995 pre_root_secret: &DerivableSecret,
996 config: &ClientConfig,
997 ) -> DerivableSecret {
998 pre_root_secret.federation_key(&config.global.calculate_federation_id())
999 }
1000
1001 pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
1003 self.log_event_added_transient_tx.subscribe()
1004 }
1005
1006 async fn migrate_pending_config_if_present(db: &Database) {
1010 if let Some(pending_config) = Client::get_pending_config_from_db(db).await {
1011 debug!(target: LOG_CLIENT, "Found pending client config, migrating to current config");
1012
1013 let mut dbtx = db.begin_transaction().await;
1014 dbtx.insert_entry(&crate::db::ClientConfigKey, &pending_config)
1016 .await;
1017 dbtx.remove_entry(&PendingClientConfigKey).await;
1019 dbtx.commit_tx().await;
1020
1021 debug!(target: LOG_CLIENT, "Successfully migrated pending config to current config");
1022 }
1023 }
1024
1025 fn load_and_refresh_client_config_static(
1028 config: &ClientConfig,
1029 api: &DynGlobalApi,
1030 db: &Database,
1031 task_group: &TaskGroup,
1032 ) {
1033 let config = config.clone();
1034 let api = api.clone();
1035 let db = db.clone();
1036 let task_group = task_group.clone();
1037
1038 task_group.spawn_cancellable("refresh_client_config_static", async move {
1040 Self::refresh_client_config_static(&config, &api, &db).await;
1041 });
1042 }
1043
1044 async fn refresh_client_config_static(
1046 config: &ClientConfig,
1047 api: &DynGlobalApi,
1048 db: &Database,
1049 ) {
1050 if let Err(error) = Self::refresh_client_config_static_try(config, api, db).await {
1051 warn!(
1052 target: LOG_CLIENT,
1053 err = %error.fmt_compact_anyhow(), "Failed to refresh client config"
1054 );
1055 }
1056 }
1057
1058 fn validate_config_update(
1060 current_config: &ClientConfig,
1061 new_config: &ClientConfig,
1062 ) -> anyhow::Result<()> {
1063 if current_config.global != new_config.global {
1065 bail!("Global configuration changes are not allowed in config updates");
1066 }
1067
1068 for (module_id, current_module_config) in ¤t_config.modules {
1070 match new_config.modules.get(module_id) {
1071 Some(new_module_config) => {
1072 if current_module_config != new_module_config {
1073 bail!(
1074 "Module {} configuration changes are not allowed, only additions are permitted",
1075 module_id
1076 );
1077 }
1078 }
1079 None => {
1080 bail!(
1081 "Module {} was removed in new config, only additions are allowed",
1082 module_id
1083 );
1084 }
1085 }
1086 }
1087
1088 Ok(())
1089 }
1090
1091 async fn refresh_client_config_static_try(
1093 current_config: &ClientConfig,
1094 api: &DynGlobalApi,
1095 db: &Database,
1096 ) -> anyhow::Result<()> {
1097 debug!(target: LOG_CLIENT, "Refreshing client config");
1098
1099 let fetched_config = api
1101 .request_current_consensus::<ClientConfig>(
1102 CLIENT_CONFIG_ENDPOINT.to_owned(),
1103 ApiRequestErased::default(),
1104 )
1105 .await?;
1106
1107 Self::validate_config_update(current_config, &fetched_config)?;
1109
1110 if current_config != &fetched_config {
1112 debug!(target: LOG_CLIENT, "Detected federation config change, saving as pending config");
1113
1114 let mut dbtx = db.begin_transaction().await;
1115 dbtx.insert_entry(&PendingClientConfigKey, &fetched_config)
1116 .await;
1117 dbtx.commit_tx().await;
1118 } else {
1119 debug!(target: LOG_CLIENT, "No federation config changes detected");
1120 }
1121
1122 Ok(())
1123 }
1124}
1125
1126pub struct ClientPreview {
1131 inner: ClientBuilder,
1132 config: ClientConfig,
1133 connectors: ConnectorRegistry,
1134 api_secret: Option<String>,
1135 prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
1136 preview_prefetch_api_version_set:
1137 Option<JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>>,
1138}
1139
1140impl ClientPreview {
1141 pub fn config(&self) -> &ClientConfig {
1143 &self.config
1144 }
1145
1146 pub async fn join(
1225 self,
1226 db_no_decoders: Database,
1227 pre_root_secret: RootSecret,
1228 ) -> anyhow::Result<ClientHandle> {
1229 let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1230
1231 let client = self
1232 .inner
1233 .init(
1234 self.connectors,
1235 db_no_decoders,
1236 pre_root_secret,
1237 self.config,
1238 self.api_secret,
1239 InitMode::Fresh,
1240 self.prefetch_api_announcements,
1241 self.preview_prefetch_api_version_set,
1242 )
1243 .await?;
1244
1245 Ok(client)
1246 }
1247
1248 pub async fn recover(
1260 self,
1261 db_no_decoders: Database,
1262 pre_root_secret: RootSecret,
1263 backup: Option<ClientBackup>,
1264 ) -> anyhow::Result<ClientHandle> {
1265 let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1266
1267 let client = self
1268 .inner
1269 .init(
1270 self.connectors,
1271 db_no_decoders,
1272 pre_root_secret,
1273 self.config,
1274 self.api_secret,
1275 InitMode::Recover {
1276 snapshot: backup.clone(),
1277 },
1278 self.prefetch_api_announcements,
1279 self.preview_prefetch_api_version_set,
1280 )
1281 .await?;
1282
1283 Ok(client)
1284 }
1285
1286 pub async fn download_backup_from_federation(
1288 &self,
1289 pre_root_secret: RootSecret,
1290 ) -> anyhow::Result<Option<ClientBackup>> {
1291 let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1292 let api = DynGlobalApi::new(
1293 self.connectors.clone(),
1294 self.config
1296 .global
1297 .api_endpoints
1298 .iter()
1299 .map(|(peer_id, peer_url)| (*peer_id, peer_url.url.clone()))
1300 .collect(),
1301 self.api_secret.as_deref(),
1302 )?;
1303
1304 Client::download_backup_from_federation_static(
1305 &api,
1306 &ClientBuilder::federation_root_secret(&pre_root_secret, &self.config),
1307 &self.inner.decoders(&self.config),
1308 )
1309 .await
1310 }
1311}