1use std::collections::BTreeMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::time::Duration;
6
7use anyhow::{Context as _, bail, ensure};
8use bitcoin::key::Secp256k1;
9use fedimint_api_client::api::global_api::with_cache::GlobalFederationApiWithCacheExt as _;
10use fedimint_api_client::api::global_api::with_request_hook::{
11 ApiRequestHook, RawFederationApiWithRequestHookExt as _,
12};
13use fedimint_api_client::api::net::ConnectorType;
14use fedimint_api_client::api::{
15 ApiVersionSet, DynClientConnector, DynGlobalApi, FederationApiExt as _, ReconnectFederationApi,
16 make_admin_connector, make_connector,
17};
18use fedimint_client_module::api::ClientRawFederationApiExt as _;
19use fedimint_client_module::meta::LegacyMetaSource;
20use fedimint_client_module::module::init::ClientModuleInit;
21use fedimint_client_module::module::recovery::RecoveryProgress;
22use fedimint_client_module::module::{
23 ClientModuleRegistry, FinalClientIface, PrimaryModulePriority, PrimaryModuleSupport,
24};
25use fedimint_client_module::secret::{DeriveableSecretClientExt as _, get_default_client_secret};
26use fedimint_client_module::transaction::{
27 TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext, tx_submission_sm_decoder,
28};
29use fedimint_client_module::{AdminCreds, ModuleRecoveryStarted};
30use fedimint_core::config::{ClientConfig, FederationId, ModuleInitRegistry};
31use fedimint_core::core::{ModuleInstanceId, ModuleKind};
32use fedimint_core::db::{
33 Database, IDatabaseTransactionOpsCoreTyped as _, verify_module_db_integrity_dbtx,
34};
35use fedimint_core::endpoint_constants::CLIENT_CONFIG_ENDPOINT;
36use fedimint_core::envs::is_running_in_test_env;
37use fedimint_core::invite_code::InviteCode;
38use fedimint_core::module::registry::ModuleDecoderRegistry;
39use fedimint_core::module::{ApiRequestErased, ApiVersion, SupportedApiVersionsSummary};
40use fedimint_core::task::TaskGroup;
41use fedimint_core::task::jit::{Jit, JitTry, JitTryAnyhow};
42use fedimint_core::util::{FmtCompact as _, FmtCompactAnyhow as _};
43use fedimint_core::{NumPeers, PeerId, fedimint_build_code_version_env, maybe_add_send};
44use fedimint_derive_secret::DerivableSecret;
45use fedimint_eventlog::{
46 DBTransactionEventLogExt as _, EventLogEntry, run_event_log_ordering_task,
47};
48use fedimint_logging::LOG_CLIENT;
49use tokio::sync::{broadcast, watch};
50use tracing::{debug, trace, warn};
51
52use super::handle::ClientHandle;
53use super::{Client, client_decoders};
54use crate::api_announcements::{
55 PeersSignedApiAnnouncements, fetch_api_announcements_from_at_least_num_of_peers, get_api_urls,
56 run_api_announcement_refresh_task, store_api_announcements_updates_from_peers,
57};
58use crate::backup::{ClientBackup, Metadata};
59use crate::client::PrimaryModuleCandidates;
60use crate::db::{
61 self, ApiSecretKey, ClientInitStateKey, ClientMetadataKey, ClientModuleRecovery,
62 ClientModuleRecoveryState, ClientPreRootSecretHashKey, InitMode, InitState,
63 PendingClientConfigKey, apply_migrations_client_module_dbtx,
64};
65use crate::meta::MetaService;
66use crate::module_init::ClientModuleInitRegistry;
67use crate::oplog::OperationLog;
68use crate::sm::executor::Executor;
69use crate::sm::notifier::Notifier;
70
71#[derive(Clone)]
93pub enum RootSecret {
94 StandardDoubleDerive(DerivableSecret),
99 Custom(DerivableSecret),
104}
105
106impl RootSecret {
107 fn to_inner(&self, federation_id: FederationId) -> DerivableSecret {
108 match self {
109 RootSecret::StandardDoubleDerive(derivable_secret) => {
110 get_default_client_secret(derivable_secret, &federation_id)
111 }
112 RootSecret::Custom(derivable_secret) => derivable_secret.clone(),
113 }
114 }
115}
116
117pub struct ClientBuilder {
119 module_inits: ClientModuleInitRegistry,
120 admin_creds: Option<AdminCreds>,
121 meta_service: Arc<crate::meta::MetaService>,
122 connector: ConnectorType,
123 stopped: bool,
124 log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
125 request_hook: ApiRequestHook,
126 reuse_connector: Option<DynClientConnector>,
127 iroh_enable_dht: bool,
128 iroh_enable_next: bool,
129}
130
131impl ClientBuilder {
132 pub(crate) fn new() -> Self {
133 trace!(
134 target: LOG_CLIENT,
135 version = %fedimint_build_code_version_env!(),
136 "Initializing fedimint client",
137 );
138 let meta_service = MetaService::new(LegacyMetaSource::default());
139 let (log_event_added_transient_tx, _log_event_added_transient_rx) =
140 broadcast::channel(1024);
141 ClientBuilder {
142 module_inits: ModuleInitRegistry::new(),
143 connector: ConnectorType::default(),
144 admin_creds: None,
145 stopped: false,
146 meta_service,
147 log_event_added_transient_tx,
148 request_hook: Arc::new(|api| api),
149 reuse_connector: None,
150 iroh_enable_dht: true,
151 iroh_enable_next: true,
152 }
153 }
154
155 pub(crate) fn from_existing(client: &Client) -> Self {
156 ClientBuilder {
157 module_inits: client.module_inits.clone(),
158 admin_creds: None,
159 stopped: false,
160 meta_service: client.meta_service.clone(),
162 connector: client.connector,
163 log_event_added_transient_tx: client.log_event_added_transient_tx.clone(),
164 request_hook: client.request_hook.clone(),
165 reuse_connector: Some(client.api.connector().clone()),
166 iroh_enable_dht: client.iroh_enable_dht,
167 iroh_enable_next: client.iroh_enable_next,
168 }
169 }
170
171 pub fn with_module_inits(&mut self, module_inits: ClientModuleInitRegistry) {
173 self.module_inits = module_inits;
174 }
175
176 pub fn with_module<M: ClientModuleInit>(&mut self, module_init: M) {
178 self.module_inits.attach(module_init);
179 }
180
181 pub fn stopped(&mut self) {
182 self.stopped = true;
183 }
184
185 pub fn with_api_request_hook(mut self, hook: ApiRequestHook) -> Self {
194 self.request_hook = hook;
195 self
196 }
197
198 pub fn with_meta_service(&mut self, meta_service: Arc<MetaService>) {
199 self.meta_service = meta_service;
200 }
201
202 pub fn with_iroh_enable_dht(mut self, iroh_enable_dht: bool) -> Self {
205 self.iroh_enable_dht = iroh_enable_dht;
206 self
207 }
208
209 pub fn with_iroh_enable_next(mut self, iroh_enable_next: bool) -> Self {
212 self.iroh_enable_next = iroh_enable_next;
213 self
214 }
215
216 async fn migrate_module_dbs(&self, db: &Database) -> anyhow::Result<()> {
223 if let Ok(client_config) = self.load_existing_config(db).await {
227 for (module_id, module_cfg) in client_config.modules {
228 let kind = module_cfg.kind.clone();
229 let Some(init) = self.module_inits.get(&kind) else {
230 continue;
232 };
233
234 let mut dbtx = db.begin_transaction().await;
235 apply_migrations_client_module_dbtx(
236 &mut dbtx.to_ref_nc(),
237 kind.to_string(),
238 init.get_database_migrations(),
239 module_id,
240 )
241 .await?;
242 if let Some(used_db_prefixes) = init.used_db_prefixes()
243 && is_running_in_test_env()
244 {
245 verify_module_db_integrity_dbtx(
246 &mut dbtx.to_ref_nc(),
247 module_id,
248 kind,
249 &used_db_prefixes,
250 )
251 .await;
252 }
253 dbtx.commit_tx_result().await?;
254 }
255 }
256
257 Ok(())
258 }
259
260 pub async fn load_existing_config(&self, db: &Database) -> anyhow::Result<ClientConfig> {
261 let Some(config) = Client::get_config_from_db(db).await else {
262 bail!("Client database not initialized")
263 };
264
265 Ok(config)
266 }
267
268 pub fn set_admin_creds(&mut self, creds: AdminCreds) {
269 self.admin_creds = Some(creds);
270 }
271
272 pub fn with_connector(&mut self, connector: ConnectorType) {
273 self.connector = connector;
274 }
275
276 #[cfg(feature = "tor")]
277 pub fn with_tor_connector(&mut self) {
278 self.with_connector(ConnectorType::tor());
279 }
280
281 #[allow(clippy::too_many_arguments)]
282 async fn init(
283 self,
284 db_no_decoders: Database,
285 pre_root_secret: DerivableSecret,
286 config: ClientConfig,
287 api_secret: Option<String>,
288 init_mode: InitMode,
289 preview_prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
290 preview_prefetch_api_version_set: Option<
291 JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>,
292 >,
293 ) -> anyhow::Result<ClientHandle> {
294 if Client::is_initialized(&db_no_decoders).await {
295 bail!("Client database already initialized")
296 }
297
298 Client::run_core_migrations(&db_no_decoders).await?;
299
300 {
303 debug!(target: LOG_CLIENT, "Initializing client database");
304 let mut dbtx = db_no_decoders.begin_transaction().await;
305 dbtx.insert_new_entry(&crate::db::ClientConfigKey, &config)
307 .await;
308 dbtx.insert_entry(
309 &ClientPreRootSecretHashKey,
310 &pre_root_secret.derive_pre_root_secret_hash(),
311 )
312 .await;
313
314 if let Some(api_secret) = api_secret.as_ref() {
315 dbtx.insert_new_entry(&ApiSecretKey, api_secret).await;
316 }
317
318 let init_state = InitState::Pending(init_mode);
319 dbtx.insert_entry(&ClientInitStateKey, &init_state).await;
320
321 let metadata = init_state
322 .does_require_recovery()
323 .flatten()
324 .map_or(Metadata::empty(), |s| s.metadata);
325
326 dbtx.insert_new_entry(&ClientMetadataKey, &metadata).await;
327
328 dbtx.commit_tx_result().await?;
329 }
330
331 let stopped = self.stopped;
332 self.build(
333 db_no_decoders,
334 pre_root_secret,
335 config,
336 api_secret,
337 stopped,
338 preview_prefetch_api_announcements,
339 preview_prefetch_api_version_set,
340 )
341 .await
342 }
343
344 pub async fn preview(self, invite_code: &InviteCode) -> anyhow::Result<ClientPreview> {
345 let (config, api) = self
346 .connector
347 .download_from_invite_code(invite_code, self.iroh_enable_dht, self.iroh_enable_next)
348 .await?;
349
350 let prefetch_api_announcements =
351 config
352 .global
353 .broadcast_public_keys
354 .clone()
355 .map(|guardian_pub_keys| {
356 Jit::new({
357 let api = api.clone();
358 move || async move {
359 fetch_api_announcements_from_at_least_num_of_peers(
363 1,
364 &api,
365 &guardian_pub_keys,
366 Duration::from_millis(20),
369 )
370 .await
371 }
372 })
373 });
374
375 self.preview_inner(
376 config,
377 invite_code.api_secret(),
378 Some(api),
379 prefetch_api_announcements,
380 )
381 .await
382 }
383
384 pub async fn preview_with_existing_config(
389 self,
390 config: ClientConfig,
391 api_secret: Option<String>,
392 reuse_api: Option<DynGlobalApi>,
393 ) -> anyhow::Result<ClientPreview> {
394 self.preview_inner(config, api_secret, reuse_api, None)
395 .await
396 }
397
398 async fn preview_inner(
399 mut self,
400 config: ClientConfig,
401 api_secret: Option<String>,
402 reuse_api: Option<DynGlobalApi>,
403 prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
404 ) -> anyhow::Result<ClientPreview> {
405 let preview_prefetch_api_version_set = if let Some(api) = reuse_api {
406 self.reuse_connector = Some(api.connector().clone());
407
408 Some(JitTry::new_try({
409 let config = config.clone();
410 || async move { Client::fetch_common_api_versions(&config, &api).await }
411 }))
412 } else {
413 None
414 };
415 Ok(ClientPreview {
416 inner: self,
417 config,
418 api_secret,
419 prefetch_api_announcements,
420 preview_prefetch_api_version_set,
421 })
422 }
423
424 pub async fn open(
425 self,
426 db_no_decoders: Database,
427 pre_root_secret: RootSecret,
428 ) -> anyhow::Result<ClientHandle> {
429 Client::run_core_migrations(&db_no_decoders).await?;
430
431 Self::migrate_pending_config_if_present(&db_no_decoders).await;
433
434 let Some(config) = Client::get_config_from_db(&db_no_decoders).await else {
435 bail!("Client database not initialized")
436 };
437
438 let pre_root_secret = pre_root_secret.to_inner(config.calculate_federation_id());
439
440 match db_no_decoders
441 .begin_transaction_nc()
442 .await
443 .get_value(&ClientPreRootSecretHashKey)
444 .await
445 {
446 Some(secret_hash) => {
447 ensure!(
448 pre_root_secret.derive_pre_root_secret_hash() == secret_hash,
449 "Secret hash does not match. Incorrect secret"
450 );
451 }
452 _ => {
453 debug!(target: LOG_CLIENT, "Backfilling secret hash");
454 let mut dbtx = db_no_decoders.begin_transaction().await;
456 dbtx.insert_entry(
457 &ClientPreRootSecretHashKey,
458 &pre_root_secret.derive_pre_root_secret_hash(),
459 )
460 .await;
461 dbtx.commit_tx().await;
462 }
463 }
464
465 let api_secret = Client::get_api_secret_from_db(&db_no_decoders).await;
466 let stopped = self.stopped;
467 let request_hook = self.request_hook.clone();
468
469 let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
470 let client = self
471 .build_stopped(
472 db_no_decoders,
473 pre_root_secret,
474 &config,
475 api_secret,
476 log_event_added_transient_tx,
477 request_hook,
478 None,
479 None,
480 )
481 .await?;
482 if !stopped {
483 client.as_inner().start_executor();
484 }
485 Ok(client)
486 }
487
488 #[allow(clippy::too_many_arguments)]
490 pub(crate) async fn build(
491 self,
492 db_no_decoders: Database,
493 pre_root_secret: DerivableSecret,
494 config: ClientConfig,
495 api_secret: Option<String>,
496 stopped: bool,
497 preview_prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
498 preview_prefetch_api_version_set: Option<
499 JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>,
500 >,
501 ) -> anyhow::Result<ClientHandle> {
502 let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
503 let request_hook = self.request_hook.clone();
504 let client = self
505 .build_stopped(
506 db_no_decoders,
507 pre_root_secret,
508 &config,
509 api_secret,
510 log_event_added_transient_tx,
511 request_hook,
512 preview_prefetch_api_announcements,
513 preview_prefetch_api_version_set,
514 )
515 .await?;
516 if !stopped {
517 client.as_inner().start_executor();
518 }
519
520 Ok(client)
521 }
522
523 #[allow(clippy::too_many_arguments)]
526 async fn build_stopped(
527 self,
528 db_no_decoders: Database,
529 pre_root_secret: DerivableSecret,
530 config: &ClientConfig,
531 api_secret: Option<String>,
532 log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
533 request_hook: ApiRequestHook,
534 preview_prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
535 preview_prefetch_api_version_set: Option<
536 JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>,
537 >,
538 ) -> anyhow::Result<ClientHandle> {
539 debug!(
540 target: LOG_CLIENT,
541 version = %fedimint_build_code_version_env!(),
542 "Building fedimint client",
543 );
544 let (log_event_added_tx, log_event_added_rx) = watch::channel(());
545 let (log_ordering_wakeup_tx, log_ordering_wakeup_rx) = watch::channel(());
546
547 let decoders = self.decoders(config);
548 let config = Self::config_decoded(config, &decoders)?;
549 let fed_id = config.calculate_federation_id();
550 let db = db_no_decoders.with_decoders(decoders.clone());
551 let connector = self.connector;
552 let peer_urls = get_api_urls(&db, &config).await;
553 let api = match self.admin_creds.as_ref() {
554 Some(admin_creds) => {
555 let connector = make_admin_connector(
556 admin_creds.peer_id,
557 peer_urls
558 .into_iter()
559 .find_map(|(peer, api_url)| {
560 (admin_creds.peer_id == peer).then_some(api_url)
561 })
562 .context("Admin creds should match a peer")?,
563 &api_secret,
564 self.iroh_enable_dht,
565 self.iroh_enable_next,
566 )
567 .await?;
568 ReconnectFederationApi::new_admin(connector, admin_creds.peer_id)
569 .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
570 .with_request_hook(&request_hook)
571 .with_cache()
572 .into()
573 }
574 None => {
575 let connector = if let Some(connector) = self.reuse_connector.clone()
576 && connector.peers().len() == peer_urls.len()
577 {
578 connector
579 } else {
580 make_connector(
581 peer_urls,
582 &api_secret,
583 self.iroh_enable_dht,
584 self.iroh_enable_next,
585 )
586 .await?
587 };
588 ReconnectFederationApi::new(connector, None)
589 .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
590 .with_request_hook(&request_hook)
591 .with_cache()
592 .into()
593 }
594 };
595
596 let task_group = TaskGroup::new();
597
598 self.migrate_module_dbs(&db).await?;
601
602 let init_state = Self::load_init_state(&db).await;
603
604 let notifier = Notifier::new();
605
606 if let Some(p) = preview_prefetch_api_announcements {
607 let announcements = p.get().await;
611
612 store_api_announcements_updates_from_peers(&db, announcements).await?
613 }
614
615 if let Some(preview_prefetch_api_version_set) = preview_prefetch_api_version_set {
616 match preview_prefetch_api_version_set.get_try().await {
617 Ok(peer_api_versions) => {
618 Client::store_prefetched_api_versions(
619 &db,
620 &config,
621 &self.module_inits,
622 peer_api_versions,
623 )
624 .await;
625 }
626 Err(err) => {
627 debug!(target: LOG_CLIENT, err = %err.fmt_compact(), "Prefetching api version negotiation failed");
628 }
629 }
630 }
631
632 let common_api_versions = Client::load_and_refresh_common_api_version_static(
633 &config,
634 &self.module_inits,
635 &api,
636 &db,
637 &task_group,
638 )
639 .await
640 .inspect_err(|err| {
641 warn!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Failed to discover API version to use.");
642 })
643 .unwrap_or(ApiVersionSet {
644 core: ApiVersion::new(0, 0),
645 modules: BTreeMap::new(),
647 });
648
649 debug!(target: LOG_CLIENT, ?common_api_versions, "Completed api version negotiation");
650
651 Self::load_and_refresh_client_config_static(&config, &api, &db, &task_group);
653
654 let mut module_recoveries: BTreeMap<
655 ModuleInstanceId,
656 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
657 > = BTreeMap::new();
658 let mut module_recovery_progress_receivers: BTreeMap<
659 ModuleInstanceId,
660 watch::Receiver<RecoveryProgress>,
661 > = BTreeMap::new();
662
663 let final_client = FinalClientIface::default();
664
665 let root_secret = Self::federation_root_secret(&pre_root_secret, &config);
666
667 let modules = {
668 let mut modules = ClientModuleRegistry::default();
669 for (module_instance_id, module_config) in config.modules.clone() {
670 let kind = module_config.kind().clone();
671 let Some(module_init) = self.module_inits.get(&kind).cloned() else {
672 debug!(
673 target: LOG_CLIENT,
674 kind=%kind,
675 instance_id=%module_instance_id,
676 "Module kind of instance not found in module gens, skipping");
677 continue;
678 };
679
680 let Some(&api_version) = common_api_versions.modules.get(&module_instance_id)
681 else {
682 warn!(
683 target: LOG_CLIENT,
684 kind=%kind,
685 instance_id=%module_instance_id,
686 "Module kind of instance has incompatible api version, skipping"
687 );
688 continue;
689 };
690
691 let start_module_recover_fn =
694 |snapshot: Option<ClientBackup>, progress: RecoveryProgress| {
695 let module_config = module_config.clone();
696 let num_peers = NumPeers::from(config.global.api_endpoints.len());
697 let db = db.clone();
698 let kind = kind.clone();
699 let notifier = notifier.clone();
700 let api = api.clone();
701 let root_secret = root_secret.clone();
702 let admin_auth = self.admin_creds.as_ref().map(|creds| creds.auth.clone());
703 let final_client = final_client.clone();
704 let (progress_tx, progress_rx) = tokio::sync::watch::channel(progress);
705 let task_group = task_group.clone();
706 let module_init = module_init.clone();
707 (
708 Box::pin(async move {
709 module_init
710 .recover(
711 final_client.clone(),
712 fed_id,
713 num_peers,
714 module_config.clone(),
715 db.clone(),
716 module_instance_id,
717 common_api_versions.core,
718 api_version,
719 root_secret.derive_module_secret(module_instance_id),
720 notifier.clone(),
721 api.clone(),
722 admin_auth,
723 snapshot.as_ref().and_then(|s| s.modules.get(&module_instance_id)),
724 progress_tx,
725 task_group,
726 )
727 .await
728 .inspect_err(|err| {
729 warn!(
730 target: LOG_CLIENT,
731 module_id = module_instance_id, %kind, err = %err.fmt_compact_anyhow(), "Module failed to recover"
732 );
733 })
734 }),
735 progress_rx,
736 )
737 };
738
739 let recovery = match init_state.does_require_recovery() {
740 Some(snapshot) => {
741 match db
742 .begin_transaction_nc()
743 .await
744 .get_value(&ClientModuleRecovery { module_instance_id })
745 .await
746 {
747 Some(module_recovery_state) => {
748 if module_recovery_state.is_done() {
749 debug!(
750 id = %module_instance_id,
751 %kind, "Module recovery already complete"
752 );
753 None
754 } else {
755 debug!(
756 id = %module_instance_id,
757 %kind,
758 progress = %module_recovery_state.progress,
759 "Starting module recovery with an existing progress"
760 );
761 Some(start_module_recover_fn(
762 snapshot,
763 module_recovery_state.progress,
764 ))
765 }
766 }
767 _ => {
768 let progress = RecoveryProgress::none();
769 let mut dbtx = db.begin_transaction().await;
770 dbtx.log_event(
771 log_ordering_wakeup_tx.clone(),
772 None,
773 ModuleRecoveryStarted::new(module_instance_id),
774 )
775 .await;
776 dbtx.insert_entry(
777 &ClientModuleRecovery { module_instance_id },
778 &ClientModuleRecoveryState { progress },
779 )
780 .await;
781
782 dbtx.commit_tx().await;
783
784 debug!(
785 id = %module_instance_id,
786 %kind, "Starting new module recovery"
787 );
788 Some(start_module_recover_fn(snapshot, progress))
789 }
790 }
791 }
792 _ => None,
793 };
794
795 match recovery {
796 Some((recovery, recovery_progress_rx)) => {
797 module_recoveries.insert(module_instance_id, recovery);
798 module_recovery_progress_receivers
799 .insert(module_instance_id, recovery_progress_rx);
800 }
801 _ => {
802 let module = module_init
803 .init(
804 final_client.clone(),
805 fed_id,
806 config.global.api_endpoints.len(),
807 module_config,
808 db.clone(),
809 module_instance_id,
810 common_api_versions.core,
811 api_version,
812 root_secret.derive_module_secret(module_instance_id),
819 notifier.clone(),
820 api.clone(),
821 self.admin_creds.as_ref().map(|cred| cred.auth.clone()),
822 task_group.clone(),
823 )
824 .await?;
825
826 modules.register_module(module_instance_id, kind, module);
827 }
828 }
829 }
830 modules
831 };
832
833 if init_state.is_pending() && module_recoveries.is_empty() {
834 let mut dbtx = db.begin_transaction().await;
835 dbtx.insert_entry(&ClientInitStateKey, &init_state.into_complete())
836 .await;
837 dbtx.commit_tx().await;
838 }
839
840 let mut primary_modules: BTreeMap<PrimaryModulePriority, PrimaryModuleCandidates> =
841 BTreeMap::new();
842
843 for (module_id, _kind, module) in modules.iter_modules() {
844 match module.supports_being_primary() {
845 PrimaryModuleSupport::Any { priority } => {
846 primary_modules
847 .entry(priority)
848 .or_default()
849 .wildcard
850 .push(module_id);
851 }
852 PrimaryModuleSupport::Selected { priority, units } => {
853 for unit in units {
854 primary_modules
855 .entry(priority)
856 .or_default()
857 .specific
858 .entry(unit)
859 .or_default()
860 .push(module_id);
861 }
862 }
863 PrimaryModuleSupport::None => {}
864 }
865 }
866
867 let executor = {
868 let mut executor_builder = Executor::builder();
869 executor_builder
870 .with_module(TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext);
871
872 for (module_instance_id, _, module) in modules.iter_modules() {
873 executor_builder.with_module_dyn(module.context(module_instance_id));
874 }
875
876 for module_instance_id in module_recoveries.keys() {
877 executor_builder.with_valid_module_id(*module_instance_id);
878 }
879
880 executor_builder.build(
881 db.clone(),
882 notifier,
883 task_group.clone(),
884 log_ordering_wakeup_tx.clone(),
885 )
886 };
887
888 let recovery_receiver_init_val = module_recovery_progress_receivers
889 .iter()
890 .map(|(module_instance_id, rx)| (*module_instance_id, *rx.borrow()))
891 .collect::<BTreeMap<_, _>>();
892 let (client_recovery_progress_sender, client_recovery_progress_receiver) =
893 watch::channel(recovery_receiver_init_val);
894
895 let client_inner = Arc::new(Client {
896 final_client: final_client.clone(),
897 config: tokio::sync::RwLock::new(config.clone()),
898 api_secret,
899 decoders,
900 db: db.clone(),
901 federation_id: fed_id,
902 federation_config_meta: config.global.meta,
903 primary_modules,
904 modules,
905 module_inits: self.module_inits.clone(),
906 log_ordering_wakeup_tx,
907 log_event_added_rx,
908 log_event_added_transient_tx: log_event_added_transient_tx.clone(),
909 request_hook,
910 executor,
911 api,
912 secp_ctx: Secp256k1::new(),
913 root_secret,
914 task_group,
915 operation_log: OperationLog::new(db.clone()),
916 client_recovery_progress_receiver,
917 meta_service: self.meta_service,
918 connector,
919 iroh_enable_dht: self.iroh_enable_dht,
920 iroh_enable_next: self.iroh_enable_next,
921 });
922 client_inner
923 .task_group
924 .spawn_cancellable("MetaService::update_continuously", {
925 let client_inner = client_inner.clone();
926 async move {
927 client_inner
928 .meta_service
929 .update_continuously(&client_inner)
930 .await;
931 }
932 });
933
934 client_inner.task_group.spawn_cancellable(
935 "update-api-announcements",
936 run_api_announcement_refresh_task(client_inner.clone()),
937 );
938
939 client_inner.task_group.spawn_cancellable(
940 "event log ordering task",
941 run_event_log_ordering_task(
942 db.clone(),
943 log_ordering_wakeup_rx,
944 log_event_added_tx,
945 log_event_added_transient_tx,
946 ),
947 );
948 let client_iface = std::sync::Arc::<Client>::downgrade(&client_inner);
949
950 let client_arc = ClientHandle::new(client_inner);
951
952 for (_, _, module) in client_arc.modules.iter_modules() {
953 module.start().await;
954 }
955
956 final_client.set(client_iface.clone());
957
958 if !module_recoveries.is_empty() {
959 client_arc.spawn_module_recoveries_task(
960 client_recovery_progress_sender,
961 module_recoveries,
962 module_recovery_progress_receivers,
963 );
964 }
965
966 Ok(client_arc)
967 }
968
969 async fn load_init_state(db: &Database) -> InitState {
970 let mut dbtx = db.begin_transaction_nc().await;
971 dbtx.get_value(&ClientInitStateKey)
972 .await
973 .unwrap_or_else(|| {
974 warn!(
977 target: LOG_CLIENT,
978 "Client missing ClientRequiresRecovery: assuming complete"
979 );
980 db::InitState::Complete(db::InitModeComplete::Fresh)
981 })
982 }
983
984 fn decoders(&self, config: &ClientConfig) -> ModuleDecoderRegistry {
985 let mut decoders = client_decoders(
986 &self.module_inits,
987 config
988 .modules
989 .iter()
990 .map(|(module_instance, module_config)| (*module_instance, module_config.kind())),
991 );
992
993 decoders.register_module(
994 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
995 ModuleKind::from_static_str("tx_submission"),
996 tx_submission_sm_decoder(),
997 );
998
999 decoders
1000 }
1001
1002 fn config_decoded(
1003 config: &ClientConfig,
1004 decoders: &ModuleDecoderRegistry,
1005 ) -> Result<ClientConfig, fedimint_core::encoding::DecodeError> {
1006 config.clone().redecode_raw(decoders)
1007 }
1008
1009 fn federation_root_secret(
1013 pre_root_secret: &DerivableSecret,
1014 config: &ClientConfig,
1015 ) -> DerivableSecret {
1016 pre_root_secret.federation_key(&config.global.calculate_federation_id())
1017 }
1018
1019 pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
1021 self.log_event_added_transient_tx.subscribe()
1022 }
1023
1024 async fn migrate_pending_config_if_present(db: &Database) {
1028 if let Some(pending_config) = Client::get_pending_config_from_db(db).await {
1029 debug!(target: LOG_CLIENT, "Found pending client config, migrating to current config");
1030
1031 let mut dbtx = db.begin_transaction().await;
1032 dbtx.insert_entry(&crate::db::ClientConfigKey, &pending_config)
1034 .await;
1035 dbtx.remove_entry(&PendingClientConfigKey).await;
1037 dbtx.commit_tx().await;
1038
1039 debug!(target: LOG_CLIENT, "Successfully migrated pending config to current config");
1040 }
1041 }
1042
1043 fn load_and_refresh_client_config_static(
1046 config: &ClientConfig,
1047 api: &DynGlobalApi,
1048 db: &Database,
1049 task_group: &TaskGroup,
1050 ) {
1051 let config = config.clone();
1052 let api = api.clone();
1053 let db = db.clone();
1054 let task_group = task_group.clone();
1055
1056 task_group.spawn_cancellable("refresh_client_config_static", async move {
1058 Self::refresh_client_config_static(&config, &api, &db).await;
1059 });
1060 }
1061
1062 async fn refresh_client_config_static(
1064 config: &ClientConfig,
1065 api: &DynGlobalApi,
1066 db: &Database,
1067 ) {
1068 if let Err(error) = Self::refresh_client_config_static_try(config, api, db).await {
1069 warn!(
1070 target: LOG_CLIENT,
1071 err = %error.fmt_compact_anyhow(), "Failed to refresh client config"
1072 );
1073 }
1074 }
1075
1076 fn validate_config_update(
1078 current_config: &ClientConfig,
1079 new_config: &ClientConfig,
1080 ) -> anyhow::Result<()> {
1081 if current_config.global != new_config.global {
1083 bail!("Global configuration changes are not allowed in config updates");
1084 }
1085
1086 for (module_id, current_module_config) in ¤t_config.modules {
1088 match new_config.modules.get(module_id) {
1089 Some(new_module_config) => {
1090 if current_module_config != new_module_config {
1091 bail!(
1092 "Module {} configuration changes are not allowed, only additions are permitted",
1093 module_id
1094 );
1095 }
1096 }
1097 None => {
1098 bail!(
1099 "Module {} was removed in new config, only additions are allowed",
1100 module_id
1101 );
1102 }
1103 }
1104 }
1105
1106 Ok(())
1107 }
1108
1109 async fn refresh_client_config_static_try(
1111 current_config: &ClientConfig,
1112 api: &DynGlobalApi,
1113 db: &Database,
1114 ) -> anyhow::Result<()> {
1115 debug!(target: LOG_CLIENT, "Refreshing client config");
1116
1117 let fetched_config = api
1119 .request_current_consensus::<ClientConfig>(
1120 CLIENT_CONFIG_ENDPOINT.to_owned(),
1121 ApiRequestErased::default(),
1122 )
1123 .await?;
1124
1125 Self::validate_config_update(current_config, &fetched_config)?;
1127
1128 if current_config != &fetched_config {
1130 debug!(target: LOG_CLIENT, "Detected federation config change, saving as pending config");
1131
1132 let mut dbtx = db.begin_transaction().await;
1133 dbtx.insert_entry(&PendingClientConfigKey, &fetched_config)
1134 .await;
1135 dbtx.commit_tx().await;
1136 } else {
1137 debug!(target: LOG_CLIENT, "No federation config changes detected");
1138 }
1139
1140 Ok(())
1141 }
1142}
1143
1144pub struct ClientPreview {
1145 inner: ClientBuilder,
1146 config: ClientConfig,
1147 api_secret: Option<String>,
1148 prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
1149 preview_prefetch_api_version_set:
1150 Option<JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>>,
1151}
1152
1153impl ClientPreview {
1154 pub fn config(&self) -> &ClientConfig {
1156 &self.config
1157 }
1158
1159 pub async fn join(
1236 self,
1237 db_no_decoders: Database,
1238 pre_root_secret: RootSecret,
1239 ) -> anyhow::Result<ClientHandle> {
1240 let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1241
1242 let client = self
1243 .inner
1244 .init(
1245 db_no_decoders,
1246 pre_root_secret,
1247 self.config,
1248 self.api_secret,
1249 InitMode::Fresh,
1250 self.prefetch_api_announcements,
1251 self.preview_prefetch_api_version_set,
1252 )
1253 .await?;
1254
1255 Ok(client)
1256 }
1257
1258 pub async fn recover(
1270 self,
1271 db_no_decoders: Database,
1272 pre_root_secret: RootSecret,
1273 backup: Option<ClientBackup>,
1274 ) -> anyhow::Result<ClientHandle> {
1275 let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1276
1277 let client = self
1278 .inner
1279 .init(
1280 db_no_decoders,
1281 pre_root_secret,
1282 self.config,
1283 self.api_secret,
1284 InitMode::Recover {
1285 snapshot: backup.clone(),
1286 },
1287 self.prefetch_api_announcements,
1288 self.preview_prefetch_api_version_set,
1289 )
1290 .await?;
1291
1292 Ok(client)
1293 }
1294
1295 pub async fn download_backup_from_federation(
1297 &self,
1298 pre_root_secret: RootSecret,
1299 ) -> anyhow::Result<Option<ClientBackup>> {
1300 let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1301 let api = DynGlobalApi::from_endpoints(
1302 self.config
1304 .global
1305 .api_endpoints
1306 .iter()
1307 .map(|(peer_id, peer_url)| (*peer_id, peer_url.url.clone())),
1308 &self.api_secret,
1309 self.inner.iroh_enable_dht,
1310 self.inner.iroh_enable_next,
1311 )
1312 .await?;
1313
1314 Client::download_backup_from_federation_static(
1315 &api,
1316 &ClientBuilder::federation_root_secret(&pre_root_secret, &self.config),
1317 &self.inner.decoders(&self.config),
1318 )
1319 .await
1320 }
1321}