1use std::collections::BTreeMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::time::Duration;
6
7use anyhow::{bail, ensure};
8use bitcoin::key::Secp256k1;
9use fedimint_api_client::api::global_api::with_cache::GlobalFederationApiWithCacheExt as _;
10use fedimint_api_client::api::global_api::with_request_hook::{
11 ApiRequestHook, RawFederationApiWithRequestHookExt as _,
12};
13use fedimint_api_client::api::{ApiVersionSet, DynGlobalApi, FederationApi, FederationApiExt as _};
14use fedimint_api_client::download_from_invite_code;
15use fedimint_client_module::api::ClientRawFederationApiExt as _;
16use fedimint_client_module::meta::LegacyMetaSource;
17use fedimint_client_module::module::init::ClientModuleInit;
18use fedimint_client_module::module::recovery::RecoveryProgress;
19use fedimint_client_module::module::{
20 ClientModuleRegistry, FinalClientIface, PrimaryModulePriority, PrimaryModuleSupport,
21};
22use fedimint_client_module::secret::{DeriveableSecretClientExt as _, get_default_client_secret};
23use fedimint_client_module::transaction::{
24 TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext, tx_submission_sm_decoder,
25};
26use fedimint_client_module::{AdminCreds, ModuleRecoveryStarted};
27use fedimint_connectors::ConnectorRegistry;
28use fedimint_core::config::{ClientConfig, FederationId, ModuleInitRegistry};
29use fedimint_core::core::{ModuleInstanceId, ModuleKind};
30use fedimint_core::db::{
31 Database, IDatabaseTransactionOpsCoreTyped as _, verify_module_db_integrity_dbtx,
32};
33use fedimint_core::endpoint_constants::CLIENT_CONFIG_ENDPOINT;
34use fedimint_core::envs::is_running_in_test_env;
35use fedimint_core::invite_code::InviteCode;
36use fedimint_core::module::registry::ModuleDecoderRegistry;
37use fedimint_core::module::{ApiRequestErased, ApiVersion, SupportedApiVersionsSummary};
38use fedimint_core::task::TaskGroup;
39use fedimint_core::task::jit::{Jit, JitTry, JitTryAnyhow};
40use fedimint_core::util::{FmtCompact as _, FmtCompactAnyhow as _};
41use fedimint_core::{NumPeers, PeerId, fedimint_build_code_version_env, maybe_add_send};
42use fedimint_derive_secret::DerivableSecret;
43use fedimint_eventlog::{
44 DBTransactionEventLogExt as _, EventLogEntry, run_event_log_ordering_task,
45};
46use fedimint_logging::LOG_CLIENT;
47use tokio::sync::{broadcast, watch};
48use tracing::{debug, trace, warn};
49
50use super::handle::ClientHandle;
51use super::{Client, client_decoders};
52use crate::api_announcements::{
53 PeersSignedApiAnnouncements, fetch_api_announcements_from_at_least_num_of_peers, get_api_urls,
54 run_api_announcement_refresh_task, store_api_announcements_updates_from_peers,
55};
56use crate::backup::{ClientBackup, Metadata};
57use crate::client::PrimaryModuleCandidates;
58use crate::db::{
59 self, ApiSecretKey, ClientInitStateKey, ClientMetadataKey, ClientModuleRecovery,
60 ClientModuleRecoveryState, ClientPreRootSecretHashKey, InitMode, InitState,
61 PendingClientConfigKey, apply_migrations_client_module_dbtx,
62};
63use crate::meta::MetaService;
64use crate::module_init::ClientModuleInitRegistry;
65use crate::oplog::OperationLog;
66use crate::sm::executor::Executor;
67use crate::sm::notifier::Notifier;
68
69#[derive(Clone)]
91pub enum RootSecret {
92 StandardDoubleDerive(DerivableSecret),
97 Custom(DerivableSecret),
102}
103
104impl RootSecret {
105 fn to_inner(&self, federation_id: FederationId) -> DerivableSecret {
106 match self {
107 RootSecret::StandardDoubleDerive(derivable_secret) => {
108 get_default_client_secret(derivable_secret, &federation_id)
109 }
110 RootSecret::Custom(derivable_secret) => derivable_secret.clone(),
111 }
112 }
113}
114
115pub struct ClientBuilder {
117 module_inits: ClientModuleInitRegistry,
118 admin_creds: Option<AdminCreds>,
119 meta_service: Arc<crate::meta::MetaService>,
120 stopped: bool,
121 log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
122 request_hook: ApiRequestHook,
123 iroh_enable_dht: bool,
124 iroh_enable_next: bool,
125}
126
127impl ClientBuilder {
128 pub(crate) fn new() -> Self {
129 trace!(
130 target: LOG_CLIENT,
131 version = %fedimint_build_code_version_env!(),
132 "Initializing fedimint client",
133 );
134 let meta_service = MetaService::new(LegacyMetaSource::default());
135 let (log_event_added_transient_tx, _log_event_added_transient_rx) =
136 broadcast::channel(1024);
137
138 ClientBuilder {
139 module_inits: ModuleInitRegistry::new(),
140 admin_creds: None,
141 stopped: false,
142 meta_service,
143 log_event_added_transient_tx,
144 request_hook: Arc::new(|api| api),
145 iroh_enable_dht: true,
146 iroh_enable_next: true,
147 }
148 }
149
150 pub(crate) fn from_existing(client: &Client) -> Self {
151 ClientBuilder {
152 module_inits: client.module_inits.clone(),
153 admin_creds: None,
154 stopped: false,
155 meta_service: client.meta_service.clone(),
157 log_event_added_transient_tx: client.log_event_added_transient_tx.clone(),
158 request_hook: client.request_hook.clone(),
159 iroh_enable_dht: client.iroh_enable_dht,
160 iroh_enable_next: client.iroh_enable_next,
161 }
162 }
163
164 pub fn with_module_inits(&mut self, module_inits: ClientModuleInitRegistry) {
166 self.module_inits = module_inits;
167 }
168
169 pub fn with_module<M: ClientModuleInit>(&mut self, module_init: M) {
171 self.module_inits.attach(module_init);
172 }
173
174 pub fn stopped(&mut self) {
175 self.stopped = true;
176 }
177 pub fn with_api_request_hook(mut self, hook: ApiRequestHook) -> Self {
186 self.request_hook = hook;
187 self
188 }
189
190 pub fn with_meta_service(&mut self, meta_service: Arc<MetaService>) {
191 self.meta_service = meta_service;
192 }
193
194 pub fn with_iroh_enable_dht(mut self, iroh_enable_dht: bool) -> Self {
197 self.iroh_enable_dht = iroh_enable_dht;
198 self
199 }
200
201 pub fn with_iroh_enable_next(mut self, iroh_enable_next: bool) -> Self {
204 self.iroh_enable_next = iroh_enable_next;
205 self
206 }
207
208 async fn migrate_module_dbs(
215 &self,
216 db: &Database,
217 client_config: &ClientConfig,
218 ) -> anyhow::Result<()> {
219 for (module_id, module_cfg) in &client_config.modules {
220 let kind = module_cfg.kind.clone();
221 let Some(init) = self.module_inits.get(&kind) else {
222 continue;
224 };
225
226 let mut dbtx = db.begin_transaction().await;
227 apply_migrations_client_module_dbtx(
228 &mut dbtx.to_ref_nc(),
229 kind.to_string(),
230 init.get_database_migrations(),
231 *module_id,
232 )
233 .await?;
234 if let Some(used_db_prefixes) = init.used_db_prefixes()
235 && is_running_in_test_env()
236 {
237 verify_module_db_integrity_dbtx(
238 &mut dbtx.to_ref_nc(),
239 *module_id,
240 kind,
241 &used_db_prefixes,
242 )
243 .await;
244 }
245 dbtx.commit_tx_result().await?;
246 }
247
248 Ok(())
249 }
250
251 pub async fn load_existing_config(&self, db: &Database) -> anyhow::Result<ClientConfig> {
252 let Some(config) = Client::get_config_from_db(db).await else {
253 bail!("Client database not initialized")
254 };
255
256 Ok(config)
257 }
258
259 pub fn set_admin_creds(&mut self, creds: AdminCreds) {
260 self.admin_creds = Some(creds);
261 }
262
263 #[allow(clippy::too_many_arguments)]
264 async fn init(
265 self,
266 connectors: ConnectorRegistry,
267 db_no_decoders: Database,
268 pre_root_secret: DerivableSecret,
269 config: ClientConfig,
270 api_secret: Option<String>,
271 init_mode: InitMode,
272 preview_prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
273 preview_prefetch_api_version_set: Option<
274 JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>,
275 >,
276 ) -> anyhow::Result<ClientHandle> {
277 if Client::is_initialized(&db_no_decoders).await {
278 bail!("Client database already initialized")
279 }
280
281 Client::run_core_migrations(&db_no_decoders).await?;
282
283 {
286 debug!(target: LOG_CLIENT, "Initializing client database");
287 let mut dbtx = db_no_decoders.begin_transaction().await;
288 dbtx.insert_new_entry(&crate::db::ClientConfigKey, &config)
290 .await;
291 dbtx.insert_entry(
292 &ClientPreRootSecretHashKey,
293 &pre_root_secret.derive_pre_root_secret_hash(),
294 )
295 .await;
296
297 if let Some(api_secret) = api_secret.as_ref() {
298 dbtx.insert_new_entry(&ApiSecretKey, api_secret).await;
299 }
300
301 let init_state = InitState::Pending(init_mode);
302 dbtx.insert_entry(&ClientInitStateKey, &init_state).await;
303
304 let metadata = init_state
305 .does_require_recovery()
306 .flatten()
307 .map_or(Metadata::empty(), |s| s.metadata);
308
309 dbtx.insert_new_entry(&ClientMetadataKey, &metadata).await;
310
311 dbtx.commit_tx_result().await?;
312 }
313
314 let stopped = self.stopped;
315 self.build(
316 connectors,
317 db_no_decoders,
318 pre_root_secret,
319 config,
320 api_secret,
321 stopped,
322 preview_prefetch_api_announcements,
323 preview_prefetch_api_version_set,
324 )
325 .await
326 }
327
328 pub async fn preview(
329 self,
330 connectors: ConnectorRegistry,
331 invite_code: &InviteCode,
332 ) -> anyhow::Result<ClientPreview> {
333 let (config, api) = download_from_invite_code(&connectors, invite_code).await?;
334
335 let prefetch_api_announcements =
336 config
337 .global
338 .broadcast_public_keys
339 .clone()
340 .map(|guardian_pub_keys| {
341 Jit::new({
342 let api = api.clone();
343 move || async move {
344 fetch_api_announcements_from_at_least_num_of_peers(
348 1,
349 &api,
350 &guardian_pub_keys,
351 Duration::from_millis(20),
354 )
355 .await
356 }
357 })
358 });
359
360 self.preview_inner(
361 connectors,
362 config,
363 invite_code.api_secret(),
364 Some(api),
365 prefetch_api_announcements,
366 )
367 .await
368 }
369
370 pub async fn preview_with_existing_config(
375 self,
376 connectors: ConnectorRegistry,
377 config: ClientConfig,
378 api_secret: Option<String>,
379 ) -> anyhow::Result<ClientPreview> {
380 self.preview_inner(connectors, config, api_secret, None, None)
381 .await
382 }
383
384 async fn preview_inner(
385 self,
386 connectors: ConnectorRegistry,
387 config: ClientConfig,
388 api_secret: Option<String>,
389 prefetch_api: Option<DynGlobalApi>,
390 prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
391 ) -> anyhow::Result<ClientPreview> {
392 let preview_prefetch_api_version_set = prefetch_api.map(|api| {
393 JitTry::new_try({
394 let config = config.clone();
395 || async move { Client::fetch_common_api_versions(&config, &api).await }
396 })
397 });
398
399 Ok(ClientPreview {
400 connectors,
401 inner: self,
402 config,
403 api_secret,
404 prefetch_api_announcements,
405 preview_prefetch_api_version_set,
406 })
407 }
408
409 pub async fn open(
410 self,
411 connectors: ConnectorRegistry,
412 db_no_decoders: Database,
413 pre_root_secret: RootSecret,
414 ) -> anyhow::Result<ClientHandle> {
415 Client::run_core_migrations(&db_no_decoders).await?;
416
417 Self::migrate_pending_config_if_present(&db_no_decoders).await;
419
420 let Some(config) = Client::get_config_from_db(&db_no_decoders).await else {
421 bail!("Client database not initialized")
422 };
423
424 let pre_root_secret = pre_root_secret.to_inner(config.calculate_federation_id());
425
426 match db_no_decoders
427 .begin_transaction_nc()
428 .await
429 .get_value(&ClientPreRootSecretHashKey)
430 .await
431 {
432 Some(secret_hash) => {
433 ensure!(
434 pre_root_secret.derive_pre_root_secret_hash() == secret_hash,
435 "Secret hash does not match. Incorrect secret"
436 );
437 }
438 _ => {
439 debug!(target: LOG_CLIENT, "Backfilling secret hash");
440 let mut dbtx = db_no_decoders.begin_transaction().await;
442 dbtx.insert_entry(
443 &ClientPreRootSecretHashKey,
444 &pre_root_secret.derive_pre_root_secret_hash(),
445 )
446 .await;
447 dbtx.commit_tx().await;
448 }
449 }
450
451 let api_secret = Client::get_api_secret_from_db(&db_no_decoders).await;
452 let stopped = self.stopped;
453 let request_hook = self.request_hook.clone();
454
455 let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
456 let client = self
457 .build_stopped(
458 connectors,
459 db_no_decoders,
460 pre_root_secret,
461 &config,
462 api_secret,
463 log_event_added_transient_tx,
464 request_hook,
465 None,
466 None,
467 )
468 .await?;
469 if !stopped {
470 client.as_inner().start_executor();
471 }
472 Ok(client)
473 }
474
475 #[allow(clippy::too_many_arguments)]
477 pub(crate) async fn build(
478 self,
479 connectors: ConnectorRegistry,
480 db_no_decoders: Database,
481 pre_root_secret: DerivableSecret,
482 config: ClientConfig,
483 api_secret: Option<String>,
484 stopped: bool,
485 preview_prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
486 preview_prefetch_api_version_set: Option<
487 JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>,
488 >,
489 ) -> anyhow::Result<ClientHandle> {
490 let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
491 let request_hook = self.request_hook.clone();
492 let client = self
493 .build_stopped(
494 connectors,
495 db_no_decoders,
496 pre_root_secret,
497 &config,
498 api_secret,
499 log_event_added_transient_tx,
500 request_hook,
501 preview_prefetch_api_announcements,
502 preview_prefetch_api_version_set,
503 )
504 .await?;
505 if !stopped {
506 client.as_inner().start_executor();
507 }
508
509 Ok(client)
510 }
511
512 #[allow(clippy::too_many_arguments)]
515 async fn build_stopped(
516 self,
517 connectors: ConnectorRegistry,
518 db_no_decoders: Database,
519 pre_root_secret: DerivableSecret,
520 config: &ClientConfig,
521 api_secret: Option<String>,
522 log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
523 request_hook: ApiRequestHook,
524 preview_prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
525 preview_prefetch_api_version_set: Option<
526 JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>,
527 >,
528 ) -> anyhow::Result<ClientHandle> {
529 debug!(
530 target: LOG_CLIENT,
531 version = %fedimint_build_code_version_env!(),
532 "Building fedimint client",
533 );
534 let (log_event_added_tx, log_event_added_rx) = watch::channel(());
535 let (log_ordering_wakeup_tx, log_ordering_wakeup_rx) = watch::channel(());
536
537 let decoders = self.decoders(config);
538 let config = Self::config_decoded(config, &decoders)?;
539 let fed_id = config.calculate_federation_id();
540 let db = db_no_decoders.with_decoders(decoders.clone());
541 let peer_urls = get_api_urls(&db, &config).await;
542 let api = match self.admin_creds.as_ref() {
543 Some(admin_creds) => FederationApi::new(
544 connectors.clone(),
545 peer_urls,
546 Some(admin_creds.peer_id),
547 Some(&admin_creds.auth.0),
548 )
549 .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
550 .with_request_hook(&request_hook)
551 .with_cache()
552 .into(),
553 None => FederationApi::new(connectors.clone(), peer_urls, None, api_secret.as_deref())
554 .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
555 .with_request_hook(&request_hook)
556 .with_cache()
557 .into(),
558 };
559
560 let task_group = TaskGroup::new();
561
562 self.migrate_module_dbs(&db, &config).await?;
565
566 let init_state = Self::load_init_state(&db).await;
567
568 let notifier = Notifier::new();
569
570 if let Some(p) = preview_prefetch_api_announcements {
571 let announcements = p.get().await;
575
576 store_api_announcements_updates_from_peers(&db, announcements).await?
577 }
578
579 if let Some(preview_prefetch_api_version_set) = preview_prefetch_api_version_set {
580 match preview_prefetch_api_version_set.get_try().await {
581 Ok(peer_api_versions) => {
582 Client::store_prefetched_api_versions(
583 &db,
584 &config,
585 &self.module_inits,
586 peer_api_versions,
587 )
588 .await;
589 }
590 Err(err) => {
591 debug!(target: LOG_CLIENT, err = %err.fmt_compact(), "Prefetching api version negotiation failed");
592 }
593 }
594 }
595
596 let common_api_versions = Client::load_and_refresh_common_api_version_static(
597 &config,
598 &self.module_inits,
599 connectors.clone(),
600 &api,
601 &db,
602 &task_group,
603 )
604 .await
605 .inspect_err(|err| {
606 warn!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Failed to discover API version to use.");
607 })
608 .unwrap_or(ApiVersionSet {
609 core: ApiVersion::new(0, 0),
610 modules: BTreeMap::new(),
612 });
613
614 debug!(target: LOG_CLIENT, ?common_api_versions, "Completed api version negotiation");
615
616 Self::load_and_refresh_client_config_static(&config, &api, &db, &task_group);
618
619 let mut module_recoveries: BTreeMap<
620 ModuleInstanceId,
621 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
622 > = BTreeMap::new();
623 let mut module_recovery_progress_receivers: BTreeMap<
624 ModuleInstanceId,
625 watch::Receiver<RecoveryProgress>,
626 > = BTreeMap::new();
627
628 let final_client = FinalClientIface::default();
629
630 let root_secret = Self::federation_root_secret(&pre_root_secret, &config);
631
632 let modules = {
633 let mut modules = ClientModuleRegistry::default();
634 for (module_instance_id, module_config) in config.modules.clone() {
635 let kind = module_config.kind().clone();
636 let Some(module_init) = self.module_inits.get(&kind).cloned() else {
637 debug!(
638 target: LOG_CLIENT,
639 kind=%kind,
640 instance_id=%module_instance_id,
641 "Module kind of instance not found in module gens, skipping");
642 continue;
643 };
644
645 let Some(&api_version) = common_api_versions.modules.get(&module_instance_id)
646 else {
647 warn!(
648 target: LOG_CLIENT,
649 kind=%kind,
650 instance_id=%module_instance_id,
651 "Module kind of instance has incompatible api version, skipping"
652 );
653 continue;
654 };
655
656 let start_module_recover_fn =
659 |snapshot: Option<ClientBackup>, progress: RecoveryProgress| {
660 let module_config = module_config.clone();
661 let num_peers = NumPeers::from(config.global.api_endpoints.len());
662 let db = db.clone();
663 let kind = kind.clone();
664 let notifier = notifier.clone();
665 let api = api.clone();
666 let root_secret = root_secret.clone();
667 let admin_auth = self.admin_creds.as_ref().map(|creds| creds.auth.clone());
668 let final_client = final_client.clone();
669 let (progress_tx, progress_rx) = tokio::sync::watch::channel(progress);
670 let task_group = task_group.clone();
671 let module_init = module_init.clone();
672 (
673 Box::pin(async move {
674 module_init
675 .recover(
676 final_client.clone(),
677 fed_id,
678 num_peers,
679 module_config.clone(),
680 db.clone(),
681 module_instance_id,
682 common_api_versions.core,
683 api_version,
684 root_secret.derive_module_secret(module_instance_id),
685 notifier.clone(),
686 api.clone(),
687 admin_auth,
688 snapshot.as_ref().and_then(|s| s.modules.get(&module_instance_id)),
689 progress_tx,
690 task_group,
691 )
692 .await
693 .inspect_err(|err| {
694 warn!(
695 target: LOG_CLIENT,
696 module_id = module_instance_id, %kind, err = %err.fmt_compact_anyhow(), "Module failed to recover"
697 );
698 })
699 }),
700 progress_rx,
701 )
702 };
703
704 let recovery = match init_state.does_require_recovery() {
705 Some(snapshot) => {
706 match db
707 .begin_transaction_nc()
708 .await
709 .get_value(&ClientModuleRecovery { module_instance_id })
710 .await
711 {
712 Some(module_recovery_state) => {
713 if module_recovery_state.is_done() {
714 debug!(
715 id = %module_instance_id,
716 %kind, "Module recovery already complete"
717 );
718 None
719 } else {
720 debug!(
721 id = %module_instance_id,
722 %kind,
723 progress = %module_recovery_state.progress,
724 "Starting module recovery with an existing progress"
725 );
726 Some(start_module_recover_fn(
727 snapshot,
728 module_recovery_state.progress,
729 ))
730 }
731 }
732 _ => {
733 let progress = RecoveryProgress::none();
734 let mut dbtx = db.begin_transaction().await;
735 dbtx.log_event(
736 log_ordering_wakeup_tx.clone(),
737 None,
738 ModuleRecoveryStarted::new(module_instance_id),
739 )
740 .await;
741 dbtx.insert_entry(
742 &ClientModuleRecovery { module_instance_id },
743 &ClientModuleRecoveryState { progress },
744 )
745 .await;
746
747 dbtx.commit_tx().await;
748
749 debug!(
750 id = %module_instance_id,
751 %kind, "Starting new module recovery"
752 );
753 Some(start_module_recover_fn(snapshot, progress))
754 }
755 }
756 }
757 _ => None,
758 };
759
760 match recovery {
761 Some((recovery, recovery_progress_rx)) => {
762 module_recoveries.insert(module_instance_id, recovery);
763 module_recovery_progress_receivers
764 .insert(module_instance_id, recovery_progress_rx);
765 }
766 _ => {
767 let module = module_init
768 .init(
769 final_client.clone(),
770 fed_id,
771 config.global.api_endpoints.len(),
772 module_config,
773 db.clone(),
774 module_instance_id,
775 common_api_versions.core,
776 api_version,
777 root_secret.derive_module_secret(module_instance_id),
784 notifier.clone(),
785 api.clone(),
786 self.admin_creds.as_ref().map(|cred| cred.auth.clone()),
787 task_group.clone(),
788 connectors.clone(),
789 )
790 .await?;
791
792 modules.register_module(module_instance_id, kind, module);
793 }
794 }
795 }
796 modules
797 };
798
799 if init_state.is_pending() && module_recoveries.is_empty() {
800 let mut dbtx = db.begin_transaction().await;
801 dbtx.insert_entry(&ClientInitStateKey, &init_state.into_complete())
802 .await;
803 dbtx.commit_tx().await;
804 }
805
806 let mut primary_modules: BTreeMap<PrimaryModulePriority, PrimaryModuleCandidates> =
807 BTreeMap::new();
808
809 for (module_id, _kind, module) in modules.iter_modules() {
810 match module.supports_being_primary() {
811 PrimaryModuleSupport::Any { priority } => {
812 primary_modules
813 .entry(priority)
814 .or_default()
815 .wildcard
816 .push(module_id);
817 }
818 PrimaryModuleSupport::Selected { priority, units } => {
819 for unit in units {
820 primary_modules
821 .entry(priority)
822 .or_default()
823 .specific
824 .entry(unit)
825 .or_default()
826 .push(module_id);
827 }
828 }
829 PrimaryModuleSupport::None => {}
830 }
831 }
832
833 let executor = {
834 let mut executor_builder = Executor::builder();
835 executor_builder
836 .with_module(TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext);
837
838 for (module_instance_id, _, module) in modules.iter_modules() {
839 executor_builder.with_module_dyn(module.context(module_instance_id));
840 }
841
842 for module_instance_id in module_recoveries.keys() {
843 executor_builder.with_valid_module_id(*module_instance_id);
844 }
845
846 executor_builder.build(
847 db.clone(),
848 notifier,
849 task_group.clone(),
850 log_ordering_wakeup_tx.clone(),
851 )
852 };
853
854 let recovery_receiver_init_val = module_recovery_progress_receivers
855 .iter()
856 .map(|(module_instance_id, rx)| (*module_instance_id, *rx.borrow()))
857 .collect::<BTreeMap<_, _>>();
858 let (client_recovery_progress_sender, client_recovery_progress_receiver) =
859 watch::channel(recovery_receiver_init_val);
860
861 let client_inner = Arc::new(Client {
862 final_client: final_client.clone(),
863 config: tokio::sync::RwLock::new(config.clone()),
864 api_secret,
865 decoders,
866 db: db.clone(),
867 connectors,
868 federation_id: fed_id,
869 federation_config_meta: config.global.meta,
870 primary_modules,
871 modules,
872 module_inits: self.module_inits.clone(),
873 log_ordering_wakeup_tx,
874 log_event_added_rx,
875 log_event_added_transient_tx: log_event_added_transient_tx.clone(),
876 request_hook,
877 executor,
878 api,
879 secp_ctx: Secp256k1::new(),
880 root_secret,
881 task_group,
882 operation_log: OperationLog::new(db.clone()),
883 client_recovery_progress_receiver,
884 meta_service: self.meta_service,
885 iroh_enable_dht: self.iroh_enable_dht,
886 iroh_enable_next: self.iroh_enable_next,
887 });
888 client_inner
889 .task_group
890 .spawn_cancellable("MetaService::update_continuously", {
891 let client_inner = client_inner.clone();
892 async move {
893 client_inner
894 .meta_service
895 .update_continuously(&client_inner)
896 .await;
897 }
898 });
899
900 client_inner
901 .task_group
902 .spawn_cancellable("update-api-announcements", {
903 let client_inner = client_inner.clone();
904 async move {
905 client_inner
906 .connectors
907 .wait_for_initialized_connections()
908 .await;
909 run_api_announcement_refresh_task(client_inner.clone()).await
910 }
911 });
912
913 client_inner
914 .task_group
915 .spawn_cancellable("event log ordering task", {
916 let client_inner = client_inner.clone();
917 async move {
918 client_inner
919 .connectors
920 .wait_for_initialized_connections()
921 .await;
922
923 run_event_log_ordering_task(
924 db.clone(),
925 log_ordering_wakeup_rx,
926 log_event_added_tx,
927 log_event_added_transient_tx,
928 )
929 .await
930 }
931 });
932 let client_iface = std::sync::Arc::<Client>::downgrade(&client_inner);
933
934 let client_arc = ClientHandle::new(client_inner);
935
936 for (_, _, module) in client_arc.modules.iter_modules() {
937 module.start().await;
938 }
939
940 final_client.set(client_iface.clone());
941
942 if !module_recoveries.is_empty() {
943 client_arc.spawn_module_recoveries_task(
944 client_recovery_progress_sender,
945 module_recoveries,
946 module_recovery_progress_receivers,
947 );
948 }
949
950 Ok(client_arc)
951 }
952
953 async fn load_init_state(db: &Database) -> InitState {
954 let mut dbtx = db.begin_transaction_nc().await;
955 dbtx.get_value(&ClientInitStateKey)
956 .await
957 .unwrap_or_else(|| {
958 warn!(
961 target: LOG_CLIENT,
962 "Client missing ClientRequiresRecovery: assuming complete"
963 );
964 db::InitState::Complete(db::InitModeComplete::Fresh)
965 })
966 }
967
968 fn decoders(&self, config: &ClientConfig) -> ModuleDecoderRegistry {
969 let mut decoders = client_decoders(
970 &self.module_inits,
971 config
972 .modules
973 .iter()
974 .map(|(module_instance, module_config)| (*module_instance, module_config.kind())),
975 );
976
977 decoders.register_module(
978 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
979 ModuleKind::from_static_str("tx_submission"),
980 tx_submission_sm_decoder(),
981 );
982
983 decoders
984 }
985
986 fn config_decoded(
987 config: &ClientConfig,
988 decoders: &ModuleDecoderRegistry,
989 ) -> Result<ClientConfig, fedimint_core::encoding::DecodeError> {
990 config.clone().redecode_raw(decoders)
991 }
992
993 fn federation_root_secret(
997 pre_root_secret: &DerivableSecret,
998 config: &ClientConfig,
999 ) -> DerivableSecret {
1000 pre_root_secret.federation_key(&config.global.calculate_federation_id())
1001 }
1002
1003 pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
1005 self.log_event_added_transient_tx.subscribe()
1006 }
1007
1008 async fn migrate_pending_config_if_present(db: &Database) {
1012 if let Some(pending_config) = Client::get_pending_config_from_db(db).await {
1013 debug!(target: LOG_CLIENT, "Found pending client config, migrating to current config");
1014
1015 let mut dbtx = db.begin_transaction().await;
1016 dbtx.insert_entry(&crate::db::ClientConfigKey, &pending_config)
1018 .await;
1019 dbtx.remove_entry(&PendingClientConfigKey).await;
1021 dbtx.commit_tx().await;
1022
1023 debug!(target: LOG_CLIENT, "Successfully migrated pending config to current config");
1024 }
1025 }
1026
1027 fn load_and_refresh_client_config_static(
1030 config: &ClientConfig,
1031 api: &DynGlobalApi,
1032 db: &Database,
1033 task_group: &TaskGroup,
1034 ) {
1035 let config = config.clone();
1036 let api = api.clone();
1037 let db = db.clone();
1038 let task_group = task_group.clone();
1039
1040 task_group.spawn_cancellable("refresh_client_config_static", async move {
1042 api.wait_for_initialized_connections().await;
1043 Self::refresh_client_config_static(&config, &api, &db).await;
1044 });
1045 }
1046
1047 async fn refresh_client_config_static(
1049 config: &ClientConfig,
1050 api: &DynGlobalApi,
1051 db: &Database,
1052 ) {
1053 if let Err(error) = Self::refresh_client_config_static_try(config, api, db).await {
1054 warn!(
1055 target: LOG_CLIENT,
1056 err = %error.fmt_compact_anyhow(), "Failed to refresh client config"
1057 );
1058 }
1059 }
1060
1061 fn validate_config_update(
1063 current_config: &ClientConfig,
1064 new_config: &ClientConfig,
1065 ) -> anyhow::Result<()> {
1066 if current_config.global != new_config.global {
1068 bail!("Global configuration changes are not allowed in config updates");
1069 }
1070
1071 for (module_id, current_module_config) in ¤t_config.modules {
1073 match new_config.modules.get(module_id) {
1074 Some(new_module_config) => {
1075 if current_module_config != new_module_config {
1076 bail!(
1077 "Module {} configuration changes are not allowed, only additions are permitted",
1078 module_id
1079 );
1080 }
1081 }
1082 None => {
1083 bail!(
1084 "Module {} was removed in new config, only additions are allowed",
1085 module_id
1086 );
1087 }
1088 }
1089 }
1090
1091 Ok(())
1092 }
1093
1094 async fn refresh_client_config_static_try(
1096 current_config: &ClientConfig,
1097 api: &DynGlobalApi,
1098 db: &Database,
1099 ) -> anyhow::Result<()> {
1100 debug!(target: LOG_CLIENT, "Refreshing client config");
1101
1102 let fetched_config = api
1104 .request_current_consensus::<ClientConfig>(
1105 CLIENT_CONFIG_ENDPOINT.to_owned(),
1106 ApiRequestErased::default(),
1107 )
1108 .await?;
1109
1110 Self::validate_config_update(current_config, &fetched_config)?;
1112
1113 if current_config != &fetched_config {
1115 debug!(target: LOG_CLIENT, "Detected federation config change, saving as pending config");
1116
1117 let mut dbtx = db.begin_transaction().await;
1118 dbtx.insert_entry(&PendingClientConfigKey, &fetched_config)
1119 .await;
1120 dbtx.commit_tx().await;
1121 } else {
1122 debug!(target: LOG_CLIENT, "No federation config changes detected");
1123 }
1124
1125 Ok(())
1126 }
1127}
1128
1129pub struct ClientPreview {
1134 inner: ClientBuilder,
1135 config: ClientConfig,
1136 connectors: ConnectorRegistry,
1137 api_secret: Option<String>,
1138 prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
1139 preview_prefetch_api_version_set:
1140 Option<JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>>,
1141}
1142
1143impl ClientPreview {
1144 pub fn config(&self) -> &ClientConfig {
1146 &self.config
1147 }
1148
1149 pub async fn join(
1228 self,
1229 db_no_decoders: Database,
1230 pre_root_secret: RootSecret,
1231 ) -> anyhow::Result<ClientHandle> {
1232 let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1233
1234 let client = self
1235 .inner
1236 .init(
1237 self.connectors,
1238 db_no_decoders,
1239 pre_root_secret,
1240 self.config,
1241 self.api_secret,
1242 InitMode::Fresh,
1243 self.prefetch_api_announcements,
1244 self.preview_prefetch_api_version_set,
1245 )
1246 .await?;
1247
1248 Ok(client)
1249 }
1250
1251 pub async fn recover(
1263 self,
1264 db_no_decoders: Database,
1265 pre_root_secret: RootSecret,
1266 backup: Option<ClientBackup>,
1267 ) -> anyhow::Result<ClientHandle> {
1268 let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1269
1270 let client = self
1271 .inner
1272 .init(
1273 self.connectors,
1274 db_no_decoders,
1275 pre_root_secret,
1276 self.config,
1277 self.api_secret,
1278 InitMode::Recover {
1279 snapshot: backup.clone(),
1280 },
1281 self.prefetch_api_announcements,
1282 self.preview_prefetch_api_version_set,
1283 )
1284 .await?;
1285
1286 Ok(client)
1287 }
1288
1289 pub async fn download_backup_from_federation(
1291 &self,
1292 pre_root_secret: RootSecret,
1293 ) -> anyhow::Result<Option<ClientBackup>> {
1294 let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1295 let api = DynGlobalApi::new(
1296 self.connectors.clone(),
1297 self.config
1299 .global
1300 .api_endpoints
1301 .iter()
1302 .map(|(peer_id, peer_url)| (*peer_id, peer_url.url.clone()))
1303 .collect(),
1304 self.api_secret.as_deref(),
1305 )?;
1306
1307 Client::download_backup_from_federation_static(
1308 &api,
1309 &ClientBuilder::federation_root_secret(&pre_root_secret, &self.config),
1310 &self.inner.decoders(&self.config),
1311 )
1312 .await
1313 }
1314}