1use std::collections::{BTreeMap, HashSet};
2use std::fmt::{self, Formatter};
3use std::future::{Future, pending};
4use std::ops::Range;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9use anyhow::{Context as _, anyhow, bail, format_err};
10use async_stream::try_stream;
11use bitcoin::key::Secp256k1;
12use bitcoin::key::rand::thread_rng;
13use bitcoin::secp256k1::{self, PublicKey};
14use fedimint_api_client::api::global_api::with_request_hook::ApiRequestHook;
15use fedimint_api_client::api::{
16 ApiVersionSet, DynGlobalApi, FederationApiExt as _, FederationResult, IGlobalFederationApi,
17};
18use fedimint_bitcoind::DynBitcoindRpc;
19use fedimint_client_module::module::recovery::RecoveryProgress;
20use fedimint_client_module::module::{
21 ClientContextIface, ClientModule, ClientModuleRegistry, DynClientModule, FinalClientIface,
22 IClientModule, IdxRange, OutPointRange, PrimaryModulePriority,
23};
24use fedimint_client_module::oplog::IOperationLog;
25use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy as _};
26use fedimint_client_module::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
27use fedimint_client_module::sm::{ActiveStateMeta, DynState, InactiveStateMeta};
28use fedimint_client_module::transaction::{
29 TRANSACTION_SUBMISSION_MODULE_INSTANCE, TransactionBuilder, TxSubmissionStates,
30 TxSubmissionStatesSM,
31};
32use fedimint_client_module::{
33 AddStateMachinesResult, ClientModuleInstance, GetInviteCodeRequest, ModuleGlobalContextGen,
34 ModuleRecoveryCompleted, TransactionUpdates, TxCreatedEvent,
35};
36use fedimint_connectors::ConnectorRegistry;
37use fedimint_core::config::{
38 ClientConfig, FederationId, GlobalClientConfig, JsonClientConfig, ModuleInitRegistry,
39};
40use fedimint_core::core::{DynInput, DynOutput, ModuleInstanceId, ModuleKind, OperationId};
41use fedimint_core::db::{
42 AutocommitError, Database, DatabaseRecord, DatabaseTransaction,
43 IDatabaseTransactionOpsCore as _, IDatabaseTransactionOpsCoreTyped as _, NonCommittable,
44};
45use fedimint_core::encoding::{Decodable, Encodable};
46use fedimint_core::endpoint_constants::{CLIENT_CONFIG_ENDPOINT, VERSION_ENDPOINT};
47use fedimint_core::envs::is_running_in_test_env;
48use fedimint_core::invite_code::InviteCode;
49use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
50use fedimint_core::module::{
51 AmountUnit, Amounts, ApiRequestErased, ApiVersion, MultiApiVersion,
52 SupportedApiVersionsSummary, SupportedCoreApiVersions, SupportedModuleApiVersions,
53};
54use fedimint_core::net::api_announcement::SignedApiAnnouncement;
55use fedimint_core::runtime::sleep;
56use fedimint_core::task::{Elapsed, MaybeSend, MaybeSync, TaskGroup};
57use fedimint_core::transaction::Transaction;
58use fedimint_core::util::backoff_util::custom_backoff;
59use fedimint_core::util::{
60 BoxStream, FmtCompact as _, FmtCompactAnyhow as _, SafeUrl, backoff_util, retry,
61};
62use fedimint_core::{
63 Amount, ChainId, NumPeers, OutPoint, PeerId, apply, async_trait_maybe_send, maybe_add_send,
64 maybe_add_send_sync, runtime,
65};
66use fedimint_derive_secret::DerivableSecret;
67use fedimint_eventlog::{
68 DBTransactionEventLogExt as _, DynEventLogTrimableTracker, Event, EventKind, EventLogEntry,
69 EventLogId, EventLogTrimableId, EventLogTrimableTracker, EventPersistence, PersistedLogEntry,
70};
71use fedimint_logging::{LOG_CLIENT, LOG_CLIENT_NET_API, LOG_CLIENT_RECOVERY};
72use futures::stream::FuturesUnordered;
73use futures::{Stream, StreamExt as _};
74use global_ctx::ModuleGlobalClientContext;
75use serde::{Deserialize, Serialize};
76use tokio::sync::{broadcast, watch};
77use tokio_stream::wrappers::WatchStream;
78use tracing::{debug, info, warn};
79
80use crate::ClientBuilder;
81use crate::api_announcements::{ApiAnnouncementPrefix, get_api_urls};
82use crate::backup::Metadata;
83use crate::client::event_log::DefaultApplicationEventLogKey;
84use crate::db::{
85 ApiSecretKey, CachedApiVersionSet, CachedApiVersionSetKey, ChainIdKey,
86 ChronologicalOperationLogKey, ClientConfigKey, ClientMetadataKey, ClientModuleRecovery,
87 ClientModuleRecoveryState, EncodedClientSecretKey, OperationLogKey, PeerLastApiVersionsSummary,
88 PeerLastApiVersionsSummaryKey, PendingClientConfigKey, apply_migrations_core_client_dbtx,
89 get_decoded_client_secret, verify_client_db_integrity_dbtx,
90};
91use crate::meta::MetaService;
92use crate::module_init::{ClientModuleInitRegistry, DynClientModuleInit, IClientModuleInit};
93use crate::oplog::OperationLog;
94use crate::sm::executor::{
95 ActiveModuleOperationStateKeyPrefix, ActiveOperationStateKeyPrefix, Executor,
96 InactiveModuleOperationStateKeyPrefix, InactiveOperationStateKeyPrefix,
97};
98
99pub(crate) mod builder;
100pub(crate) mod event_log;
101pub(crate) mod global_ctx;
102pub(crate) mod handle;
103
104const SUPPORTED_CORE_API_VERSIONS: &[fedimint_core::module::ApiVersion] =
108 &[ApiVersion { major: 0, minor: 0 }];
109
110#[derive(Default)]
112pub(crate) struct PrimaryModuleCandidates {
113 specific: BTreeMap<AmountUnit, Vec<ModuleInstanceId>>,
115 wildcard: Vec<ModuleInstanceId>,
117}
118
119pub struct Client {
133 final_client: FinalClientIface,
134 config: tokio::sync::RwLock<ClientConfig>,
135 api_secret: Option<String>,
136 decoders: ModuleDecoderRegistry,
137 connectors: ConnectorRegistry,
138 db: Database,
139 federation_id: FederationId,
140 federation_config_meta: BTreeMap<String, String>,
141 primary_modules: BTreeMap<PrimaryModulePriority, PrimaryModuleCandidates>,
142 pub(crate) modules: ClientModuleRegistry,
143 module_inits: ClientModuleInitRegistry,
144 executor: Executor,
145 pub(crate) api: DynGlobalApi,
146 root_secret: DerivableSecret,
147 operation_log: OperationLog,
148 secp_ctx: Secp256k1<secp256k1::All>,
149 meta_service: Arc<MetaService>,
150
151 task_group: TaskGroup,
152
153 client_recovery_progress_receiver:
155 watch::Receiver<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
156
157 log_ordering_wakeup_tx: watch::Sender<()>,
160 log_event_added_rx: watch::Receiver<()>,
162 log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
163 request_hook: ApiRequestHook,
164 iroh_enable_dht: bool,
165 iroh_enable_next: bool,
166 #[allow(dead_code)]
171 user_bitcoind_rpc: Option<DynBitcoindRpc>,
172 pub(crate) user_bitcoind_rpc_no_chain_id:
177 Option<fedimint_client_module::module::init::BitcoindRpcNoChainIdFactory>,
178}
179
180#[derive(Debug, Serialize, Deserialize)]
181struct ListOperationsParams {
182 limit: Option<usize>,
183 last_seen: Option<ChronologicalOperationLogKey>,
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize)]
187pub struct GetOperationIdRequest {
188 operation_id: OperationId,
189}
190
191#[derive(Debug, Clone, Serialize, Deserialize)]
192pub struct GetBalanceChangesRequest {
193 #[serde(default = "AmountUnit::bitcoin")]
194 unit: AmountUnit,
195}
196
197impl Client {
198 pub async fn builder() -> anyhow::Result<ClientBuilder> {
201 Ok(ClientBuilder::new())
202 }
203
204 pub fn api(&self) -> &(dyn IGlobalFederationApi + 'static) {
205 self.api.as_ref()
206 }
207
208 pub fn api_clone(&self) -> DynGlobalApi {
209 self.api.clone()
210 }
211
212 pub fn connection_status_stream(&self) -> impl Stream<Item = BTreeMap<PeerId, bool>> {
215 self.api.connection_status_stream()
216 }
217
218 pub fn federation_reconnect(&self) {
226 let peers: Vec<PeerId> = self.api.all_peers().iter().copied().collect();
227
228 for peer_id in peers {
229 let api = self.api.clone();
230 self.task_group.spawn_cancellable(
231 format!("federation-reconnect-once-{peer_id}"),
232 async move {
233 if let Err(e) = api.get_peer_connection(peer_id).await {
234 debug!(
235 target: LOG_CLIENT_NET_API,
236 %peer_id,
237 err = %e.fmt_compact(),
238 "Failed to connect to peer"
239 );
240 }
241 },
242 );
243 }
244 }
245
246 pub fn spawn_federation_reconnect(&self) {
268 let peers: Vec<PeerId> = self.api.all_peers().iter().copied().collect();
269
270 for peer_id in peers {
271 let api = self.api.clone();
272 self.task_group.spawn_cancellable(
273 format!("federation-reconnect-{peer_id}"),
274 async move {
275 loop {
276 match api.get_peer_connection(peer_id).await {
277 Ok(conn) => {
278 conn.await_disconnection().await;
279 }
280 Err(e) => {
281 debug!(
284 target: LOG_CLIENT_NET_API,
285 %peer_id,
286 err = %e.fmt_compact(),
287 "Failed to connect to peer, will retry"
288 );
289 }
290 }
291 }
292 },
293 );
294 }
295 }
296
297 pub fn task_group(&self) -> &TaskGroup {
299 &self.task_group
300 }
301
302 pub fn get_metrics() -> anyhow::Result<String> {
307 fedimint_metrics::get_metrics()
308 }
309
310 #[doc(hidden)]
312 pub fn executor(&self) -> &Executor {
313 &self.executor
314 }
315
316 pub async fn get_config_from_db(db: &Database) -> Option<ClientConfig> {
317 let mut dbtx = db.begin_transaction_nc().await;
318 dbtx.get_value(&ClientConfigKey).await
319 }
320
321 pub async fn get_pending_config_from_db(db: &Database) -> Option<ClientConfig> {
322 let mut dbtx = db.begin_transaction_nc().await;
323 dbtx.get_value(&PendingClientConfigKey).await
324 }
325
326 pub async fn get_api_secret_from_db(db: &Database) -> Option<String> {
327 let mut dbtx = db.begin_transaction_nc().await;
328 dbtx.get_value(&ApiSecretKey).await
329 }
330
331 pub async fn store_encodable_client_secret<T: Encodable>(
332 db: &Database,
333 secret: T,
334 ) -> anyhow::Result<()> {
335 let mut dbtx = db.begin_transaction().await;
336
337 if dbtx.get_value(&EncodedClientSecretKey).await.is_some() {
339 bail!("Encoded client secret already exists, cannot overwrite")
340 }
341
342 let encoded_secret = T::consensus_encode_to_vec(&secret);
343 dbtx.insert_entry(&EncodedClientSecretKey, &encoded_secret)
344 .await;
345 dbtx.commit_tx().await;
346 Ok(())
347 }
348
349 pub async fn load_decodable_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
350 let Some(secret) = Self::load_decodable_client_secret_opt(db).await? else {
351 bail!("Encoded client secret not present in DB")
352 };
353
354 Ok(secret)
355 }
356 pub async fn load_decodable_client_secret_opt<T: Decodable>(
357 db: &Database,
358 ) -> anyhow::Result<Option<T>> {
359 let mut dbtx = db.begin_transaction_nc().await;
360
361 let client_secret = dbtx.get_value(&EncodedClientSecretKey).await;
362
363 Ok(match client_secret {
364 Some(client_secret) => Some(
365 T::consensus_decode_whole(&client_secret, &ModuleRegistry::default())
366 .map_err(|e| anyhow!("Decoding failed: {e}"))?,
367 ),
368 None => None,
369 })
370 }
371
372 pub async fn load_or_generate_client_secret(db: &Database) -> anyhow::Result<[u8; 64]> {
373 let client_secret = match Self::load_decodable_client_secret::<[u8; 64]>(db).await {
374 Ok(secret) => secret,
375 _ => {
376 let secret = PlainRootSecretStrategy::random(&mut thread_rng());
377 Self::store_encodable_client_secret(db, secret)
378 .await
379 .expect("Storing client secret must work");
380 secret
381 }
382 };
383 Ok(client_secret)
384 }
385
386 pub async fn is_initialized(db: &Database) -> bool {
387 let mut dbtx = db.begin_transaction_nc().await;
388 dbtx.raw_get_bytes(&[ClientConfigKey::DB_PREFIX])
389 .await
390 .expect("Unrecoverable error occurred while reading and entry from the database")
391 .is_some()
392 }
393
394 pub fn start_executor(self: &Arc<Self>) {
395 debug!(
396 target: LOG_CLIENT,
397 "Starting fedimint client executor",
398 );
399 self.executor.start_executor(self.context_gen());
400 }
401
402 pub fn federation_id(&self) -> FederationId {
403 self.federation_id
404 }
405
406 fn context_gen(self: &Arc<Self>) -> ModuleGlobalContextGen {
407 let client_inner = Arc::downgrade(self);
408 Arc::new(move |module_instance, operation| {
409 ModuleGlobalClientContext {
410 client: client_inner
411 .clone()
412 .upgrade()
413 .expect("ModuleGlobalContextGen called after client was dropped"),
414 module_instance_id: module_instance,
415 operation,
416 }
417 .into()
418 })
419 }
420
421 pub async fn config(&self) -> ClientConfig {
422 self.config.read().await.clone()
423 }
424
425 pub fn api_secret(&self) -> &Option<String> {
427 &self.api_secret
428 }
429
430 pub async fn core_api_version(&self) -> ApiVersion {
436 self.db
439 .begin_transaction_nc()
440 .await
441 .get_value(&CachedApiVersionSetKey)
442 .await
443 .map(|cached: CachedApiVersionSet| cached.0.core)
444 .unwrap_or(ApiVersion { major: 0, minor: 0 })
445 }
446
447 pub async fn chain_id(&self) -> anyhow::Result<ChainId> {
454 if let Some(chain_id) = self
456 .db
457 .begin_transaction_nc()
458 .await
459 .get_value(&ChainIdKey)
460 .await
461 {
462 return Ok(chain_id);
463 }
464
465 let chain_id = self.api.chain_id().await?;
467
468 let mut dbtx = self.db.begin_transaction().await;
470 dbtx.insert_entry(&ChainIdKey, &chain_id).await;
471 dbtx.commit_tx().await;
472
473 Ok(chain_id)
474 }
475
476 pub fn decoders(&self) -> &ModuleDecoderRegistry {
477 &self.decoders
478 }
479
480 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
482 self.try_get_module(instance)
483 .expect("Module instance not found")
484 }
485
486 fn try_get_module(
487 &self,
488 instance: ModuleInstanceId,
489 ) -> Option<&maybe_add_send_sync!(dyn IClientModule)> {
490 Some(self.modules.get(instance)?.as_ref())
491 }
492
493 pub fn has_module(&self, instance: ModuleInstanceId) -> bool {
494 self.modules.get(instance).is_some()
495 }
496
497 fn transaction_builder_get_balance(&self, builder: &TransactionBuilder) -> (Amounts, Amounts) {
503 let mut in_amounts = Amounts::ZERO;
505 let mut out_amounts = Amounts::ZERO;
506 let mut fee_amounts = Amounts::ZERO;
507
508 for input in builder.inputs() {
509 let module = self.get_module(input.input.module_instance_id());
510
511 let item_fees = module.input_fee(&input.amounts, &input.input).expect(
512 "We only build transactions with input versions that are supported by the module",
513 );
514
515 in_amounts.checked_add_mut(&input.amounts);
516 fee_amounts.checked_add_mut(&item_fees);
517 }
518
519 for output in builder.outputs() {
520 let module = self.get_module(output.output.module_instance_id());
521
522 let item_fees = module.output_fee(&output.amounts, &output.output).expect(
523 "We only build transactions with output versions that are supported by the module",
524 );
525
526 out_amounts.checked_add_mut(&output.amounts);
527 fee_amounts.checked_add_mut(&item_fees);
528 }
529
530 out_amounts.checked_add_mut(&fee_amounts);
531 (in_amounts, out_amounts)
532 }
533
534 pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
535 Ok((self.federation_id().to_fake_ln_pub_key(&self.secp_ctx)?, 0))
536 }
537
538 pub fn get_config_meta(&self, key: &str) -> Option<String> {
540 self.federation_config_meta.get(key).cloned()
541 }
542
543 pub(crate) fn root_secret(&self) -> DerivableSecret {
544 self.root_secret.clone()
545 }
546
547 pub async fn add_state_machines(
548 &self,
549 dbtx: &mut DatabaseTransaction<'_>,
550 states: Vec<DynState>,
551 ) -> AddStateMachinesResult {
552 self.executor.add_state_machines_dbtx(dbtx, states).await
553 }
554
555 pub async fn get_active_operations(&self) -> HashSet<OperationId> {
557 let active_states = self.executor.get_active_states().await;
558 let mut active_operations = HashSet::with_capacity(active_states.len());
559 let mut dbtx = self.db().begin_transaction_nc().await;
560 for (state, _) in active_states {
561 let operation_id = state.operation_id();
562 if dbtx
563 .get_value(&OperationLogKey { operation_id })
564 .await
565 .is_some()
566 {
567 active_operations.insert(operation_id);
568 }
569 }
570 active_operations
571 }
572
573 pub fn operation_log(&self) -> &OperationLog {
574 &self.operation_log
575 }
576
577 pub fn meta_service(&self) -> &Arc<MetaService> {
579 &self.meta_service
580 }
581
582 pub async fn get_meta_expiration_timestamp(&self) -> Option<SystemTime> {
584 let meta_service = self.meta_service();
585 let ts = meta_service
586 .get_field::<u64>(self.db(), "federation_expiry_timestamp")
587 .await
588 .and_then(|v| v.value)?;
589 Some(UNIX_EPOCH + Duration::from_secs(ts))
590 }
591
592 async fn finalize_transaction(
594 &self,
595 dbtx: &mut DatabaseTransaction<'_>,
596 operation_id: OperationId,
597 mut partial_transaction: TransactionBuilder,
598 ) -> anyhow::Result<(Transaction, Vec<DynState>, Range<u64>)> {
599 let (in_amounts, out_amounts) = self.transaction_builder_get_balance(&partial_transaction);
600
601 let mut added_inputs_bundles = vec![];
602 let mut added_outputs_bundles = vec![];
603
604 for unit in in_amounts.units().union(&out_amounts.units()) {
615 let input_amount = in_amounts.get(unit).copied().unwrap_or_default();
616 let output_amount = out_amounts.get(unit).copied().unwrap_or_default();
617 if input_amount == output_amount {
618 continue;
619 }
620
621 let Some((module_id, module)) = self.primary_module_for_unit(*unit) else {
622 bail!("No module to balance a partial transaction (affected unit: {unit:?}");
623 };
624
625 let (added_input_bundle, added_output_bundle) = module
626 .create_final_inputs_and_outputs(
627 module_id,
628 dbtx,
629 operation_id,
630 *unit,
631 input_amount,
632 output_amount,
633 )
634 .await?;
635
636 added_inputs_bundles.push(added_input_bundle);
637 added_outputs_bundles.push(added_output_bundle);
638 }
639
640 let change_range = Range {
644 start: partial_transaction.outputs().count() as u64,
645 end: (partial_transaction.outputs().count() as u64
646 + added_outputs_bundles
647 .iter()
648 .map(|output| output.outputs().len() as u64)
649 .sum::<u64>()),
650 };
651
652 for added_inputs in added_inputs_bundles {
653 partial_transaction = partial_transaction.with_inputs(added_inputs);
654 }
655
656 for added_outputs in added_outputs_bundles {
657 partial_transaction = partial_transaction.with_outputs(added_outputs);
658 }
659
660 let (input_amounts, output_amounts) =
661 self.transaction_builder_get_balance(&partial_transaction);
662
663 for (unit, output_amount) in output_amounts {
664 let input_amount = input_amounts.get(&unit).copied().unwrap_or_default();
665
666 assert!(input_amount >= output_amount, "Transaction is underfunded");
667 }
668
669 let (tx, states) = partial_transaction.build(&self.secp_ctx, thread_rng());
670
671 Ok((tx, states, change_range))
672 }
673
674 pub async fn finalize_and_submit_transaction<F, M>(
686 &self,
687 operation_id: OperationId,
688 operation_type: &str,
689 operation_meta_gen: F,
690 tx_builder: TransactionBuilder,
691 ) -> anyhow::Result<OutPointRange>
692 where
693 F: Fn(OutPointRange) -> M + Clone + MaybeSend + MaybeSync,
694 M: serde::Serialize + MaybeSend,
695 {
696 let operation_type = operation_type.to_owned();
697
698 let autocommit_res = self
699 .db
700 .autocommit(
701 |dbtx, _| {
702 let operation_type = operation_type.clone();
703 let tx_builder = tx_builder.clone();
704 let operation_meta_gen = operation_meta_gen.clone();
705 Box::pin(async move {
706 self.finalize_and_submit_transaction_dbtx(
707 dbtx,
708 operation_id,
709 &operation_type,
710 operation_meta_gen,
711 tx_builder,
712 )
713 .await
714 })
715 },
716 Some(100), )
718 .await;
719
720 match autocommit_res {
721 Ok(txid) => Ok(txid),
722 Err(AutocommitError::ClosureError { error, .. }) => Err(error),
723 Err(AutocommitError::CommitFailed {
724 attempts,
725 last_error,
726 }) => panic!(
727 "Failed to commit tx submission dbtx after {attempts} attempts: {last_error}"
728 ),
729 }
730 }
731
732 pub async fn finalize_and_submit_transaction_dbtx<F, M>(
735 &self,
736 dbtx: &mut DatabaseTransaction<'_>,
737 operation_id: OperationId,
738 operation_type: &str,
739 operation_meta_gen: F,
740 tx_builder: TransactionBuilder,
741 ) -> anyhow::Result<OutPointRange>
742 where
743 F: FnOnce(OutPointRange) -> M + MaybeSend,
744 M: serde::Serialize + MaybeSend,
745 {
746 if Client::operation_exists_dbtx(dbtx, operation_id).await {
747 bail!("There already exists an operation with id {operation_id:?}")
748 }
749
750 let out_point_range = self
751 .finalize_and_submit_transaction_inner(dbtx, operation_id, tx_builder)
752 .await?;
753
754 self.operation_log()
755 .add_operation_log_entry_dbtx(
756 dbtx,
757 operation_id,
758 operation_type,
759 operation_meta_gen(out_point_range),
760 )
761 .await;
762
763 Ok(out_point_range)
764 }
765
766 async fn finalize_and_submit_transaction_inner(
767 &self,
768 dbtx: &mut DatabaseTransaction<'_>,
769 operation_id: OperationId,
770 tx_builder: TransactionBuilder,
771 ) -> anyhow::Result<OutPointRange> {
772 let (transaction, mut states, change_range) = self
773 .finalize_transaction(&mut dbtx.to_ref_nc(), operation_id, tx_builder)
774 .await?;
775
776 if transaction.consensus_encode_to_vec().len() > Transaction::MAX_TX_SIZE {
777 let inputs = transaction
778 .inputs
779 .iter()
780 .map(DynInput::module_instance_id)
781 .collect::<Vec<_>>();
782 let outputs = transaction
783 .outputs
784 .iter()
785 .map(DynOutput::module_instance_id)
786 .collect::<Vec<_>>();
787 warn!(
788 target: LOG_CLIENT_NET_API,
789 size=%transaction.consensus_encode_to_vec().len(),
790 ?inputs,
791 ?outputs,
792 "Transaction too large",
793 );
794 debug!(target: LOG_CLIENT_NET_API, ?transaction, "transaction details");
795 bail!(
796 "The generated transaction would be rejected by the federation for being too large."
797 );
798 }
799
800 let txid = transaction.tx_hash();
801
802 debug!(
803 target: LOG_CLIENT_NET_API,
804 %txid,
805 operation_id = %operation_id.fmt_short(),
806 ?transaction,
807 "Finalized and submitting transaction",
808 );
809
810 let tx_submission_sm = DynState::from_typed(
811 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
812 TxSubmissionStatesSM {
813 operation_id,
814 state: TxSubmissionStates::Created(transaction),
815 },
816 );
817 states.push(tx_submission_sm);
818
819 self.executor.add_state_machines_dbtx(dbtx, states).await?;
820
821 self.log_event_dbtx(dbtx, None, TxCreatedEvent { txid, operation_id })
822 .await;
823
824 Ok(OutPointRange::new(txid, IdxRange::from(change_range)))
825 }
826
827 async fn transaction_update_stream(
828 &self,
829 operation_id: OperationId,
830 ) -> BoxStream<'static, TxSubmissionStatesSM> {
831 self.executor
832 .notifier()
833 .module_notifier::<TxSubmissionStatesSM>(
834 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
835 self.final_client.clone(),
836 )
837 .subscribe(operation_id)
838 .await
839 }
840
841 pub async fn operation_exists(&self, operation_id: OperationId) -> bool {
842 let mut dbtx = self.db().begin_transaction_nc().await;
843
844 Client::operation_exists_dbtx(&mut dbtx, operation_id).await
845 }
846
847 pub async fn operation_exists_dbtx(
848 dbtx: &mut DatabaseTransaction<'_>,
849 operation_id: OperationId,
850 ) -> bool {
851 let active_state_exists = dbtx
852 .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
853 .await
854 .next()
855 .await
856 .is_some();
857
858 let inactive_state_exists = dbtx
859 .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
860 .await
861 .next()
862 .await
863 .is_some();
864
865 active_state_exists || inactive_state_exists
866 }
867
868 pub async fn has_active_states(&self, operation_id: OperationId) -> bool {
869 self.db
870 .begin_transaction_nc()
871 .await
872 .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
873 .await
874 .next()
875 .await
876 .is_some()
877 }
878
879 pub async fn await_primary_bitcoin_module_output(
882 &self,
883 operation_id: OperationId,
884 out_point: OutPoint,
885 ) -> anyhow::Result<()> {
886 self.primary_module_for_unit(AmountUnit::BITCOIN)
887 .ok_or_else(|| anyhow!("No primary module available"))?
888 .1
889 .await_primary_module_output(operation_id, out_point)
890 .await
891 }
892
893 pub fn get_first_module<M: ClientModule>(
895 &'_ self,
896 ) -> anyhow::Result<ClientModuleInstance<'_, M>> {
897 let module_kind = M::kind();
898 let id = self
899 .get_first_instance(&module_kind)
900 .ok_or_else(|| format_err!("No modules found of kind {module_kind}"))?;
901 let module: &M = self
902 .try_get_module(id)
903 .ok_or_else(|| format_err!("Unknown module instance {id}"))?
904 .as_any()
905 .downcast_ref::<M>()
906 .ok_or_else(|| format_err!("Module is not of type {}", std::any::type_name::<M>()))?;
907 let (db, _) = self.db().with_prefix_module_id(id);
908 Ok(ClientModuleInstance {
909 id,
910 db,
911 api: self.api().with_module(id),
912 module,
913 })
914 }
915
916 pub fn get_module_client_dyn(
917 &self,
918 instance_id: ModuleInstanceId,
919 ) -> anyhow::Result<&maybe_add_send_sync!(dyn IClientModule)> {
920 self.try_get_module(instance_id)
921 .ok_or(anyhow!("Unknown module instance {}", instance_id))
922 }
923
924 pub fn db(&self) -> &Database {
925 &self.db
926 }
927
928 pub fn endpoints(&self) -> &ConnectorRegistry {
929 &self.connectors
930 }
931
932 pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
935 TransactionUpdates {
936 update_stream: self.transaction_update_stream(operation_id).await,
937 }
938 }
939
940 pub fn get_first_instance(&self, module_kind: &ModuleKind) -> Option<ModuleInstanceId> {
942 self.modules
943 .iter_modules()
944 .find(|(_, kind, _module)| *kind == module_kind)
945 .map(|(instance_id, _, _)| instance_id)
946 }
947
948 pub async fn root_secret_encoding<T: Decodable>(&self) -> anyhow::Result<T> {
951 get_decoded_client_secret::<T>(self.db()).await
952 }
953
954 pub async fn await_primary_bitcoin_module_outputs(
957 &self,
958 operation_id: OperationId,
959 outputs: Vec<OutPoint>,
960 ) -> anyhow::Result<()> {
961 for out_point in outputs {
962 self.await_primary_bitcoin_module_output(operation_id, out_point)
963 .await?;
964 }
965
966 Ok(())
967 }
968
969 pub async fn get_config_json(&self) -> JsonClientConfig {
975 self.config().await.to_json()
976 }
977
978 #[doc(hidden)]
981 pub async fn get_balance_for_btc(&self) -> anyhow::Result<Amount> {
984 self.get_balance_for_unit(AmountUnit::BITCOIN).await
985 }
986
987 pub async fn get_balance_for_unit(&self, unit: AmountUnit) -> anyhow::Result<Amount> {
988 let (id, module) = self
989 .primary_module_for_unit(unit)
990 .ok_or_else(|| anyhow!("Primary module not available"))?;
991 Ok(module
992 .get_balance(id, &mut self.db().begin_transaction_nc().await, unit)
993 .await)
994 }
995
996 pub async fn subscribe_balance_changes(&self, unit: AmountUnit) -> BoxStream<'static, Amount> {
999 let primary_module_things =
1000 if let Some((primary_module_id, primary_module)) = self.primary_module_for_unit(unit) {
1001 let balance_changes = primary_module.subscribe_balance_changes().await;
1002 let initial_balance = self
1003 .get_balance_for_unit(unit)
1004 .await
1005 .expect("Primary is present");
1006
1007 Some((
1008 primary_module_id,
1009 primary_module.clone(),
1010 balance_changes,
1011 initial_balance,
1012 ))
1013 } else {
1014 None
1015 };
1016 let db = self.db().clone();
1017
1018 Box::pin(async_stream::stream! {
1019 let Some((primary_module_id, primary_module, mut balance_changes, initial_balance)) = primary_module_things else {
1020 pending().await
1023 };
1024
1025
1026 yield initial_balance;
1027 let mut prev_balance = initial_balance;
1028 while let Some(()) = balance_changes.next().await {
1029 let mut dbtx = db.begin_transaction_nc().await;
1030 let balance = primary_module
1031 .get_balance(primary_module_id, &mut dbtx, unit)
1032 .await;
1033
1034 if balance != prev_balance {
1036 prev_balance = balance;
1037 yield balance;
1038 }
1039 }
1040 })
1041 }
1042
1043 async fn make_api_version_request(
1048 delay: Duration,
1049 peer_id: PeerId,
1050 api: &DynGlobalApi,
1051 ) -> (
1052 PeerId,
1053 Result<SupportedApiVersionsSummary, fedimint_connectors::error::ServerError>,
1054 ) {
1055 runtime::sleep(delay).await;
1056 (
1057 peer_id,
1058 api.request_single_peer::<SupportedApiVersionsSummary>(
1059 VERSION_ENDPOINT.to_owned(),
1060 ApiRequestErased::default(),
1061 peer_id,
1062 )
1063 .await,
1064 )
1065 }
1066
1067 fn create_api_version_backoff() -> impl Iterator<Item = Duration> {
1073 custom_backoff(Duration::from_millis(200), Duration::from_secs(600), None)
1074 }
1075
1076 pub async fn fetch_common_api_versions_from_all_peers(
1079 num_peers: NumPeers,
1080 api: DynGlobalApi,
1081 db: Database,
1082 num_responses_sender: watch::Sender<usize>,
1083 ) {
1084 let mut backoff = Self::create_api_version_backoff();
1085
1086 let mut requests = FuturesUnordered::new();
1089
1090 for peer_id in num_peers.peer_ids() {
1091 requests.push(Self::make_api_version_request(
1092 Duration::ZERO,
1093 peer_id,
1094 &api,
1095 ));
1096 }
1097
1098 let mut num_responses = 0;
1099
1100 while let Some((peer_id, response)) = requests.next().await {
1101 let retry = match response {
1102 Err(err) => {
1103 let has_previous_response = db
1104 .begin_transaction_nc()
1105 .await
1106 .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1107 .await
1108 .is_some();
1109 debug!(
1110 target: LOG_CLIENT,
1111 %peer_id,
1112 err = %err.fmt_compact(),
1113 %has_previous_response,
1114 "Failed to refresh API versions of a peer"
1115 );
1116
1117 !has_previous_response
1118 }
1119 Ok(o) => {
1120 let mut dbtx = db.begin_transaction().await;
1123 dbtx.insert_entry(
1124 &PeerLastApiVersionsSummaryKey(peer_id),
1125 &PeerLastApiVersionsSummary(o),
1126 )
1127 .await;
1128 dbtx.commit_tx().await;
1129 false
1130 }
1131 };
1132
1133 if retry {
1134 requests.push(Self::make_api_version_request(
1135 backoff.next().expect("Keeps retrying"),
1136 peer_id,
1137 &api,
1138 ));
1139 } else {
1140 num_responses += 1;
1141 num_responses_sender.send_replace(num_responses);
1142 }
1143 }
1144 }
1145
1146 pub async fn fetch_peers_api_versions_from_threshold_of_peers(
1150 num_peers: NumPeers,
1151 api: DynGlobalApi,
1152 ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1153 let mut backoff = Self::create_api_version_backoff();
1154
1155 let mut requests = FuturesUnordered::new();
1158
1159 for peer_id in num_peers.peer_ids() {
1160 requests.push(Self::make_api_version_request(
1161 Duration::ZERO,
1162 peer_id,
1163 &api,
1164 ));
1165 }
1166
1167 let mut successful_responses = BTreeMap::new();
1168
1169 while successful_responses.len() < num_peers.threshold()
1170 && let Some((peer_id, response)) = requests.next().await
1171 {
1172 let retry = match response {
1173 Err(err) => {
1174 debug!(
1175 target: LOG_CLIENT,
1176 %peer_id,
1177 err = %err.fmt_compact(),
1178 "Failed to fetch API versions from peer"
1179 );
1180 true
1181 }
1182 Ok(response) => {
1183 successful_responses.insert(peer_id, response);
1184 false
1185 }
1186 };
1187
1188 if retry {
1189 requests.push(Self::make_api_version_request(
1190 backoff.next().expect("Keeps retrying"),
1191 peer_id,
1192 &api,
1193 ));
1194 }
1195 }
1196
1197 successful_responses
1198 }
1199
1200 pub async fn fetch_common_api_versions(
1202 config: &ClientConfig,
1203 api: &DynGlobalApi,
1204 ) -> anyhow::Result<BTreeMap<PeerId, SupportedApiVersionsSummary>> {
1205 debug!(
1206 target: LOG_CLIENT,
1207 "Fetching common api versions"
1208 );
1209
1210 let num_peers = NumPeers::from(config.global.api_endpoints.len());
1211
1212 let peer_api_version_sets =
1213 Self::fetch_peers_api_versions_from_threshold_of_peers(num_peers, api.clone()).await;
1214
1215 Ok(peer_api_version_sets)
1216 }
1217
1218 pub async fn write_api_version_cache(
1222 dbtx: &mut DatabaseTransaction<'_>,
1223 api_version_set: ApiVersionSet,
1224 ) {
1225 debug!(
1226 target: LOG_CLIENT,
1227 value = ?api_version_set,
1228 "Writing API version set to cache"
1229 );
1230
1231 dbtx.insert_entry(
1232 &CachedApiVersionSetKey,
1233 &CachedApiVersionSet(api_version_set),
1234 )
1235 .await;
1236 }
1237
1238 pub async fn store_prefetched_api_versions(
1243 db: &Database,
1244 config: &ClientConfig,
1245 client_module_init: &ClientModuleInitRegistry,
1246 peer_api_versions: &BTreeMap<PeerId, SupportedApiVersionsSummary>,
1247 ) {
1248 debug!(
1249 target: LOG_CLIENT,
1250 "Storing {} prefetched peer API version responses and calculating common version set",
1251 peer_api_versions.len()
1252 );
1253
1254 let mut dbtx = db.begin_transaction().await;
1255 let client_supported_versions =
1257 Self::supported_api_versions_summary_static(config, client_module_init);
1258 match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1259 &client_supported_versions,
1260 peer_api_versions,
1261 ) {
1262 Ok(common_api_versions) => {
1263 Self::write_api_version_cache(&mut dbtx.to_ref_nc(), common_api_versions).await;
1265 debug!(target: LOG_CLIENT, "Calculated and stored common API version set");
1266 }
1267 Err(err) => {
1268 debug!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Failed to calculate common API versions from prefetched data");
1269 }
1270 }
1271
1272 for (peer_id, peer_api_versions) in peer_api_versions {
1274 dbtx.insert_entry(
1275 &PeerLastApiVersionsSummaryKey(*peer_id),
1276 &PeerLastApiVersionsSummary(peer_api_versions.clone()),
1277 )
1278 .await;
1279 }
1280 dbtx.commit_tx().await;
1281 debug!(target: LOG_CLIENT, "Stored individual peer API version responses");
1282 }
1283
1284 pub fn supported_api_versions_summary_static(
1286 config: &ClientConfig,
1287 client_module_init: &ClientModuleInitRegistry,
1288 ) -> SupportedApiVersionsSummary {
1289 SupportedApiVersionsSummary {
1290 core: SupportedCoreApiVersions {
1291 core_consensus: config.global.consensus_version,
1292 api: MultiApiVersion::try_from_iter(SUPPORTED_CORE_API_VERSIONS.to_owned())
1293 .expect("must not have conflicting versions"),
1294 },
1295 modules: config
1296 .modules
1297 .iter()
1298 .filter_map(|(&module_instance_id, module_config)| {
1299 client_module_init
1300 .get(module_config.kind())
1301 .map(|module_init| {
1302 (
1303 module_instance_id,
1304 SupportedModuleApiVersions {
1305 core_consensus: config.global.consensus_version,
1306 module_consensus: module_config.version,
1307 api: module_init.supported_api_versions(),
1308 },
1309 )
1310 })
1311 })
1312 .collect(),
1313 }
1314 }
1315
1316 pub async fn load_and_refresh_common_api_version(&self) -> anyhow::Result<ApiVersionSet> {
1317 Self::load_and_refresh_common_api_version_static(
1318 &self.config().await,
1319 &self.module_inits,
1320 self.connectors.clone(),
1321 &self.api,
1322 &self.db,
1323 &self.task_group,
1324 )
1325 .await
1326 }
1327
1328 async fn load_and_refresh_common_api_version_static(
1334 config: &ClientConfig,
1335 module_init: &ClientModuleInitRegistry,
1336 connectors: ConnectorRegistry,
1337 api: &DynGlobalApi,
1338 db: &Database,
1339 task_group: &TaskGroup,
1340 ) -> anyhow::Result<ApiVersionSet> {
1341 if let Some(v) = db
1342 .begin_transaction_nc()
1343 .await
1344 .get_value(&CachedApiVersionSetKey)
1345 .await
1346 {
1347 debug!(
1348 target: LOG_CLIENT,
1349 "Found existing cached common api versions"
1350 );
1351 let config = config.clone();
1352 let client_module_init = module_init.clone();
1353 let api = api.clone();
1354 let db = db.clone();
1355 let task_group = task_group.clone();
1356 task_group
1359 .clone()
1360 .spawn_cancellable("refresh_common_api_version_static", async move {
1361 connectors.wait_for_initialized_connections().await;
1362
1363 if let Err(error) = Self::refresh_common_api_version_static(
1364 &config,
1365 &client_module_init,
1366 &api,
1367 &db,
1368 task_group,
1369 false,
1370 )
1371 .await
1372 {
1373 warn!(
1374 target: LOG_CLIENT,
1375 err = %error.fmt_compact_anyhow(), "Failed to discover common api versions"
1376 );
1377 }
1378 });
1379
1380 return Ok(v.0);
1381 }
1382
1383 info!(
1384 target: LOG_CLIENT,
1385 "Fetching initial API versions "
1386 );
1387 Self::refresh_common_api_version_static(
1388 config,
1389 module_init,
1390 api,
1391 db,
1392 task_group.clone(),
1393 true,
1394 )
1395 .await
1396 }
1397
1398 async fn refresh_common_api_version_static(
1399 config: &ClientConfig,
1400 client_module_init: &ClientModuleInitRegistry,
1401 api: &DynGlobalApi,
1402 db: &Database,
1403 task_group: TaskGroup,
1404 block_until_ok: bool,
1405 ) -> anyhow::Result<ApiVersionSet> {
1406 debug!(
1407 target: LOG_CLIENT,
1408 "Refreshing common api versions"
1409 );
1410
1411 let (num_responses_sender, mut num_responses_receiver) = tokio::sync::watch::channel(0);
1412 let num_peers = NumPeers::from(config.global.api_endpoints.len());
1413
1414 task_group.spawn_cancellable("refresh peers api versions", {
1415 Client::fetch_common_api_versions_from_all_peers(
1416 num_peers,
1417 api.clone(),
1418 db.clone(),
1419 num_responses_sender,
1420 )
1421 });
1422
1423 let common_api_versions = loop {
1424 let _: Result<_, Elapsed> = runtime::timeout(
1432 Duration::from_secs(30),
1433 num_responses_receiver.wait_for(|num| num_peers.threshold() <= *num),
1434 )
1435 .await;
1436
1437 let peer_api_version_sets = Self::load_peers_last_api_versions(db, num_peers).await;
1438
1439 match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1440 &Self::supported_api_versions_summary_static(config, client_module_init),
1441 &peer_api_version_sets,
1442 ) {
1443 Ok(o) => break o,
1444 Err(err) if block_until_ok => {
1445 warn!(
1446 target: LOG_CLIENT,
1447 err = %err.fmt_compact_anyhow(),
1448 "Failed to discover API version to use. Retrying..."
1449 );
1450 continue;
1451 }
1452 Err(e) => return Err(e),
1453 }
1454 };
1455
1456 debug!(
1457 target: LOG_CLIENT,
1458 value = ?common_api_versions,
1459 "Updating the cached common api versions"
1460 );
1461 let mut dbtx = db.begin_transaction().await;
1462 let _ = dbtx
1463 .insert_entry(
1464 &CachedApiVersionSetKey,
1465 &CachedApiVersionSet(common_api_versions.clone()),
1466 )
1467 .await;
1468
1469 dbtx.commit_tx().await;
1470
1471 Ok(common_api_versions)
1472 }
1473
1474 pub async fn get_metadata(&self) -> Metadata {
1476 self.db
1477 .begin_transaction_nc()
1478 .await
1479 .get_value(&ClientMetadataKey)
1480 .await
1481 .unwrap_or_else(|| {
1482 warn!(
1483 target: LOG_CLIENT,
1484 "Missing existing metadata. This key should have been set on Client init"
1485 );
1486 Metadata::empty()
1487 })
1488 }
1489
1490 pub async fn set_metadata(&self, metadata: &Metadata) {
1492 self.db
1493 .autocommit::<_, _, anyhow::Error>(
1494 |dbtx, _| {
1495 Box::pin(async {
1496 Self::set_metadata_dbtx(dbtx, metadata).await;
1497 Ok(())
1498 })
1499 },
1500 None,
1501 )
1502 .await
1503 .expect("Failed to autocommit metadata");
1504 }
1505
1506 pub fn has_pending_recoveries(&self) -> bool {
1507 !self
1508 .client_recovery_progress_receiver
1509 .borrow()
1510 .iter()
1511 .all(|(_id, progress)| progress.is_done())
1512 }
1513
1514 pub async fn wait_for_all_recoveries(&self) -> anyhow::Result<()> {
1522 let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1523 recovery_receiver
1524 .wait_for(|in_progress| {
1525 in_progress
1526 .iter()
1527 .all(|(_id, progress)| progress.is_done())
1528 })
1529 .await
1530 .context("Recovery task completed and update receiver disconnected, but some modules failed to recover")?;
1531
1532 Ok(())
1533 }
1534
1535 pub fn subscribe_to_recovery_progress(
1540 &self,
1541 ) -> impl Stream<Item = (ModuleInstanceId, RecoveryProgress)> + use<> {
1542 WatchStream::new(self.client_recovery_progress_receiver.clone())
1543 .flat_map(futures::stream::iter)
1544 }
1545
1546 pub async fn wait_for_module_kind_recovery(
1547 &self,
1548 module_kind: ModuleKind,
1549 ) -> anyhow::Result<()> {
1550 let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1551 let config = self.config().await;
1552 recovery_receiver
1553 .wait_for(|in_progress| {
1554 !in_progress
1555 .iter()
1556 .filter(|(module_instance_id, _progress)| {
1557 config.modules[module_instance_id].kind == module_kind
1558 })
1559 .any(|(_id, progress)| !progress.is_done())
1560 })
1561 .await
1562 .context("Recovery task completed and update receiver disconnected, but the desired modules are still unavailable or failed to recover")?;
1563
1564 Ok(())
1565 }
1566
1567 pub async fn wait_for_all_active_state_machines(&self) -> anyhow::Result<()> {
1568 loop {
1569 if self.executor.get_active_states().await.is_empty() {
1570 break;
1571 }
1572 sleep(Duration::from_millis(100)).await;
1573 }
1574 Ok(())
1575 }
1576
1577 pub async fn set_metadata_dbtx(dbtx: &mut DatabaseTransaction<'_>, metadata: &Metadata) {
1579 dbtx.insert_new_entry(&ClientMetadataKey, metadata).await;
1580 }
1581
1582 fn spawn_module_recoveries_task(
1583 &self,
1584 recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1585 module_recoveries: BTreeMap<
1586 ModuleInstanceId,
1587 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1588 >,
1589 module_recovery_progress_receivers: BTreeMap<
1590 ModuleInstanceId,
1591 watch::Receiver<RecoveryProgress>,
1592 >,
1593 ) {
1594 let db = self.db.clone();
1595 let log_ordering_wakeup_tx = self.log_ordering_wakeup_tx.clone();
1596 let module_kinds: BTreeMap<ModuleInstanceId, String> = self
1597 .modules
1598 .iter_modules_id_kind()
1599 .map(|(id, kind)| (id, kind.to_string()))
1600 .collect();
1601 self.task_group
1602 .spawn("module recoveries", |_task_handle| async {
1603 Self::run_module_recoveries_task(
1604 db,
1605 log_ordering_wakeup_tx,
1606 recovery_sender,
1607 module_recoveries,
1608 module_recovery_progress_receivers,
1609 module_kinds,
1610 )
1611 .await;
1612 });
1613 }
1614
1615 async fn run_module_recoveries_task(
1616 db: Database,
1617 log_ordering_wakeup_tx: watch::Sender<()>,
1618 recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1619 module_recoveries: BTreeMap<
1620 ModuleInstanceId,
1621 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1622 >,
1623 module_recovery_progress_receivers: BTreeMap<
1624 ModuleInstanceId,
1625 watch::Receiver<RecoveryProgress>,
1626 >,
1627 module_kinds: BTreeMap<ModuleInstanceId, String>,
1628 ) {
1629 debug!(target: LOG_CLIENT_RECOVERY, num_modules=%module_recovery_progress_receivers.len(), "Staring module recoveries");
1630 let mut completed_stream = Vec::new();
1631 let progress_stream = futures::stream::FuturesUnordered::new();
1632
1633 for (module_instance_id, f) in module_recoveries {
1634 completed_stream.push(futures::stream::once(Box::pin(async move {
1635 match f.await {
1636 Ok(()) => (module_instance_id, None),
1637 Err(err) => {
1638 warn!(
1639 target: LOG_CLIENT,
1640 err = %err.fmt_compact_anyhow(), module_instance_id, "Module recovery failed"
1641 );
1642 futures::future::pending::<()>().await;
1646 unreachable!()
1647 }
1648 }
1649 })));
1650 }
1651
1652 for (module_instance_id, rx) in module_recovery_progress_receivers {
1653 progress_stream.push(
1654 tokio_stream::wrappers::WatchStream::new(rx)
1655 .fuse()
1656 .map(move |progress| (module_instance_id, Some(progress))),
1657 );
1658 }
1659
1660 let mut futures = futures::stream::select(
1661 futures::stream::select_all(progress_stream),
1662 futures::stream::select_all(completed_stream),
1663 );
1664
1665 while let Some((module_instance_id, progress)) = futures.next().await {
1666 let mut dbtx = db.begin_transaction().await;
1667
1668 let prev_progress = *recovery_sender
1669 .borrow()
1670 .get(&module_instance_id)
1671 .expect("existing progress must be present");
1672
1673 let progress = if prev_progress.is_done() {
1674 prev_progress
1676 } else if let Some(progress) = progress {
1677 progress
1678 } else {
1679 prev_progress.to_complete()
1680 };
1681
1682 if !prev_progress.is_done() && progress.is_done() {
1683 info!(
1684 target: LOG_CLIENT,
1685 module_instance_id,
1686 progress = format!("{}/{}", progress.complete, progress.total),
1687 "Recovery complete"
1688 );
1689 dbtx.log_event(
1690 log_ordering_wakeup_tx.clone(),
1691 None,
1692 ModuleRecoveryCompleted {
1693 module_id: module_instance_id,
1694 },
1695 )
1696 .await;
1697 } else {
1698 info!(
1699 target: LOG_CLIENT,
1700 module_instance_id,
1701 kind = module_kinds.get(&module_instance_id).map(String::as_str).unwrap_or("unknown"),
1702 progress = format!("{}/{}", progress.complete, progress.total),
1703 "Recovery progress"
1704 );
1705 }
1706
1707 dbtx.insert_entry(
1708 &ClientModuleRecovery { module_instance_id },
1709 &ClientModuleRecoveryState { progress },
1710 )
1711 .await;
1712 dbtx.commit_tx().await;
1713
1714 recovery_sender.send_modify(|v| {
1715 v.insert(module_instance_id, progress);
1716 });
1717 }
1718 debug!(target: LOG_CLIENT_RECOVERY, "Recovery executor stopped");
1719 }
1720
1721 async fn load_peers_last_api_versions(
1722 db: &Database,
1723 num_peers: NumPeers,
1724 ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1725 let mut peer_api_version_sets = BTreeMap::new();
1726
1727 let mut dbtx = db.begin_transaction_nc().await;
1728 for peer_id in num_peers.peer_ids() {
1729 if let Some(v) = dbtx
1730 .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1731 .await
1732 {
1733 peer_api_version_sets.insert(peer_id, v.0);
1734 }
1735 }
1736 drop(dbtx);
1737 peer_api_version_sets
1738 }
1739
1740 pub async fn get_peer_url_announcements(&self) -> BTreeMap<PeerId, SignedApiAnnouncement> {
1743 self.db()
1744 .begin_transaction_nc()
1745 .await
1746 .find_by_prefix(&ApiAnnouncementPrefix)
1747 .await
1748 .map(|(announcement_key, announcement)| (announcement_key.0, announcement))
1749 .collect()
1750 .await
1751 }
1752
1753 pub async fn get_guardian_metadata(
1755 &self,
1756 ) -> BTreeMap<PeerId, fedimint_core::net::guardian_metadata::SignedGuardianMetadata> {
1757 self.db()
1758 .begin_transaction_nc()
1759 .await
1760 .find_by_prefix(&crate::guardian_metadata::GuardianMetadataPrefix)
1761 .await
1762 .map(|(key, metadata)| (key.0, metadata))
1763 .collect()
1764 .await
1765 }
1766
1767 pub async fn get_peer_urls(&self) -> BTreeMap<PeerId, SafeUrl> {
1769 get_api_urls(&self.db, &self.config().await).await
1770 }
1771
1772 pub async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
1775 self.get_peer_urls()
1776 .await
1777 .into_iter()
1778 .find_map(|(peer_id, url)| (peer == peer_id).then_some(url))
1779 .map(|peer_url| {
1780 InviteCode::new(
1781 peer_url.clone(),
1782 peer,
1783 self.federation_id(),
1784 self.api_secret.clone(),
1785 )
1786 })
1787 }
1788
1789 pub async fn get_guardian_public_keys_blocking(
1793 &self,
1794 ) -> BTreeMap<PeerId, fedimint_core::secp256k1::PublicKey> {
1795 self.db
1796 .autocommit(
1797 |dbtx, _| {
1798 Box::pin(async move {
1799 let config = self.config().await;
1800
1801 let guardian_pub_keys = self
1802 .get_or_backfill_broadcast_public_keys(dbtx, config)
1803 .await;
1804
1805 Result::<_, ()>::Ok(guardian_pub_keys)
1806 })
1807 },
1808 None,
1809 )
1810 .await
1811 .expect("Will retry forever")
1812 }
1813
1814 async fn get_or_backfill_broadcast_public_keys(
1815 &self,
1816 dbtx: &mut DatabaseTransaction<'_>,
1817 config: ClientConfig,
1818 ) -> BTreeMap<PeerId, PublicKey> {
1819 match config.global.broadcast_public_keys {
1820 Some(guardian_pub_keys) => guardian_pub_keys,
1821 _ => {
1822 let (guardian_pub_keys, new_config) = self.fetch_and_update_config(config).await;
1823
1824 dbtx.insert_entry(&ClientConfigKey, &new_config).await;
1825 *(self.config.write().await) = new_config;
1826 guardian_pub_keys
1827 }
1828 }
1829 }
1830
1831 async fn fetch_session_count(&self) -> FederationResult<u64> {
1832 self.api.session_count().await
1833 }
1834
1835 async fn fetch_and_update_config(
1836 &self,
1837 config: ClientConfig,
1838 ) -> (BTreeMap<PeerId, PublicKey>, ClientConfig) {
1839 let fetched_config = retry(
1840 "Fetching guardian public keys",
1841 backoff_util::background_backoff(),
1842 || async {
1843 Ok(self
1844 .api
1845 .request_current_consensus::<ClientConfig>(
1846 CLIENT_CONFIG_ENDPOINT.to_owned(),
1847 ApiRequestErased::default(),
1848 )
1849 .await?)
1850 },
1851 )
1852 .await
1853 .expect("Will never return on error");
1854
1855 let Some(guardian_pub_keys) = fetched_config.global.broadcast_public_keys else {
1856 warn!(
1857 target: LOG_CLIENT,
1858 "Guardian public keys not found in fetched config, server not updated to 0.4 yet"
1859 );
1860 pending::<()>().await;
1861 unreachable!("Pending will never return");
1862 };
1863
1864 let new_config = ClientConfig {
1865 global: GlobalClientConfig {
1866 broadcast_public_keys: Some(guardian_pub_keys.clone()),
1867 ..config.global
1868 },
1869 modules: config.modules,
1870 };
1871 (guardian_pub_keys, new_config)
1872 }
1873
1874 pub fn handle_global_rpc(
1875 &self,
1876 method: String,
1877 params: serde_json::Value,
1878 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1879 Box::pin(try_stream! {
1880 match method.as_str() {
1881 "get_balance" => {
1882 let balance = self.get_balance_for_btc().await.unwrap_or_default();
1883 yield serde_json::to_value(balance)?;
1884 }
1885 "subscribe_balance_changes" => {
1886 let req: GetBalanceChangesRequest= serde_json::from_value(params)?;
1887 let mut stream = self.subscribe_balance_changes(req.unit).await;
1888 while let Some(balance) = stream.next().await {
1889 yield serde_json::to_value(balance)?;
1890 }
1891 }
1892 "get_config" => {
1893 let config = self.config().await;
1894 yield serde_json::to_value(config)?;
1895 }
1896 "get_federation_id" => {
1897 let federation_id = self.federation_id();
1898 yield serde_json::to_value(federation_id)?;
1899 }
1900 "get_invite_code" => {
1901 let req: GetInviteCodeRequest = serde_json::from_value(params)?;
1902 let invite_code = self.invite_code(req.peer).await;
1903 yield serde_json::to_value(invite_code)?;
1904 }
1905 "get_operation" => {
1906 let req: GetOperationIdRequest = serde_json::from_value(params)?;
1907 let operation = self.operation_log().get_operation(req.operation_id).await;
1908 yield serde_json::to_value(operation)?;
1909 }
1910 "list_operations" => {
1911 let req: ListOperationsParams = serde_json::from_value(params)?;
1912 let limit = if req.limit.is_none() && req.last_seen.is_none() {
1913 usize::MAX
1914 } else {
1915 req.limit.unwrap_or(usize::MAX)
1916 };
1917 let operations = self.operation_log()
1918 .paginate_operations_rev(limit, req.last_seen)
1919 .await;
1920 yield serde_json::to_value(operations)?;
1921 }
1922 "session_count" => {
1923 let count = self.fetch_session_count().await?;
1924 yield serde_json::to_value(count)?;
1925 }
1926 "has_pending_recoveries" => {
1927 let has_pending = self.has_pending_recoveries();
1928 yield serde_json::to_value(has_pending)?;
1929 }
1930 "wait_for_all_recoveries" => {
1931 self.wait_for_all_recoveries().await?;
1932 yield serde_json::Value::Null;
1933 }
1934 "subscribe_to_recovery_progress" => {
1935 let mut stream = self.subscribe_to_recovery_progress();
1936 while let Some((module_id, progress)) = stream.next().await {
1937 yield serde_json::json!({
1938 "module_id": module_id,
1939 "progress": progress
1940 });
1941 }
1942 }
1943 #[allow(deprecated)]
1944 "backup_to_federation" => {
1945 let metadata = if params.is_null() {
1946 Metadata::from_json_serialized(serde_json::json!({}))
1947 } else {
1948 Metadata::from_json_serialized(params)
1949 };
1950 self.backup_to_federation(metadata).await?;
1951 yield serde_json::Value::Null;
1952 }
1953 _ => {
1954 Err(anyhow::format_err!("Unknown method: {}", method))?;
1955 unreachable!()
1956 },
1957 }
1958 })
1959 }
1960
1961 pub async fn log_event<E>(&self, module_id: Option<ModuleInstanceId>, event: E)
1962 where
1963 E: Event + Send,
1964 {
1965 let mut dbtx = self.db.begin_transaction().await;
1966 self.log_event_dbtx(&mut dbtx, module_id, event).await;
1967 dbtx.commit_tx().await;
1968 }
1969
1970 pub async fn log_event_dbtx<E, Cap>(
1971 &self,
1972 dbtx: &mut DatabaseTransaction<'_, Cap>,
1973 module_id: Option<ModuleInstanceId>,
1974 event: E,
1975 ) where
1976 E: Event + Send,
1977 Cap: Send,
1978 {
1979 dbtx.log_event(self.log_ordering_wakeup_tx.clone(), module_id, event)
1980 .await;
1981 }
1982
1983 pub async fn log_event_raw_dbtx<Cap>(
1984 &self,
1985 dbtx: &mut DatabaseTransaction<'_, Cap>,
1986 kind: EventKind,
1987 module: Option<(ModuleKind, ModuleInstanceId)>,
1988 payload: Vec<u8>,
1989 persist: EventPersistence,
1990 ) where
1991 Cap: Send,
1992 {
1993 let module_id = module.as_ref().map(|m| m.1);
1994 let module_kind = module.map(|m| m.0);
1995 dbtx.log_event_raw(
1996 self.log_ordering_wakeup_tx.clone(),
1997 kind,
1998 module_kind,
1999 module_id,
2000 payload,
2001 persist,
2002 )
2003 .await;
2004 }
2005
2006 pub fn built_in_application_event_log_tracker(&self) -> DynEventLogTrimableTracker {
2018 struct BuiltInApplicationEventLogTracker;
2019
2020 #[apply(async_trait_maybe_send!)]
2021 impl EventLogTrimableTracker for BuiltInApplicationEventLogTracker {
2022 async fn store(
2024 &mut self,
2025 dbtx: &mut DatabaseTransaction<NonCommittable>,
2026 pos: EventLogTrimableId,
2027 ) -> anyhow::Result<()> {
2028 dbtx.insert_entry(&DefaultApplicationEventLogKey, &pos)
2029 .await;
2030 Ok(())
2031 }
2032
2033 async fn load(
2035 &mut self,
2036 dbtx: &mut DatabaseTransaction<NonCommittable>,
2037 ) -> anyhow::Result<Option<EventLogTrimableId>> {
2038 Ok(dbtx.get_value(&DefaultApplicationEventLogKey).await)
2039 }
2040 }
2041 Box::new(BuiltInApplicationEventLogTracker)
2042 }
2043
2044 pub async fn handle_historical_events<F, R>(
2052 &self,
2053 tracker: fedimint_eventlog::DynEventLogTracker,
2054 handler_fn: F,
2055 ) -> anyhow::Result<()>
2056 where
2057 F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
2058 R: Future<Output = anyhow::Result<()>>,
2059 {
2060 fedimint_eventlog::handle_events(
2061 self.db.clone(),
2062 tracker,
2063 self.log_event_added_rx.clone(),
2064 handler_fn,
2065 )
2066 .await
2067 }
2068
2069 pub async fn handle_events<F, R>(
2088 &self,
2089 tracker: fedimint_eventlog::DynEventLogTrimableTracker,
2090 handler_fn: F,
2091 ) -> anyhow::Result<()>
2092 where
2093 F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
2094 R: Future<Output = anyhow::Result<()>>,
2095 {
2096 fedimint_eventlog::handle_trimable_events(
2097 self.db.clone(),
2098 tracker,
2099 self.log_event_added_rx.clone(),
2100 handler_fn,
2101 )
2102 .await
2103 }
2104
2105 pub async fn get_event_log(
2106 &self,
2107 pos: Option<EventLogId>,
2108 limit: u64,
2109 ) -> Vec<PersistedLogEntry> {
2110 self.get_event_log_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
2111 .await
2112 }
2113
2114 pub async fn get_event_log_trimable(
2115 &self,
2116 pos: Option<EventLogTrimableId>,
2117 limit: u64,
2118 ) -> Vec<PersistedLogEntry> {
2119 self.get_event_log_trimable_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
2120 .await
2121 }
2122
2123 pub async fn get_event_log_dbtx<Cap>(
2124 &self,
2125 dbtx: &mut DatabaseTransaction<'_, Cap>,
2126 pos: Option<EventLogId>,
2127 limit: u64,
2128 ) -> Vec<PersistedLogEntry>
2129 where
2130 Cap: Send,
2131 {
2132 dbtx.get_event_log(pos, limit).await
2133 }
2134
2135 pub async fn get_event_log_trimable_dbtx<Cap>(
2136 &self,
2137 dbtx: &mut DatabaseTransaction<'_, Cap>,
2138 pos: Option<EventLogTrimableId>,
2139 limit: u64,
2140 ) -> Vec<PersistedLogEntry>
2141 where
2142 Cap: Send,
2143 {
2144 dbtx.get_event_log_trimable(pos, limit).await
2145 }
2146
2147 pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
2149 self.log_event_added_transient_tx.subscribe()
2150 }
2151
2152 pub fn log_event_added_rx(&self) -> watch::Receiver<()> {
2154 self.log_event_added_rx.clone()
2155 }
2156
2157 pub fn iroh_enable_dht(&self) -> bool {
2158 self.iroh_enable_dht
2159 }
2160
2161 pub(crate) async fn run_core_migrations(
2162 db_no_decoders: &Database,
2163 ) -> Result<(), anyhow::Error> {
2164 let mut dbtx = db_no_decoders.begin_transaction().await;
2165 apply_migrations_core_client_dbtx(&mut dbtx.to_ref_nc(), "fedimint-client".to_string())
2166 .await?;
2167 if is_running_in_test_env() {
2168 verify_client_db_integrity_dbtx(&mut dbtx.to_ref_nc()).await;
2169 }
2170 dbtx.commit_tx_result().await?;
2171 Ok(())
2172 }
2173
2174 fn primary_modules_for_unit(
2176 &self,
2177 unit: AmountUnit,
2178 ) -> impl Iterator<Item = (ModuleInstanceId, &DynClientModule)> {
2179 self.primary_modules
2180 .iter()
2181 .flat_map(move |(_prio, candidates)| {
2182 candidates
2183 .specific
2184 .get(&unit)
2185 .into_iter()
2186 .flatten()
2187 .copied()
2188 .chain(candidates.wildcard.iter().copied())
2190 })
2191 .map(|id| (id, self.modules.get_expect(id)))
2192 }
2193
2194 pub fn primary_module_for_unit(
2198 &self,
2199 unit: AmountUnit,
2200 ) -> Option<(ModuleInstanceId, &DynClientModule)> {
2201 self.primary_modules_for_unit(unit).next()
2202 }
2203
2204 pub fn primary_module_for_btc(&self) -> (ModuleInstanceId, &DynClientModule) {
2206 self.primary_module_for_unit(AmountUnit::BITCOIN)
2207 .expect("No primary module for Bitcoin")
2208 }
2209}
2210
2211#[apply(async_trait_maybe_send!)]
2212impl ClientContextIface for Client {
2213 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
2214 Client::get_module(self, instance)
2215 }
2216
2217 fn api_clone(&self) -> DynGlobalApi {
2218 Client::api_clone(self)
2219 }
2220 fn decoders(&self) -> &ModuleDecoderRegistry {
2221 Client::decoders(self)
2222 }
2223
2224 async fn finalize_and_submit_transaction(
2225 &self,
2226 operation_id: OperationId,
2227 operation_type: &str,
2228 operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
2229 tx_builder: TransactionBuilder,
2230 ) -> anyhow::Result<OutPointRange> {
2231 Client::finalize_and_submit_transaction(
2232 self,
2233 operation_id,
2234 operation_type,
2235 &operation_meta_gen,
2237 tx_builder,
2238 )
2239 .await
2240 }
2241
2242 async fn finalize_and_submit_transaction_dbtx(
2243 &self,
2244 dbtx: &mut DatabaseTransaction<'_>,
2245 operation_id: OperationId,
2246 operation_type: &str,
2247 operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
2248 tx_builder: TransactionBuilder,
2249 ) -> anyhow::Result<OutPointRange> {
2250 Client::finalize_and_submit_transaction_dbtx(
2251 self,
2252 dbtx,
2253 operation_id,
2254 operation_type,
2255 &operation_meta_gen,
2256 tx_builder,
2257 )
2258 .await
2259 }
2260
2261 async fn finalize_and_submit_transaction_inner(
2262 &self,
2263 dbtx: &mut DatabaseTransaction<'_>,
2264 operation_id: OperationId,
2265 tx_builder: TransactionBuilder,
2266 ) -> anyhow::Result<OutPointRange> {
2267 Client::finalize_and_submit_transaction_inner(self, dbtx, operation_id, tx_builder).await
2268 }
2269
2270 async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
2271 Client::transaction_updates(self, operation_id).await
2272 }
2273
2274 async fn await_primary_module_outputs(
2275 &self,
2276 operation_id: OperationId,
2277 outputs: Vec<OutPoint>,
2279 ) -> anyhow::Result<()> {
2280 Client::await_primary_bitcoin_module_outputs(self, operation_id, outputs).await
2281 }
2282
2283 fn operation_log(&self) -> &dyn IOperationLog {
2284 Client::operation_log(self)
2285 }
2286
2287 async fn has_active_states(&self, operation_id: OperationId) -> bool {
2288 Client::has_active_states(self, operation_id).await
2289 }
2290
2291 async fn operation_exists(&self, operation_id: OperationId) -> bool {
2292 Client::operation_exists(self, operation_id).await
2293 }
2294
2295 async fn config(&self) -> ClientConfig {
2296 Client::config(self).await
2297 }
2298
2299 fn db(&self) -> &Database {
2300 Client::db(self)
2301 }
2302
2303 fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static)) {
2304 Client::executor(self)
2305 }
2306
2307 async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
2308 Client::invite_code(self, peer).await
2309 }
2310
2311 fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
2312 Client::get_internal_payment_markers(self)
2313 }
2314
2315 async fn log_event_json(
2316 &self,
2317 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
2318 module_kind: Option<ModuleKind>,
2319 module_id: ModuleInstanceId,
2320 kind: EventKind,
2321 payload: serde_json::Value,
2322 persist: EventPersistence,
2323 ) {
2324 dbtx.ensure_global()
2325 .expect("Must be called with global dbtx");
2326 self.log_event_raw_dbtx(
2327 dbtx,
2328 kind,
2329 module_kind.map(|kind| (kind, module_id)),
2330 serde_json::to_vec(&payload).expect("Serialization can't fail"),
2331 persist,
2332 )
2333 .await;
2334 }
2335
2336 async fn read_operation_active_states<'dbtx>(
2337 &self,
2338 operation_id: OperationId,
2339 module_id: ModuleInstanceId,
2340 dbtx: &'dbtx mut DatabaseTransaction<'_>,
2341 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>
2342 {
2343 Box::pin(
2344 dbtx.find_by_prefix(&ActiveModuleOperationStateKeyPrefix {
2345 operation_id,
2346 module_instance: module_id,
2347 })
2348 .await
2349 .map(move |(k, v)| (k.0, v)),
2350 )
2351 }
2352 async fn read_operation_inactive_states<'dbtx>(
2353 &self,
2354 operation_id: OperationId,
2355 module_id: ModuleInstanceId,
2356 dbtx: &'dbtx mut DatabaseTransaction<'_>,
2357 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>
2358 {
2359 Box::pin(
2360 dbtx.find_by_prefix(&InactiveModuleOperationStateKeyPrefix {
2361 operation_id,
2362 module_instance: module_id,
2363 })
2364 .await
2365 .map(move |(k, v)| (k.0, v)),
2366 )
2367 }
2368}
2369
2370impl fmt::Debug for Client {
2372 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2373 write!(f, "Client")
2374 }
2375}
2376
2377pub fn client_decoders<'a>(
2378 registry: &ModuleInitRegistry<DynClientModuleInit>,
2379 module_kinds: impl Iterator<Item = (ModuleInstanceId, &'a ModuleKind)>,
2380) -> ModuleDecoderRegistry {
2381 let mut modules = BTreeMap::new();
2382 for (id, kind) in module_kinds {
2383 let Some(init) = registry.get(kind) else {
2384 debug!("Detected configuration for unsupported module id: {id}, kind: {kind}");
2385 continue;
2386 };
2387
2388 modules.insert(
2389 id,
2390 (
2391 kind.clone(),
2392 IClientModuleInit::decoder(AsRef::<dyn IClientModuleInit + 'static>::as_ref(init)),
2393 ),
2394 );
2395 }
2396 ModuleDecoderRegistry::from(modules)
2397}