1use std::collections::{BTreeMap, HashSet};
2use std::fmt::{self, Formatter};
3use std::future::{Future, pending};
4use std::ops::Range;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9use anyhow::{Context as _, anyhow, bail, format_err};
10use async_stream::try_stream;
11use bitcoin::key::Secp256k1;
12use bitcoin::key::rand::thread_rng;
13use bitcoin::secp256k1::{self, PublicKey};
14use fedimint_api_client::api::global_api::with_request_hook::ApiRequestHook;
15use fedimint_api_client::api::{
16 ApiVersionSet, DynGlobalApi, FederationApiExt as _, FederationResult, IGlobalFederationApi,
17};
18use fedimint_bitcoind::DynBitcoindRpc;
19use fedimint_client_module::module::recovery::RecoveryProgress;
20use fedimint_client_module::module::{
21 ClientContextIface, ClientModule, ClientModuleRegistry, DynClientModule, FinalClientIface,
22 IClientModule, IdxRange, OutPointRange, PrimaryModulePriority,
23};
24use fedimint_client_module::oplog::IOperationLog;
25use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy as _};
26use fedimint_client_module::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
27use fedimint_client_module::sm::{ActiveStateMeta, DynState, InactiveStateMeta};
28use fedimint_client_module::transaction::{
29 TRANSACTION_SUBMISSION_MODULE_INSTANCE, TransactionBuilder, TxSubmissionStates,
30 TxSubmissionStatesSM,
31};
32use fedimint_client_module::{
33 AddStateMachinesResult, ClientModuleInstance, GetInviteCodeRequest, ModuleGlobalContextGen,
34 ModuleRecoveryCompleted, TransactionUpdates, TxCreatedEvent,
35};
36use fedimint_connectors::ConnectorRegistry;
37use fedimint_core::config::{
38 ClientConfig, FederationId, GlobalClientConfig, JsonClientConfig, ModuleInitRegistry,
39};
40use fedimint_core::core::{DynInput, DynOutput, ModuleInstanceId, ModuleKind, OperationId};
41use fedimint_core::db::{
42 AutocommitError, Database, DatabaseRecord, DatabaseTransaction,
43 IDatabaseTransactionOpsCore as _, IDatabaseTransactionOpsCoreTyped as _, NonCommittable,
44};
45use fedimint_core::encoding::{Decodable, Encodable};
46use fedimint_core::endpoint_constants::{CLIENT_CONFIG_ENDPOINT, VERSION_ENDPOINT};
47use fedimint_core::envs::is_running_in_test_env;
48use fedimint_core::invite_code::InviteCode;
49use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
50use fedimint_core::module::{
51 AmountUnit, Amounts, ApiRequestErased, ApiVersion, MultiApiVersion,
52 SupportedApiVersionsSummary, SupportedCoreApiVersions, SupportedModuleApiVersions,
53};
54use fedimint_core::net::api_announcement::SignedApiAnnouncement;
55use fedimint_core::runtime::sleep;
56use fedimint_core::task::{Elapsed, MaybeSend, MaybeSync, TaskGroup};
57use fedimint_core::transaction::Transaction;
58use fedimint_core::util::backoff_util::custom_backoff;
59use fedimint_core::util::{
60 BoxStream, FmtCompact as _, FmtCompactAnyhow as _, SafeUrl, backoff_util, retry,
61};
62use fedimint_core::{
63 Amount, ChainId, NumPeers, OutPoint, PeerId, apply, async_trait_maybe_send, maybe_add_send,
64 maybe_add_send_sync, runtime,
65};
66use fedimint_derive_secret::DerivableSecret;
67use fedimint_eventlog::{
68 DBTransactionEventLogExt as _, DynEventLogTrimableTracker, Event, EventKind, EventLogEntry,
69 EventLogId, EventLogTrimableId, EventLogTrimableTracker, EventPersistence, PersistedLogEntry,
70};
71use fedimint_logging::{LOG_CLIENT, LOG_CLIENT_NET_API, LOG_CLIENT_RECOVERY};
72use futures::stream::FuturesUnordered;
73use futures::{Stream, StreamExt as _};
74use global_ctx::ModuleGlobalClientContext;
75use serde::{Deserialize, Serialize};
76use tokio::sync::{broadcast, watch};
77use tokio_stream::wrappers::WatchStream;
78use tracing::{debug, info, warn};
79
80use crate::ClientBuilder;
81use crate::api_announcements::{ApiAnnouncementPrefix, get_api_urls};
82use crate::backup::Metadata;
83use crate::client::event_log::DefaultApplicationEventLogKey;
84use crate::db::{
85 ApiSecretKey, CachedApiVersionSet, CachedApiVersionSetKey, ChainIdKey,
86 ChronologicalOperationLogKey, ClientConfigKey, ClientMetadataKey, ClientModuleRecovery,
87 ClientModuleRecoveryState, EncodedClientSecretKey, OperationLogKey, PeerLastApiVersionsSummary,
88 PeerLastApiVersionsSummaryKey, PendingClientConfigKey, apply_migrations_core_client_dbtx,
89 get_decoded_client_secret, verify_client_db_integrity_dbtx,
90};
91use crate::meta::MetaService;
92use crate::module_init::{ClientModuleInitRegistry, DynClientModuleInit, IClientModuleInit};
93use crate::oplog::OperationLog;
94use crate::sm::executor::{
95 ActiveModuleOperationStateKeyPrefix, ActiveOperationStateKeyPrefix, Executor,
96 InactiveModuleOperationStateKeyPrefix, InactiveOperationStateKeyPrefix,
97};
98
99pub(crate) mod builder;
100pub(crate) mod event_log;
101pub(crate) mod global_ctx;
102pub(crate) mod handle;
103
104const SUPPORTED_CORE_API_VERSIONS: &[fedimint_core::module::ApiVersion] =
108 &[ApiVersion { major: 0, minor: 0 }];
109
110#[derive(Default)]
112pub(crate) struct PrimaryModuleCandidates {
113 specific: BTreeMap<AmountUnit, Vec<ModuleInstanceId>>,
115 wildcard: Vec<ModuleInstanceId>,
117}
118
119pub struct Client {
133 final_client: FinalClientIface,
134 config: tokio::sync::RwLock<ClientConfig>,
135 api_secret: Option<String>,
136 decoders: ModuleDecoderRegistry,
137 connectors: ConnectorRegistry,
138 db: Database,
139 federation_id: FederationId,
140 federation_config_meta: BTreeMap<String, String>,
141 primary_modules: BTreeMap<PrimaryModulePriority, PrimaryModuleCandidates>,
142 pub(crate) modules: ClientModuleRegistry,
143 module_inits: ClientModuleInitRegistry,
144 executor: Executor,
145 pub(crate) api: DynGlobalApi,
146 root_secret: DerivableSecret,
147 operation_log: OperationLog,
148 secp_ctx: Secp256k1<secp256k1::All>,
149 meta_service: Arc<MetaService>,
150
151 task_group: TaskGroup,
152
153 client_recovery_progress_receiver:
155 watch::Receiver<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
156
157 log_ordering_wakeup_tx: watch::Sender<()>,
160 log_event_added_rx: watch::Receiver<()>,
162 log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
163 request_hook: ApiRequestHook,
164 iroh_enable_dht: bool,
165 iroh_enable_next: bool,
166 #[allow(dead_code)]
171 user_bitcoind_rpc: Option<DynBitcoindRpc>,
172 pub(crate) user_bitcoind_rpc_no_chain_id:
177 Option<fedimint_client_module::module::init::BitcoindRpcNoChainIdFactory>,
178}
179
180#[derive(Debug, Serialize, Deserialize)]
181struct ListOperationsParams {
182 limit: Option<usize>,
183 last_seen: Option<ChronologicalOperationLogKey>,
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize)]
187pub struct GetOperationIdRequest {
188 operation_id: OperationId,
189}
190
191#[derive(Debug, Clone, Serialize, Deserialize)]
192pub struct GetBalanceChangesRequest {
193 #[serde(default = "AmountUnit::bitcoin")]
194 unit: AmountUnit,
195}
196
197impl Client {
198 pub async fn builder() -> anyhow::Result<ClientBuilder> {
201 Ok(ClientBuilder::new())
202 }
203
204 pub fn api(&self) -> &(dyn IGlobalFederationApi + 'static) {
205 self.api.as_ref()
206 }
207
208 pub fn api_clone(&self) -> DynGlobalApi {
209 self.api.clone()
210 }
211
212 pub fn connection_status_stream(&self) -> impl Stream<Item = BTreeMap<PeerId, bool>> {
215 self.api.connection_status_stream()
216 }
217
218 pub fn task_group(&self) -> &TaskGroup {
220 &self.task_group
221 }
222
223 pub fn get_metrics() -> anyhow::Result<String> {
228 fedimint_metrics::get_metrics()
229 }
230
231 #[doc(hidden)]
233 pub fn executor(&self) -> &Executor {
234 &self.executor
235 }
236
237 pub async fn get_config_from_db(db: &Database) -> Option<ClientConfig> {
238 let mut dbtx = db.begin_transaction_nc().await;
239 dbtx.get_value(&ClientConfigKey).await
240 }
241
242 pub async fn get_pending_config_from_db(db: &Database) -> Option<ClientConfig> {
243 let mut dbtx = db.begin_transaction_nc().await;
244 dbtx.get_value(&PendingClientConfigKey).await
245 }
246
247 pub async fn get_api_secret_from_db(db: &Database) -> Option<String> {
248 let mut dbtx = db.begin_transaction_nc().await;
249 dbtx.get_value(&ApiSecretKey).await
250 }
251
252 pub async fn store_encodable_client_secret<T: Encodable>(
253 db: &Database,
254 secret: T,
255 ) -> anyhow::Result<()> {
256 let mut dbtx = db.begin_transaction().await;
257
258 if dbtx.get_value(&EncodedClientSecretKey).await.is_some() {
260 bail!("Encoded client secret already exists, cannot overwrite")
261 }
262
263 let encoded_secret = T::consensus_encode_to_vec(&secret);
264 dbtx.insert_entry(&EncodedClientSecretKey, &encoded_secret)
265 .await;
266 dbtx.commit_tx().await;
267 Ok(())
268 }
269
270 pub async fn load_decodable_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
271 let Some(secret) = Self::load_decodable_client_secret_opt(db).await? else {
272 bail!("Encoded client secret not present in DB")
273 };
274
275 Ok(secret)
276 }
277 pub async fn load_decodable_client_secret_opt<T: Decodable>(
278 db: &Database,
279 ) -> anyhow::Result<Option<T>> {
280 let mut dbtx = db.begin_transaction_nc().await;
281
282 let client_secret = dbtx.get_value(&EncodedClientSecretKey).await;
283
284 Ok(match client_secret {
285 Some(client_secret) => Some(
286 T::consensus_decode_whole(&client_secret, &ModuleRegistry::default())
287 .map_err(|e| anyhow!("Decoding failed: {e}"))?,
288 ),
289 None => None,
290 })
291 }
292
293 pub async fn load_or_generate_client_secret(db: &Database) -> anyhow::Result<[u8; 64]> {
294 let client_secret = match Self::load_decodable_client_secret::<[u8; 64]>(db).await {
295 Ok(secret) => secret,
296 _ => {
297 let secret = PlainRootSecretStrategy::random(&mut thread_rng());
298 Self::store_encodable_client_secret(db, secret)
299 .await
300 .expect("Storing client secret must work");
301 secret
302 }
303 };
304 Ok(client_secret)
305 }
306
307 pub async fn is_initialized(db: &Database) -> bool {
308 let mut dbtx = db.begin_transaction_nc().await;
309 dbtx.raw_get_bytes(&[ClientConfigKey::DB_PREFIX])
310 .await
311 .expect("Unrecoverable error occurred while reading and entry from the database")
312 .is_some()
313 }
314
315 pub fn start_executor(self: &Arc<Self>) {
316 debug!(
317 target: LOG_CLIENT,
318 "Starting fedimint client executor",
319 );
320 self.executor.start_executor(self.context_gen());
321 }
322
323 pub fn federation_id(&self) -> FederationId {
324 self.federation_id
325 }
326
327 fn context_gen(self: &Arc<Self>) -> ModuleGlobalContextGen {
328 let client_inner = Arc::downgrade(self);
329 Arc::new(move |module_instance, operation| {
330 ModuleGlobalClientContext {
331 client: client_inner
332 .clone()
333 .upgrade()
334 .expect("ModuleGlobalContextGen called after client was dropped"),
335 module_instance_id: module_instance,
336 operation,
337 }
338 .into()
339 })
340 }
341
342 pub async fn config(&self) -> ClientConfig {
343 self.config.read().await.clone()
344 }
345
346 pub fn api_secret(&self) -> &Option<String> {
348 &self.api_secret
349 }
350
351 pub async fn core_api_version(&self) -> ApiVersion {
357 self.db
360 .begin_transaction_nc()
361 .await
362 .get_value(&CachedApiVersionSetKey)
363 .await
364 .map(|cached: CachedApiVersionSet| cached.0.core)
365 .unwrap_or(ApiVersion { major: 0, minor: 0 })
366 }
367
368 pub async fn chain_id(&self) -> anyhow::Result<ChainId> {
375 if let Some(chain_id) = self
377 .db
378 .begin_transaction_nc()
379 .await
380 .get_value(&ChainIdKey)
381 .await
382 {
383 return Ok(chain_id);
384 }
385
386 let chain_id = self.api.chain_id().await?;
388
389 let mut dbtx = self.db.begin_transaction().await;
391 dbtx.insert_entry(&ChainIdKey, &chain_id).await;
392 dbtx.commit_tx().await;
393
394 Ok(chain_id)
395 }
396
397 pub fn decoders(&self) -> &ModuleDecoderRegistry {
398 &self.decoders
399 }
400
401 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
403 self.try_get_module(instance)
404 .expect("Module instance not found")
405 }
406
407 fn try_get_module(
408 &self,
409 instance: ModuleInstanceId,
410 ) -> Option<&maybe_add_send_sync!(dyn IClientModule)> {
411 Some(self.modules.get(instance)?.as_ref())
412 }
413
414 pub fn has_module(&self, instance: ModuleInstanceId) -> bool {
415 self.modules.get(instance).is_some()
416 }
417
418 fn transaction_builder_get_balance(&self, builder: &TransactionBuilder) -> (Amounts, Amounts) {
424 let mut in_amounts = Amounts::ZERO;
426 let mut out_amounts = Amounts::ZERO;
427 let mut fee_amounts = Amounts::ZERO;
428
429 for input in builder.inputs() {
430 let module = self.get_module(input.input.module_instance_id());
431
432 let item_fees = module.input_fee(&input.amounts, &input.input).expect(
433 "We only build transactions with input versions that are supported by the module",
434 );
435
436 in_amounts.checked_add_mut(&input.amounts);
437 fee_amounts.checked_add_mut(&item_fees);
438 }
439
440 for output in builder.outputs() {
441 let module = self.get_module(output.output.module_instance_id());
442
443 let item_fees = module.output_fee(&output.amounts, &output.output).expect(
444 "We only build transactions with output versions that are supported by the module",
445 );
446
447 out_amounts.checked_add_mut(&output.amounts);
448 fee_amounts.checked_add_mut(&item_fees);
449 }
450
451 out_amounts.checked_add_mut(&fee_amounts);
452 (in_amounts, out_amounts)
453 }
454
455 pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
456 Ok((self.federation_id().to_fake_ln_pub_key(&self.secp_ctx)?, 0))
457 }
458
459 pub fn get_config_meta(&self, key: &str) -> Option<String> {
461 self.federation_config_meta.get(key).cloned()
462 }
463
464 pub(crate) fn root_secret(&self) -> DerivableSecret {
465 self.root_secret.clone()
466 }
467
468 pub async fn add_state_machines(
469 &self,
470 dbtx: &mut DatabaseTransaction<'_>,
471 states: Vec<DynState>,
472 ) -> AddStateMachinesResult {
473 self.executor.add_state_machines_dbtx(dbtx, states).await
474 }
475
476 pub async fn get_active_operations(&self) -> HashSet<OperationId> {
478 let active_states = self.executor.get_active_states().await;
479 let mut active_operations = HashSet::with_capacity(active_states.len());
480 let mut dbtx = self.db().begin_transaction_nc().await;
481 for (state, _) in active_states {
482 let operation_id = state.operation_id();
483 if dbtx
484 .get_value(&OperationLogKey { operation_id })
485 .await
486 .is_some()
487 {
488 active_operations.insert(operation_id);
489 }
490 }
491 active_operations
492 }
493
494 pub fn operation_log(&self) -> &OperationLog {
495 &self.operation_log
496 }
497
498 pub fn meta_service(&self) -> &Arc<MetaService> {
500 &self.meta_service
501 }
502
503 pub async fn get_meta_expiration_timestamp(&self) -> Option<SystemTime> {
505 let meta_service = self.meta_service();
506 let ts = meta_service
507 .get_field::<u64>(self.db(), "federation_expiry_timestamp")
508 .await
509 .and_then(|v| v.value)?;
510 Some(UNIX_EPOCH + Duration::from_secs(ts))
511 }
512
513 async fn finalize_transaction(
515 &self,
516 dbtx: &mut DatabaseTransaction<'_>,
517 operation_id: OperationId,
518 mut partial_transaction: TransactionBuilder,
519 ) -> anyhow::Result<(Transaction, Vec<DynState>, Range<u64>)> {
520 let (in_amounts, out_amounts) = self.transaction_builder_get_balance(&partial_transaction);
521
522 let mut added_inputs_bundles = vec![];
523 let mut added_outputs_bundles = vec![];
524
525 for unit in in_amounts.units().union(&out_amounts.units()) {
536 let input_amount = in_amounts.get(unit).copied().unwrap_or_default();
537 let output_amount = out_amounts.get(unit).copied().unwrap_or_default();
538 if input_amount == output_amount {
539 continue;
540 }
541
542 let Some((module_id, module)) = self.primary_module_for_unit(*unit) else {
543 bail!("No module to balance a partial transaction (affected unit: {unit:?}");
544 };
545
546 let (added_input_bundle, added_output_bundle) = module
547 .create_final_inputs_and_outputs(
548 module_id,
549 dbtx,
550 operation_id,
551 *unit,
552 input_amount,
553 output_amount,
554 )
555 .await?;
556
557 added_inputs_bundles.push(added_input_bundle);
558 added_outputs_bundles.push(added_output_bundle);
559 }
560
561 let change_range = Range {
565 start: partial_transaction.outputs().count() as u64,
566 end: (partial_transaction.outputs().count() as u64
567 + added_outputs_bundles
568 .iter()
569 .map(|output| output.outputs().len() as u64)
570 .sum::<u64>()),
571 };
572
573 for added_inputs in added_inputs_bundles {
574 partial_transaction = partial_transaction.with_inputs(added_inputs);
575 }
576
577 for added_outputs in added_outputs_bundles {
578 partial_transaction = partial_transaction.with_outputs(added_outputs);
579 }
580
581 let (input_amounts, output_amounts) =
582 self.transaction_builder_get_balance(&partial_transaction);
583
584 for (unit, output_amount) in output_amounts {
585 let input_amount = input_amounts.get(&unit).copied().unwrap_or_default();
586
587 assert!(input_amount >= output_amount, "Transaction is underfunded");
588 }
589
590 let (tx, states) = partial_transaction.build(&self.secp_ctx, thread_rng());
591
592 Ok((tx, states, change_range))
593 }
594
595 pub async fn finalize_and_submit_transaction<F, M>(
607 &self,
608 operation_id: OperationId,
609 operation_type: &str,
610 operation_meta_gen: F,
611 tx_builder: TransactionBuilder,
612 ) -> anyhow::Result<OutPointRange>
613 where
614 F: Fn(OutPointRange) -> M + Clone + MaybeSend + MaybeSync,
615 M: serde::Serialize + MaybeSend,
616 {
617 let operation_type = operation_type.to_owned();
618
619 let autocommit_res = self
620 .db
621 .autocommit(
622 |dbtx, _| {
623 let operation_type = operation_type.clone();
624 let tx_builder = tx_builder.clone();
625 let operation_meta_gen = operation_meta_gen.clone();
626 Box::pin(async move {
627 if Client::operation_exists_dbtx(dbtx, operation_id).await {
628 bail!("There already exists an operation with id {operation_id:?}")
629 }
630
631 let out_point_range = self
632 .finalize_and_submit_transaction_inner(dbtx, operation_id, tx_builder)
633 .await?;
634
635 self.operation_log()
636 .add_operation_log_entry_dbtx(
637 dbtx,
638 operation_id,
639 &operation_type,
640 operation_meta_gen(out_point_range),
641 )
642 .await;
643
644 Ok(out_point_range)
645 })
646 },
647 Some(100), )
649 .await;
650
651 match autocommit_res {
652 Ok(txid) => Ok(txid),
653 Err(AutocommitError::ClosureError { error, .. }) => Err(error),
654 Err(AutocommitError::CommitFailed {
655 attempts,
656 last_error,
657 }) => panic!(
658 "Failed to commit tx submission dbtx after {attempts} attempts: {last_error}"
659 ),
660 }
661 }
662
663 async fn finalize_and_submit_transaction_inner(
664 &self,
665 dbtx: &mut DatabaseTransaction<'_>,
666 operation_id: OperationId,
667 tx_builder: TransactionBuilder,
668 ) -> anyhow::Result<OutPointRange> {
669 let (transaction, mut states, change_range) = self
670 .finalize_transaction(&mut dbtx.to_ref_nc(), operation_id, tx_builder)
671 .await?;
672
673 if transaction.consensus_encode_to_vec().len() > Transaction::MAX_TX_SIZE {
674 let inputs = transaction
675 .inputs
676 .iter()
677 .map(DynInput::module_instance_id)
678 .collect::<Vec<_>>();
679 let outputs = transaction
680 .outputs
681 .iter()
682 .map(DynOutput::module_instance_id)
683 .collect::<Vec<_>>();
684 warn!(
685 target: LOG_CLIENT_NET_API,
686 size=%transaction.consensus_encode_to_vec().len(),
687 ?inputs,
688 ?outputs,
689 "Transaction too large",
690 );
691 debug!(target: LOG_CLIENT_NET_API, ?transaction, "transaction details");
692 bail!(
693 "The generated transaction would be rejected by the federation for being too large."
694 );
695 }
696
697 let txid = transaction.tx_hash();
698
699 debug!(target: LOG_CLIENT_NET_API, %txid, ?transaction, "Finalized and submitting transaction");
700
701 let tx_submission_sm = DynState::from_typed(
702 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
703 TxSubmissionStatesSM {
704 operation_id,
705 state: TxSubmissionStates::Created(transaction),
706 },
707 );
708 states.push(tx_submission_sm);
709
710 self.executor.add_state_machines_dbtx(dbtx, states).await?;
711
712 self.log_event_dbtx(dbtx, None, TxCreatedEvent { txid, operation_id })
713 .await;
714
715 Ok(OutPointRange::new(txid, IdxRange::from(change_range)))
716 }
717
718 async fn transaction_update_stream(
719 &self,
720 operation_id: OperationId,
721 ) -> BoxStream<'static, TxSubmissionStatesSM> {
722 self.executor
723 .notifier()
724 .module_notifier::<TxSubmissionStatesSM>(
725 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
726 self.final_client.clone(),
727 )
728 .subscribe(operation_id)
729 .await
730 }
731
732 pub async fn operation_exists(&self, operation_id: OperationId) -> bool {
733 let mut dbtx = self.db().begin_transaction_nc().await;
734
735 Client::operation_exists_dbtx(&mut dbtx, operation_id).await
736 }
737
738 pub async fn operation_exists_dbtx(
739 dbtx: &mut DatabaseTransaction<'_>,
740 operation_id: OperationId,
741 ) -> bool {
742 let active_state_exists = dbtx
743 .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
744 .await
745 .next()
746 .await
747 .is_some();
748
749 let inactive_state_exists = dbtx
750 .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
751 .await
752 .next()
753 .await
754 .is_some();
755
756 active_state_exists || inactive_state_exists
757 }
758
759 pub async fn has_active_states(&self, operation_id: OperationId) -> bool {
760 self.db
761 .begin_transaction_nc()
762 .await
763 .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
764 .await
765 .next()
766 .await
767 .is_some()
768 }
769
770 pub async fn await_primary_bitcoin_module_output(
773 &self,
774 operation_id: OperationId,
775 out_point: OutPoint,
776 ) -> anyhow::Result<()> {
777 self.primary_module_for_unit(AmountUnit::BITCOIN)
778 .ok_or_else(|| anyhow!("No primary module available"))?
779 .1
780 .await_primary_module_output(operation_id, out_point)
781 .await
782 }
783
784 pub fn get_first_module<M: ClientModule>(
786 &'_ self,
787 ) -> anyhow::Result<ClientModuleInstance<'_, M>> {
788 let module_kind = M::kind();
789 let id = self
790 .get_first_instance(&module_kind)
791 .ok_or_else(|| format_err!("No modules found of kind {module_kind}"))?;
792 let module: &M = self
793 .try_get_module(id)
794 .ok_or_else(|| format_err!("Unknown module instance {id}"))?
795 .as_any()
796 .downcast_ref::<M>()
797 .ok_or_else(|| format_err!("Module is not of type {}", std::any::type_name::<M>()))?;
798 let (db, _) = self.db().with_prefix_module_id(id);
799 Ok(ClientModuleInstance {
800 id,
801 db,
802 api: self.api().with_module(id),
803 module,
804 })
805 }
806
807 pub fn get_module_client_dyn(
808 &self,
809 instance_id: ModuleInstanceId,
810 ) -> anyhow::Result<&maybe_add_send_sync!(dyn IClientModule)> {
811 self.try_get_module(instance_id)
812 .ok_or(anyhow!("Unknown module instance {}", instance_id))
813 }
814
815 pub fn db(&self) -> &Database {
816 &self.db
817 }
818
819 pub fn endpoints(&self) -> &ConnectorRegistry {
820 &self.connectors
821 }
822
823 pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
826 TransactionUpdates {
827 update_stream: self.transaction_update_stream(operation_id).await,
828 }
829 }
830
831 pub fn get_first_instance(&self, module_kind: &ModuleKind) -> Option<ModuleInstanceId> {
833 self.modules
834 .iter_modules()
835 .find(|(_, kind, _module)| *kind == module_kind)
836 .map(|(instance_id, _, _)| instance_id)
837 }
838
839 pub async fn root_secret_encoding<T: Decodable>(&self) -> anyhow::Result<T> {
842 get_decoded_client_secret::<T>(self.db()).await
843 }
844
845 pub async fn await_primary_bitcoin_module_outputs(
848 &self,
849 operation_id: OperationId,
850 outputs: Vec<OutPoint>,
851 ) -> anyhow::Result<()> {
852 for out_point in outputs {
853 self.await_primary_bitcoin_module_output(operation_id, out_point)
854 .await?;
855 }
856
857 Ok(())
858 }
859
860 pub async fn get_config_json(&self) -> JsonClientConfig {
866 self.config().await.to_json()
867 }
868
869 #[doc(hidden)]
872 pub async fn get_balance_for_btc(&self) -> anyhow::Result<Amount> {
875 self.get_balance_for_unit(AmountUnit::BITCOIN).await
876 }
877
878 pub async fn get_balance_for_unit(&self, unit: AmountUnit) -> anyhow::Result<Amount> {
879 let (id, module) = self
880 .primary_module_for_unit(unit)
881 .ok_or_else(|| anyhow!("Primary module not available"))?;
882 Ok(module
883 .get_balance(id, &mut self.db().begin_transaction_nc().await, unit)
884 .await)
885 }
886
887 pub async fn subscribe_balance_changes(&self, unit: AmountUnit) -> BoxStream<'static, Amount> {
890 let primary_module_things =
891 if let Some((primary_module_id, primary_module)) = self.primary_module_for_unit(unit) {
892 let balance_changes = primary_module.subscribe_balance_changes().await;
893 let initial_balance = self
894 .get_balance_for_unit(unit)
895 .await
896 .expect("Primary is present");
897
898 Some((
899 primary_module_id,
900 primary_module.clone(),
901 balance_changes,
902 initial_balance,
903 ))
904 } else {
905 None
906 };
907 let db = self.db().clone();
908
909 Box::pin(async_stream::stream! {
910 let Some((primary_module_id, primary_module, mut balance_changes, initial_balance)) = primary_module_things else {
911 pending().await
914 };
915
916
917 yield initial_balance;
918 let mut prev_balance = initial_balance;
919 while let Some(()) = balance_changes.next().await {
920 let mut dbtx = db.begin_transaction_nc().await;
921 let balance = primary_module
922 .get_balance(primary_module_id, &mut dbtx, unit)
923 .await;
924
925 if balance != prev_balance {
927 prev_balance = balance;
928 yield balance;
929 }
930 }
931 })
932 }
933
934 async fn make_api_version_request(
939 delay: Duration,
940 peer_id: PeerId,
941 api: &DynGlobalApi,
942 ) -> (
943 PeerId,
944 Result<SupportedApiVersionsSummary, fedimint_connectors::error::ServerError>,
945 ) {
946 runtime::sleep(delay).await;
947 (
948 peer_id,
949 api.request_single_peer::<SupportedApiVersionsSummary>(
950 VERSION_ENDPOINT.to_owned(),
951 ApiRequestErased::default(),
952 peer_id,
953 )
954 .await,
955 )
956 }
957
958 fn create_api_version_backoff() -> impl Iterator<Item = Duration> {
964 custom_backoff(Duration::from_millis(200), Duration::from_secs(600), None)
965 }
966
967 pub async fn fetch_common_api_versions_from_all_peers(
970 num_peers: NumPeers,
971 api: DynGlobalApi,
972 db: Database,
973 num_responses_sender: watch::Sender<usize>,
974 ) {
975 let mut backoff = Self::create_api_version_backoff();
976
977 let mut requests = FuturesUnordered::new();
980
981 for peer_id in num_peers.peer_ids() {
982 requests.push(Self::make_api_version_request(
983 Duration::ZERO,
984 peer_id,
985 &api,
986 ));
987 }
988
989 let mut num_responses = 0;
990
991 while let Some((peer_id, response)) = requests.next().await {
992 let retry = match response {
993 Err(err) => {
994 let has_previous_response = db
995 .begin_transaction_nc()
996 .await
997 .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
998 .await
999 .is_some();
1000 debug!(
1001 target: LOG_CLIENT,
1002 %peer_id,
1003 err = %err.fmt_compact(),
1004 %has_previous_response,
1005 "Failed to refresh API versions of a peer"
1006 );
1007
1008 !has_previous_response
1009 }
1010 Ok(o) => {
1011 let mut dbtx = db.begin_transaction().await;
1014 dbtx.insert_entry(
1015 &PeerLastApiVersionsSummaryKey(peer_id),
1016 &PeerLastApiVersionsSummary(o),
1017 )
1018 .await;
1019 dbtx.commit_tx().await;
1020 false
1021 }
1022 };
1023
1024 if retry {
1025 requests.push(Self::make_api_version_request(
1026 backoff.next().expect("Keeps retrying"),
1027 peer_id,
1028 &api,
1029 ));
1030 } else {
1031 num_responses += 1;
1032 num_responses_sender.send_replace(num_responses);
1033 }
1034 }
1035 }
1036
1037 pub async fn fetch_peers_api_versions_from_threshold_of_peers(
1041 num_peers: NumPeers,
1042 api: DynGlobalApi,
1043 ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1044 let mut backoff = Self::create_api_version_backoff();
1045
1046 let mut requests = FuturesUnordered::new();
1049
1050 for peer_id in num_peers.peer_ids() {
1051 requests.push(Self::make_api_version_request(
1052 Duration::ZERO,
1053 peer_id,
1054 &api,
1055 ));
1056 }
1057
1058 let mut successful_responses = BTreeMap::new();
1059
1060 while successful_responses.len() < num_peers.threshold()
1061 && let Some((peer_id, response)) = requests.next().await
1062 {
1063 let retry = match response {
1064 Err(err) => {
1065 debug!(
1066 target: LOG_CLIENT,
1067 %peer_id,
1068 err = %err.fmt_compact(),
1069 "Failed to fetch API versions from peer"
1070 );
1071 true
1072 }
1073 Ok(response) => {
1074 successful_responses.insert(peer_id, response);
1075 false
1076 }
1077 };
1078
1079 if retry {
1080 requests.push(Self::make_api_version_request(
1081 backoff.next().expect("Keeps retrying"),
1082 peer_id,
1083 &api,
1084 ));
1085 }
1086 }
1087
1088 successful_responses
1089 }
1090
1091 pub async fn fetch_common_api_versions(
1093 config: &ClientConfig,
1094 api: &DynGlobalApi,
1095 ) -> anyhow::Result<BTreeMap<PeerId, SupportedApiVersionsSummary>> {
1096 debug!(
1097 target: LOG_CLIENT,
1098 "Fetching common api versions"
1099 );
1100
1101 let num_peers = NumPeers::from(config.global.api_endpoints.len());
1102
1103 let peer_api_version_sets =
1104 Self::fetch_peers_api_versions_from_threshold_of_peers(num_peers, api.clone()).await;
1105
1106 Ok(peer_api_version_sets)
1107 }
1108
1109 pub async fn write_api_version_cache(
1113 dbtx: &mut DatabaseTransaction<'_>,
1114 api_version_set: ApiVersionSet,
1115 ) {
1116 debug!(
1117 target: LOG_CLIENT,
1118 value = ?api_version_set,
1119 "Writing API version set to cache"
1120 );
1121
1122 dbtx.insert_entry(
1123 &CachedApiVersionSetKey,
1124 &CachedApiVersionSet(api_version_set),
1125 )
1126 .await;
1127 }
1128
1129 pub async fn store_prefetched_api_versions(
1134 db: &Database,
1135 config: &ClientConfig,
1136 client_module_init: &ClientModuleInitRegistry,
1137 peer_api_versions: &BTreeMap<PeerId, SupportedApiVersionsSummary>,
1138 ) {
1139 debug!(
1140 target: LOG_CLIENT,
1141 "Storing {} prefetched peer API version responses and calculating common version set",
1142 peer_api_versions.len()
1143 );
1144
1145 let mut dbtx = db.begin_transaction().await;
1146 let client_supported_versions =
1148 Self::supported_api_versions_summary_static(config, client_module_init);
1149 match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1150 &client_supported_versions,
1151 peer_api_versions,
1152 ) {
1153 Ok(common_api_versions) => {
1154 Self::write_api_version_cache(&mut dbtx.to_ref_nc(), common_api_versions).await;
1156 debug!(target: LOG_CLIENT, "Calculated and stored common API version set");
1157 }
1158 Err(err) => {
1159 debug!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Failed to calculate common API versions from prefetched data");
1160 }
1161 }
1162
1163 for (peer_id, peer_api_versions) in peer_api_versions {
1165 dbtx.insert_entry(
1166 &PeerLastApiVersionsSummaryKey(*peer_id),
1167 &PeerLastApiVersionsSummary(peer_api_versions.clone()),
1168 )
1169 .await;
1170 }
1171 dbtx.commit_tx().await;
1172 debug!(target: LOG_CLIENT, "Stored individual peer API version responses");
1173 }
1174
1175 pub fn supported_api_versions_summary_static(
1177 config: &ClientConfig,
1178 client_module_init: &ClientModuleInitRegistry,
1179 ) -> SupportedApiVersionsSummary {
1180 SupportedApiVersionsSummary {
1181 core: SupportedCoreApiVersions {
1182 core_consensus: config.global.consensus_version,
1183 api: MultiApiVersion::try_from_iter(SUPPORTED_CORE_API_VERSIONS.to_owned())
1184 .expect("must not have conflicting versions"),
1185 },
1186 modules: config
1187 .modules
1188 .iter()
1189 .filter_map(|(&module_instance_id, module_config)| {
1190 client_module_init
1191 .get(module_config.kind())
1192 .map(|module_init| {
1193 (
1194 module_instance_id,
1195 SupportedModuleApiVersions {
1196 core_consensus: config.global.consensus_version,
1197 module_consensus: module_config.version,
1198 api: module_init.supported_api_versions(),
1199 },
1200 )
1201 })
1202 })
1203 .collect(),
1204 }
1205 }
1206
1207 pub async fn load_and_refresh_common_api_version(&self) -> anyhow::Result<ApiVersionSet> {
1208 Self::load_and_refresh_common_api_version_static(
1209 &self.config().await,
1210 &self.module_inits,
1211 self.connectors.clone(),
1212 &self.api,
1213 &self.db,
1214 &self.task_group,
1215 )
1216 .await
1217 }
1218
1219 async fn load_and_refresh_common_api_version_static(
1225 config: &ClientConfig,
1226 module_init: &ClientModuleInitRegistry,
1227 connectors: ConnectorRegistry,
1228 api: &DynGlobalApi,
1229 db: &Database,
1230 task_group: &TaskGroup,
1231 ) -> anyhow::Result<ApiVersionSet> {
1232 if let Some(v) = db
1233 .begin_transaction_nc()
1234 .await
1235 .get_value(&CachedApiVersionSetKey)
1236 .await
1237 {
1238 debug!(
1239 target: LOG_CLIENT,
1240 "Found existing cached common api versions"
1241 );
1242 let config = config.clone();
1243 let client_module_init = module_init.clone();
1244 let api = api.clone();
1245 let db = db.clone();
1246 let task_group = task_group.clone();
1247 task_group
1250 .clone()
1251 .spawn_cancellable("refresh_common_api_version_static", async move {
1252 connectors.wait_for_initialized_connections().await;
1253
1254 if let Err(error) = Self::refresh_common_api_version_static(
1255 &config,
1256 &client_module_init,
1257 &api,
1258 &db,
1259 task_group,
1260 false,
1261 )
1262 .await
1263 {
1264 warn!(
1265 target: LOG_CLIENT,
1266 err = %error.fmt_compact_anyhow(), "Failed to discover common api versions"
1267 );
1268 }
1269 });
1270
1271 return Ok(v.0);
1272 }
1273
1274 info!(
1275 target: LOG_CLIENT,
1276 "Fetching initial API versions "
1277 );
1278 Self::refresh_common_api_version_static(
1279 config,
1280 module_init,
1281 api,
1282 db,
1283 task_group.clone(),
1284 true,
1285 )
1286 .await
1287 }
1288
1289 async fn refresh_common_api_version_static(
1290 config: &ClientConfig,
1291 client_module_init: &ClientModuleInitRegistry,
1292 api: &DynGlobalApi,
1293 db: &Database,
1294 task_group: TaskGroup,
1295 block_until_ok: bool,
1296 ) -> anyhow::Result<ApiVersionSet> {
1297 debug!(
1298 target: LOG_CLIENT,
1299 "Refreshing common api versions"
1300 );
1301
1302 let (num_responses_sender, mut num_responses_receiver) = tokio::sync::watch::channel(0);
1303 let num_peers = NumPeers::from(config.global.api_endpoints.len());
1304
1305 task_group.spawn_cancellable("refresh peers api versions", {
1306 Client::fetch_common_api_versions_from_all_peers(
1307 num_peers,
1308 api.clone(),
1309 db.clone(),
1310 num_responses_sender,
1311 )
1312 });
1313
1314 let common_api_versions = loop {
1315 let _: Result<_, Elapsed> = runtime::timeout(
1323 Duration::from_secs(30),
1324 num_responses_receiver.wait_for(|num| num_peers.threshold() <= *num),
1325 )
1326 .await;
1327
1328 let peer_api_version_sets = Self::load_peers_last_api_versions(db, num_peers).await;
1329
1330 match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1331 &Self::supported_api_versions_summary_static(config, client_module_init),
1332 &peer_api_version_sets,
1333 ) {
1334 Ok(o) => break o,
1335 Err(err) if block_until_ok => {
1336 warn!(
1337 target: LOG_CLIENT,
1338 err = %err.fmt_compact_anyhow(),
1339 "Failed to discover API version to use. Retrying..."
1340 );
1341 continue;
1342 }
1343 Err(e) => return Err(e),
1344 }
1345 };
1346
1347 debug!(
1348 target: LOG_CLIENT,
1349 value = ?common_api_versions,
1350 "Updating the cached common api versions"
1351 );
1352 let mut dbtx = db.begin_transaction().await;
1353 let _ = dbtx
1354 .insert_entry(
1355 &CachedApiVersionSetKey,
1356 &CachedApiVersionSet(common_api_versions.clone()),
1357 )
1358 .await;
1359
1360 dbtx.commit_tx().await;
1361
1362 Ok(common_api_versions)
1363 }
1364
1365 pub async fn get_metadata(&self) -> Metadata {
1367 self.db
1368 .begin_transaction_nc()
1369 .await
1370 .get_value(&ClientMetadataKey)
1371 .await
1372 .unwrap_or_else(|| {
1373 warn!(
1374 target: LOG_CLIENT,
1375 "Missing existing metadata. This key should have been set on Client init"
1376 );
1377 Metadata::empty()
1378 })
1379 }
1380
1381 pub async fn set_metadata(&self, metadata: &Metadata) {
1383 self.db
1384 .autocommit::<_, _, anyhow::Error>(
1385 |dbtx, _| {
1386 Box::pin(async {
1387 Self::set_metadata_dbtx(dbtx, metadata).await;
1388 Ok(())
1389 })
1390 },
1391 None,
1392 )
1393 .await
1394 .expect("Failed to autocommit metadata");
1395 }
1396
1397 pub fn has_pending_recoveries(&self) -> bool {
1398 !self
1399 .client_recovery_progress_receiver
1400 .borrow()
1401 .iter()
1402 .all(|(_id, progress)| progress.is_done())
1403 }
1404
1405 pub async fn wait_for_all_recoveries(&self) -> anyhow::Result<()> {
1413 let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1414 recovery_receiver
1415 .wait_for(|in_progress| {
1416 in_progress
1417 .iter()
1418 .all(|(_id, progress)| progress.is_done())
1419 })
1420 .await
1421 .context("Recovery task completed and update receiver disconnected, but some modules failed to recover")?;
1422
1423 Ok(())
1424 }
1425
1426 pub fn subscribe_to_recovery_progress(
1431 &self,
1432 ) -> impl Stream<Item = (ModuleInstanceId, RecoveryProgress)> + use<> {
1433 WatchStream::new(self.client_recovery_progress_receiver.clone())
1434 .flat_map(futures::stream::iter)
1435 }
1436
1437 pub async fn wait_for_module_kind_recovery(
1438 &self,
1439 module_kind: ModuleKind,
1440 ) -> anyhow::Result<()> {
1441 let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1442 let config = self.config().await;
1443 recovery_receiver
1444 .wait_for(|in_progress| {
1445 !in_progress
1446 .iter()
1447 .filter(|(module_instance_id, _progress)| {
1448 config.modules[module_instance_id].kind == module_kind
1449 })
1450 .any(|(_id, progress)| !progress.is_done())
1451 })
1452 .await
1453 .context("Recovery task completed and update receiver disconnected, but the desired modules are still unavailable or failed to recover")?;
1454
1455 Ok(())
1456 }
1457
1458 pub async fn wait_for_all_active_state_machines(&self) -> anyhow::Result<()> {
1459 loop {
1460 if self.executor.get_active_states().await.is_empty() {
1461 break;
1462 }
1463 sleep(Duration::from_millis(100)).await;
1464 }
1465 Ok(())
1466 }
1467
1468 pub async fn set_metadata_dbtx(dbtx: &mut DatabaseTransaction<'_>, metadata: &Metadata) {
1470 dbtx.insert_new_entry(&ClientMetadataKey, metadata).await;
1471 }
1472
1473 fn spawn_module_recoveries_task(
1474 &self,
1475 recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1476 module_recoveries: BTreeMap<
1477 ModuleInstanceId,
1478 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1479 >,
1480 module_recovery_progress_receivers: BTreeMap<
1481 ModuleInstanceId,
1482 watch::Receiver<RecoveryProgress>,
1483 >,
1484 ) {
1485 let db = self.db.clone();
1486 let log_ordering_wakeup_tx = self.log_ordering_wakeup_tx.clone();
1487 self.task_group
1488 .spawn("module recoveries", |_task_handle| async {
1489 Self::run_module_recoveries_task(
1490 db,
1491 log_ordering_wakeup_tx,
1492 recovery_sender,
1493 module_recoveries,
1494 module_recovery_progress_receivers,
1495 )
1496 .await;
1497 });
1498 }
1499
1500 async fn run_module_recoveries_task(
1501 db: Database,
1502 log_ordering_wakeup_tx: watch::Sender<()>,
1503 recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1504 module_recoveries: BTreeMap<
1505 ModuleInstanceId,
1506 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1507 >,
1508 module_recovery_progress_receivers: BTreeMap<
1509 ModuleInstanceId,
1510 watch::Receiver<RecoveryProgress>,
1511 >,
1512 ) {
1513 debug!(target: LOG_CLIENT_RECOVERY, num_modules=%module_recovery_progress_receivers.len(), "Staring module recoveries");
1514 let mut completed_stream = Vec::new();
1515 let progress_stream = futures::stream::FuturesUnordered::new();
1516
1517 for (module_instance_id, f) in module_recoveries {
1518 completed_stream.push(futures::stream::once(Box::pin(async move {
1519 match f.await {
1520 Ok(()) => (module_instance_id, None),
1521 Err(err) => {
1522 warn!(
1523 target: LOG_CLIENT,
1524 err = %err.fmt_compact_anyhow(), module_instance_id, "Module recovery failed"
1525 );
1526 futures::future::pending::<()>().await;
1530 unreachable!()
1531 }
1532 }
1533 })));
1534 }
1535
1536 for (module_instance_id, rx) in module_recovery_progress_receivers {
1537 progress_stream.push(
1538 tokio_stream::wrappers::WatchStream::new(rx)
1539 .fuse()
1540 .map(move |progress| (module_instance_id, Some(progress))),
1541 );
1542 }
1543
1544 let mut futures = futures::stream::select(
1545 futures::stream::select_all(progress_stream),
1546 futures::stream::select_all(completed_stream),
1547 );
1548
1549 while let Some((module_instance_id, progress)) = futures.next().await {
1550 let mut dbtx = db.begin_transaction().await;
1551
1552 let prev_progress = *recovery_sender
1553 .borrow()
1554 .get(&module_instance_id)
1555 .expect("existing progress must be present");
1556
1557 let progress = if prev_progress.is_done() {
1558 prev_progress
1560 } else if let Some(progress) = progress {
1561 progress
1562 } else {
1563 prev_progress.to_complete()
1564 };
1565
1566 if !prev_progress.is_done() && progress.is_done() {
1567 info!(
1568 target: LOG_CLIENT,
1569 module_instance_id,
1570 progress = format!("{}/{}", progress.complete, progress.total),
1571 "Recovery complete"
1572 );
1573 dbtx.log_event(
1574 log_ordering_wakeup_tx.clone(),
1575 None,
1576 ModuleRecoveryCompleted {
1577 module_id: module_instance_id,
1578 },
1579 )
1580 .await;
1581 } else {
1582 info!(
1583 target: LOG_CLIENT,
1584 module_instance_id,
1585 progress = format!("{}/{}", progress.complete, progress.total),
1586 "Recovery progress"
1587 );
1588 }
1589
1590 dbtx.insert_entry(
1591 &ClientModuleRecovery { module_instance_id },
1592 &ClientModuleRecoveryState { progress },
1593 )
1594 .await;
1595 dbtx.commit_tx().await;
1596
1597 recovery_sender.send_modify(|v| {
1598 v.insert(module_instance_id, progress);
1599 });
1600 }
1601 debug!(target: LOG_CLIENT_RECOVERY, "Recovery executor stopped");
1602 }
1603
1604 async fn load_peers_last_api_versions(
1605 db: &Database,
1606 num_peers: NumPeers,
1607 ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1608 let mut peer_api_version_sets = BTreeMap::new();
1609
1610 let mut dbtx = db.begin_transaction_nc().await;
1611 for peer_id in num_peers.peer_ids() {
1612 if let Some(v) = dbtx
1613 .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1614 .await
1615 {
1616 peer_api_version_sets.insert(peer_id, v.0);
1617 }
1618 }
1619 drop(dbtx);
1620 peer_api_version_sets
1621 }
1622
1623 pub async fn get_peer_url_announcements(&self) -> BTreeMap<PeerId, SignedApiAnnouncement> {
1626 self.db()
1627 .begin_transaction_nc()
1628 .await
1629 .find_by_prefix(&ApiAnnouncementPrefix)
1630 .await
1631 .map(|(announcement_key, announcement)| (announcement_key.0, announcement))
1632 .collect()
1633 .await
1634 }
1635
1636 pub async fn get_guardian_metadata(
1638 &self,
1639 ) -> BTreeMap<PeerId, fedimint_core::net::guardian_metadata::SignedGuardianMetadata> {
1640 self.db()
1641 .begin_transaction_nc()
1642 .await
1643 .find_by_prefix(&crate::guardian_metadata::GuardianMetadataPrefix)
1644 .await
1645 .map(|(key, metadata)| (key.0, metadata))
1646 .collect()
1647 .await
1648 }
1649
1650 pub async fn get_peer_urls(&self) -> BTreeMap<PeerId, SafeUrl> {
1652 get_api_urls(&self.db, &self.config().await).await
1653 }
1654
1655 pub async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
1658 self.get_peer_urls()
1659 .await
1660 .into_iter()
1661 .find_map(|(peer_id, url)| (peer == peer_id).then_some(url))
1662 .map(|peer_url| {
1663 InviteCode::new(
1664 peer_url.clone(),
1665 peer,
1666 self.federation_id(),
1667 self.api_secret.clone(),
1668 )
1669 })
1670 }
1671
1672 pub async fn get_guardian_public_keys_blocking(
1676 &self,
1677 ) -> BTreeMap<PeerId, fedimint_core::secp256k1::PublicKey> {
1678 self.db
1679 .autocommit(
1680 |dbtx, _| {
1681 Box::pin(async move {
1682 let config = self.config().await;
1683
1684 let guardian_pub_keys = self
1685 .get_or_backfill_broadcast_public_keys(dbtx, config)
1686 .await;
1687
1688 Result::<_, ()>::Ok(guardian_pub_keys)
1689 })
1690 },
1691 None,
1692 )
1693 .await
1694 .expect("Will retry forever")
1695 }
1696
1697 async fn get_or_backfill_broadcast_public_keys(
1698 &self,
1699 dbtx: &mut DatabaseTransaction<'_>,
1700 config: ClientConfig,
1701 ) -> BTreeMap<PeerId, PublicKey> {
1702 match config.global.broadcast_public_keys {
1703 Some(guardian_pub_keys) => guardian_pub_keys,
1704 _ => {
1705 let (guardian_pub_keys, new_config) = self.fetch_and_update_config(config).await;
1706
1707 dbtx.insert_entry(&ClientConfigKey, &new_config).await;
1708 *(self.config.write().await) = new_config;
1709 guardian_pub_keys
1710 }
1711 }
1712 }
1713
1714 async fn fetch_session_count(&self) -> FederationResult<u64> {
1715 self.api.session_count().await
1716 }
1717
1718 async fn fetch_and_update_config(
1719 &self,
1720 config: ClientConfig,
1721 ) -> (BTreeMap<PeerId, PublicKey>, ClientConfig) {
1722 let fetched_config = retry(
1723 "Fetching guardian public keys",
1724 backoff_util::background_backoff(),
1725 || async {
1726 Ok(self
1727 .api
1728 .request_current_consensus::<ClientConfig>(
1729 CLIENT_CONFIG_ENDPOINT.to_owned(),
1730 ApiRequestErased::default(),
1731 )
1732 .await?)
1733 },
1734 )
1735 .await
1736 .expect("Will never return on error");
1737
1738 let Some(guardian_pub_keys) = fetched_config.global.broadcast_public_keys else {
1739 warn!(
1740 target: LOG_CLIENT,
1741 "Guardian public keys not found in fetched config, server not updated to 0.4 yet"
1742 );
1743 pending::<()>().await;
1744 unreachable!("Pending will never return");
1745 };
1746
1747 let new_config = ClientConfig {
1748 global: GlobalClientConfig {
1749 broadcast_public_keys: Some(guardian_pub_keys.clone()),
1750 ..config.global
1751 },
1752 modules: config.modules,
1753 };
1754 (guardian_pub_keys, new_config)
1755 }
1756
1757 pub fn handle_global_rpc(
1758 &self,
1759 method: String,
1760 params: serde_json::Value,
1761 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1762 Box::pin(try_stream! {
1763 match method.as_str() {
1764 "get_balance" => {
1765 let balance = self.get_balance_for_btc().await.unwrap_or_default();
1766 yield serde_json::to_value(balance)?;
1767 }
1768 "subscribe_balance_changes" => {
1769 let req: GetBalanceChangesRequest= serde_json::from_value(params)?;
1770 let mut stream = self.subscribe_balance_changes(req.unit).await;
1771 while let Some(balance) = stream.next().await {
1772 yield serde_json::to_value(balance)?;
1773 }
1774 }
1775 "get_config" => {
1776 let config = self.config().await;
1777 yield serde_json::to_value(config)?;
1778 }
1779 "get_federation_id" => {
1780 let federation_id = self.federation_id();
1781 yield serde_json::to_value(federation_id)?;
1782 }
1783 "get_invite_code" => {
1784 let req: GetInviteCodeRequest = serde_json::from_value(params)?;
1785 let invite_code = self.invite_code(req.peer).await;
1786 yield serde_json::to_value(invite_code)?;
1787 }
1788 "get_operation" => {
1789 let req: GetOperationIdRequest = serde_json::from_value(params)?;
1790 let operation = self.operation_log().get_operation(req.operation_id).await;
1791 yield serde_json::to_value(operation)?;
1792 }
1793 "list_operations" => {
1794 let req: ListOperationsParams = serde_json::from_value(params)?;
1795 let limit = if req.limit.is_none() && req.last_seen.is_none() {
1796 usize::MAX
1797 } else {
1798 req.limit.unwrap_or(usize::MAX)
1799 };
1800 let operations = self.operation_log()
1801 .paginate_operations_rev(limit, req.last_seen)
1802 .await;
1803 yield serde_json::to_value(operations)?;
1804 }
1805 "session_count" => {
1806 let count = self.fetch_session_count().await?;
1807 yield serde_json::to_value(count)?;
1808 }
1809 "has_pending_recoveries" => {
1810 let has_pending = self.has_pending_recoveries();
1811 yield serde_json::to_value(has_pending)?;
1812 }
1813 "wait_for_all_recoveries" => {
1814 self.wait_for_all_recoveries().await?;
1815 yield serde_json::Value::Null;
1816 }
1817 "subscribe_to_recovery_progress" => {
1818 let mut stream = self.subscribe_to_recovery_progress();
1819 while let Some((module_id, progress)) = stream.next().await {
1820 yield serde_json::json!({
1821 "module_id": module_id,
1822 "progress": progress
1823 });
1824 }
1825 }
1826 #[allow(deprecated)]
1827 "backup_to_federation" => {
1828 let metadata = if params.is_null() {
1829 Metadata::from_json_serialized(serde_json::json!({}))
1830 } else {
1831 Metadata::from_json_serialized(params)
1832 };
1833 self.backup_to_federation(metadata).await?;
1834 yield serde_json::Value::Null;
1835 }
1836 _ => {
1837 Err(anyhow::format_err!("Unknown method: {}", method))?;
1838 unreachable!()
1839 },
1840 }
1841 })
1842 }
1843
1844 pub async fn log_event<E>(&self, module_id: Option<ModuleInstanceId>, event: E)
1845 where
1846 E: Event + Send,
1847 {
1848 let mut dbtx = self.db.begin_transaction().await;
1849 self.log_event_dbtx(&mut dbtx, module_id, event).await;
1850 dbtx.commit_tx().await;
1851 }
1852
1853 pub async fn log_event_dbtx<E, Cap>(
1854 &self,
1855 dbtx: &mut DatabaseTransaction<'_, Cap>,
1856 module_id: Option<ModuleInstanceId>,
1857 event: E,
1858 ) where
1859 E: Event + Send,
1860 Cap: Send,
1861 {
1862 dbtx.log_event(self.log_ordering_wakeup_tx.clone(), module_id, event)
1863 .await;
1864 }
1865
1866 pub async fn log_event_raw_dbtx<Cap>(
1867 &self,
1868 dbtx: &mut DatabaseTransaction<'_, Cap>,
1869 kind: EventKind,
1870 module: Option<(ModuleKind, ModuleInstanceId)>,
1871 payload: Vec<u8>,
1872 persist: EventPersistence,
1873 ) where
1874 Cap: Send,
1875 {
1876 let module_id = module.as_ref().map(|m| m.1);
1877 let module_kind = module.map(|m| m.0);
1878 dbtx.log_event_raw(
1879 self.log_ordering_wakeup_tx.clone(),
1880 kind,
1881 module_kind,
1882 module_id,
1883 payload,
1884 persist,
1885 )
1886 .await;
1887 }
1888
1889 pub fn built_in_application_event_log_tracker(&self) -> DynEventLogTrimableTracker {
1901 struct BuiltInApplicationEventLogTracker;
1902
1903 #[apply(async_trait_maybe_send!)]
1904 impl EventLogTrimableTracker for BuiltInApplicationEventLogTracker {
1905 async fn store(
1907 &mut self,
1908 dbtx: &mut DatabaseTransaction<NonCommittable>,
1909 pos: EventLogTrimableId,
1910 ) -> anyhow::Result<()> {
1911 dbtx.insert_entry(&DefaultApplicationEventLogKey, &pos)
1912 .await;
1913 Ok(())
1914 }
1915
1916 async fn load(
1918 &mut self,
1919 dbtx: &mut DatabaseTransaction<NonCommittable>,
1920 ) -> anyhow::Result<Option<EventLogTrimableId>> {
1921 Ok(dbtx.get_value(&DefaultApplicationEventLogKey).await)
1922 }
1923 }
1924 Box::new(BuiltInApplicationEventLogTracker)
1925 }
1926
1927 pub async fn handle_historical_events<F, R>(
1935 &self,
1936 tracker: fedimint_eventlog::DynEventLogTracker,
1937 handler_fn: F,
1938 ) -> anyhow::Result<()>
1939 where
1940 F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
1941 R: Future<Output = anyhow::Result<()>>,
1942 {
1943 fedimint_eventlog::handle_events(
1944 self.db.clone(),
1945 tracker,
1946 self.log_event_added_rx.clone(),
1947 handler_fn,
1948 )
1949 .await
1950 }
1951
1952 pub async fn handle_events<F, R>(
1971 &self,
1972 tracker: fedimint_eventlog::DynEventLogTrimableTracker,
1973 handler_fn: F,
1974 ) -> anyhow::Result<()>
1975 where
1976 F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
1977 R: Future<Output = anyhow::Result<()>>,
1978 {
1979 fedimint_eventlog::handle_trimable_events(
1980 self.db.clone(),
1981 tracker,
1982 self.log_event_added_rx.clone(),
1983 handler_fn,
1984 )
1985 .await
1986 }
1987
1988 pub async fn get_event_log(
1989 &self,
1990 pos: Option<EventLogId>,
1991 limit: u64,
1992 ) -> Vec<PersistedLogEntry> {
1993 self.get_event_log_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
1994 .await
1995 }
1996
1997 pub async fn get_event_log_trimable(
1998 &self,
1999 pos: Option<EventLogTrimableId>,
2000 limit: u64,
2001 ) -> Vec<PersistedLogEntry> {
2002 self.get_event_log_trimable_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
2003 .await
2004 }
2005
2006 pub async fn get_event_log_dbtx<Cap>(
2007 &self,
2008 dbtx: &mut DatabaseTransaction<'_, Cap>,
2009 pos: Option<EventLogId>,
2010 limit: u64,
2011 ) -> Vec<PersistedLogEntry>
2012 where
2013 Cap: Send,
2014 {
2015 dbtx.get_event_log(pos, limit).await
2016 }
2017
2018 pub async fn get_event_log_trimable_dbtx<Cap>(
2019 &self,
2020 dbtx: &mut DatabaseTransaction<'_, Cap>,
2021 pos: Option<EventLogTrimableId>,
2022 limit: u64,
2023 ) -> Vec<PersistedLogEntry>
2024 where
2025 Cap: Send,
2026 {
2027 dbtx.get_event_log_trimable(pos, limit).await
2028 }
2029
2030 pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
2032 self.log_event_added_transient_tx.subscribe()
2033 }
2034
2035 pub fn log_event_added_rx(&self) -> watch::Receiver<()> {
2037 self.log_event_added_rx.clone()
2038 }
2039
2040 pub fn iroh_enable_dht(&self) -> bool {
2041 self.iroh_enable_dht
2042 }
2043
2044 pub(crate) async fn run_core_migrations(
2045 db_no_decoders: &Database,
2046 ) -> Result<(), anyhow::Error> {
2047 let mut dbtx = db_no_decoders.begin_transaction().await;
2048 apply_migrations_core_client_dbtx(&mut dbtx.to_ref_nc(), "fedimint-client".to_string())
2049 .await?;
2050 if is_running_in_test_env() {
2051 verify_client_db_integrity_dbtx(&mut dbtx.to_ref_nc()).await;
2052 }
2053 dbtx.commit_tx_result().await?;
2054 Ok(())
2055 }
2056
2057 fn primary_modules_for_unit(
2059 &self,
2060 unit: AmountUnit,
2061 ) -> impl Iterator<Item = (ModuleInstanceId, &DynClientModule)> {
2062 self.primary_modules
2063 .iter()
2064 .flat_map(move |(_prio, candidates)| {
2065 candidates
2066 .specific
2067 .get(&unit)
2068 .into_iter()
2069 .flatten()
2070 .copied()
2071 .chain(candidates.wildcard.iter().copied())
2073 })
2074 .map(|id| (id, self.modules.get_expect(id)))
2075 }
2076
2077 pub fn primary_module_for_unit(
2081 &self,
2082 unit: AmountUnit,
2083 ) -> Option<(ModuleInstanceId, &DynClientModule)> {
2084 self.primary_modules_for_unit(unit).next()
2085 }
2086
2087 pub fn primary_module_for_btc(&self) -> (ModuleInstanceId, &DynClientModule) {
2089 self.primary_module_for_unit(AmountUnit::BITCOIN)
2090 .expect("No primary module for Bitcoin")
2091 }
2092}
2093
2094#[apply(async_trait_maybe_send!)]
2095impl ClientContextIface for Client {
2096 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
2097 Client::get_module(self, instance)
2098 }
2099
2100 fn api_clone(&self) -> DynGlobalApi {
2101 Client::api_clone(self)
2102 }
2103 fn decoders(&self) -> &ModuleDecoderRegistry {
2104 Client::decoders(self)
2105 }
2106
2107 async fn finalize_and_submit_transaction(
2108 &self,
2109 operation_id: OperationId,
2110 operation_type: &str,
2111 operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
2112 tx_builder: TransactionBuilder,
2113 ) -> anyhow::Result<OutPointRange> {
2114 Client::finalize_and_submit_transaction(
2115 self,
2116 operation_id,
2117 operation_type,
2118 &operation_meta_gen,
2120 tx_builder,
2121 )
2122 .await
2123 }
2124
2125 async fn finalize_and_submit_transaction_inner(
2126 &self,
2127 dbtx: &mut DatabaseTransaction<'_>,
2128 operation_id: OperationId,
2129 tx_builder: TransactionBuilder,
2130 ) -> anyhow::Result<OutPointRange> {
2131 Client::finalize_and_submit_transaction_inner(self, dbtx, operation_id, tx_builder).await
2132 }
2133
2134 async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
2135 Client::transaction_updates(self, operation_id).await
2136 }
2137
2138 async fn await_primary_module_outputs(
2139 &self,
2140 operation_id: OperationId,
2141 outputs: Vec<OutPoint>,
2143 ) -> anyhow::Result<()> {
2144 Client::await_primary_bitcoin_module_outputs(self, operation_id, outputs).await
2145 }
2146
2147 fn operation_log(&self) -> &dyn IOperationLog {
2148 Client::operation_log(self)
2149 }
2150
2151 async fn has_active_states(&self, operation_id: OperationId) -> bool {
2152 Client::has_active_states(self, operation_id).await
2153 }
2154
2155 async fn operation_exists(&self, operation_id: OperationId) -> bool {
2156 Client::operation_exists(self, operation_id).await
2157 }
2158
2159 async fn config(&self) -> ClientConfig {
2160 Client::config(self).await
2161 }
2162
2163 fn db(&self) -> &Database {
2164 Client::db(self)
2165 }
2166
2167 fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static)) {
2168 Client::executor(self)
2169 }
2170
2171 async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
2172 Client::invite_code(self, peer).await
2173 }
2174
2175 fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
2176 Client::get_internal_payment_markers(self)
2177 }
2178
2179 async fn log_event_json(
2180 &self,
2181 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
2182 module_kind: Option<ModuleKind>,
2183 module_id: ModuleInstanceId,
2184 kind: EventKind,
2185 payload: serde_json::Value,
2186 persist: EventPersistence,
2187 ) {
2188 dbtx.ensure_global()
2189 .expect("Must be called with global dbtx");
2190 self.log_event_raw_dbtx(
2191 dbtx,
2192 kind,
2193 module_kind.map(|kind| (kind, module_id)),
2194 serde_json::to_vec(&payload).expect("Serialization can't fail"),
2195 persist,
2196 )
2197 .await;
2198 }
2199
2200 async fn read_operation_active_states<'dbtx>(
2201 &self,
2202 operation_id: OperationId,
2203 module_id: ModuleInstanceId,
2204 dbtx: &'dbtx mut DatabaseTransaction<'_>,
2205 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>
2206 {
2207 Box::pin(
2208 dbtx.find_by_prefix(&ActiveModuleOperationStateKeyPrefix {
2209 operation_id,
2210 module_instance: module_id,
2211 })
2212 .await
2213 .map(move |(k, v)| (k.0, v)),
2214 )
2215 }
2216 async fn read_operation_inactive_states<'dbtx>(
2217 &self,
2218 operation_id: OperationId,
2219 module_id: ModuleInstanceId,
2220 dbtx: &'dbtx mut DatabaseTransaction<'_>,
2221 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>
2222 {
2223 Box::pin(
2224 dbtx.find_by_prefix(&InactiveModuleOperationStateKeyPrefix {
2225 operation_id,
2226 module_instance: module_id,
2227 })
2228 .await
2229 .map(move |(k, v)| (k.0, v)),
2230 )
2231 }
2232}
2233
2234impl fmt::Debug for Client {
2236 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2237 write!(f, "Client")
2238 }
2239}
2240
2241pub fn client_decoders<'a>(
2242 registry: &ModuleInitRegistry<DynClientModuleInit>,
2243 module_kinds: impl Iterator<Item = (ModuleInstanceId, &'a ModuleKind)>,
2244) -> ModuleDecoderRegistry {
2245 let mut modules = BTreeMap::new();
2246 for (id, kind) in module_kinds {
2247 let Some(init) = registry.get(kind) else {
2248 debug!("Detected configuration for unsupported module id: {id}, kind: {kind}");
2249 continue;
2250 };
2251
2252 modules.insert(
2253 id,
2254 (
2255 kind.clone(),
2256 IClientModuleInit::decoder(AsRef::<dyn IClientModuleInit + 'static>::as_ref(init)),
2257 ),
2258 );
2259 }
2260 ModuleDecoderRegistry::from(modules)
2261}