1use std::collections::{BTreeMap, HashSet};
2use std::fmt::{self, Formatter};
3use std::future::{Future, pending};
4use std::ops::Range;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9use anyhow::{Context as _, anyhow, bail, format_err};
10use async_stream::try_stream;
11use bitcoin::key::Secp256k1;
12use bitcoin::key::rand::thread_rng;
13use bitcoin::secp256k1::{self, PublicKey};
14use fedimint_api_client::api::global_api::with_request_hook::ApiRequestHook;
15use fedimint_api_client::api::{
16 ApiVersionSet, DynGlobalApi, FederationApiExt as _, FederationResult, IGlobalFederationApi,
17};
18use fedimint_bitcoind::DynBitcoindRpc;
19use fedimint_client_module::module::recovery::RecoveryProgress;
20use fedimint_client_module::module::{
21 ClientContextIface, ClientModule, ClientModuleRegistry, DynClientModule, FinalClientIface,
22 IClientModule, IdxRange, OutPointRange, PrimaryModulePriority,
23};
24use fedimint_client_module::oplog::IOperationLog;
25use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy as _};
26use fedimint_client_module::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
27use fedimint_client_module::sm::{ActiveStateMeta, DynState, InactiveStateMeta};
28use fedimint_client_module::transaction::{
29 TRANSACTION_SUBMISSION_MODULE_INSTANCE, TransactionBuilder, TxSubmissionStates,
30 TxSubmissionStatesSM,
31};
32use fedimint_client_module::{
33 AddStateMachinesResult, ClientModuleInstance, GetInviteCodeRequest, ModuleGlobalContextGen,
34 ModuleRecoveryCompleted, TransactionUpdates, TxCreatedEvent,
35};
36use fedimint_connectors::ConnectorRegistry;
37use fedimint_core::config::{
38 ClientConfig, FederationId, GlobalClientConfig, JsonClientConfig, ModuleInitRegistry,
39};
40use fedimint_core::core::{DynInput, DynOutput, ModuleInstanceId, ModuleKind, OperationId};
41use fedimint_core::db::{
42 AutocommitError, Database, DatabaseRecord, DatabaseTransaction,
43 IDatabaseTransactionOpsCore as _, IDatabaseTransactionOpsCoreTyped as _, NonCommittable,
44};
45use fedimint_core::encoding::{Decodable, Encodable};
46use fedimint_core::endpoint_constants::{CLIENT_CONFIG_ENDPOINT, VERSION_ENDPOINT};
47use fedimint_core::envs::is_running_in_test_env;
48use fedimint_core::invite_code::InviteCode;
49use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
50use fedimint_core::module::{
51 AmountUnit, Amounts, ApiRequestErased, ApiVersion, MultiApiVersion,
52 SupportedApiVersionsSummary, SupportedCoreApiVersions, SupportedModuleApiVersions,
53};
54use fedimint_core::net::api_announcement::SignedApiAnnouncement;
55use fedimint_core::runtime::sleep;
56use fedimint_core::task::{Elapsed, MaybeSend, MaybeSync, TaskGroup};
57use fedimint_core::transaction::Transaction;
58use fedimint_core::util::backoff_util::custom_backoff;
59use fedimint_core::util::{
60 BoxStream, FmtCompact as _, FmtCompactAnyhow as _, SafeUrl, backoff_util, retry,
61};
62use fedimint_core::{
63 Amount, ChainId, NumPeers, OutPoint, PeerId, apply, async_trait_maybe_send, maybe_add_send,
64 maybe_add_send_sync, runtime,
65};
66use fedimint_derive_secret::DerivableSecret;
67use fedimint_eventlog::{
68 DBTransactionEventLogExt as _, DynEventLogTrimableTracker, Event, EventKind, EventLogEntry,
69 EventLogId, EventLogTrimableId, EventLogTrimableTracker, EventPersistence, PersistedLogEntry,
70};
71use fedimint_logging::{LOG_CLIENT, LOG_CLIENT_NET_API, LOG_CLIENT_RECOVERY};
72use futures::stream::FuturesUnordered;
73use futures::{Stream, StreamExt as _};
74use global_ctx::ModuleGlobalClientContext;
75use serde::{Deserialize, Serialize};
76use tokio::sync::{broadcast, watch};
77use tokio_stream::wrappers::WatchStream;
78use tracing::{debug, info, warn};
79
80use crate::ClientBuilder;
81use crate::api_announcements::{ApiAnnouncementPrefix, get_api_urls};
82use crate::backup::Metadata;
83use crate::client::event_log::DefaultApplicationEventLogKey;
84use crate::db::{
85 ApiSecretKey, CachedApiVersionSet, CachedApiVersionSetKey, ChainIdKey,
86 ChronologicalOperationLogKey, ClientConfigKey, ClientMetadataKey, ClientModuleRecovery,
87 ClientModuleRecoveryState, EncodedClientSecretKey, OperationLogKey, PeerLastApiVersionsSummary,
88 PeerLastApiVersionsSummaryKey, PendingClientConfigKey, apply_migrations_core_client_dbtx,
89 get_decoded_client_secret, verify_client_db_integrity_dbtx,
90};
91use crate::meta::MetaService;
92use crate::module_init::{ClientModuleInitRegistry, DynClientModuleInit, IClientModuleInit};
93use crate::oplog::OperationLog;
94use crate::sm::executor::{
95 ActiveModuleOperationStateKeyPrefix, ActiveOperationStateKeyPrefix, Executor,
96 InactiveModuleOperationStateKeyPrefix, InactiveOperationStateKeyPrefix,
97};
98
99pub(crate) mod builder;
100pub(crate) mod event_log;
101pub(crate) mod global_ctx;
102pub(crate) mod handle;
103
104const SUPPORTED_CORE_API_VERSIONS: &[fedimint_core::module::ApiVersion] =
108 &[ApiVersion { major: 0, minor: 0 }];
109
110#[derive(Default)]
112pub(crate) struct PrimaryModuleCandidates {
113 specific: BTreeMap<AmountUnit, Vec<ModuleInstanceId>>,
115 wildcard: Vec<ModuleInstanceId>,
117}
118
119pub struct Client {
133 final_client: FinalClientIface,
134 config: tokio::sync::RwLock<ClientConfig>,
135 api_secret: Option<String>,
136 decoders: ModuleDecoderRegistry,
137 connectors: ConnectorRegistry,
138 db: Database,
139 federation_id: FederationId,
140 federation_config_meta: BTreeMap<String, String>,
141 primary_modules: BTreeMap<PrimaryModulePriority, PrimaryModuleCandidates>,
142 pub(crate) modules: ClientModuleRegistry,
143 module_inits: ClientModuleInitRegistry,
144 executor: Executor,
145 pub(crate) api: DynGlobalApi,
146 root_secret: DerivableSecret,
147 operation_log: OperationLog,
148 secp_ctx: Secp256k1<secp256k1::All>,
149 meta_service: Arc<MetaService>,
150
151 task_group: TaskGroup,
152
153 client_recovery_progress_receiver:
155 watch::Receiver<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
156
157 log_ordering_wakeup_tx: watch::Sender<()>,
160 log_event_added_rx: watch::Receiver<()>,
162 log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
163 request_hook: ApiRequestHook,
164 iroh_enable_dht: bool,
165 iroh_enable_next: bool,
166 #[allow(dead_code)]
171 user_bitcoind_rpc: Option<DynBitcoindRpc>,
172 pub(crate) user_bitcoind_rpc_no_chain_id:
177 Option<fedimint_client_module::module::init::BitcoindRpcNoChainIdFactory>,
178}
179
180#[derive(Debug, Serialize, Deserialize)]
181struct ListOperationsParams {
182 limit: Option<usize>,
183 last_seen: Option<ChronologicalOperationLogKey>,
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize)]
187pub struct GetOperationIdRequest {
188 operation_id: OperationId,
189}
190
191#[derive(Debug, Clone, Serialize, Deserialize)]
192pub struct GetBalanceChangesRequest {
193 #[serde(default = "AmountUnit::bitcoin")]
194 unit: AmountUnit,
195}
196
197impl Client {
198 pub async fn builder() -> anyhow::Result<ClientBuilder> {
201 Ok(ClientBuilder::new())
202 }
203
204 pub fn api(&self) -> &(dyn IGlobalFederationApi + 'static) {
205 self.api.as_ref()
206 }
207
208 pub fn api_clone(&self) -> DynGlobalApi {
209 self.api.clone()
210 }
211
212 pub fn connection_status_stream(&self) -> impl Stream<Item = BTreeMap<PeerId, bool>> {
215 self.api.connection_status_stream()
216 }
217
218 pub fn federation_reconnect(&self) {
226 let peers: Vec<PeerId> = self.api.all_peers().iter().copied().collect();
227
228 for peer_id in peers {
229 let api = self.api.clone();
230 self.task_group.spawn_cancellable(
231 format!("federation-reconnect-once-{peer_id}"),
232 async move {
233 if let Err(e) = api.get_peer_connection(peer_id).await {
234 debug!(
235 target: LOG_CLIENT_NET_API,
236 %peer_id,
237 err = %e.fmt_compact(),
238 "Failed to connect to peer"
239 );
240 }
241 },
242 );
243 }
244 }
245
246 pub fn spawn_federation_reconnect(&self) {
268 let peers: Vec<PeerId> = self.api.all_peers().iter().copied().collect();
269
270 for peer_id in peers {
271 let api = self.api.clone();
272 self.task_group.spawn_cancellable(
273 format!("federation-reconnect-{peer_id}"),
274 async move {
275 loop {
276 match api.get_peer_connection(peer_id).await {
277 Ok(conn) => {
278 conn.await_disconnection().await;
279 }
280 Err(e) => {
281 debug!(
284 target: LOG_CLIENT_NET_API,
285 %peer_id,
286 err = %e.fmt_compact(),
287 "Failed to connect to peer, will retry"
288 );
289 }
290 }
291 }
292 },
293 );
294 }
295 }
296
297 pub fn task_group(&self) -> &TaskGroup {
299 &self.task_group
300 }
301
302 pub fn get_metrics() -> anyhow::Result<String> {
307 fedimint_metrics::get_metrics()
308 }
309
310 #[doc(hidden)]
312 pub fn executor(&self) -> &Executor {
313 &self.executor
314 }
315
316 pub async fn get_config_from_db(db: &Database) -> Option<ClientConfig> {
317 let mut dbtx = db.begin_transaction_nc().await;
318 dbtx.get_value(&ClientConfigKey).await
319 }
320
321 pub async fn get_pending_config_from_db(db: &Database) -> Option<ClientConfig> {
322 let mut dbtx = db.begin_transaction_nc().await;
323 dbtx.get_value(&PendingClientConfigKey).await
324 }
325
326 pub async fn get_api_secret_from_db(db: &Database) -> Option<String> {
327 let mut dbtx = db.begin_transaction_nc().await;
328 dbtx.get_value(&ApiSecretKey).await
329 }
330
331 pub async fn store_encodable_client_secret<T: Encodable>(
332 db: &Database,
333 secret: T,
334 ) -> anyhow::Result<()> {
335 let mut dbtx = db.begin_transaction().await;
336
337 if dbtx.get_value(&EncodedClientSecretKey).await.is_some() {
339 bail!("Encoded client secret already exists, cannot overwrite")
340 }
341
342 let encoded_secret = T::consensus_encode_to_vec(&secret);
343 dbtx.insert_entry(&EncodedClientSecretKey, &encoded_secret)
344 .await;
345 dbtx.commit_tx().await;
346 Ok(())
347 }
348
349 pub async fn load_decodable_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
350 let Some(secret) = Self::load_decodable_client_secret_opt(db).await? else {
351 bail!("Encoded client secret not present in DB")
352 };
353
354 Ok(secret)
355 }
356 pub async fn load_decodable_client_secret_opt<T: Decodable>(
357 db: &Database,
358 ) -> anyhow::Result<Option<T>> {
359 let mut dbtx = db.begin_transaction_nc().await;
360
361 let client_secret = dbtx.get_value(&EncodedClientSecretKey).await;
362
363 Ok(match client_secret {
364 Some(client_secret) => Some(
365 T::consensus_decode_whole(&client_secret, &ModuleRegistry::default())
366 .map_err(|e| anyhow!("Decoding failed: {e}"))?,
367 ),
368 None => None,
369 })
370 }
371
372 pub async fn load_or_generate_client_secret(db: &Database) -> anyhow::Result<[u8; 64]> {
373 let client_secret = match Self::load_decodable_client_secret::<[u8; 64]>(db).await {
374 Ok(secret) => secret,
375 _ => {
376 let secret = PlainRootSecretStrategy::random(&mut thread_rng());
377 Self::store_encodable_client_secret(db, secret)
378 .await
379 .expect("Storing client secret must work");
380 secret
381 }
382 };
383 Ok(client_secret)
384 }
385
386 pub async fn is_initialized(db: &Database) -> bool {
387 let mut dbtx = db.begin_transaction_nc().await;
388 dbtx.raw_get_bytes(&[ClientConfigKey::DB_PREFIX])
389 .await
390 .expect("Unrecoverable error occurred while reading and entry from the database")
391 .is_some()
392 }
393
394 pub fn start_executor(self: &Arc<Self>) {
395 debug!(
396 target: LOG_CLIENT,
397 "Starting fedimint client executor",
398 );
399 self.executor.start_executor(self.context_gen());
400 }
401
402 pub fn federation_id(&self) -> FederationId {
403 self.federation_id
404 }
405
406 fn context_gen(self: &Arc<Self>) -> ModuleGlobalContextGen {
407 let client_inner = Arc::downgrade(self);
408 Arc::new(move |module_instance, operation| {
409 ModuleGlobalClientContext {
410 client: client_inner
411 .clone()
412 .upgrade()
413 .expect("ModuleGlobalContextGen called after client was dropped"),
414 module_instance_id: module_instance,
415 operation,
416 }
417 .into()
418 })
419 }
420
421 pub async fn config(&self) -> ClientConfig {
422 self.config.read().await.clone()
423 }
424
425 pub fn api_secret(&self) -> &Option<String> {
427 &self.api_secret
428 }
429
430 pub async fn core_api_version(&self) -> ApiVersion {
436 self.db
439 .begin_transaction_nc()
440 .await
441 .get_value(&CachedApiVersionSetKey)
442 .await
443 .map(|cached: CachedApiVersionSet| cached.0.core)
444 .unwrap_or(ApiVersion { major: 0, minor: 0 })
445 }
446
447 pub async fn chain_id(&self) -> anyhow::Result<ChainId> {
454 if let Some(chain_id) = self
456 .db
457 .begin_transaction_nc()
458 .await
459 .get_value(&ChainIdKey)
460 .await
461 {
462 return Ok(chain_id);
463 }
464
465 let chain_id = self.api.chain_id().await?;
467
468 let mut dbtx = self.db.begin_transaction().await;
470 dbtx.insert_entry(&ChainIdKey, &chain_id).await;
471 dbtx.commit_tx().await;
472
473 Ok(chain_id)
474 }
475
476 pub fn decoders(&self) -> &ModuleDecoderRegistry {
477 &self.decoders
478 }
479
480 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
482 self.try_get_module(instance)
483 .expect("Module instance not found")
484 }
485
486 fn try_get_module(
487 &self,
488 instance: ModuleInstanceId,
489 ) -> Option<&maybe_add_send_sync!(dyn IClientModule)> {
490 Some(self.modules.get(instance)?.as_ref())
491 }
492
493 pub fn has_module(&self, instance: ModuleInstanceId) -> bool {
494 self.modules.get(instance).is_some()
495 }
496
497 fn transaction_builder_get_balance(&self, builder: &TransactionBuilder) -> (Amounts, Amounts) {
503 let mut in_amounts = Amounts::ZERO;
505 let mut out_amounts = Amounts::ZERO;
506 let mut fee_amounts = Amounts::ZERO;
507
508 for input in builder.inputs() {
509 let module = self.get_module(input.input.module_instance_id());
510
511 let item_fees = module.input_fee(&input.amounts, &input.input).expect(
512 "We only build transactions with input versions that are supported by the module",
513 );
514
515 in_amounts.checked_add_mut(&input.amounts);
516 fee_amounts.checked_add_mut(&item_fees);
517 }
518
519 for output in builder.outputs() {
520 let module = self.get_module(output.output.module_instance_id());
521
522 let item_fees = module.output_fee(&output.amounts, &output.output).expect(
523 "We only build transactions with output versions that are supported by the module",
524 );
525
526 out_amounts.checked_add_mut(&output.amounts);
527 fee_amounts.checked_add_mut(&item_fees);
528 }
529
530 out_amounts.checked_add_mut(&fee_amounts);
531 (in_amounts, out_amounts)
532 }
533
534 pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
535 Ok((self.federation_id().to_fake_ln_pub_key(&self.secp_ctx)?, 0))
536 }
537
538 pub fn get_config_meta(&self, key: &str) -> Option<String> {
540 self.federation_config_meta.get(key).cloned()
541 }
542
543 pub(crate) fn root_secret(&self) -> DerivableSecret {
544 self.root_secret.clone()
545 }
546
547 pub async fn add_state_machines(
548 &self,
549 dbtx: &mut DatabaseTransaction<'_>,
550 states: Vec<DynState>,
551 ) -> AddStateMachinesResult {
552 self.executor.add_state_machines_dbtx(dbtx, states).await
553 }
554
555 pub async fn get_active_operations(&self) -> HashSet<OperationId> {
557 let active_states = self.executor.get_active_states().await;
558 let mut active_operations = HashSet::with_capacity(active_states.len());
559 let mut dbtx = self.db().begin_transaction_nc().await;
560 for (state, _) in active_states {
561 let operation_id = state.operation_id();
562 if dbtx
563 .get_value(&OperationLogKey { operation_id })
564 .await
565 .is_some()
566 {
567 active_operations.insert(operation_id);
568 }
569 }
570 active_operations
571 }
572
573 pub fn operation_log(&self) -> &OperationLog {
574 &self.operation_log
575 }
576
577 pub fn meta_service(&self) -> &Arc<MetaService> {
579 &self.meta_service
580 }
581
582 pub async fn get_meta_expiration_timestamp(&self) -> Option<SystemTime> {
584 let meta_service = self.meta_service();
585 let ts = meta_service
586 .get_field::<u64>(self.db(), "federation_expiry_timestamp")
587 .await
588 .and_then(|v| v.value)?;
589 Some(UNIX_EPOCH + Duration::from_secs(ts))
590 }
591
592 async fn finalize_transaction(
594 &self,
595 dbtx: &mut DatabaseTransaction<'_>,
596 operation_id: OperationId,
597 mut partial_transaction: TransactionBuilder,
598 ) -> anyhow::Result<(Transaction, Vec<DynState>, Range<u64>)> {
599 let (in_amounts, out_amounts) = self.transaction_builder_get_balance(&partial_transaction);
600
601 let mut added_inputs_bundles = vec![];
602 let mut added_outputs_bundles = vec![];
603
604 for unit in in_amounts.units().union(&out_amounts.units()) {
615 let input_amount = in_amounts.get(unit).copied().unwrap_or_default();
616 let output_amount = out_amounts.get(unit).copied().unwrap_or_default();
617 if input_amount == output_amount {
618 continue;
619 }
620
621 let Some((module_id, module)) = self.primary_module_for_unit(*unit) else {
622 bail!("No module to balance a partial transaction (affected unit: {unit:?}");
623 };
624
625 let (added_input_bundle, added_output_bundle) = module
626 .create_final_inputs_and_outputs(
627 module_id,
628 dbtx,
629 operation_id,
630 *unit,
631 input_amount,
632 output_amount,
633 )
634 .await?;
635
636 added_inputs_bundles.push(added_input_bundle);
637 added_outputs_bundles.push(added_output_bundle);
638 }
639
640 let change_range = Range {
644 start: partial_transaction.outputs().count() as u64,
645 end: (partial_transaction.outputs().count() as u64
646 + added_outputs_bundles
647 .iter()
648 .map(|output| output.outputs().len() as u64)
649 .sum::<u64>()),
650 };
651
652 for added_inputs in added_inputs_bundles {
653 partial_transaction = partial_transaction.with_inputs(added_inputs);
654 }
655
656 for added_outputs in added_outputs_bundles {
657 partial_transaction = partial_transaction.with_outputs(added_outputs);
658 }
659
660 let (input_amounts, output_amounts) =
661 self.transaction_builder_get_balance(&partial_transaction);
662
663 for (unit, output_amount) in output_amounts {
664 let input_amount = input_amounts.get(&unit).copied().unwrap_or_default();
665
666 assert!(input_amount >= output_amount, "Transaction is underfunded");
667 }
668
669 let (tx, states) = partial_transaction.build(&self.secp_ctx, thread_rng());
670
671 Ok((tx, states, change_range))
672 }
673
674 pub async fn finalize_and_submit_transaction<F, M>(
686 &self,
687 operation_id: OperationId,
688 operation_type: &str,
689 operation_meta_gen: F,
690 tx_builder: TransactionBuilder,
691 ) -> anyhow::Result<OutPointRange>
692 where
693 F: Fn(OutPointRange) -> M + Clone + MaybeSend + MaybeSync,
694 M: serde::Serialize + MaybeSend,
695 {
696 let operation_type = operation_type.to_owned();
697
698 let autocommit_res = self
699 .db
700 .autocommit(
701 |dbtx, _| {
702 let operation_type = operation_type.clone();
703 let tx_builder = tx_builder.clone();
704 let operation_meta_gen = operation_meta_gen.clone();
705 Box::pin(async move {
706 if Client::operation_exists_dbtx(dbtx, operation_id).await {
707 bail!("There already exists an operation with id {operation_id:?}")
708 }
709
710 let out_point_range = self
711 .finalize_and_submit_transaction_inner(dbtx, operation_id, tx_builder)
712 .await?;
713
714 self.operation_log()
715 .add_operation_log_entry_dbtx(
716 dbtx,
717 operation_id,
718 &operation_type,
719 operation_meta_gen(out_point_range),
720 )
721 .await;
722
723 Ok(out_point_range)
724 })
725 },
726 Some(100), )
728 .await;
729
730 match autocommit_res {
731 Ok(txid) => Ok(txid),
732 Err(AutocommitError::ClosureError { error, .. }) => Err(error),
733 Err(AutocommitError::CommitFailed {
734 attempts,
735 last_error,
736 }) => panic!(
737 "Failed to commit tx submission dbtx after {attempts} attempts: {last_error}"
738 ),
739 }
740 }
741
742 async fn finalize_and_submit_transaction_inner(
743 &self,
744 dbtx: &mut DatabaseTransaction<'_>,
745 operation_id: OperationId,
746 tx_builder: TransactionBuilder,
747 ) -> anyhow::Result<OutPointRange> {
748 let (transaction, mut states, change_range) = self
749 .finalize_transaction(&mut dbtx.to_ref_nc(), operation_id, tx_builder)
750 .await?;
751
752 if transaction.consensus_encode_to_vec().len() > Transaction::MAX_TX_SIZE {
753 let inputs = transaction
754 .inputs
755 .iter()
756 .map(DynInput::module_instance_id)
757 .collect::<Vec<_>>();
758 let outputs = transaction
759 .outputs
760 .iter()
761 .map(DynOutput::module_instance_id)
762 .collect::<Vec<_>>();
763 warn!(
764 target: LOG_CLIENT_NET_API,
765 size=%transaction.consensus_encode_to_vec().len(),
766 ?inputs,
767 ?outputs,
768 "Transaction too large",
769 );
770 debug!(target: LOG_CLIENT_NET_API, ?transaction, "transaction details");
771 bail!(
772 "The generated transaction would be rejected by the federation for being too large."
773 );
774 }
775
776 let txid = transaction.tx_hash();
777
778 debug!(
779 target: LOG_CLIENT_NET_API,
780 %txid,
781 operation_id = %operation_id.fmt_short(),
782 ?transaction,
783 "Finalized and submitting transaction",
784 );
785
786 let tx_submission_sm = DynState::from_typed(
787 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
788 TxSubmissionStatesSM {
789 operation_id,
790 state: TxSubmissionStates::Created(transaction),
791 },
792 );
793 states.push(tx_submission_sm);
794
795 self.executor.add_state_machines_dbtx(dbtx, states).await?;
796
797 self.log_event_dbtx(dbtx, None, TxCreatedEvent { txid, operation_id })
798 .await;
799
800 Ok(OutPointRange::new(txid, IdxRange::from(change_range)))
801 }
802
803 async fn transaction_update_stream(
804 &self,
805 operation_id: OperationId,
806 ) -> BoxStream<'static, TxSubmissionStatesSM> {
807 self.executor
808 .notifier()
809 .module_notifier::<TxSubmissionStatesSM>(
810 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
811 self.final_client.clone(),
812 )
813 .subscribe(operation_id)
814 .await
815 }
816
817 pub async fn operation_exists(&self, operation_id: OperationId) -> bool {
818 let mut dbtx = self.db().begin_transaction_nc().await;
819
820 Client::operation_exists_dbtx(&mut dbtx, operation_id).await
821 }
822
823 pub async fn operation_exists_dbtx(
824 dbtx: &mut DatabaseTransaction<'_>,
825 operation_id: OperationId,
826 ) -> bool {
827 let active_state_exists = dbtx
828 .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
829 .await
830 .next()
831 .await
832 .is_some();
833
834 let inactive_state_exists = dbtx
835 .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
836 .await
837 .next()
838 .await
839 .is_some();
840
841 active_state_exists || inactive_state_exists
842 }
843
844 pub async fn has_active_states(&self, operation_id: OperationId) -> bool {
845 self.db
846 .begin_transaction_nc()
847 .await
848 .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
849 .await
850 .next()
851 .await
852 .is_some()
853 }
854
855 pub async fn await_primary_bitcoin_module_output(
858 &self,
859 operation_id: OperationId,
860 out_point: OutPoint,
861 ) -> anyhow::Result<()> {
862 self.primary_module_for_unit(AmountUnit::BITCOIN)
863 .ok_or_else(|| anyhow!("No primary module available"))?
864 .1
865 .await_primary_module_output(operation_id, out_point)
866 .await
867 }
868
869 pub fn get_first_module<M: ClientModule>(
871 &'_ self,
872 ) -> anyhow::Result<ClientModuleInstance<'_, M>> {
873 let module_kind = M::kind();
874 let id = self
875 .get_first_instance(&module_kind)
876 .ok_or_else(|| format_err!("No modules found of kind {module_kind}"))?;
877 let module: &M = self
878 .try_get_module(id)
879 .ok_or_else(|| format_err!("Unknown module instance {id}"))?
880 .as_any()
881 .downcast_ref::<M>()
882 .ok_or_else(|| format_err!("Module is not of type {}", std::any::type_name::<M>()))?;
883 let (db, _) = self.db().with_prefix_module_id(id);
884 Ok(ClientModuleInstance {
885 id,
886 db,
887 api: self.api().with_module(id),
888 module,
889 })
890 }
891
892 pub fn get_module_client_dyn(
893 &self,
894 instance_id: ModuleInstanceId,
895 ) -> anyhow::Result<&maybe_add_send_sync!(dyn IClientModule)> {
896 self.try_get_module(instance_id)
897 .ok_or(anyhow!("Unknown module instance {}", instance_id))
898 }
899
900 pub fn db(&self) -> &Database {
901 &self.db
902 }
903
904 pub fn endpoints(&self) -> &ConnectorRegistry {
905 &self.connectors
906 }
907
908 pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
911 TransactionUpdates {
912 update_stream: self.transaction_update_stream(operation_id).await,
913 }
914 }
915
916 pub fn get_first_instance(&self, module_kind: &ModuleKind) -> Option<ModuleInstanceId> {
918 self.modules
919 .iter_modules()
920 .find(|(_, kind, _module)| *kind == module_kind)
921 .map(|(instance_id, _, _)| instance_id)
922 }
923
924 pub async fn root_secret_encoding<T: Decodable>(&self) -> anyhow::Result<T> {
927 get_decoded_client_secret::<T>(self.db()).await
928 }
929
930 pub async fn await_primary_bitcoin_module_outputs(
933 &self,
934 operation_id: OperationId,
935 outputs: Vec<OutPoint>,
936 ) -> anyhow::Result<()> {
937 for out_point in outputs {
938 self.await_primary_bitcoin_module_output(operation_id, out_point)
939 .await?;
940 }
941
942 Ok(())
943 }
944
945 pub async fn get_config_json(&self) -> JsonClientConfig {
951 self.config().await.to_json()
952 }
953
954 #[doc(hidden)]
957 pub async fn get_balance_for_btc(&self) -> anyhow::Result<Amount> {
960 self.get_balance_for_unit(AmountUnit::BITCOIN).await
961 }
962
963 pub async fn get_balance_for_unit(&self, unit: AmountUnit) -> anyhow::Result<Amount> {
964 let (id, module) = self
965 .primary_module_for_unit(unit)
966 .ok_or_else(|| anyhow!("Primary module not available"))?;
967 Ok(module
968 .get_balance(id, &mut self.db().begin_transaction_nc().await, unit)
969 .await)
970 }
971
972 pub async fn subscribe_balance_changes(&self, unit: AmountUnit) -> BoxStream<'static, Amount> {
975 let primary_module_things =
976 if let Some((primary_module_id, primary_module)) = self.primary_module_for_unit(unit) {
977 let balance_changes = primary_module.subscribe_balance_changes().await;
978 let initial_balance = self
979 .get_balance_for_unit(unit)
980 .await
981 .expect("Primary is present");
982
983 Some((
984 primary_module_id,
985 primary_module.clone(),
986 balance_changes,
987 initial_balance,
988 ))
989 } else {
990 None
991 };
992 let db = self.db().clone();
993
994 Box::pin(async_stream::stream! {
995 let Some((primary_module_id, primary_module, mut balance_changes, initial_balance)) = primary_module_things else {
996 pending().await
999 };
1000
1001
1002 yield initial_balance;
1003 let mut prev_balance = initial_balance;
1004 while let Some(()) = balance_changes.next().await {
1005 let mut dbtx = db.begin_transaction_nc().await;
1006 let balance = primary_module
1007 .get_balance(primary_module_id, &mut dbtx, unit)
1008 .await;
1009
1010 if balance != prev_balance {
1012 prev_balance = balance;
1013 yield balance;
1014 }
1015 }
1016 })
1017 }
1018
1019 async fn make_api_version_request(
1024 delay: Duration,
1025 peer_id: PeerId,
1026 api: &DynGlobalApi,
1027 ) -> (
1028 PeerId,
1029 Result<SupportedApiVersionsSummary, fedimint_connectors::error::ServerError>,
1030 ) {
1031 runtime::sleep(delay).await;
1032 (
1033 peer_id,
1034 api.request_single_peer::<SupportedApiVersionsSummary>(
1035 VERSION_ENDPOINT.to_owned(),
1036 ApiRequestErased::default(),
1037 peer_id,
1038 )
1039 .await,
1040 )
1041 }
1042
1043 fn create_api_version_backoff() -> impl Iterator<Item = Duration> {
1049 custom_backoff(Duration::from_millis(200), Duration::from_secs(600), None)
1050 }
1051
1052 pub async fn fetch_common_api_versions_from_all_peers(
1055 num_peers: NumPeers,
1056 api: DynGlobalApi,
1057 db: Database,
1058 num_responses_sender: watch::Sender<usize>,
1059 ) {
1060 let mut backoff = Self::create_api_version_backoff();
1061
1062 let mut requests = FuturesUnordered::new();
1065
1066 for peer_id in num_peers.peer_ids() {
1067 requests.push(Self::make_api_version_request(
1068 Duration::ZERO,
1069 peer_id,
1070 &api,
1071 ));
1072 }
1073
1074 let mut num_responses = 0;
1075
1076 while let Some((peer_id, response)) = requests.next().await {
1077 let retry = match response {
1078 Err(err) => {
1079 let has_previous_response = db
1080 .begin_transaction_nc()
1081 .await
1082 .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1083 .await
1084 .is_some();
1085 debug!(
1086 target: LOG_CLIENT,
1087 %peer_id,
1088 err = %err.fmt_compact(),
1089 %has_previous_response,
1090 "Failed to refresh API versions of a peer"
1091 );
1092
1093 !has_previous_response
1094 }
1095 Ok(o) => {
1096 let mut dbtx = db.begin_transaction().await;
1099 dbtx.insert_entry(
1100 &PeerLastApiVersionsSummaryKey(peer_id),
1101 &PeerLastApiVersionsSummary(o),
1102 )
1103 .await;
1104 dbtx.commit_tx().await;
1105 false
1106 }
1107 };
1108
1109 if retry {
1110 requests.push(Self::make_api_version_request(
1111 backoff.next().expect("Keeps retrying"),
1112 peer_id,
1113 &api,
1114 ));
1115 } else {
1116 num_responses += 1;
1117 num_responses_sender.send_replace(num_responses);
1118 }
1119 }
1120 }
1121
1122 pub async fn fetch_peers_api_versions_from_threshold_of_peers(
1126 num_peers: NumPeers,
1127 api: DynGlobalApi,
1128 ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1129 let mut backoff = Self::create_api_version_backoff();
1130
1131 let mut requests = FuturesUnordered::new();
1134
1135 for peer_id in num_peers.peer_ids() {
1136 requests.push(Self::make_api_version_request(
1137 Duration::ZERO,
1138 peer_id,
1139 &api,
1140 ));
1141 }
1142
1143 let mut successful_responses = BTreeMap::new();
1144
1145 while successful_responses.len() < num_peers.threshold()
1146 && let Some((peer_id, response)) = requests.next().await
1147 {
1148 let retry = match response {
1149 Err(err) => {
1150 debug!(
1151 target: LOG_CLIENT,
1152 %peer_id,
1153 err = %err.fmt_compact(),
1154 "Failed to fetch API versions from peer"
1155 );
1156 true
1157 }
1158 Ok(response) => {
1159 successful_responses.insert(peer_id, response);
1160 false
1161 }
1162 };
1163
1164 if retry {
1165 requests.push(Self::make_api_version_request(
1166 backoff.next().expect("Keeps retrying"),
1167 peer_id,
1168 &api,
1169 ));
1170 }
1171 }
1172
1173 successful_responses
1174 }
1175
1176 pub async fn fetch_common_api_versions(
1178 config: &ClientConfig,
1179 api: &DynGlobalApi,
1180 ) -> anyhow::Result<BTreeMap<PeerId, SupportedApiVersionsSummary>> {
1181 debug!(
1182 target: LOG_CLIENT,
1183 "Fetching common api versions"
1184 );
1185
1186 let num_peers = NumPeers::from(config.global.api_endpoints.len());
1187
1188 let peer_api_version_sets =
1189 Self::fetch_peers_api_versions_from_threshold_of_peers(num_peers, api.clone()).await;
1190
1191 Ok(peer_api_version_sets)
1192 }
1193
1194 pub async fn write_api_version_cache(
1198 dbtx: &mut DatabaseTransaction<'_>,
1199 api_version_set: ApiVersionSet,
1200 ) {
1201 debug!(
1202 target: LOG_CLIENT,
1203 value = ?api_version_set,
1204 "Writing API version set to cache"
1205 );
1206
1207 dbtx.insert_entry(
1208 &CachedApiVersionSetKey,
1209 &CachedApiVersionSet(api_version_set),
1210 )
1211 .await;
1212 }
1213
1214 pub async fn store_prefetched_api_versions(
1219 db: &Database,
1220 config: &ClientConfig,
1221 client_module_init: &ClientModuleInitRegistry,
1222 peer_api_versions: &BTreeMap<PeerId, SupportedApiVersionsSummary>,
1223 ) {
1224 debug!(
1225 target: LOG_CLIENT,
1226 "Storing {} prefetched peer API version responses and calculating common version set",
1227 peer_api_versions.len()
1228 );
1229
1230 let mut dbtx = db.begin_transaction().await;
1231 let client_supported_versions =
1233 Self::supported_api_versions_summary_static(config, client_module_init);
1234 match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1235 &client_supported_versions,
1236 peer_api_versions,
1237 ) {
1238 Ok(common_api_versions) => {
1239 Self::write_api_version_cache(&mut dbtx.to_ref_nc(), common_api_versions).await;
1241 debug!(target: LOG_CLIENT, "Calculated and stored common API version set");
1242 }
1243 Err(err) => {
1244 debug!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Failed to calculate common API versions from prefetched data");
1245 }
1246 }
1247
1248 for (peer_id, peer_api_versions) in peer_api_versions {
1250 dbtx.insert_entry(
1251 &PeerLastApiVersionsSummaryKey(*peer_id),
1252 &PeerLastApiVersionsSummary(peer_api_versions.clone()),
1253 )
1254 .await;
1255 }
1256 dbtx.commit_tx().await;
1257 debug!(target: LOG_CLIENT, "Stored individual peer API version responses");
1258 }
1259
1260 pub fn supported_api_versions_summary_static(
1262 config: &ClientConfig,
1263 client_module_init: &ClientModuleInitRegistry,
1264 ) -> SupportedApiVersionsSummary {
1265 SupportedApiVersionsSummary {
1266 core: SupportedCoreApiVersions {
1267 core_consensus: config.global.consensus_version,
1268 api: MultiApiVersion::try_from_iter(SUPPORTED_CORE_API_VERSIONS.to_owned())
1269 .expect("must not have conflicting versions"),
1270 },
1271 modules: config
1272 .modules
1273 .iter()
1274 .filter_map(|(&module_instance_id, module_config)| {
1275 client_module_init
1276 .get(module_config.kind())
1277 .map(|module_init| {
1278 (
1279 module_instance_id,
1280 SupportedModuleApiVersions {
1281 core_consensus: config.global.consensus_version,
1282 module_consensus: module_config.version,
1283 api: module_init.supported_api_versions(),
1284 },
1285 )
1286 })
1287 })
1288 .collect(),
1289 }
1290 }
1291
1292 pub async fn load_and_refresh_common_api_version(&self) -> anyhow::Result<ApiVersionSet> {
1293 Self::load_and_refresh_common_api_version_static(
1294 &self.config().await,
1295 &self.module_inits,
1296 self.connectors.clone(),
1297 &self.api,
1298 &self.db,
1299 &self.task_group,
1300 )
1301 .await
1302 }
1303
1304 async fn load_and_refresh_common_api_version_static(
1310 config: &ClientConfig,
1311 module_init: &ClientModuleInitRegistry,
1312 connectors: ConnectorRegistry,
1313 api: &DynGlobalApi,
1314 db: &Database,
1315 task_group: &TaskGroup,
1316 ) -> anyhow::Result<ApiVersionSet> {
1317 if let Some(v) = db
1318 .begin_transaction_nc()
1319 .await
1320 .get_value(&CachedApiVersionSetKey)
1321 .await
1322 {
1323 debug!(
1324 target: LOG_CLIENT,
1325 "Found existing cached common api versions"
1326 );
1327 let config = config.clone();
1328 let client_module_init = module_init.clone();
1329 let api = api.clone();
1330 let db = db.clone();
1331 let task_group = task_group.clone();
1332 task_group
1335 .clone()
1336 .spawn_cancellable("refresh_common_api_version_static", async move {
1337 connectors.wait_for_initialized_connections().await;
1338
1339 if let Err(error) = Self::refresh_common_api_version_static(
1340 &config,
1341 &client_module_init,
1342 &api,
1343 &db,
1344 task_group,
1345 false,
1346 )
1347 .await
1348 {
1349 warn!(
1350 target: LOG_CLIENT,
1351 err = %error.fmt_compact_anyhow(), "Failed to discover common api versions"
1352 );
1353 }
1354 });
1355
1356 return Ok(v.0);
1357 }
1358
1359 info!(
1360 target: LOG_CLIENT,
1361 "Fetching initial API versions "
1362 );
1363 Self::refresh_common_api_version_static(
1364 config,
1365 module_init,
1366 api,
1367 db,
1368 task_group.clone(),
1369 true,
1370 )
1371 .await
1372 }
1373
1374 async fn refresh_common_api_version_static(
1375 config: &ClientConfig,
1376 client_module_init: &ClientModuleInitRegistry,
1377 api: &DynGlobalApi,
1378 db: &Database,
1379 task_group: TaskGroup,
1380 block_until_ok: bool,
1381 ) -> anyhow::Result<ApiVersionSet> {
1382 debug!(
1383 target: LOG_CLIENT,
1384 "Refreshing common api versions"
1385 );
1386
1387 let (num_responses_sender, mut num_responses_receiver) = tokio::sync::watch::channel(0);
1388 let num_peers = NumPeers::from(config.global.api_endpoints.len());
1389
1390 task_group.spawn_cancellable("refresh peers api versions", {
1391 Client::fetch_common_api_versions_from_all_peers(
1392 num_peers,
1393 api.clone(),
1394 db.clone(),
1395 num_responses_sender,
1396 )
1397 });
1398
1399 let common_api_versions = loop {
1400 let _: Result<_, Elapsed> = runtime::timeout(
1408 Duration::from_secs(30),
1409 num_responses_receiver.wait_for(|num| num_peers.threshold() <= *num),
1410 )
1411 .await;
1412
1413 let peer_api_version_sets = Self::load_peers_last_api_versions(db, num_peers).await;
1414
1415 match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1416 &Self::supported_api_versions_summary_static(config, client_module_init),
1417 &peer_api_version_sets,
1418 ) {
1419 Ok(o) => break o,
1420 Err(err) if block_until_ok => {
1421 warn!(
1422 target: LOG_CLIENT,
1423 err = %err.fmt_compact_anyhow(),
1424 "Failed to discover API version to use. Retrying..."
1425 );
1426 continue;
1427 }
1428 Err(e) => return Err(e),
1429 }
1430 };
1431
1432 debug!(
1433 target: LOG_CLIENT,
1434 value = ?common_api_versions,
1435 "Updating the cached common api versions"
1436 );
1437 let mut dbtx = db.begin_transaction().await;
1438 let _ = dbtx
1439 .insert_entry(
1440 &CachedApiVersionSetKey,
1441 &CachedApiVersionSet(common_api_versions.clone()),
1442 )
1443 .await;
1444
1445 dbtx.commit_tx().await;
1446
1447 Ok(common_api_versions)
1448 }
1449
1450 pub async fn get_metadata(&self) -> Metadata {
1452 self.db
1453 .begin_transaction_nc()
1454 .await
1455 .get_value(&ClientMetadataKey)
1456 .await
1457 .unwrap_or_else(|| {
1458 warn!(
1459 target: LOG_CLIENT,
1460 "Missing existing metadata. This key should have been set on Client init"
1461 );
1462 Metadata::empty()
1463 })
1464 }
1465
1466 pub async fn set_metadata(&self, metadata: &Metadata) {
1468 self.db
1469 .autocommit::<_, _, anyhow::Error>(
1470 |dbtx, _| {
1471 Box::pin(async {
1472 Self::set_metadata_dbtx(dbtx, metadata).await;
1473 Ok(())
1474 })
1475 },
1476 None,
1477 )
1478 .await
1479 .expect("Failed to autocommit metadata");
1480 }
1481
1482 pub fn has_pending_recoveries(&self) -> bool {
1483 !self
1484 .client_recovery_progress_receiver
1485 .borrow()
1486 .iter()
1487 .all(|(_id, progress)| progress.is_done())
1488 }
1489
1490 pub async fn wait_for_all_recoveries(&self) -> anyhow::Result<()> {
1498 let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1499 recovery_receiver
1500 .wait_for(|in_progress| {
1501 in_progress
1502 .iter()
1503 .all(|(_id, progress)| progress.is_done())
1504 })
1505 .await
1506 .context("Recovery task completed and update receiver disconnected, but some modules failed to recover")?;
1507
1508 Ok(())
1509 }
1510
1511 pub fn subscribe_to_recovery_progress(
1516 &self,
1517 ) -> impl Stream<Item = (ModuleInstanceId, RecoveryProgress)> + use<> {
1518 WatchStream::new(self.client_recovery_progress_receiver.clone())
1519 .flat_map(futures::stream::iter)
1520 }
1521
1522 pub async fn wait_for_module_kind_recovery(
1523 &self,
1524 module_kind: ModuleKind,
1525 ) -> anyhow::Result<()> {
1526 let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1527 let config = self.config().await;
1528 recovery_receiver
1529 .wait_for(|in_progress| {
1530 !in_progress
1531 .iter()
1532 .filter(|(module_instance_id, _progress)| {
1533 config.modules[module_instance_id].kind == module_kind
1534 })
1535 .any(|(_id, progress)| !progress.is_done())
1536 })
1537 .await
1538 .context("Recovery task completed and update receiver disconnected, but the desired modules are still unavailable or failed to recover")?;
1539
1540 Ok(())
1541 }
1542
1543 pub async fn wait_for_all_active_state_machines(&self) -> anyhow::Result<()> {
1544 loop {
1545 if self.executor.get_active_states().await.is_empty() {
1546 break;
1547 }
1548 sleep(Duration::from_millis(100)).await;
1549 }
1550 Ok(())
1551 }
1552
1553 pub async fn set_metadata_dbtx(dbtx: &mut DatabaseTransaction<'_>, metadata: &Metadata) {
1555 dbtx.insert_new_entry(&ClientMetadataKey, metadata).await;
1556 }
1557
1558 fn spawn_module_recoveries_task(
1559 &self,
1560 recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1561 module_recoveries: BTreeMap<
1562 ModuleInstanceId,
1563 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1564 >,
1565 module_recovery_progress_receivers: BTreeMap<
1566 ModuleInstanceId,
1567 watch::Receiver<RecoveryProgress>,
1568 >,
1569 ) {
1570 let db = self.db.clone();
1571 let log_ordering_wakeup_tx = self.log_ordering_wakeup_tx.clone();
1572 self.task_group
1573 .spawn("module recoveries", |_task_handle| async {
1574 Self::run_module_recoveries_task(
1575 db,
1576 log_ordering_wakeup_tx,
1577 recovery_sender,
1578 module_recoveries,
1579 module_recovery_progress_receivers,
1580 )
1581 .await;
1582 });
1583 }
1584
1585 async fn run_module_recoveries_task(
1586 db: Database,
1587 log_ordering_wakeup_tx: watch::Sender<()>,
1588 recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1589 module_recoveries: BTreeMap<
1590 ModuleInstanceId,
1591 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1592 >,
1593 module_recovery_progress_receivers: BTreeMap<
1594 ModuleInstanceId,
1595 watch::Receiver<RecoveryProgress>,
1596 >,
1597 ) {
1598 debug!(target: LOG_CLIENT_RECOVERY, num_modules=%module_recovery_progress_receivers.len(), "Staring module recoveries");
1599 let mut completed_stream = Vec::new();
1600 let progress_stream = futures::stream::FuturesUnordered::new();
1601
1602 for (module_instance_id, f) in module_recoveries {
1603 completed_stream.push(futures::stream::once(Box::pin(async move {
1604 match f.await {
1605 Ok(()) => (module_instance_id, None),
1606 Err(err) => {
1607 warn!(
1608 target: LOG_CLIENT,
1609 err = %err.fmt_compact_anyhow(), module_instance_id, "Module recovery failed"
1610 );
1611 futures::future::pending::<()>().await;
1615 unreachable!()
1616 }
1617 }
1618 })));
1619 }
1620
1621 for (module_instance_id, rx) in module_recovery_progress_receivers {
1622 progress_stream.push(
1623 tokio_stream::wrappers::WatchStream::new(rx)
1624 .fuse()
1625 .map(move |progress| (module_instance_id, Some(progress))),
1626 );
1627 }
1628
1629 let mut futures = futures::stream::select(
1630 futures::stream::select_all(progress_stream),
1631 futures::stream::select_all(completed_stream),
1632 );
1633
1634 while let Some((module_instance_id, progress)) = futures.next().await {
1635 let mut dbtx = db.begin_transaction().await;
1636
1637 let prev_progress = *recovery_sender
1638 .borrow()
1639 .get(&module_instance_id)
1640 .expect("existing progress must be present");
1641
1642 let progress = if prev_progress.is_done() {
1643 prev_progress
1645 } else if let Some(progress) = progress {
1646 progress
1647 } else {
1648 prev_progress.to_complete()
1649 };
1650
1651 if !prev_progress.is_done() && progress.is_done() {
1652 info!(
1653 target: LOG_CLIENT,
1654 module_instance_id,
1655 progress = format!("{}/{}", progress.complete, progress.total),
1656 "Recovery complete"
1657 );
1658 dbtx.log_event(
1659 log_ordering_wakeup_tx.clone(),
1660 None,
1661 ModuleRecoveryCompleted {
1662 module_id: module_instance_id,
1663 },
1664 )
1665 .await;
1666 } else {
1667 info!(
1668 target: LOG_CLIENT,
1669 module_instance_id,
1670 progress = format!("{}/{}", progress.complete, progress.total),
1671 "Recovery progress"
1672 );
1673 }
1674
1675 dbtx.insert_entry(
1676 &ClientModuleRecovery { module_instance_id },
1677 &ClientModuleRecoveryState { progress },
1678 )
1679 .await;
1680 dbtx.commit_tx().await;
1681
1682 recovery_sender.send_modify(|v| {
1683 v.insert(module_instance_id, progress);
1684 });
1685 }
1686 debug!(target: LOG_CLIENT_RECOVERY, "Recovery executor stopped");
1687 }
1688
1689 async fn load_peers_last_api_versions(
1690 db: &Database,
1691 num_peers: NumPeers,
1692 ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1693 let mut peer_api_version_sets = BTreeMap::new();
1694
1695 let mut dbtx = db.begin_transaction_nc().await;
1696 for peer_id in num_peers.peer_ids() {
1697 if let Some(v) = dbtx
1698 .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1699 .await
1700 {
1701 peer_api_version_sets.insert(peer_id, v.0);
1702 }
1703 }
1704 drop(dbtx);
1705 peer_api_version_sets
1706 }
1707
1708 pub async fn get_peer_url_announcements(&self) -> BTreeMap<PeerId, SignedApiAnnouncement> {
1711 self.db()
1712 .begin_transaction_nc()
1713 .await
1714 .find_by_prefix(&ApiAnnouncementPrefix)
1715 .await
1716 .map(|(announcement_key, announcement)| (announcement_key.0, announcement))
1717 .collect()
1718 .await
1719 }
1720
1721 pub async fn get_guardian_metadata(
1723 &self,
1724 ) -> BTreeMap<PeerId, fedimint_core::net::guardian_metadata::SignedGuardianMetadata> {
1725 self.db()
1726 .begin_transaction_nc()
1727 .await
1728 .find_by_prefix(&crate::guardian_metadata::GuardianMetadataPrefix)
1729 .await
1730 .map(|(key, metadata)| (key.0, metadata))
1731 .collect()
1732 .await
1733 }
1734
1735 pub async fn get_peer_urls(&self) -> BTreeMap<PeerId, SafeUrl> {
1737 get_api_urls(&self.db, &self.config().await).await
1738 }
1739
1740 pub async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
1743 self.get_peer_urls()
1744 .await
1745 .into_iter()
1746 .find_map(|(peer_id, url)| (peer == peer_id).then_some(url))
1747 .map(|peer_url| {
1748 InviteCode::new(
1749 peer_url.clone(),
1750 peer,
1751 self.federation_id(),
1752 self.api_secret.clone(),
1753 )
1754 })
1755 }
1756
1757 pub async fn get_guardian_public_keys_blocking(
1761 &self,
1762 ) -> BTreeMap<PeerId, fedimint_core::secp256k1::PublicKey> {
1763 self.db
1764 .autocommit(
1765 |dbtx, _| {
1766 Box::pin(async move {
1767 let config = self.config().await;
1768
1769 let guardian_pub_keys = self
1770 .get_or_backfill_broadcast_public_keys(dbtx, config)
1771 .await;
1772
1773 Result::<_, ()>::Ok(guardian_pub_keys)
1774 })
1775 },
1776 None,
1777 )
1778 .await
1779 .expect("Will retry forever")
1780 }
1781
1782 async fn get_or_backfill_broadcast_public_keys(
1783 &self,
1784 dbtx: &mut DatabaseTransaction<'_>,
1785 config: ClientConfig,
1786 ) -> BTreeMap<PeerId, PublicKey> {
1787 match config.global.broadcast_public_keys {
1788 Some(guardian_pub_keys) => guardian_pub_keys,
1789 _ => {
1790 let (guardian_pub_keys, new_config) = self.fetch_and_update_config(config).await;
1791
1792 dbtx.insert_entry(&ClientConfigKey, &new_config).await;
1793 *(self.config.write().await) = new_config;
1794 guardian_pub_keys
1795 }
1796 }
1797 }
1798
1799 async fn fetch_session_count(&self) -> FederationResult<u64> {
1800 self.api.session_count().await
1801 }
1802
1803 async fn fetch_and_update_config(
1804 &self,
1805 config: ClientConfig,
1806 ) -> (BTreeMap<PeerId, PublicKey>, ClientConfig) {
1807 let fetched_config = retry(
1808 "Fetching guardian public keys",
1809 backoff_util::background_backoff(),
1810 || async {
1811 Ok(self
1812 .api
1813 .request_current_consensus::<ClientConfig>(
1814 CLIENT_CONFIG_ENDPOINT.to_owned(),
1815 ApiRequestErased::default(),
1816 )
1817 .await?)
1818 },
1819 )
1820 .await
1821 .expect("Will never return on error");
1822
1823 let Some(guardian_pub_keys) = fetched_config.global.broadcast_public_keys else {
1824 warn!(
1825 target: LOG_CLIENT,
1826 "Guardian public keys not found in fetched config, server not updated to 0.4 yet"
1827 );
1828 pending::<()>().await;
1829 unreachable!("Pending will never return");
1830 };
1831
1832 let new_config = ClientConfig {
1833 global: GlobalClientConfig {
1834 broadcast_public_keys: Some(guardian_pub_keys.clone()),
1835 ..config.global
1836 },
1837 modules: config.modules,
1838 };
1839 (guardian_pub_keys, new_config)
1840 }
1841
1842 pub fn handle_global_rpc(
1843 &self,
1844 method: String,
1845 params: serde_json::Value,
1846 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1847 Box::pin(try_stream! {
1848 match method.as_str() {
1849 "get_balance" => {
1850 let balance = self.get_balance_for_btc().await.unwrap_or_default();
1851 yield serde_json::to_value(balance)?;
1852 }
1853 "subscribe_balance_changes" => {
1854 let req: GetBalanceChangesRequest= serde_json::from_value(params)?;
1855 let mut stream = self.subscribe_balance_changes(req.unit).await;
1856 while let Some(balance) = stream.next().await {
1857 yield serde_json::to_value(balance)?;
1858 }
1859 }
1860 "get_config" => {
1861 let config = self.config().await;
1862 yield serde_json::to_value(config)?;
1863 }
1864 "get_federation_id" => {
1865 let federation_id = self.federation_id();
1866 yield serde_json::to_value(federation_id)?;
1867 }
1868 "get_invite_code" => {
1869 let req: GetInviteCodeRequest = serde_json::from_value(params)?;
1870 let invite_code = self.invite_code(req.peer).await;
1871 yield serde_json::to_value(invite_code)?;
1872 }
1873 "get_operation" => {
1874 let req: GetOperationIdRequest = serde_json::from_value(params)?;
1875 let operation = self.operation_log().get_operation(req.operation_id).await;
1876 yield serde_json::to_value(operation)?;
1877 }
1878 "list_operations" => {
1879 let req: ListOperationsParams = serde_json::from_value(params)?;
1880 let limit = if req.limit.is_none() && req.last_seen.is_none() {
1881 usize::MAX
1882 } else {
1883 req.limit.unwrap_or(usize::MAX)
1884 };
1885 let operations = self.operation_log()
1886 .paginate_operations_rev(limit, req.last_seen)
1887 .await;
1888 yield serde_json::to_value(operations)?;
1889 }
1890 "session_count" => {
1891 let count = self.fetch_session_count().await?;
1892 yield serde_json::to_value(count)?;
1893 }
1894 "has_pending_recoveries" => {
1895 let has_pending = self.has_pending_recoveries();
1896 yield serde_json::to_value(has_pending)?;
1897 }
1898 "wait_for_all_recoveries" => {
1899 self.wait_for_all_recoveries().await?;
1900 yield serde_json::Value::Null;
1901 }
1902 "subscribe_to_recovery_progress" => {
1903 let mut stream = self.subscribe_to_recovery_progress();
1904 while let Some((module_id, progress)) = stream.next().await {
1905 yield serde_json::json!({
1906 "module_id": module_id,
1907 "progress": progress
1908 });
1909 }
1910 }
1911 #[allow(deprecated)]
1912 "backup_to_federation" => {
1913 let metadata = if params.is_null() {
1914 Metadata::from_json_serialized(serde_json::json!({}))
1915 } else {
1916 Metadata::from_json_serialized(params)
1917 };
1918 self.backup_to_federation(metadata).await?;
1919 yield serde_json::Value::Null;
1920 }
1921 _ => {
1922 Err(anyhow::format_err!("Unknown method: {}", method))?;
1923 unreachable!()
1924 },
1925 }
1926 })
1927 }
1928
1929 pub async fn log_event<E>(&self, module_id: Option<ModuleInstanceId>, event: E)
1930 where
1931 E: Event + Send,
1932 {
1933 let mut dbtx = self.db.begin_transaction().await;
1934 self.log_event_dbtx(&mut dbtx, module_id, event).await;
1935 dbtx.commit_tx().await;
1936 }
1937
1938 pub async fn log_event_dbtx<E, Cap>(
1939 &self,
1940 dbtx: &mut DatabaseTransaction<'_, Cap>,
1941 module_id: Option<ModuleInstanceId>,
1942 event: E,
1943 ) where
1944 E: Event + Send,
1945 Cap: Send,
1946 {
1947 dbtx.log_event(self.log_ordering_wakeup_tx.clone(), module_id, event)
1948 .await;
1949 }
1950
1951 pub async fn log_event_raw_dbtx<Cap>(
1952 &self,
1953 dbtx: &mut DatabaseTransaction<'_, Cap>,
1954 kind: EventKind,
1955 module: Option<(ModuleKind, ModuleInstanceId)>,
1956 payload: Vec<u8>,
1957 persist: EventPersistence,
1958 ) where
1959 Cap: Send,
1960 {
1961 let module_id = module.as_ref().map(|m| m.1);
1962 let module_kind = module.map(|m| m.0);
1963 dbtx.log_event_raw(
1964 self.log_ordering_wakeup_tx.clone(),
1965 kind,
1966 module_kind,
1967 module_id,
1968 payload,
1969 persist,
1970 )
1971 .await;
1972 }
1973
1974 pub fn built_in_application_event_log_tracker(&self) -> DynEventLogTrimableTracker {
1986 struct BuiltInApplicationEventLogTracker;
1987
1988 #[apply(async_trait_maybe_send!)]
1989 impl EventLogTrimableTracker for BuiltInApplicationEventLogTracker {
1990 async fn store(
1992 &mut self,
1993 dbtx: &mut DatabaseTransaction<NonCommittable>,
1994 pos: EventLogTrimableId,
1995 ) -> anyhow::Result<()> {
1996 dbtx.insert_entry(&DefaultApplicationEventLogKey, &pos)
1997 .await;
1998 Ok(())
1999 }
2000
2001 async fn load(
2003 &mut self,
2004 dbtx: &mut DatabaseTransaction<NonCommittable>,
2005 ) -> anyhow::Result<Option<EventLogTrimableId>> {
2006 Ok(dbtx.get_value(&DefaultApplicationEventLogKey).await)
2007 }
2008 }
2009 Box::new(BuiltInApplicationEventLogTracker)
2010 }
2011
2012 pub async fn handle_historical_events<F, R>(
2020 &self,
2021 tracker: fedimint_eventlog::DynEventLogTracker,
2022 handler_fn: F,
2023 ) -> anyhow::Result<()>
2024 where
2025 F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
2026 R: Future<Output = anyhow::Result<()>>,
2027 {
2028 fedimint_eventlog::handle_events(
2029 self.db.clone(),
2030 tracker,
2031 self.log_event_added_rx.clone(),
2032 handler_fn,
2033 )
2034 .await
2035 }
2036
2037 pub async fn handle_events<F, R>(
2056 &self,
2057 tracker: fedimint_eventlog::DynEventLogTrimableTracker,
2058 handler_fn: F,
2059 ) -> anyhow::Result<()>
2060 where
2061 F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
2062 R: Future<Output = anyhow::Result<()>>,
2063 {
2064 fedimint_eventlog::handle_trimable_events(
2065 self.db.clone(),
2066 tracker,
2067 self.log_event_added_rx.clone(),
2068 handler_fn,
2069 )
2070 .await
2071 }
2072
2073 pub async fn get_event_log(
2074 &self,
2075 pos: Option<EventLogId>,
2076 limit: u64,
2077 ) -> Vec<PersistedLogEntry> {
2078 self.get_event_log_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
2079 .await
2080 }
2081
2082 pub async fn get_event_log_trimable(
2083 &self,
2084 pos: Option<EventLogTrimableId>,
2085 limit: u64,
2086 ) -> Vec<PersistedLogEntry> {
2087 self.get_event_log_trimable_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
2088 .await
2089 }
2090
2091 pub async fn get_event_log_dbtx<Cap>(
2092 &self,
2093 dbtx: &mut DatabaseTransaction<'_, Cap>,
2094 pos: Option<EventLogId>,
2095 limit: u64,
2096 ) -> Vec<PersistedLogEntry>
2097 where
2098 Cap: Send,
2099 {
2100 dbtx.get_event_log(pos, limit).await
2101 }
2102
2103 pub async fn get_event_log_trimable_dbtx<Cap>(
2104 &self,
2105 dbtx: &mut DatabaseTransaction<'_, Cap>,
2106 pos: Option<EventLogTrimableId>,
2107 limit: u64,
2108 ) -> Vec<PersistedLogEntry>
2109 where
2110 Cap: Send,
2111 {
2112 dbtx.get_event_log_trimable(pos, limit).await
2113 }
2114
2115 pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
2117 self.log_event_added_transient_tx.subscribe()
2118 }
2119
2120 pub fn log_event_added_rx(&self) -> watch::Receiver<()> {
2122 self.log_event_added_rx.clone()
2123 }
2124
2125 pub fn iroh_enable_dht(&self) -> bool {
2126 self.iroh_enable_dht
2127 }
2128
2129 pub(crate) async fn run_core_migrations(
2130 db_no_decoders: &Database,
2131 ) -> Result<(), anyhow::Error> {
2132 let mut dbtx = db_no_decoders.begin_transaction().await;
2133 apply_migrations_core_client_dbtx(&mut dbtx.to_ref_nc(), "fedimint-client".to_string())
2134 .await?;
2135 if is_running_in_test_env() {
2136 verify_client_db_integrity_dbtx(&mut dbtx.to_ref_nc()).await;
2137 }
2138 dbtx.commit_tx_result().await?;
2139 Ok(())
2140 }
2141
2142 fn primary_modules_for_unit(
2144 &self,
2145 unit: AmountUnit,
2146 ) -> impl Iterator<Item = (ModuleInstanceId, &DynClientModule)> {
2147 self.primary_modules
2148 .iter()
2149 .flat_map(move |(_prio, candidates)| {
2150 candidates
2151 .specific
2152 .get(&unit)
2153 .into_iter()
2154 .flatten()
2155 .copied()
2156 .chain(candidates.wildcard.iter().copied())
2158 })
2159 .map(|id| (id, self.modules.get_expect(id)))
2160 }
2161
2162 pub fn primary_module_for_unit(
2166 &self,
2167 unit: AmountUnit,
2168 ) -> Option<(ModuleInstanceId, &DynClientModule)> {
2169 self.primary_modules_for_unit(unit).next()
2170 }
2171
2172 pub fn primary_module_for_btc(&self) -> (ModuleInstanceId, &DynClientModule) {
2174 self.primary_module_for_unit(AmountUnit::BITCOIN)
2175 .expect("No primary module for Bitcoin")
2176 }
2177}
2178
2179#[apply(async_trait_maybe_send!)]
2180impl ClientContextIface for Client {
2181 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
2182 Client::get_module(self, instance)
2183 }
2184
2185 fn api_clone(&self) -> DynGlobalApi {
2186 Client::api_clone(self)
2187 }
2188 fn decoders(&self) -> &ModuleDecoderRegistry {
2189 Client::decoders(self)
2190 }
2191
2192 async fn finalize_and_submit_transaction(
2193 &self,
2194 operation_id: OperationId,
2195 operation_type: &str,
2196 operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
2197 tx_builder: TransactionBuilder,
2198 ) -> anyhow::Result<OutPointRange> {
2199 Client::finalize_and_submit_transaction(
2200 self,
2201 operation_id,
2202 operation_type,
2203 &operation_meta_gen,
2205 tx_builder,
2206 )
2207 .await
2208 }
2209
2210 async fn finalize_and_submit_transaction_inner(
2211 &self,
2212 dbtx: &mut DatabaseTransaction<'_>,
2213 operation_id: OperationId,
2214 tx_builder: TransactionBuilder,
2215 ) -> anyhow::Result<OutPointRange> {
2216 Client::finalize_and_submit_transaction_inner(self, dbtx, operation_id, tx_builder).await
2217 }
2218
2219 async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
2220 Client::transaction_updates(self, operation_id).await
2221 }
2222
2223 async fn await_primary_module_outputs(
2224 &self,
2225 operation_id: OperationId,
2226 outputs: Vec<OutPoint>,
2228 ) -> anyhow::Result<()> {
2229 Client::await_primary_bitcoin_module_outputs(self, operation_id, outputs).await
2230 }
2231
2232 fn operation_log(&self) -> &dyn IOperationLog {
2233 Client::operation_log(self)
2234 }
2235
2236 async fn has_active_states(&self, operation_id: OperationId) -> bool {
2237 Client::has_active_states(self, operation_id).await
2238 }
2239
2240 async fn operation_exists(&self, operation_id: OperationId) -> bool {
2241 Client::operation_exists(self, operation_id).await
2242 }
2243
2244 async fn config(&self) -> ClientConfig {
2245 Client::config(self).await
2246 }
2247
2248 fn db(&self) -> &Database {
2249 Client::db(self)
2250 }
2251
2252 fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static)) {
2253 Client::executor(self)
2254 }
2255
2256 async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
2257 Client::invite_code(self, peer).await
2258 }
2259
2260 fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
2261 Client::get_internal_payment_markers(self)
2262 }
2263
2264 async fn log_event_json(
2265 &self,
2266 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
2267 module_kind: Option<ModuleKind>,
2268 module_id: ModuleInstanceId,
2269 kind: EventKind,
2270 payload: serde_json::Value,
2271 persist: EventPersistence,
2272 ) {
2273 dbtx.ensure_global()
2274 .expect("Must be called with global dbtx");
2275 self.log_event_raw_dbtx(
2276 dbtx,
2277 kind,
2278 module_kind.map(|kind| (kind, module_id)),
2279 serde_json::to_vec(&payload).expect("Serialization can't fail"),
2280 persist,
2281 )
2282 .await;
2283 }
2284
2285 async fn read_operation_active_states<'dbtx>(
2286 &self,
2287 operation_id: OperationId,
2288 module_id: ModuleInstanceId,
2289 dbtx: &'dbtx mut DatabaseTransaction<'_>,
2290 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>
2291 {
2292 Box::pin(
2293 dbtx.find_by_prefix(&ActiveModuleOperationStateKeyPrefix {
2294 operation_id,
2295 module_instance: module_id,
2296 })
2297 .await
2298 .map(move |(k, v)| (k.0, v)),
2299 )
2300 }
2301 async fn read_operation_inactive_states<'dbtx>(
2302 &self,
2303 operation_id: OperationId,
2304 module_id: ModuleInstanceId,
2305 dbtx: &'dbtx mut DatabaseTransaction<'_>,
2306 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>
2307 {
2308 Box::pin(
2309 dbtx.find_by_prefix(&InactiveModuleOperationStateKeyPrefix {
2310 operation_id,
2311 module_instance: module_id,
2312 })
2313 .await
2314 .map(move |(k, v)| (k.0, v)),
2315 )
2316 }
2317}
2318
2319impl fmt::Debug for Client {
2321 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2322 write!(f, "Client")
2323 }
2324}
2325
2326pub fn client_decoders<'a>(
2327 registry: &ModuleInitRegistry<DynClientModuleInit>,
2328 module_kinds: impl Iterator<Item = (ModuleInstanceId, &'a ModuleKind)>,
2329) -> ModuleDecoderRegistry {
2330 let mut modules = BTreeMap::new();
2331 for (id, kind) in module_kinds {
2332 let Some(init) = registry.get(kind) else {
2333 debug!("Detected configuration for unsupported module id: {id}, kind: {kind}");
2334 continue;
2335 };
2336
2337 modules.insert(
2338 id,
2339 (
2340 kind.clone(),
2341 IClientModuleInit::decoder(AsRef::<dyn IClientModuleInit + 'static>::as_ref(init)),
2342 ),
2343 );
2344 }
2345 ModuleDecoderRegistry::from(modules)
2346}