1use std::collections::{BTreeMap, HashSet};
2use std::fmt::{self, Formatter};
3use std::future::{Future, pending};
4use std::ops::Range;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9use anyhow::{Context as _, anyhow, bail, format_err};
10use async_stream::try_stream;
11use bitcoin::key::Secp256k1;
12use bitcoin::key::rand::thread_rng;
13use bitcoin::secp256k1::{self, PublicKey};
14use fedimint_api_client::api::global_api::with_request_hook::ApiRequestHook;
15use fedimint_api_client::api::{
16 ApiVersionSet, DynGlobalApi, FederationApiExt as _, FederationResult, IGlobalFederationApi,
17};
18use fedimint_bitcoind::DynBitcoindRpc;
19use fedimint_client_module::module::recovery::RecoveryProgress;
20use fedimint_client_module::module::{
21 ClientContextIface, ClientModule, ClientModuleRegistry, DynClientModule, FinalClientIface,
22 IClientModule, IdxRange, OutPointRange, PrimaryModulePriority,
23};
24use fedimint_client_module::oplog::IOperationLog;
25use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy as _};
26use fedimint_client_module::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
27use fedimint_client_module::sm::{ActiveStateMeta, DynState, InactiveStateMeta};
28use fedimint_client_module::transaction::{
29 TRANSACTION_SUBMISSION_MODULE_INSTANCE, TransactionBuilder, TxSubmissionStates,
30 TxSubmissionStatesSM,
31};
32use fedimint_client_module::{
33 AddStateMachinesResult, ClientModuleInstance, GetInviteCodeRequest, ModuleGlobalContextGen,
34 ModuleRecoveryCompleted, TransactionUpdates, TxCreatedEvent,
35};
36use fedimint_connectors::ConnectorRegistry;
37use fedimint_core::config::{
38 ClientConfig, FederationId, GlobalClientConfig, JsonClientConfig, ModuleInitRegistry,
39};
40use fedimint_core::core::{DynInput, DynOutput, ModuleInstanceId, ModuleKind, OperationId};
41use fedimint_core::db::{
42 AutocommitError, Database, DatabaseRecord, DatabaseTransaction,
43 IDatabaseTransactionOpsCore as _, IDatabaseTransactionOpsCoreTyped as _, NonCommittable,
44};
45use fedimint_core::encoding::{Decodable, Encodable};
46use fedimint_core::endpoint_constants::{CLIENT_CONFIG_ENDPOINT, VERSION_ENDPOINT};
47use fedimint_core::envs::is_running_in_test_env;
48use fedimint_core::invite_code::InviteCode;
49use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
50use fedimint_core::module::{
51 AmountUnit, Amounts, ApiRequestErased, ApiVersion, MultiApiVersion,
52 SupportedApiVersionsSummary, SupportedCoreApiVersions, SupportedModuleApiVersions,
53};
54use fedimint_core::net::api_announcement::SignedApiAnnouncement;
55use fedimint_core::runtime::sleep;
56use fedimint_core::task::{Elapsed, MaybeSend, MaybeSync, TaskGroup};
57use fedimint_core::transaction::Transaction;
58use fedimint_core::util::backoff_util::custom_backoff;
59use fedimint_core::util::{
60 BoxStream, FmtCompact as _, FmtCompactAnyhow as _, SafeUrl, backoff_util, retry,
61};
62use fedimint_core::{
63 Amount, ChainId, NumPeers, OutPoint, PeerId, apply, async_trait_maybe_send, maybe_add_send,
64 maybe_add_send_sync, runtime,
65};
66use fedimint_derive_secret::DerivableSecret;
67use fedimint_eventlog::{
68 DBTransactionEventLogExt as _, DynEventLogTrimableTracker, Event, EventKind, EventLogEntry,
69 EventLogId, EventLogTrimableId, EventLogTrimableTracker, EventPersistence, PersistedLogEntry,
70};
71use fedimint_logging::{LOG_CLIENT, LOG_CLIENT_NET_API, LOG_CLIENT_RECOVERY};
72use futures::stream::FuturesUnordered;
73use futures::{Stream, StreamExt as _};
74use global_ctx::ModuleGlobalClientContext;
75use serde::{Deserialize, Serialize};
76use tokio::sync::{broadcast, watch};
77use tokio_stream::wrappers::WatchStream;
78use tracing::{debug, info, warn};
79
80use crate::ClientBuilder;
81use crate::api_announcements::{ApiAnnouncementPrefix, get_api_urls};
82use crate::backup::Metadata;
83use crate::client::event_log::DefaultApplicationEventLogKey;
84use crate::db::{
85 ApiSecretKey, CachedApiVersionSet, CachedApiVersionSetKey, ChainIdKey,
86 ChronologicalOperationLogKey, ClientConfigKey, ClientMetadataKey, ClientModuleRecovery,
87 ClientModuleRecoveryState, EncodedClientSecretKey, OperationLogKey, PeerLastApiVersionsSummary,
88 PeerLastApiVersionsSummaryKey, PendingClientConfigKey, apply_migrations_core_client_dbtx,
89 get_decoded_client_secret, verify_client_db_integrity_dbtx,
90};
91use crate::meta::MetaService;
92use crate::module_init::{ClientModuleInitRegistry, DynClientModuleInit, IClientModuleInit};
93use crate::oplog::OperationLog;
94use crate::sm::executor::{
95 ActiveModuleOperationStateKeyPrefix, ActiveOperationStateKeyPrefix, Executor,
96 InactiveModuleOperationStateKeyPrefix, InactiveOperationStateKeyPrefix,
97};
98
99pub(crate) mod builder;
100pub(crate) mod event_log;
101pub(crate) mod global_ctx;
102pub(crate) mod handle;
103
104const SUPPORTED_CORE_API_VERSIONS: &[fedimint_core::module::ApiVersion] =
108 &[ApiVersion { major: 0, minor: 0 }];
109
110#[derive(Default)]
112pub(crate) struct PrimaryModuleCandidates {
113 specific: BTreeMap<AmountUnit, Vec<ModuleInstanceId>>,
115 wildcard: Vec<ModuleInstanceId>,
117}
118
119pub struct Client {
133 final_client: FinalClientIface,
134 config: tokio::sync::RwLock<ClientConfig>,
135 api_secret: Option<String>,
136 decoders: ModuleDecoderRegistry,
137 connectors: ConnectorRegistry,
138 db: Database,
139 federation_id: FederationId,
140 federation_config_meta: BTreeMap<String, String>,
141 primary_modules: BTreeMap<PrimaryModulePriority, PrimaryModuleCandidates>,
142 pub(crate) modules: ClientModuleRegistry,
143 module_inits: ClientModuleInitRegistry,
144 executor: Executor,
145 pub(crate) api: DynGlobalApi,
146 root_secret: DerivableSecret,
147 operation_log: OperationLog,
148 secp_ctx: Secp256k1<secp256k1::All>,
149 meta_service: Arc<MetaService>,
150
151 task_group: TaskGroup,
152
153 client_recovery_progress_receiver:
155 watch::Receiver<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
156
157 log_ordering_wakeup_tx: watch::Sender<()>,
160 log_event_added_rx: watch::Receiver<()>,
162 log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
163 request_hook: ApiRequestHook,
164 iroh_enable_dht: bool,
165 iroh_enable_next: bool,
166 #[allow(dead_code)]
171 user_bitcoind_rpc: Option<DynBitcoindRpc>,
172 pub(crate) user_bitcoind_rpc_no_chain_id:
177 Option<fedimint_client_module::module::init::BitcoindRpcNoChainIdFactory>,
178}
179
180#[derive(Debug, Serialize, Deserialize)]
181struct ListOperationsParams {
182 limit: Option<usize>,
183 last_seen: Option<ChronologicalOperationLogKey>,
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize)]
187pub struct GetOperationIdRequest {
188 operation_id: OperationId,
189}
190
191#[derive(Debug, Clone, Serialize, Deserialize)]
192pub struct GetBalanceChangesRequest {
193 #[serde(default = "AmountUnit::bitcoin")]
194 unit: AmountUnit,
195}
196
197impl Client {
198 pub async fn builder() -> anyhow::Result<ClientBuilder> {
201 Ok(ClientBuilder::new())
202 }
203
204 pub fn api(&self) -> &(dyn IGlobalFederationApi + 'static) {
205 self.api.as_ref()
206 }
207
208 pub fn api_clone(&self) -> DynGlobalApi {
209 self.api.clone()
210 }
211
212 pub fn connection_status_stream(&self) -> impl Stream<Item = BTreeMap<PeerId, bool>> {
215 self.api.connection_status_stream()
216 }
217
218 pub fn federation_reconnect(&self) {
226 let peers: Vec<PeerId> = self.api.all_peers().iter().copied().collect();
227
228 for peer_id in peers {
229 let api = self.api.clone();
230 self.task_group.spawn_cancellable(
231 format!("federation-reconnect-once-{peer_id}"),
232 async move {
233 if let Err(e) = api.get_peer_connection(peer_id).await {
234 debug!(
235 target: LOG_CLIENT_NET_API,
236 %peer_id,
237 err = %e.fmt_compact(),
238 "Failed to connect to peer"
239 );
240 }
241 },
242 );
243 }
244 }
245
246 pub fn spawn_federation_reconnect(&self) {
268 let peers: Vec<PeerId> = self.api.all_peers().iter().copied().collect();
269
270 for peer_id in peers {
271 let api = self.api.clone();
272 self.task_group.spawn_cancellable(
273 format!("federation-reconnect-{peer_id}"),
274 async move {
275 loop {
276 match api.get_peer_connection(peer_id).await {
277 Ok(conn) => {
278 conn.await_disconnection().await;
279 }
280 Err(e) => {
281 debug!(
284 target: LOG_CLIENT_NET_API,
285 %peer_id,
286 err = %e.fmt_compact(),
287 "Failed to connect to peer, will retry"
288 );
289 }
290 }
291 }
292 },
293 );
294 }
295 }
296
297 pub fn task_group(&self) -> &TaskGroup {
299 &self.task_group
300 }
301
302 pub fn get_metrics() -> anyhow::Result<String> {
307 fedimint_metrics::get_metrics()
308 }
309
310 #[doc(hidden)]
312 pub fn executor(&self) -> &Executor {
313 &self.executor
314 }
315
316 pub async fn get_config_from_db(db: &Database) -> Option<ClientConfig> {
317 let mut dbtx = db.begin_transaction_nc().await;
318 dbtx.get_value(&ClientConfigKey).await
319 }
320
321 pub async fn get_pending_config_from_db(db: &Database) -> Option<ClientConfig> {
322 let mut dbtx = db.begin_transaction_nc().await;
323 dbtx.get_value(&PendingClientConfigKey).await
324 }
325
326 pub async fn get_api_secret_from_db(db: &Database) -> Option<String> {
327 let mut dbtx = db.begin_transaction_nc().await;
328 dbtx.get_value(&ApiSecretKey).await
329 }
330
331 pub async fn store_encodable_client_secret<T: Encodable>(
332 db: &Database,
333 secret: T,
334 ) -> anyhow::Result<()> {
335 let mut dbtx = db.begin_transaction().await;
336
337 if dbtx.get_value(&EncodedClientSecretKey).await.is_some() {
339 bail!("Encoded client secret already exists, cannot overwrite")
340 }
341
342 let encoded_secret = T::consensus_encode_to_vec(&secret);
343 dbtx.insert_entry(&EncodedClientSecretKey, &encoded_secret)
344 .await;
345 dbtx.commit_tx().await;
346 Ok(())
347 }
348
349 pub async fn load_decodable_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
350 let Some(secret) = Self::load_decodable_client_secret_opt(db).await? else {
351 bail!("Encoded client secret not present in DB")
352 };
353
354 Ok(secret)
355 }
356 pub async fn load_decodable_client_secret_opt<T: Decodable>(
357 db: &Database,
358 ) -> anyhow::Result<Option<T>> {
359 let mut dbtx = db.begin_transaction_nc().await;
360
361 let client_secret = dbtx.get_value(&EncodedClientSecretKey).await;
362
363 Ok(match client_secret {
364 Some(client_secret) => Some(
365 T::consensus_decode_whole(&client_secret, &ModuleRegistry::default())
366 .map_err(|e| anyhow!("Decoding failed: {e}"))?,
367 ),
368 None => None,
369 })
370 }
371
372 pub async fn load_or_generate_client_secret(db: &Database) -> anyhow::Result<[u8; 64]> {
373 let client_secret = match Self::load_decodable_client_secret::<[u8; 64]>(db).await {
374 Ok(secret) => secret,
375 _ => {
376 let secret = PlainRootSecretStrategy::random(&mut thread_rng());
377 Self::store_encodable_client_secret(db, secret)
378 .await
379 .expect("Storing client secret must work");
380 secret
381 }
382 };
383 Ok(client_secret)
384 }
385
386 pub async fn is_initialized(db: &Database) -> bool {
387 let mut dbtx = db.begin_transaction_nc().await;
388 dbtx.raw_get_bytes(&[ClientConfigKey::DB_PREFIX])
389 .await
390 .expect("Unrecoverable error occurred while reading and entry from the database")
391 .is_some()
392 }
393
394 pub fn start_executor(self: &Arc<Self>) {
395 debug!(
396 target: LOG_CLIENT,
397 "Starting fedimint client executor",
398 );
399 self.executor.start_executor(self.context_gen());
400 }
401
402 pub fn federation_id(&self) -> FederationId {
403 self.federation_id
404 }
405
406 fn context_gen(self: &Arc<Self>) -> ModuleGlobalContextGen {
407 let client_inner = Arc::downgrade(self);
408 Arc::new(move |module_instance, operation| {
409 ModuleGlobalClientContext {
410 client: client_inner
411 .clone()
412 .upgrade()
413 .expect("ModuleGlobalContextGen called after client was dropped"),
414 module_instance_id: module_instance,
415 operation,
416 }
417 .into()
418 })
419 }
420
421 pub async fn config(&self) -> ClientConfig {
422 self.config.read().await.clone()
423 }
424
425 pub fn api_secret(&self) -> &Option<String> {
427 &self.api_secret
428 }
429
430 pub async fn core_api_version(&self) -> ApiVersion {
436 self.db
439 .begin_transaction_nc()
440 .await
441 .get_value(&CachedApiVersionSetKey)
442 .await
443 .map(|cached: CachedApiVersionSet| cached.0.core)
444 .unwrap_or(ApiVersion { major: 0, minor: 0 })
445 }
446
447 pub async fn chain_id(&self) -> anyhow::Result<ChainId> {
454 if let Some(chain_id) = self
456 .db
457 .begin_transaction_nc()
458 .await
459 .get_value(&ChainIdKey)
460 .await
461 {
462 return Ok(chain_id);
463 }
464
465 let chain_id = self.api.chain_id().await?;
467
468 let mut dbtx = self.db.begin_transaction().await;
470 dbtx.insert_entry(&ChainIdKey, &chain_id).await;
471 dbtx.commit_tx().await;
472
473 Ok(chain_id)
474 }
475
476 pub fn decoders(&self) -> &ModuleDecoderRegistry {
477 &self.decoders
478 }
479
480 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
482 self.try_get_module(instance)
483 .expect("Module instance not found")
484 }
485
486 fn try_get_module(
487 &self,
488 instance: ModuleInstanceId,
489 ) -> Option<&maybe_add_send_sync!(dyn IClientModule)> {
490 Some(self.modules.get(instance)?.as_ref())
491 }
492
493 pub fn has_module(&self, instance: ModuleInstanceId) -> bool {
494 self.modules.get(instance).is_some()
495 }
496
497 fn transaction_builder_get_balance(&self, builder: &TransactionBuilder) -> (Amounts, Amounts) {
503 let mut in_amounts = Amounts::ZERO;
505 let mut out_amounts = Amounts::ZERO;
506 let mut fee_amounts = Amounts::ZERO;
507
508 for input in builder.inputs() {
509 let module = self.get_module(input.input.module_instance_id());
510
511 let item_fees = module.input_fee(&input.amounts, &input.input).expect(
512 "We only build transactions with input versions that are supported by the module",
513 );
514
515 in_amounts.checked_add_mut(&input.amounts);
516 fee_amounts.checked_add_mut(&item_fees);
517 }
518
519 for output in builder.outputs() {
520 let module = self.get_module(output.output.module_instance_id());
521
522 let item_fees = module.output_fee(&output.amounts, &output.output).expect(
523 "We only build transactions with output versions that are supported by the module",
524 );
525
526 out_amounts.checked_add_mut(&output.amounts);
527 fee_amounts.checked_add_mut(&item_fees);
528 }
529
530 out_amounts.checked_add_mut(&fee_amounts);
531 (in_amounts, out_amounts)
532 }
533
534 pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
535 Ok((self.federation_id().to_fake_ln_pub_key(&self.secp_ctx)?, 0))
536 }
537
538 pub fn get_config_meta(&self, key: &str) -> Option<String> {
540 self.federation_config_meta.get(key).cloned()
541 }
542
543 pub(crate) fn root_secret(&self) -> DerivableSecret {
544 self.root_secret.clone()
545 }
546
547 pub async fn add_state_machines(
548 &self,
549 dbtx: &mut DatabaseTransaction<'_>,
550 states: Vec<DynState>,
551 ) -> AddStateMachinesResult {
552 self.executor.add_state_machines_dbtx(dbtx, states).await
553 }
554
555 pub async fn get_active_operations(&self) -> HashSet<OperationId> {
557 let active_states = self.executor.get_active_states().await;
558 let mut active_operations = HashSet::with_capacity(active_states.len());
559 let mut dbtx = self.db().begin_transaction_nc().await;
560 for (state, _) in active_states {
561 let operation_id = state.operation_id();
562 if dbtx
563 .get_value(&OperationLogKey { operation_id })
564 .await
565 .is_some()
566 {
567 active_operations.insert(operation_id);
568 }
569 }
570 active_operations
571 }
572
573 pub fn operation_log(&self) -> &OperationLog {
574 &self.operation_log
575 }
576
577 pub fn meta_service(&self) -> &Arc<MetaService> {
579 &self.meta_service
580 }
581
582 pub async fn get_meta_expiration_timestamp(&self) -> Option<SystemTime> {
584 let meta_service = self.meta_service();
585 let ts = meta_service
586 .get_field::<u64>(self.db(), "federation_expiry_timestamp")
587 .await
588 .and_then(|v| v.value)?;
589 Some(UNIX_EPOCH + Duration::from_secs(ts))
590 }
591
592 async fn finalize_transaction(
594 &self,
595 dbtx: &mut DatabaseTransaction<'_>,
596 operation_id: OperationId,
597 mut partial_transaction: TransactionBuilder,
598 ) -> anyhow::Result<(Transaction, Vec<DynState>, Range<u64>)> {
599 let (in_amounts, out_amounts) = self.transaction_builder_get_balance(&partial_transaction);
600
601 let mut added_inputs_bundles = vec![];
602 let mut added_outputs_bundles = vec![];
603
604 for unit in in_amounts.units().union(&out_amounts.units()) {
615 let input_amount = in_amounts.get(unit).copied().unwrap_or_default();
616 let output_amount = out_amounts.get(unit).copied().unwrap_or_default();
617 if input_amount == output_amount {
618 continue;
619 }
620
621 let Some((module_id, module)) = self.primary_module_for_unit(*unit) else {
622 bail!("No module to balance a partial transaction (affected unit: {unit:?}");
623 };
624
625 let (added_input_bundle, added_output_bundle) = module
626 .create_final_inputs_and_outputs(
627 module_id,
628 dbtx,
629 operation_id,
630 *unit,
631 input_amount,
632 output_amount,
633 )
634 .await?;
635
636 added_inputs_bundles.push(added_input_bundle);
637 added_outputs_bundles.push(added_output_bundle);
638 }
639
640 let change_range = Range {
644 start: partial_transaction.outputs().count() as u64,
645 end: (partial_transaction.outputs().count() as u64
646 + added_outputs_bundles
647 .iter()
648 .map(|output| output.outputs().len() as u64)
649 .sum::<u64>()),
650 };
651
652 for added_inputs in added_inputs_bundles {
653 partial_transaction = partial_transaction.with_inputs(added_inputs);
654 }
655
656 for added_outputs in added_outputs_bundles {
657 partial_transaction = partial_transaction.with_outputs(added_outputs);
658 }
659
660 let (input_amounts, output_amounts) =
661 self.transaction_builder_get_balance(&partial_transaction);
662
663 for (unit, output_amount) in output_amounts {
664 let input_amount = input_amounts.get(&unit).copied().unwrap_or_default();
665
666 assert!(input_amount >= output_amount, "Transaction is underfunded");
667 }
668
669 let (tx, states) = partial_transaction.build(&self.secp_ctx, thread_rng());
670
671 Ok((tx, states, change_range))
672 }
673
674 pub async fn finalize_and_submit_transaction<F, M>(
686 &self,
687 operation_id: OperationId,
688 operation_type: &str,
689 operation_meta_gen: F,
690 tx_builder: TransactionBuilder,
691 ) -> anyhow::Result<OutPointRange>
692 where
693 F: Fn(OutPointRange) -> M + Clone + MaybeSend + MaybeSync,
694 M: serde::Serialize + MaybeSend,
695 {
696 let operation_type = operation_type.to_owned();
697
698 let autocommit_res = self
699 .db
700 .autocommit(
701 |dbtx, _| {
702 let operation_type = operation_type.clone();
703 let tx_builder = tx_builder.clone();
704 let operation_meta_gen = operation_meta_gen.clone();
705 Box::pin(async move {
706 if Client::operation_exists_dbtx(dbtx, operation_id).await {
707 bail!("There already exists an operation with id {operation_id:?}")
708 }
709
710 let out_point_range = self
711 .finalize_and_submit_transaction_inner(dbtx, operation_id, tx_builder)
712 .await?;
713
714 self.operation_log()
715 .add_operation_log_entry_dbtx(
716 dbtx,
717 operation_id,
718 &operation_type,
719 operation_meta_gen(out_point_range),
720 )
721 .await;
722
723 Ok(out_point_range)
724 })
725 },
726 Some(100), )
728 .await;
729
730 match autocommit_res {
731 Ok(txid) => Ok(txid),
732 Err(AutocommitError::ClosureError { error, .. }) => Err(error),
733 Err(AutocommitError::CommitFailed {
734 attempts,
735 last_error,
736 }) => panic!(
737 "Failed to commit tx submission dbtx after {attempts} attempts: {last_error}"
738 ),
739 }
740 }
741
742 async fn finalize_and_submit_transaction_inner(
743 &self,
744 dbtx: &mut DatabaseTransaction<'_>,
745 operation_id: OperationId,
746 tx_builder: TransactionBuilder,
747 ) -> anyhow::Result<OutPointRange> {
748 let (transaction, mut states, change_range) = self
749 .finalize_transaction(&mut dbtx.to_ref_nc(), operation_id, tx_builder)
750 .await?;
751
752 if transaction.consensus_encode_to_vec().len() > Transaction::MAX_TX_SIZE {
753 let inputs = transaction
754 .inputs
755 .iter()
756 .map(DynInput::module_instance_id)
757 .collect::<Vec<_>>();
758 let outputs = transaction
759 .outputs
760 .iter()
761 .map(DynOutput::module_instance_id)
762 .collect::<Vec<_>>();
763 warn!(
764 target: LOG_CLIENT_NET_API,
765 size=%transaction.consensus_encode_to_vec().len(),
766 ?inputs,
767 ?outputs,
768 "Transaction too large",
769 );
770 debug!(target: LOG_CLIENT_NET_API, ?transaction, "transaction details");
771 bail!(
772 "The generated transaction would be rejected by the federation for being too large."
773 );
774 }
775
776 let txid = transaction.tx_hash();
777
778 debug!(
779 target: LOG_CLIENT_NET_API,
780 %txid,
781 operation_id = %operation_id.fmt_short(),
782 ?transaction,
783 "Finalized and submitting transaction",
784 );
785
786 let tx_submission_sm = DynState::from_typed(
787 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
788 TxSubmissionStatesSM {
789 operation_id,
790 state: TxSubmissionStates::Created(transaction),
791 },
792 );
793 states.push(tx_submission_sm);
794
795 self.executor.add_state_machines_dbtx(dbtx, states).await?;
796
797 self.log_event_dbtx(dbtx, None, TxCreatedEvent { txid, operation_id })
798 .await;
799
800 Ok(OutPointRange::new(txid, IdxRange::from(change_range)))
801 }
802
803 async fn transaction_update_stream(
804 &self,
805 operation_id: OperationId,
806 ) -> BoxStream<'static, TxSubmissionStatesSM> {
807 self.executor
808 .notifier()
809 .module_notifier::<TxSubmissionStatesSM>(
810 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
811 self.final_client.clone(),
812 )
813 .subscribe(operation_id)
814 .await
815 }
816
817 pub async fn operation_exists(&self, operation_id: OperationId) -> bool {
818 let mut dbtx = self.db().begin_transaction_nc().await;
819
820 Client::operation_exists_dbtx(&mut dbtx, operation_id).await
821 }
822
823 pub async fn operation_exists_dbtx(
824 dbtx: &mut DatabaseTransaction<'_>,
825 operation_id: OperationId,
826 ) -> bool {
827 let active_state_exists = dbtx
828 .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
829 .await
830 .next()
831 .await
832 .is_some();
833
834 let inactive_state_exists = dbtx
835 .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
836 .await
837 .next()
838 .await
839 .is_some();
840
841 active_state_exists || inactive_state_exists
842 }
843
844 pub async fn has_active_states(&self, operation_id: OperationId) -> bool {
845 self.db
846 .begin_transaction_nc()
847 .await
848 .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
849 .await
850 .next()
851 .await
852 .is_some()
853 }
854
855 pub async fn await_primary_bitcoin_module_output(
858 &self,
859 operation_id: OperationId,
860 out_point: OutPoint,
861 ) -> anyhow::Result<()> {
862 self.primary_module_for_unit(AmountUnit::BITCOIN)
863 .ok_or_else(|| anyhow!("No primary module available"))?
864 .1
865 .await_primary_module_output(operation_id, out_point)
866 .await
867 }
868
869 pub fn get_first_module<M: ClientModule>(
871 &'_ self,
872 ) -> anyhow::Result<ClientModuleInstance<'_, M>> {
873 let module_kind = M::kind();
874 let id = self
875 .get_first_instance(&module_kind)
876 .ok_or_else(|| format_err!("No modules found of kind {module_kind}"))?;
877 let module: &M = self
878 .try_get_module(id)
879 .ok_or_else(|| format_err!("Unknown module instance {id}"))?
880 .as_any()
881 .downcast_ref::<M>()
882 .ok_or_else(|| format_err!("Module is not of type {}", std::any::type_name::<M>()))?;
883 let (db, _) = self.db().with_prefix_module_id(id);
884 Ok(ClientModuleInstance {
885 id,
886 db,
887 api: self.api().with_module(id),
888 module,
889 })
890 }
891
892 pub fn get_module_client_dyn(
893 &self,
894 instance_id: ModuleInstanceId,
895 ) -> anyhow::Result<&maybe_add_send_sync!(dyn IClientModule)> {
896 self.try_get_module(instance_id)
897 .ok_or(anyhow!("Unknown module instance {}", instance_id))
898 }
899
900 pub fn db(&self) -> &Database {
901 &self.db
902 }
903
904 pub fn endpoints(&self) -> &ConnectorRegistry {
905 &self.connectors
906 }
907
908 pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
911 TransactionUpdates {
912 update_stream: self.transaction_update_stream(operation_id).await,
913 }
914 }
915
916 pub fn get_first_instance(&self, module_kind: &ModuleKind) -> Option<ModuleInstanceId> {
918 self.modules
919 .iter_modules()
920 .find(|(_, kind, _module)| *kind == module_kind)
921 .map(|(instance_id, _, _)| instance_id)
922 }
923
924 pub async fn root_secret_encoding<T: Decodable>(&self) -> anyhow::Result<T> {
927 get_decoded_client_secret::<T>(self.db()).await
928 }
929
930 pub async fn await_primary_bitcoin_module_outputs(
933 &self,
934 operation_id: OperationId,
935 outputs: Vec<OutPoint>,
936 ) -> anyhow::Result<()> {
937 for out_point in outputs {
938 self.await_primary_bitcoin_module_output(operation_id, out_point)
939 .await?;
940 }
941
942 Ok(())
943 }
944
945 pub async fn get_config_json(&self) -> JsonClientConfig {
951 self.config().await.to_json()
952 }
953
954 #[doc(hidden)]
957 pub async fn get_balance_for_btc(&self) -> anyhow::Result<Amount> {
960 self.get_balance_for_unit(AmountUnit::BITCOIN).await
961 }
962
963 pub async fn get_balance_for_unit(&self, unit: AmountUnit) -> anyhow::Result<Amount> {
964 let (id, module) = self
965 .primary_module_for_unit(unit)
966 .ok_or_else(|| anyhow!("Primary module not available"))?;
967 Ok(module
968 .get_balance(id, &mut self.db().begin_transaction_nc().await, unit)
969 .await)
970 }
971
972 pub async fn subscribe_balance_changes(&self, unit: AmountUnit) -> BoxStream<'static, Amount> {
975 let primary_module_things =
976 if let Some((primary_module_id, primary_module)) = self.primary_module_for_unit(unit) {
977 let balance_changes = primary_module.subscribe_balance_changes().await;
978 let initial_balance = self
979 .get_balance_for_unit(unit)
980 .await
981 .expect("Primary is present");
982
983 Some((
984 primary_module_id,
985 primary_module.clone(),
986 balance_changes,
987 initial_balance,
988 ))
989 } else {
990 None
991 };
992 let db = self.db().clone();
993
994 Box::pin(async_stream::stream! {
995 let Some((primary_module_id, primary_module, mut balance_changes, initial_balance)) = primary_module_things else {
996 pending().await
999 };
1000
1001
1002 yield initial_balance;
1003 let mut prev_balance = initial_balance;
1004 while let Some(()) = balance_changes.next().await {
1005 let mut dbtx = db.begin_transaction_nc().await;
1006 let balance = primary_module
1007 .get_balance(primary_module_id, &mut dbtx, unit)
1008 .await;
1009
1010 if balance != prev_balance {
1012 prev_balance = balance;
1013 yield balance;
1014 }
1015 }
1016 })
1017 }
1018
1019 async fn make_api_version_request(
1024 delay: Duration,
1025 peer_id: PeerId,
1026 api: &DynGlobalApi,
1027 ) -> (
1028 PeerId,
1029 Result<SupportedApiVersionsSummary, fedimint_connectors::error::ServerError>,
1030 ) {
1031 runtime::sleep(delay).await;
1032 (
1033 peer_id,
1034 api.request_single_peer::<SupportedApiVersionsSummary>(
1035 VERSION_ENDPOINT.to_owned(),
1036 ApiRequestErased::default(),
1037 peer_id,
1038 )
1039 .await,
1040 )
1041 }
1042
1043 fn create_api_version_backoff() -> impl Iterator<Item = Duration> {
1049 custom_backoff(Duration::from_millis(200), Duration::from_secs(600), None)
1050 }
1051
1052 pub async fn fetch_common_api_versions_from_all_peers(
1055 num_peers: NumPeers,
1056 api: DynGlobalApi,
1057 db: Database,
1058 num_responses_sender: watch::Sender<usize>,
1059 ) {
1060 let mut backoff = Self::create_api_version_backoff();
1061
1062 let mut requests = FuturesUnordered::new();
1065
1066 for peer_id in num_peers.peer_ids() {
1067 requests.push(Self::make_api_version_request(
1068 Duration::ZERO,
1069 peer_id,
1070 &api,
1071 ));
1072 }
1073
1074 let mut num_responses = 0;
1075
1076 while let Some((peer_id, response)) = requests.next().await {
1077 let retry = match response {
1078 Err(err) => {
1079 let has_previous_response = db
1080 .begin_transaction_nc()
1081 .await
1082 .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1083 .await
1084 .is_some();
1085 debug!(
1086 target: LOG_CLIENT,
1087 %peer_id,
1088 err = %err.fmt_compact(),
1089 %has_previous_response,
1090 "Failed to refresh API versions of a peer"
1091 );
1092
1093 !has_previous_response
1094 }
1095 Ok(o) => {
1096 let mut dbtx = db.begin_transaction().await;
1099 dbtx.insert_entry(
1100 &PeerLastApiVersionsSummaryKey(peer_id),
1101 &PeerLastApiVersionsSummary(o),
1102 )
1103 .await;
1104 dbtx.commit_tx().await;
1105 false
1106 }
1107 };
1108
1109 if retry {
1110 requests.push(Self::make_api_version_request(
1111 backoff.next().expect("Keeps retrying"),
1112 peer_id,
1113 &api,
1114 ));
1115 } else {
1116 num_responses += 1;
1117 num_responses_sender.send_replace(num_responses);
1118 }
1119 }
1120 }
1121
1122 pub async fn fetch_peers_api_versions_from_threshold_of_peers(
1126 num_peers: NumPeers,
1127 api: DynGlobalApi,
1128 ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1129 let mut backoff = Self::create_api_version_backoff();
1130
1131 let mut requests = FuturesUnordered::new();
1134
1135 for peer_id in num_peers.peer_ids() {
1136 requests.push(Self::make_api_version_request(
1137 Duration::ZERO,
1138 peer_id,
1139 &api,
1140 ));
1141 }
1142
1143 let mut successful_responses = BTreeMap::new();
1144
1145 while successful_responses.len() < num_peers.threshold()
1146 && let Some((peer_id, response)) = requests.next().await
1147 {
1148 let retry = match response {
1149 Err(err) => {
1150 debug!(
1151 target: LOG_CLIENT,
1152 %peer_id,
1153 err = %err.fmt_compact(),
1154 "Failed to fetch API versions from peer"
1155 );
1156 true
1157 }
1158 Ok(response) => {
1159 successful_responses.insert(peer_id, response);
1160 false
1161 }
1162 };
1163
1164 if retry {
1165 requests.push(Self::make_api_version_request(
1166 backoff.next().expect("Keeps retrying"),
1167 peer_id,
1168 &api,
1169 ));
1170 }
1171 }
1172
1173 successful_responses
1174 }
1175
1176 pub async fn fetch_common_api_versions(
1178 config: &ClientConfig,
1179 api: &DynGlobalApi,
1180 ) -> anyhow::Result<BTreeMap<PeerId, SupportedApiVersionsSummary>> {
1181 debug!(
1182 target: LOG_CLIENT,
1183 "Fetching common api versions"
1184 );
1185
1186 let num_peers = NumPeers::from(config.global.api_endpoints.len());
1187
1188 let peer_api_version_sets =
1189 Self::fetch_peers_api_versions_from_threshold_of_peers(num_peers, api.clone()).await;
1190
1191 Ok(peer_api_version_sets)
1192 }
1193
1194 pub async fn write_api_version_cache(
1198 dbtx: &mut DatabaseTransaction<'_>,
1199 api_version_set: ApiVersionSet,
1200 ) {
1201 debug!(
1202 target: LOG_CLIENT,
1203 value = ?api_version_set,
1204 "Writing API version set to cache"
1205 );
1206
1207 dbtx.insert_entry(
1208 &CachedApiVersionSetKey,
1209 &CachedApiVersionSet(api_version_set),
1210 )
1211 .await;
1212 }
1213
1214 pub async fn store_prefetched_api_versions(
1219 db: &Database,
1220 config: &ClientConfig,
1221 client_module_init: &ClientModuleInitRegistry,
1222 peer_api_versions: &BTreeMap<PeerId, SupportedApiVersionsSummary>,
1223 ) {
1224 debug!(
1225 target: LOG_CLIENT,
1226 "Storing {} prefetched peer API version responses and calculating common version set",
1227 peer_api_versions.len()
1228 );
1229
1230 let mut dbtx = db.begin_transaction().await;
1231 let client_supported_versions =
1233 Self::supported_api_versions_summary_static(config, client_module_init);
1234 match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1235 &client_supported_versions,
1236 peer_api_versions,
1237 ) {
1238 Ok(common_api_versions) => {
1239 Self::write_api_version_cache(&mut dbtx.to_ref_nc(), common_api_versions).await;
1241 debug!(target: LOG_CLIENT, "Calculated and stored common API version set");
1242 }
1243 Err(err) => {
1244 debug!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Failed to calculate common API versions from prefetched data");
1245 }
1246 }
1247
1248 for (peer_id, peer_api_versions) in peer_api_versions {
1250 dbtx.insert_entry(
1251 &PeerLastApiVersionsSummaryKey(*peer_id),
1252 &PeerLastApiVersionsSummary(peer_api_versions.clone()),
1253 )
1254 .await;
1255 }
1256 dbtx.commit_tx().await;
1257 debug!(target: LOG_CLIENT, "Stored individual peer API version responses");
1258 }
1259
1260 pub fn supported_api_versions_summary_static(
1262 config: &ClientConfig,
1263 client_module_init: &ClientModuleInitRegistry,
1264 ) -> SupportedApiVersionsSummary {
1265 SupportedApiVersionsSummary {
1266 core: SupportedCoreApiVersions {
1267 core_consensus: config.global.consensus_version,
1268 api: MultiApiVersion::try_from_iter(SUPPORTED_CORE_API_VERSIONS.to_owned())
1269 .expect("must not have conflicting versions"),
1270 },
1271 modules: config
1272 .modules
1273 .iter()
1274 .filter_map(|(&module_instance_id, module_config)| {
1275 client_module_init
1276 .get(module_config.kind())
1277 .map(|module_init| {
1278 (
1279 module_instance_id,
1280 SupportedModuleApiVersions {
1281 core_consensus: config.global.consensus_version,
1282 module_consensus: module_config.version,
1283 api: module_init.supported_api_versions(),
1284 },
1285 )
1286 })
1287 })
1288 .collect(),
1289 }
1290 }
1291
1292 pub async fn load_and_refresh_common_api_version(&self) -> anyhow::Result<ApiVersionSet> {
1293 Self::load_and_refresh_common_api_version_static(
1294 &self.config().await,
1295 &self.module_inits,
1296 self.connectors.clone(),
1297 &self.api,
1298 &self.db,
1299 &self.task_group,
1300 )
1301 .await
1302 }
1303
1304 async fn load_and_refresh_common_api_version_static(
1310 config: &ClientConfig,
1311 module_init: &ClientModuleInitRegistry,
1312 connectors: ConnectorRegistry,
1313 api: &DynGlobalApi,
1314 db: &Database,
1315 task_group: &TaskGroup,
1316 ) -> anyhow::Result<ApiVersionSet> {
1317 if let Some(v) = db
1318 .begin_transaction_nc()
1319 .await
1320 .get_value(&CachedApiVersionSetKey)
1321 .await
1322 {
1323 debug!(
1324 target: LOG_CLIENT,
1325 "Found existing cached common api versions"
1326 );
1327 let config = config.clone();
1328 let client_module_init = module_init.clone();
1329 let api = api.clone();
1330 let db = db.clone();
1331 let task_group = task_group.clone();
1332 task_group
1335 .clone()
1336 .spawn_cancellable("refresh_common_api_version_static", async move {
1337 connectors.wait_for_initialized_connections().await;
1338
1339 if let Err(error) = Self::refresh_common_api_version_static(
1340 &config,
1341 &client_module_init,
1342 &api,
1343 &db,
1344 task_group,
1345 false,
1346 )
1347 .await
1348 {
1349 warn!(
1350 target: LOG_CLIENT,
1351 err = %error.fmt_compact_anyhow(), "Failed to discover common api versions"
1352 );
1353 }
1354 });
1355
1356 return Ok(v.0);
1357 }
1358
1359 info!(
1360 target: LOG_CLIENT,
1361 "Fetching initial API versions "
1362 );
1363 Self::refresh_common_api_version_static(
1364 config,
1365 module_init,
1366 api,
1367 db,
1368 task_group.clone(),
1369 true,
1370 )
1371 .await
1372 }
1373
1374 async fn refresh_common_api_version_static(
1375 config: &ClientConfig,
1376 client_module_init: &ClientModuleInitRegistry,
1377 api: &DynGlobalApi,
1378 db: &Database,
1379 task_group: TaskGroup,
1380 block_until_ok: bool,
1381 ) -> anyhow::Result<ApiVersionSet> {
1382 debug!(
1383 target: LOG_CLIENT,
1384 "Refreshing common api versions"
1385 );
1386
1387 let (num_responses_sender, mut num_responses_receiver) = tokio::sync::watch::channel(0);
1388 let num_peers = NumPeers::from(config.global.api_endpoints.len());
1389
1390 task_group.spawn_cancellable("refresh peers api versions", {
1391 Client::fetch_common_api_versions_from_all_peers(
1392 num_peers,
1393 api.clone(),
1394 db.clone(),
1395 num_responses_sender,
1396 )
1397 });
1398
1399 let common_api_versions = loop {
1400 let _: Result<_, Elapsed> = runtime::timeout(
1408 Duration::from_secs(30),
1409 num_responses_receiver.wait_for(|num| num_peers.threshold() <= *num),
1410 )
1411 .await;
1412
1413 let peer_api_version_sets = Self::load_peers_last_api_versions(db, num_peers).await;
1414
1415 match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1416 &Self::supported_api_versions_summary_static(config, client_module_init),
1417 &peer_api_version_sets,
1418 ) {
1419 Ok(o) => break o,
1420 Err(err) if block_until_ok => {
1421 warn!(
1422 target: LOG_CLIENT,
1423 err = %err.fmt_compact_anyhow(),
1424 "Failed to discover API version to use. Retrying..."
1425 );
1426 continue;
1427 }
1428 Err(e) => return Err(e),
1429 }
1430 };
1431
1432 debug!(
1433 target: LOG_CLIENT,
1434 value = ?common_api_versions,
1435 "Updating the cached common api versions"
1436 );
1437 let mut dbtx = db.begin_transaction().await;
1438 let _ = dbtx
1439 .insert_entry(
1440 &CachedApiVersionSetKey,
1441 &CachedApiVersionSet(common_api_versions.clone()),
1442 )
1443 .await;
1444
1445 dbtx.commit_tx().await;
1446
1447 Ok(common_api_versions)
1448 }
1449
1450 pub async fn get_metadata(&self) -> Metadata {
1452 self.db
1453 .begin_transaction_nc()
1454 .await
1455 .get_value(&ClientMetadataKey)
1456 .await
1457 .unwrap_or_else(|| {
1458 warn!(
1459 target: LOG_CLIENT,
1460 "Missing existing metadata. This key should have been set on Client init"
1461 );
1462 Metadata::empty()
1463 })
1464 }
1465
1466 pub async fn set_metadata(&self, metadata: &Metadata) {
1468 self.db
1469 .autocommit::<_, _, anyhow::Error>(
1470 |dbtx, _| {
1471 Box::pin(async {
1472 Self::set_metadata_dbtx(dbtx, metadata).await;
1473 Ok(())
1474 })
1475 },
1476 None,
1477 )
1478 .await
1479 .expect("Failed to autocommit metadata");
1480 }
1481
1482 pub fn has_pending_recoveries(&self) -> bool {
1483 !self
1484 .client_recovery_progress_receiver
1485 .borrow()
1486 .iter()
1487 .all(|(_id, progress)| progress.is_done())
1488 }
1489
1490 pub async fn wait_for_all_recoveries(&self) -> anyhow::Result<()> {
1498 let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1499 recovery_receiver
1500 .wait_for(|in_progress| {
1501 in_progress
1502 .iter()
1503 .all(|(_id, progress)| progress.is_done())
1504 })
1505 .await
1506 .context("Recovery task completed and update receiver disconnected, but some modules failed to recover")?;
1507
1508 Ok(())
1509 }
1510
1511 pub fn subscribe_to_recovery_progress(
1516 &self,
1517 ) -> impl Stream<Item = (ModuleInstanceId, RecoveryProgress)> + use<> {
1518 WatchStream::new(self.client_recovery_progress_receiver.clone())
1519 .flat_map(futures::stream::iter)
1520 }
1521
1522 pub async fn wait_for_module_kind_recovery(
1523 &self,
1524 module_kind: ModuleKind,
1525 ) -> anyhow::Result<()> {
1526 let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1527 let config = self.config().await;
1528 recovery_receiver
1529 .wait_for(|in_progress| {
1530 !in_progress
1531 .iter()
1532 .filter(|(module_instance_id, _progress)| {
1533 config.modules[module_instance_id].kind == module_kind
1534 })
1535 .any(|(_id, progress)| !progress.is_done())
1536 })
1537 .await
1538 .context("Recovery task completed and update receiver disconnected, but the desired modules are still unavailable or failed to recover")?;
1539
1540 Ok(())
1541 }
1542
1543 pub async fn wait_for_all_active_state_machines(&self) -> anyhow::Result<()> {
1544 loop {
1545 if self.executor.get_active_states().await.is_empty() {
1546 break;
1547 }
1548 sleep(Duration::from_millis(100)).await;
1549 }
1550 Ok(())
1551 }
1552
1553 pub async fn set_metadata_dbtx(dbtx: &mut DatabaseTransaction<'_>, metadata: &Metadata) {
1555 dbtx.insert_new_entry(&ClientMetadataKey, metadata).await;
1556 }
1557
1558 fn spawn_module_recoveries_task(
1559 &self,
1560 recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1561 module_recoveries: BTreeMap<
1562 ModuleInstanceId,
1563 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1564 >,
1565 module_recovery_progress_receivers: BTreeMap<
1566 ModuleInstanceId,
1567 watch::Receiver<RecoveryProgress>,
1568 >,
1569 ) {
1570 let db = self.db.clone();
1571 let log_ordering_wakeup_tx = self.log_ordering_wakeup_tx.clone();
1572 let module_kinds: BTreeMap<ModuleInstanceId, String> = self
1573 .modules
1574 .iter_modules_id_kind()
1575 .map(|(id, kind)| (id, kind.to_string()))
1576 .collect();
1577 self.task_group
1578 .spawn("module recoveries", |_task_handle| async {
1579 Self::run_module_recoveries_task(
1580 db,
1581 log_ordering_wakeup_tx,
1582 recovery_sender,
1583 module_recoveries,
1584 module_recovery_progress_receivers,
1585 module_kinds,
1586 )
1587 .await;
1588 });
1589 }
1590
1591 async fn run_module_recoveries_task(
1592 db: Database,
1593 log_ordering_wakeup_tx: watch::Sender<()>,
1594 recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1595 module_recoveries: BTreeMap<
1596 ModuleInstanceId,
1597 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1598 >,
1599 module_recovery_progress_receivers: BTreeMap<
1600 ModuleInstanceId,
1601 watch::Receiver<RecoveryProgress>,
1602 >,
1603 module_kinds: BTreeMap<ModuleInstanceId, String>,
1604 ) {
1605 debug!(target: LOG_CLIENT_RECOVERY, num_modules=%module_recovery_progress_receivers.len(), "Staring module recoveries");
1606 let mut completed_stream = Vec::new();
1607 let progress_stream = futures::stream::FuturesUnordered::new();
1608
1609 for (module_instance_id, f) in module_recoveries {
1610 completed_stream.push(futures::stream::once(Box::pin(async move {
1611 match f.await {
1612 Ok(()) => (module_instance_id, None),
1613 Err(err) => {
1614 warn!(
1615 target: LOG_CLIENT,
1616 err = %err.fmt_compact_anyhow(), module_instance_id, "Module recovery failed"
1617 );
1618 futures::future::pending::<()>().await;
1622 unreachable!()
1623 }
1624 }
1625 })));
1626 }
1627
1628 for (module_instance_id, rx) in module_recovery_progress_receivers {
1629 progress_stream.push(
1630 tokio_stream::wrappers::WatchStream::new(rx)
1631 .fuse()
1632 .map(move |progress| (module_instance_id, Some(progress))),
1633 );
1634 }
1635
1636 let mut futures = futures::stream::select(
1637 futures::stream::select_all(progress_stream),
1638 futures::stream::select_all(completed_stream),
1639 );
1640
1641 while let Some((module_instance_id, progress)) = futures.next().await {
1642 let mut dbtx = db.begin_transaction().await;
1643
1644 let prev_progress = *recovery_sender
1645 .borrow()
1646 .get(&module_instance_id)
1647 .expect("existing progress must be present");
1648
1649 let progress = if prev_progress.is_done() {
1650 prev_progress
1652 } else if let Some(progress) = progress {
1653 progress
1654 } else {
1655 prev_progress.to_complete()
1656 };
1657
1658 if !prev_progress.is_done() && progress.is_done() {
1659 info!(
1660 target: LOG_CLIENT,
1661 module_instance_id,
1662 progress = format!("{}/{}", progress.complete, progress.total),
1663 "Recovery complete"
1664 );
1665 dbtx.log_event(
1666 log_ordering_wakeup_tx.clone(),
1667 None,
1668 ModuleRecoveryCompleted {
1669 module_id: module_instance_id,
1670 },
1671 )
1672 .await;
1673 } else {
1674 info!(
1675 target: LOG_CLIENT,
1676 module_instance_id,
1677 kind = module_kinds.get(&module_instance_id).map(String::as_str).unwrap_or("unknown"),
1678 progress = format!("{}/{}", progress.complete, progress.total),
1679 "Recovery progress"
1680 );
1681 }
1682
1683 dbtx.insert_entry(
1684 &ClientModuleRecovery { module_instance_id },
1685 &ClientModuleRecoveryState { progress },
1686 )
1687 .await;
1688 dbtx.commit_tx().await;
1689
1690 recovery_sender.send_modify(|v| {
1691 v.insert(module_instance_id, progress);
1692 });
1693 }
1694 debug!(target: LOG_CLIENT_RECOVERY, "Recovery executor stopped");
1695 }
1696
1697 async fn load_peers_last_api_versions(
1698 db: &Database,
1699 num_peers: NumPeers,
1700 ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1701 let mut peer_api_version_sets = BTreeMap::new();
1702
1703 let mut dbtx = db.begin_transaction_nc().await;
1704 for peer_id in num_peers.peer_ids() {
1705 if let Some(v) = dbtx
1706 .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1707 .await
1708 {
1709 peer_api_version_sets.insert(peer_id, v.0);
1710 }
1711 }
1712 drop(dbtx);
1713 peer_api_version_sets
1714 }
1715
1716 pub async fn get_peer_url_announcements(&self) -> BTreeMap<PeerId, SignedApiAnnouncement> {
1719 self.db()
1720 .begin_transaction_nc()
1721 .await
1722 .find_by_prefix(&ApiAnnouncementPrefix)
1723 .await
1724 .map(|(announcement_key, announcement)| (announcement_key.0, announcement))
1725 .collect()
1726 .await
1727 }
1728
1729 pub async fn get_guardian_metadata(
1731 &self,
1732 ) -> BTreeMap<PeerId, fedimint_core::net::guardian_metadata::SignedGuardianMetadata> {
1733 self.db()
1734 .begin_transaction_nc()
1735 .await
1736 .find_by_prefix(&crate::guardian_metadata::GuardianMetadataPrefix)
1737 .await
1738 .map(|(key, metadata)| (key.0, metadata))
1739 .collect()
1740 .await
1741 }
1742
1743 pub async fn get_peer_urls(&self) -> BTreeMap<PeerId, SafeUrl> {
1745 get_api_urls(&self.db, &self.config().await).await
1746 }
1747
1748 pub async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
1751 self.get_peer_urls()
1752 .await
1753 .into_iter()
1754 .find_map(|(peer_id, url)| (peer == peer_id).then_some(url))
1755 .map(|peer_url| {
1756 InviteCode::new(
1757 peer_url.clone(),
1758 peer,
1759 self.federation_id(),
1760 self.api_secret.clone(),
1761 )
1762 })
1763 }
1764
1765 pub async fn get_guardian_public_keys_blocking(
1769 &self,
1770 ) -> BTreeMap<PeerId, fedimint_core::secp256k1::PublicKey> {
1771 self.db
1772 .autocommit(
1773 |dbtx, _| {
1774 Box::pin(async move {
1775 let config = self.config().await;
1776
1777 let guardian_pub_keys = self
1778 .get_or_backfill_broadcast_public_keys(dbtx, config)
1779 .await;
1780
1781 Result::<_, ()>::Ok(guardian_pub_keys)
1782 })
1783 },
1784 None,
1785 )
1786 .await
1787 .expect("Will retry forever")
1788 }
1789
1790 async fn get_or_backfill_broadcast_public_keys(
1791 &self,
1792 dbtx: &mut DatabaseTransaction<'_>,
1793 config: ClientConfig,
1794 ) -> BTreeMap<PeerId, PublicKey> {
1795 match config.global.broadcast_public_keys {
1796 Some(guardian_pub_keys) => guardian_pub_keys,
1797 _ => {
1798 let (guardian_pub_keys, new_config) = self.fetch_and_update_config(config).await;
1799
1800 dbtx.insert_entry(&ClientConfigKey, &new_config).await;
1801 *(self.config.write().await) = new_config;
1802 guardian_pub_keys
1803 }
1804 }
1805 }
1806
1807 async fn fetch_session_count(&self) -> FederationResult<u64> {
1808 self.api.session_count().await
1809 }
1810
1811 async fn fetch_and_update_config(
1812 &self,
1813 config: ClientConfig,
1814 ) -> (BTreeMap<PeerId, PublicKey>, ClientConfig) {
1815 let fetched_config = retry(
1816 "Fetching guardian public keys",
1817 backoff_util::background_backoff(),
1818 || async {
1819 Ok(self
1820 .api
1821 .request_current_consensus::<ClientConfig>(
1822 CLIENT_CONFIG_ENDPOINT.to_owned(),
1823 ApiRequestErased::default(),
1824 )
1825 .await?)
1826 },
1827 )
1828 .await
1829 .expect("Will never return on error");
1830
1831 let Some(guardian_pub_keys) = fetched_config.global.broadcast_public_keys else {
1832 warn!(
1833 target: LOG_CLIENT,
1834 "Guardian public keys not found in fetched config, server not updated to 0.4 yet"
1835 );
1836 pending::<()>().await;
1837 unreachable!("Pending will never return");
1838 };
1839
1840 let new_config = ClientConfig {
1841 global: GlobalClientConfig {
1842 broadcast_public_keys: Some(guardian_pub_keys.clone()),
1843 ..config.global
1844 },
1845 modules: config.modules,
1846 };
1847 (guardian_pub_keys, new_config)
1848 }
1849
1850 pub fn handle_global_rpc(
1851 &self,
1852 method: String,
1853 params: serde_json::Value,
1854 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1855 Box::pin(try_stream! {
1856 match method.as_str() {
1857 "get_balance" => {
1858 let balance = self.get_balance_for_btc().await.unwrap_or_default();
1859 yield serde_json::to_value(balance)?;
1860 }
1861 "subscribe_balance_changes" => {
1862 let req: GetBalanceChangesRequest= serde_json::from_value(params)?;
1863 let mut stream = self.subscribe_balance_changes(req.unit).await;
1864 while let Some(balance) = stream.next().await {
1865 yield serde_json::to_value(balance)?;
1866 }
1867 }
1868 "get_config" => {
1869 let config = self.config().await;
1870 yield serde_json::to_value(config)?;
1871 }
1872 "get_federation_id" => {
1873 let federation_id = self.federation_id();
1874 yield serde_json::to_value(federation_id)?;
1875 }
1876 "get_invite_code" => {
1877 let req: GetInviteCodeRequest = serde_json::from_value(params)?;
1878 let invite_code = self.invite_code(req.peer).await;
1879 yield serde_json::to_value(invite_code)?;
1880 }
1881 "get_operation" => {
1882 let req: GetOperationIdRequest = serde_json::from_value(params)?;
1883 let operation = self.operation_log().get_operation(req.operation_id).await;
1884 yield serde_json::to_value(operation)?;
1885 }
1886 "list_operations" => {
1887 let req: ListOperationsParams = serde_json::from_value(params)?;
1888 let limit = if req.limit.is_none() && req.last_seen.is_none() {
1889 usize::MAX
1890 } else {
1891 req.limit.unwrap_or(usize::MAX)
1892 };
1893 let operations = self.operation_log()
1894 .paginate_operations_rev(limit, req.last_seen)
1895 .await;
1896 yield serde_json::to_value(operations)?;
1897 }
1898 "session_count" => {
1899 let count = self.fetch_session_count().await?;
1900 yield serde_json::to_value(count)?;
1901 }
1902 "has_pending_recoveries" => {
1903 let has_pending = self.has_pending_recoveries();
1904 yield serde_json::to_value(has_pending)?;
1905 }
1906 "wait_for_all_recoveries" => {
1907 self.wait_for_all_recoveries().await?;
1908 yield serde_json::Value::Null;
1909 }
1910 "subscribe_to_recovery_progress" => {
1911 let mut stream = self.subscribe_to_recovery_progress();
1912 while let Some((module_id, progress)) = stream.next().await {
1913 yield serde_json::json!({
1914 "module_id": module_id,
1915 "progress": progress
1916 });
1917 }
1918 }
1919 #[allow(deprecated)]
1920 "backup_to_federation" => {
1921 let metadata = if params.is_null() {
1922 Metadata::from_json_serialized(serde_json::json!({}))
1923 } else {
1924 Metadata::from_json_serialized(params)
1925 };
1926 self.backup_to_federation(metadata).await?;
1927 yield serde_json::Value::Null;
1928 }
1929 _ => {
1930 Err(anyhow::format_err!("Unknown method: {}", method))?;
1931 unreachable!()
1932 },
1933 }
1934 })
1935 }
1936
1937 pub async fn log_event<E>(&self, module_id: Option<ModuleInstanceId>, event: E)
1938 where
1939 E: Event + Send,
1940 {
1941 let mut dbtx = self.db.begin_transaction().await;
1942 self.log_event_dbtx(&mut dbtx, module_id, event).await;
1943 dbtx.commit_tx().await;
1944 }
1945
1946 pub async fn log_event_dbtx<E, Cap>(
1947 &self,
1948 dbtx: &mut DatabaseTransaction<'_, Cap>,
1949 module_id: Option<ModuleInstanceId>,
1950 event: E,
1951 ) where
1952 E: Event + Send,
1953 Cap: Send,
1954 {
1955 dbtx.log_event(self.log_ordering_wakeup_tx.clone(), module_id, event)
1956 .await;
1957 }
1958
1959 pub async fn log_event_raw_dbtx<Cap>(
1960 &self,
1961 dbtx: &mut DatabaseTransaction<'_, Cap>,
1962 kind: EventKind,
1963 module: Option<(ModuleKind, ModuleInstanceId)>,
1964 payload: Vec<u8>,
1965 persist: EventPersistence,
1966 ) where
1967 Cap: Send,
1968 {
1969 let module_id = module.as_ref().map(|m| m.1);
1970 let module_kind = module.map(|m| m.0);
1971 dbtx.log_event_raw(
1972 self.log_ordering_wakeup_tx.clone(),
1973 kind,
1974 module_kind,
1975 module_id,
1976 payload,
1977 persist,
1978 )
1979 .await;
1980 }
1981
1982 pub fn built_in_application_event_log_tracker(&self) -> DynEventLogTrimableTracker {
1994 struct BuiltInApplicationEventLogTracker;
1995
1996 #[apply(async_trait_maybe_send!)]
1997 impl EventLogTrimableTracker for BuiltInApplicationEventLogTracker {
1998 async fn store(
2000 &mut self,
2001 dbtx: &mut DatabaseTransaction<NonCommittable>,
2002 pos: EventLogTrimableId,
2003 ) -> anyhow::Result<()> {
2004 dbtx.insert_entry(&DefaultApplicationEventLogKey, &pos)
2005 .await;
2006 Ok(())
2007 }
2008
2009 async fn load(
2011 &mut self,
2012 dbtx: &mut DatabaseTransaction<NonCommittable>,
2013 ) -> anyhow::Result<Option<EventLogTrimableId>> {
2014 Ok(dbtx.get_value(&DefaultApplicationEventLogKey).await)
2015 }
2016 }
2017 Box::new(BuiltInApplicationEventLogTracker)
2018 }
2019
2020 pub async fn handle_historical_events<F, R>(
2028 &self,
2029 tracker: fedimint_eventlog::DynEventLogTracker,
2030 handler_fn: F,
2031 ) -> anyhow::Result<()>
2032 where
2033 F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
2034 R: Future<Output = anyhow::Result<()>>,
2035 {
2036 fedimint_eventlog::handle_events(
2037 self.db.clone(),
2038 tracker,
2039 self.log_event_added_rx.clone(),
2040 handler_fn,
2041 )
2042 .await
2043 }
2044
2045 pub async fn handle_events<F, R>(
2064 &self,
2065 tracker: fedimint_eventlog::DynEventLogTrimableTracker,
2066 handler_fn: F,
2067 ) -> anyhow::Result<()>
2068 where
2069 F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
2070 R: Future<Output = anyhow::Result<()>>,
2071 {
2072 fedimint_eventlog::handle_trimable_events(
2073 self.db.clone(),
2074 tracker,
2075 self.log_event_added_rx.clone(),
2076 handler_fn,
2077 )
2078 .await
2079 }
2080
2081 pub async fn get_event_log(
2082 &self,
2083 pos: Option<EventLogId>,
2084 limit: u64,
2085 ) -> Vec<PersistedLogEntry> {
2086 self.get_event_log_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
2087 .await
2088 }
2089
2090 pub async fn get_event_log_trimable(
2091 &self,
2092 pos: Option<EventLogTrimableId>,
2093 limit: u64,
2094 ) -> Vec<PersistedLogEntry> {
2095 self.get_event_log_trimable_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
2096 .await
2097 }
2098
2099 pub async fn get_event_log_dbtx<Cap>(
2100 &self,
2101 dbtx: &mut DatabaseTransaction<'_, Cap>,
2102 pos: Option<EventLogId>,
2103 limit: u64,
2104 ) -> Vec<PersistedLogEntry>
2105 where
2106 Cap: Send,
2107 {
2108 dbtx.get_event_log(pos, limit).await
2109 }
2110
2111 pub async fn get_event_log_trimable_dbtx<Cap>(
2112 &self,
2113 dbtx: &mut DatabaseTransaction<'_, Cap>,
2114 pos: Option<EventLogTrimableId>,
2115 limit: u64,
2116 ) -> Vec<PersistedLogEntry>
2117 where
2118 Cap: Send,
2119 {
2120 dbtx.get_event_log_trimable(pos, limit).await
2121 }
2122
2123 pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
2125 self.log_event_added_transient_tx.subscribe()
2126 }
2127
2128 pub fn log_event_added_rx(&self) -> watch::Receiver<()> {
2130 self.log_event_added_rx.clone()
2131 }
2132
2133 pub fn iroh_enable_dht(&self) -> bool {
2134 self.iroh_enable_dht
2135 }
2136
2137 pub(crate) async fn run_core_migrations(
2138 db_no_decoders: &Database,
2139 ) -> Result<(), anyhow::Error> {
2140 let mut dbtx = db_no_decoders.begin_transaction().await;
2141 apply_migrations_core_client_dbtx(&mut dbtx.to_ref_nc(), "fedimint-client".to_string())
2142 .await?;
2143 if is_running_in_test_env() {
2144 verify_client_db_integrity_dbtx(&mut dbtx.to_ref_nc()).await;
2145 }
2146 dbtx.commit_tx_result().await?;
2147 Ok(())
2148 }
2149
2150 fn primary_modules_for_unit(
2152 &self,
2153 unit: AmountUnit,
2154 ) -> impl Iterator<Item = (ModuleInstanceId, &DynClientModule)> {
2155 self.primary_modules
2156 .iter()
2157 .flat_map(move |(_prio, candidates)| {
2158 candidates
2159 .specific
2160 .get(&unit)
2161 .into_iter()
2162 .flatten()
2163 .copied()
2164 .chain(candidates.wildcard.iter().copied())
2166 })
2167 .map(|id| (id, self.modules.get_expect(id)))
2168 }
2169
2170 pub fn primary_module_for_unit(
2174 &self,
2175 unit: AmountUnit,
2176 ) -> Option<(ModuleInstanceId, &DynClientModule)> {
2177 self.primary_modules_for_unit(unit).next()
2178 }
2179
2180 pub fn primary_module_for_btc(&self) -> (ModuleInstanceId, &DynClientModule) {
2182 self.primary_module_for_unit(AmountUnit::BITCOIN)
2183 .expect("No primary module for Bitcoin")
2184 }
2185}
2186
2187#[apply(async_trait_maybe_send!)]
2188impl ClientContextIface for Client {
2189 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
2190 Client::get_module(self, instance)
2191 }
2192
2193 fn api_clone(&self) -> DynGlobalApi {
2194 Client::api_clone(self)
2195 }
2196 fn decoders(&self) -> &ModuleDecoderRegistry {
2197 Client::decoders(self)
2198 }
2199
2200 async fn finalize_and_submit_transaction(
2201 &self,
2202 operation_id: OperationId,
2203 operation_type: &str,
2204 operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
2205 tx_builder: TransactionBuilder,
2206 ) -> anyhow::Result<OutPointRange> {
2207 Client::finalize_and_submit_transaction(
2208 self,
2209 operation_id,
2210 operation_type,
2211 &operation_meta_gen,
2213 tx_builder,
2214 )
2215 .await
2216 }
2217
2218 async fn finalize_and_submit_transaction_inner(
2219 &self,
2220 dbtx: &mut DatabaseTransaction<'_>,
2221 operation_id: OperationId,
2222 tx_builder: TransactionBuilder,
2223 ) -> anyhow::Result<OutPointRange> {
2224 Client::finalize_and_submit_transaction_inner(self, dbtx, operation_id, tx_builder).await
2225 }
2226
2227 async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
2228 Client::transaction_updates(self, operation_id).await
2229 }
2230
2231 async fn await_primary_module_outputs(
2232 &self,
2233 operation_id: OperationId,
2234 outputs: Vec<OutPoint>,
2236 ) -> anyhow::Result<()> {
2237 Client::await_primary_bitcoin_module_outputs(self, operation_id, outputs).await
2238 }
2239
2240 fn operation_log(&self) -> &dyn IOperationLog {
2241 Client::operation_log(self)
2242 }
2243
2244 async fn has_active_states(&self, operation_id: OperationId) -> bool {
2245 Client::has_active_states(self, operation_id).await
2246 }
2247
2248 async fn operation_exists(&self, operation_id: OperationId) -> bool {
2249 Client::operation_exists(self, operation_id).await
2250 }
2251
2252 async fn config(&self) -> ClientConfig {
2253 Client::config(self).await
2254 }
2255
2256 fn db(&self) -> &Database {
2257 Client::db(self)
2258 }
2259
2260 fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static)) {
2261 Client::executor(self)
2262 }
2263
2264 async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
2265 Client::invite_code(self, peer).await
2266 }
2267
2268 fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
2269 Client::get_internal_payment_markers(self)
2270 }
2271
2272 async fn log_event_json(
2273 &self,
2274 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
2275 module_kind: Option<ModuleKind>,
2276 module_id: ModuleInstanceId,
2277 kind: EventKind,
2278 payload: serde_json::Value,
2279 persist: EventPersistence,
2280 ) {
2281 dbtx.ensure_global()
2282 .expect("Must be called with global dbtx");
2283 self.log_event_raw_dbtx(
2284 dbtx,
2285 kind,
2286 module_kind.map(|kind| (kind, module_id)),
2287 serde_json::to_vec(&payload).expect("Serialization can't fail"),
2288 persist,
2289 )
2290 .await;
2291 }
2292
2293 async fn read_operation_active_states<'dbtx>(
2294 &self,
2295 operation_id: OperationId,
2296 module_id: ModuleInstanceId,
2297 dbtx: &'dbtx mut DatabaseTransaction<'_>,
2298 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>
2299 {
2300 Box::pin(
2301 dbtx.find_by_prefix(&ActiveModuleOperationStateKeyPrefix {
2302 operation_id,
2303 module_instance: module_id,
2304 })
2305 .await
2306 .map(move |(k, v)| (k.0, v)),
2307 )
2308 }
2309 async fn read_operation_inactive_states<'dbtx>(
2310 &self,
2311 operation_id: OperationId,
2312 module_id: ModuleInstanceId,
2313 dbtx: &'dbtx mut DatabaseTransaction<'_>,
2314 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>
2315 {
2316 Box::pin(
2317 dbtx.find_by_prefix(&InactiveModuleOperationStateKeyPrefix {
2318 operation_id,
2319 module_instance: module_id,
2320 })
2321 .await
2322 .map(move |(k, v)| (k.0, v)),
2323 )
2324 }
2325}
2326
2327impl fmt::Debug for Client {
2329 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2330 write!(f, "Client")
2331 }
2332}
2333
2334pub fn client_decoders<'a>(
2335 registry: &ModuleInitRegistry<DynClientModuleInit>,
2336 module_kinds: impl Iterator<Item = (ModuleInstanceId, &'a ModuleKind)>,
2337) -> ModuleDecoderRegistry {
2338 let mut modules = BTreeMap::new();
2339 for (id, kind) in module_kinds {
2340 let Some(init) = registry.get(kind) else {
2341 debug!("Detected configuration for unsupported module id: {id}, kind: {kind}");
2342 continue;
2343 };
2344
2345 modules.insert(
2346 id,
2347 (
2348 kind.clone(),
2349 IClientModuleInit::decoder(AsRef::<dyn IClientModuleInit + 'static>::as_ref(init)),
2350 ),
2351 );
2352 }
2353 ModuleDecoderRegistry::from(modules)
2354}