1use std::collections::{BTreeMap, HashSet};
2use std::fmt::{self, Formatter};
3use std::future::{Future, pending};
4use std::ops::Range;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9use anyhow::{Context as _, anyhow, bail, format_err};
10use async_stream::try_stream;
11use bitcoin::key::Secp256k1;
12use bitcoin::key::rand::thread_rng;
13use bitcoin::secp256k1::{self, PublicKey};
14use fedimint_api_client::api::global_api::with_request_hook::ApiRequestHook;
15use fedimint_api_client::api::{
16 ApiVersionSet, DynGlobalApi, FederationApiExt as _, FederationResult, IGlobalFederationApi,
17};
18use fedimint_client_module::module::recovery::RecoveryProgress;
19use fedimint_client_module::module::{
20 ClientContextIface, ClientModule, ClientModuleRegistry, DynClientModule, FinalClientIface,
21 IClientModule, IdxRange, OutPointRange, PrimaryModulePriority,
22};
23use fedimint_client_module::oplog::IOperationLog;
24use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy as _};
25use fedimint_client_module::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
26use fedimint_client_module::sm::{ActiveStateMeta, DynState, InactiveStateMeta};
27use fedimint_client_module::transaction::{
28 TRANSACTION_SUBMISSION_MODULE_INSTANCE, TransactionBuilder, TxSubmissionStates,
29 TxSubmissionStatesSM,
30};
31use fedimint_client_module::{
32 AddStateMachinesResult, ClientModuleInstance, GetInviteCodeRequest, ModuleGlobalContextGen,
33 ModuleRecoveryCompleted, TransactionUpdates, TxCreatedEvent,
34};
35use fedimint_connectors::ConnectorRegistry;
36use fedimint_core::config::{
37 ClientConfig, FederationId, GlobalClientConfig, JsonClientConfig, ModuleInitRegistry,
38};
39use fedimint_core::core::{DynInput, DynOutput, ModuleInstanceId, ModuleKind, OperationId};
40use fedimint_core::db::{
41 AutocommitError, Database, DatabaseRecord, DatabaseTransaction,
42 IDatabaseTransactionOpsCore as _, IDatabaseTransactionOpsCoreTyped as _, NonCommittable,
43};
44use fedimint_core::encoding::{Decodable, Encodable};
45use fedimint_core::endpoint_constants::{CLIENT_CONFIG_ENDPOINT, VERSION_ENDPOINT};
46use fedimint_core::envs::is_running_in_test_env;
47use fedimint_core::invite_code::InviteCode;
48use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
49use fedimint_core::module::{
50 AmountUnit, Amounts, ApiRequestErased, ApiVersion, MultiApiVersion,
51 SupportedApiVersionsSummary, SupportedCoreApiVersions, SupportedModuleApiVersions,
52};
53use fedimint_core::net::api_announcement::SignedApiAnnouncement;
54use fedimint_core::runtime::sleep;
55use fedimint_core::task::{Elapsed, MaybeSend, MaybeSync, TaskGroup};
56use fedimint_core::transaction::Transaction;
57use fedimint_core::util::backoff_util::custom_backoff;
58use fedimint_core::util::{
59 BoxStream, FmtCompact as _, FmtCompactAnyhow as _, SafeUrl, backoff_util, retry,
60};
61use fedimint_core::{
62 Amount, NumPeers, OutPoint, PeerId, apply, async_trait_maybe_send, maybe_add_send,
63 maybe_add_send_sync, runtime,
64};
65use fedimint_derive_secret::DerivableSecret;
66use fedimint_eventlog::{
67 DBTransactionEventLogExt as _, DynEventLogTrimableTracker, Event, EventKind, EventLogEntry,
68 EventLogId, EventLogTrimableId, EventLogTrimableTracker, EventPersistence, PersistedLogEntry,
69};
70use fedimint_logging::{LOG_CLIENT, LOG_CLIENT_NET_API, LOG_CLIENT_RECOVERY};
71use futures::stream::FuturesUnordered;
72use futures::{Stream, StreamExt as _};
73use global_ctx::ModuleGlobalClientContext;
74use serde::{Deserialize, Serialize};
75use tokio::sync::{broadcast, watch};
76use tokio_stream::wrappers::WatchStream;
77use tracing::{debug, info, warn};
78
79use crate::ClientBuilder;
80use crate::api_announcements::{ApiAnnouncementPrefix, get_api_urls};
81use crate::backup::Metadata;
82use crate::client::event_log::DefaultApplicationEventLogKey;
83use crate::db::{
84 ApiSecretKey, CachedApiVersionSet, CachedApiVersionSetKey, ChronologicalOperationLogKey,
85 ClientConfigKey, ClientMetadataKey, ClientModuleRecovery, ClientModuleRecoveryState,
86 EncodedClientSecretKey, OperationLogKey, PeerLastApiVersionsSummary,
87 PeerLastApiVersionsSummaryKey, PendingClientConfigKey, apply_migrations_core_client_dbtx,
88 get_decoded_client_secret, verify_client_db_integrity_dbtx,
89};
90use crate::meta::MetaService;
91use crate::module_init::{ClientModuleInitRegistry, DynClientModuleInit, IClientModuleInit};
92use crate::oplog::OperationLog;
93use crate::sm::executor::{
94 ActiveModuleOperationStateKeyPrefix, ActiveOperationStateKeyPrefix, Executor,
95 InactiveModuleOperationStateKeyPrefix, InactiveOperationStateKeyPrefix,
96};
97
98pub(crate) mod builder;
99pub(crate) mod event_log;
100pub(crate) mod global_ctx;
101pub(crate) mod handle;
102
103const SUPPORTED_CORE_API_VERSIONS: &[fedimint_core::module::ApiVersion] =
107 &[ApiVersion { major: 0, minor: 0 }];
108
109#[derive(Default)]
111pub(crate) struct PrimaryModuleCandidates {
112 specific: BTreeMap<AmountUnit, Vec<ModuleInstanceId>>,
114 wildcard: Vec<ModuleInstanceId>,
116}
117
118pub struct Client {
132 final_client: FinalClientIface,
133 config: tokio::sync::RwLock<ClientConfig>,
134 api_secret: Option<String>,
135 decoders: ModuleDecoderRegistry,
136 connectors: ConnectorRegistry,
137 db: Database,
138 federation_id: FederationId,
139 federation_config_meta: BTreeMap<String, String>,
140 primary_modules: BTreeMap<PrimaryModulePriority, PrimaryModuleCandidates>,
141 pub(crate) modules: ClientModuleRegistry,
142 module_inits: ClientModuleInitRegistry,
143 executor: Executor,
144 pub(crate) api: DynGlobalApi,
145 root_secret: DerivableSecret,
146 operation_log: OperationLog,
147 secp_ctx: Secp256k1<secp256k1::All>,
148 meta_service: Arc<MetaService>,
149
150 task_group: TaskGroup,
151
152 client_recovery_progress_receiver:
154 watch::Receiver<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
155
156 log_ordering_wakeup_tx: watch::Sender<()>,
159 log_event_added_rx: watch::Receiver<()>,
161 log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
162 request_hook: ApiRequestHook,
163 iroh_enable_dht: bool,
164 iroh_enable_next: bool,
165}
166
167#[derive(Debug, Serialize, Deserialize)]
168struct ListOperationsParams {
169 limit: Option<usize>,
170 last_seen: Option<ChronologicalOperationLogKey>,
171}
172
173#[derive(Debug, Clone, Serialize, Deserialize)]
174pub struct GetOperationIdRequest {
175 operation_id: OperationId,
176}
177
178#[derive(Debug, Clone, Serialize, Deserialize)]
179pub struct GetBalanceChangesRequest {
180 #[serde(default = "AmountUnit::bitcoin")]
181 unit: AmountUnit,
182}
183
184impl Client {
185 pub async fn builder() -> anyhow::Result<ClientBuilder> {
188 Ok(ClientBuilder::new())
189 }
190
191 pub fn api(&self) -> &(dyn IGlobalFederationApi + 'static) {
192 self.api.as_ref()
193 }
194
195 pub fn api_clone(&self) -> DynGlobalApi {
196 self.api.clone()
197 }
198
199 pub fn connection_status_stream(&self) -> impl Stream<Item = BTreeMap<PeerId, bool>> {
202 self.api.connection_status_stream()
203 }
204
205 pub fn task_group(&self) -> &TaskGroup {
207 &self.task_group
208 }
209
210 #[doc(hidden)]
212 pub fn executor(&self) -> &Executor {
213 &self.executor
214 }
215
216 pub async fn get_config_from_db(db: &Database) -> Option<ClientConfig> {
217 let mut dbtx = db.begin_transaction_nc().await;
218 dbtx.get_value(&ClientConfigKey).await
219 }
220
221 pub async fn get_pending_config_from_db(db: &Database) -> Option<ClientConfig> {
222 let mut dbtx = db.begin_transaction_nc().await;
223 dbtx.get_value(&PendingClientConfigKey).await
224 }
225
226 pub async fn get_api_secret_from_db(db: &Database) -> Option<String> {
227 let mut dbtx = db.begin_transaction_nc().await;
228 dbtx.get_value(&ApiSecretKey).await
229 }
230
231 pub async fn store_encodable_client_secret<T: Encodable>(
232 db: &Database,
233 secret: T,
234 ) -> anyhow::Result<()> {
235 let mut dbtx = db.begin_transaction().await;
236
237 if dbtx.get_value(&EncodedClientSecretKey).await.is_some() {
239 bail!("Encoded client secret already exists, cannot overwrite")
240 }
241
242 let encoded_secret = T::consensus_encode_to_vec(&secret);
243 dbtx.insert_entry(&EncodedClientSecretKey, &encoded_secret)
244 .await;
245 dbtx.commit_tx().await;
246 Ok(())
247 }
248
249 pub async fn load_decodable_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
250 let Some(secret) = Self::load_decodable_client_secret_opt(db).await? else {
251 bail!("Encoded client secret not present in DB")
252 };
253
254 Ok(secret)
255 }
256 pub async fn load_decodable_client_secret_opt<T: Decodable>(
257 db: &Database,
258 ) -> anyhow::Result<Option<T>> {
259 let mut dbtx = db.begin_transaction_nc().await;
260
261 let client_secret = dbtx.get_value(&EncodedClientSecretKey).await;
262
263 Ok(match client_secret {
264 Some(client_secret) => Some(
265 T::consensus_decode_whole(&client_secret, &ModuleRegistry::default())
266 .map_err(|e| anyhow!("Decoding failed: {e}"))?,
267 ),
268 None => None,
269 })
270 }
271
272 pub async fn load_or_generate_client_secret(db: &Database) -> anyhow::Result<[u8; 64]> {
273 let client_secret = match Self::load_decodable_client_secret::<[u8; 64]>(db).await {
274 Ok(secret) => secret,
275 _ => {
276 let secret = PlainRootSecretStrategy::random(&mut thread_rng());
277 Self::store_encodable_client_secret(db, secret)
278 .await
279 .expect("Storing client secret must work");
280 secret
281 }
282 };
283 Ok(client_secret)
284 }
285
286 pub async fn is_initialized(db: &Database) -> bool {
287 let mut dbtx = db.begin_transaction_nc().await;
288 dbtx.raw_get_bytes(&[ClientConfigKey::DB_PREFIX])
289 .await
290 .expect("Unrecoverable error occurred while reading and entry from the database")
291 .is_some()
292 }
293
294 pub fn start_executor(self: &Arc<Self>) {
295 debug!(
296 target: LOG_CLIENT,
297 "Starting fedimint client executor",
298 );
299 self.executor.start_executor(self.context_gen());
300 }
301
302 pub fn federation_id(&self) -> FederationId {
303 self.federation_id
304 }
305
306 fn context_gen(self: &Arc<Self>) -> ModuleGlobalContextGen {
307 let client_inner = Arc::downgrade(self);
308 Arc::new(move |module_instance, operation| {
309 ModuleGlobalClientContext {
310 client: client_inner
311 .clone()
312 .upgrade()
313 .expect("ModuleGlobalContextGen called after client was dropped"),
314 module_instance_id: module_instance,
315 operation,
316 }
317 .into()
318 })
319 }
320
321 pub async fn config(&self) -> ClientConfig {
322 self.config.read().await.clone()
323 }
324
325 pub fn api_secret(&self) -> &Option<String> {
327 &self.api_secret
328 }
329
330 pub async fn core_api_version(&self) -> ApiVersion {
336 self.db
339 .begin_transaction_nc()
340 .await
341 .get_value(&CachedApiVersionSetKey)
342 .await
343 .map(|cached: CachedApiVersionSet| cached.0.core)
344 .unwrap_or(ApiVersion { major: 0, minor: 0 })
345 }
346
347 pub fn decoders(&self) -> &ModuleDecoderRegistry {
348 &self.decoders
349 }
350
351 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
353 self.try_get_module(instance)
354 .expect("Module instance not found")
355 }
356
357 fn try_get_module(
358 &self,
359 instance: ModuleInstanceId,
360 ) -> Option<&maybe_add_send_sync!(dyn IClientModule)> {
361 Some(self.modules.get(instance)?.as_ref())
362 }
363
364 pub fn has_module(&self, instance: ModuleInstanceId) -> bool {
365 self.modules.get(instance).is_some()
366 }
367
368 fn transaction_builder_get_balance(&self, builder: &TransactionBuilder) -> (Amounts, Amounts) {
374 let mut in_amounts = Amounts::ZERO;
376 let mut out_amounts = Amounts::ZERO;
377 let mut fee_amounts = Amounts::ZERO;
378
379 for input in builder.inputs() {
380 let module = self.get_module(input.input.module_instance_id());
381
382 let item_fees = module.input_fee(&input.amounts, &input.input).expect(
383 "We only build transactions with input versions that are supported by the module",
384 );
385
386 in_amounts.checked_add_mut(&input.amounts);
387 fee_amounts.checked_add_mut(&item_fees);
388 }
389
390 for output in builder.outputs() {
391 let module = self.get_module(output.output.module_instance_id());
392
393 let item_fees = module.output_fee(&output.amounts, &output.output).expect(
394 "We only build transactions with output versions that are supported by the module",
395 );
396
397 out_amounts.checked_add_mut(&output.amounts);
398 fee_amounts.checked_add_mut(&item_fees);
399 }
400
401 out_amounts.checked_add_mut(&fee_amounts);
402 (in_amounts, out_amounts)
403 }
404
405 pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
406 Ok((self.federation_id().to_fake_ln_pub_key(&self.secp_ctx)?, 0))
407 }
408
409 pub fn get_config_meta(&self, key: &str) -> Option<String> {
411 self.federation_config_meta.get(key).cloned()
412 }
413
414 pub(crate) fn root_secret(&self) -> DerivableSecret {
415 self.root_secret.clone()
416 }
417
418 pub async fn add_state_machines(
419 &self,
420 dbtx: &mut DatabaseTransaction<'_>,
421 states: Vec<DynState>,
422 ) -> AddStateMachinesResult {
423 self.executor.add_state_machines_dbtx(dbtx, states).await
424 }
425
426 pub async fn get_active_operations(&self) -> HashSet<OperationId> {
428 let active_states = self.executor.get_active_states().await;
429 let mut active_operations = HashSet::with_capacity(active_states.len());
430 let mut dbtx = self.db().begin_transaction_nc().await;
431 for (state, _) in active_states {
432 let operation_id = state.operation_id();
433 if dbtx
434 .get_value(&OperationLogKey { operation_id })
435 .await
436 .is_some()
437 {
438 active_operations.insert(operation_id);
439 }
440 }
441 active_operations
442 }
443
444 pub fn operation_log(&self) -> &OperationLog {
445 &self.operation_log
446 }
447
448 pub fn meta_service(&self) -> &Arc<MetaService> {
450 &self.meta_service
451 }
452
453 pub async fn get_meta_expiration_timestamp(&self) -> Option<SystemTime> {
455 let meta_service = self.meta_service();
456 let ts = meta_service
457 .get_field::<u64>(self.db(), "federation_expiry_timestamp")
458 .await
459 .and_then(|v| v.value)?;
460 Some(UNIX_EPOCH + Duration::from_secs(ts))
461 }
462
463 async fn finalize_transaction(
465 &self,
466 dbtx: &mut DatabaseTransaction<'_>,
467 operation_id: OperationId,
468 mut partial_transaction: TransactionBuilder,
469 ) -> anyhow::Result<(Transaction, Vec<DynState>, Range<u64>)> {
470 let (in_amounts, out_amounts) = self.transaction_builder_get_balance(&partial_transaction);
471
472 let mut added_inputs_bundles = vec![];
473 let mut added_outputs_bundles = vec![];
474
475 for unit in in_amounts.units().union(&out_amounts.units()) {
486 let input_amount = in_amounts.get(unit).copied().unwrap_or_default();
487 let output_amount = out_amounts.get(unit).copied().unwrap_or_default();
488 if input_amount == output_amount {
489 continue;
490 }
491
492 let Some((module_id, module)) = self.primary_module_for_unit(*unit) else {
493 bail!("No module to balance a partial transaction (affected unit: {unit:?}");
494 };
495
496 let (added_input_bundle, added_output_bundle) = module
497 .create_final_inputs_and_outputs(
498 module_id,
499 dbtx,
500 operation_id,
501 *unit,
502 input_amount,
503 output_amount,
504 )
505 .await?;
506
507 added_inputs_bundles.push(added_input_bundle);
508 added_outputs_bundles.push(added_output_bundle);
509 }
510
511 let change_range = Range {
515 start: partial_transaction.outputs().count() as u64,
516 end: (partial_transaction.outputs().count() as u64
517 + added_outputs_bundles
518 .iter()
519 .map(|output| output.outputs().len() as u64)
520 .sum::<u64>()),
521 };
522
523 for added_inputs in added_inputs_bundles {
524 partial_transaction = partial_transaction.with_inputs(added_inputs);
525 }
526
527 for added_outputs in added_outputs_bundles {
528 partial_transaction = partial_transaction.with_outputs(added_outputs);
529 }
530
531 let (input_amounts, output_amounts) =
532 self.transaction_builder_get_balance(&partial_transaction);
533
534 for (unit, output_amount) in output_amounts {
535 let input_amount = input_amounts.get(&unit).copied().unwrap_or_default();
536
537 assert!(input_amount >= output_amount, "Transaction is underfunded");
538 }
539
540 let (tx, states) = partial_transaction.build(&self.secp_ctx, thread_rng());
541
542 Ok((tx, states, change_range))
543 }
544
545 pub async fn finalize_and_submit_transaction<F, M>(
557 &self,
558 operation_id: OperationId,
559 operation_type: &str,
560 operation_meta_gen: F,
561 tx_builder: TransactionBuilder,
562 ) -> anyhow::Result<OutPointRange>
563 where
564 F: Fn(OutPointRange) -> M + Clone + MaybeSend + MaybeSync,
565 M: serde::Serialize + MaybeSend,
566 {
567 let operation_type = operation_type.to_owned();
568
569 let autocommit_res = self
570 .db
571 .autocommit(
572 |dbtx, _| {
573 let operation_type = operation_type.clone();
574 let tx_builder = tx_builder.clone();
575 let operation_meta_gen = operation_meta_gen.clone();
576 Box::pin(async move {
577 if Client::operation_exists_dbtx(dbtx, operation_id).await {
578 bail!("There already exists an operation with id {operation_id:?}")
579 }
580
581 let out_point_range = self
582 .finalize_and_submit_transaction_inner(dbtx, operation_id, tx_builder)
583 .await?;
584
585 self.operation_log()
586 .add_operation_log_entry_dbtx(
587 dbtx,
588 operation_id,
589 &operation_type,
590 operation_meta_gen(out_point_range),
591 )
592 .await;
593
594 Ok(out_point_range)
595 })
596 },
597 Some(100), )
599 .await;
600
601 match autocommit_res {
602 Ok(txid) => Ok(txid),
603 Err(AutocommitError::ClosureError { error, .. }) => Err(error),
604 Err(AutocommitError::CommitFailed {
605 attempts,
606 last_error,
607 }) => panic!(
608 "Failed to commit tx submission dbtx after {attempts} attempts: {last_error}"
609 ),
610 }
611 }
612
613 async fn finalize_and_submit_transaction_inner(
614 &self,
615 dbtx: &mut DatabaseTransaction<'_>,
616 operation_id: OperationId,
617 tx_builder: TransactionBuilder,
618 ) -> anyhow::Result<OutPointRange> {
619 let (transaction, mut states, change_range) = self
620 .finalize_transaction(&mut dbtx.to_ref_nc(), operation_id, tx_builder)
621 .await?;
622
623 if transaction.consensus_encode_to_vec().len() > Transaction::MAX_TX_SIZE {
624 let inputs = transaction
625 .inputs
626 .iter()
627 .map(DynInput::module_instance_id)
628 .collect::<Vec<_>>();
629 let outputs = transaction
630 .outputs
631 .iter()
632 .map(DynOutput::module_instance_id)
633 .collect::<Vec<_>>();
634 warn!(
635 target: LOG_CLIENT_NET_API,
636 size=%transaction.consensus_encode_to_vec().len(),
637 ?inputs,
638 ?outputs,
639 "Transaction too large",
640 );
641 debug!(target: LOG_CLIENT_NET_API, ?transaction, "transaction details");
642 bail!(
643 "The generated transaction would be rejected by the federation for being too large."
644 );
645 }
646
647 let txid = transaction.tx_hash();
648
649 debug!(target: LOG_CLIENT_NET_API, %txid, ?transaction, "Finalized and submitting transaction");
650
651 let tx_submission_sm = DynState::from_typed(
652 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
653 TxSubmissionStatesSM {
654 operation_id,
655 state: TxSubmissionStates::Created(transaction),
656 },
657 );
658 states.push(tx_submission_sm);
659
660 self.executor.add_state_machines_dbtx(dbtx, states).await?;
661
662 self.log_event_dbtx(dbtx, None, TxCreatedEvent { txid, operation_id })
663 .await;
664
665 Ok(OutPointRange::new(txid, IdxRange::from(change_range)))
666 }
667
668 async fn transaction_update_stream(
669 &self,
670 operation_id: OperationId,
671 ) -> BoxStream<'static, TxSubmissionStatesSM> {
672 self.executor
673 .notifier()
674 .module_notifier::<TxSubmissionStatesSM>(
675 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
676 self.final_client.clone(),
677 )
678 .subscribe(operation_id)
679 .await
680 }
681
682 pub async fn operation_exists(&self, operation_id: OperationId) -> bool {
683 let mut dbtx = self.db().begin_transaction_nc().await;
684
685 Client::operation_exists_dbtx(&mut dbtx, operation_id).await
686 }
687
688 pub async fn operation_exists_dbtx(
689 dbtx: &mut DatabaseTransaction<'_>,
690 operation_id: OperationId,
691 ) -> bool {
692 let active_state_exists = dbtx
693 .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
694 .await
695 .next()
696 .await
697 .is_some();
698
699 let inactive_state_exists = dbtx
700 .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
701 .await
702 .next()
703 .await
704 .is_some();
705
706 active_state_exists || inactive_state_exists
707 }
708
709 pub async fn has_active_states(&self, operation_id: OperationId) -> bool {
710 self.db
711 .begin_transaction_nc()
712 .await
713 .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
714 .await
715 .next()
716 .await
717 .is_some()
718 }
719
720 pub async fn await_primary_bitcoin_module_output(
723 &self,
724 operation_id: OperationId,
725 out_point: OutPoint,
726 ) -> anyhow::Result<()> {
727 self.primary_module_for_unit(AmountUnit::BITCOIN)
728 .ok_or_else(|| anyhow!("No primary module available"))?
729 .1
730 .await_primary_module_output(operation_id, out_point)
731 .await
732 }
733
734 pub fn get_first_module<M: ClientModule>(
736 &'_ self,
737 ) -> anyhow::Result<ClientModuleInstance<'_, M>> {
738 let module_kind = M::kind();
739 let id = self
740 .get_first_instance(&module_kind)
741 .ok_or_else(|| format_err!("No modules found of kind {module_kind}"))?;
742 let module: &M = self
743 .try_get_module(id)
744 .ok_or_else(|| format_err!("Unknown module instance {id}"))?
745 .as_any()
746 .downcast_ref::<M>()
747 .ok_or_else(|| format_err!("Module is not of type {}", std::any::type_name::<M>()))?;
748 let (db, _) = self.db().with_prefix_module_id(id);
749 Ok(ClientModuleInstance {
750 id,
751 db,
752 api: self.api().with_module(id),
753 module,
754 })
755 }
756
757 pub fn get_module_client_dyn(
758 &self,
759 instance_id: ModuleInstanceId,
760 ) -> anyhow::Result<&maybe_add_send_sync!(dyn IClientModule)> {
761 self.try_get_module(instance_id)
762 .ok_or(anyhow!("Unknown module instance {}", instance_id))
763 }
764
765 pub fn db(&self) -> &Database {
766 &self.db
767 }
768
769 pub fn endpoints(&self) -> &ConnectorRegistry {
770 &self.connectors
771 }
772
773 pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
776 TransactionUpdates {
777 update_stream: self.transaction_update_stream(operation_id).await,
778 }
779 }
780
781 pub fn get_first_instance(&self, module_kind: &ModuleKind) -> Option<ModuleInstanceId> {
783 self.modules
784 .iter_modules()
785 .find(|(_, kind, _module)| *kind == module_kind)
786 .map(|(instance_id, _, _)| instance_id)
787 }
788
789 pub async fn root_secret_encoding<T: Decodable>(&self) -> anyhow::Result<T> {
792 get_decoded_client_secret::<T>(self.db()).await
793 }
794
795 pub async fn await_primary_bitcoin_module_outputs(
798 &self,
799 operation_id: OperationId,
800 outputs: Vec<OutPoint>,
801 ) -> anyhow::Result<()> {
802 for out_point in outputs {
803 self.await_primary_bitcoin_module_output(operation_id, out_point)
804 .await?;
805 }
806
807 Ok(())
808 }
809
810 pub async fn get_config_json(&self) -> JsonClientConfig {
816 self.config().await.to_json()
817 }
818
819 #[doc(hidden)]
822 pub async fn get_balance_for_btc(&self) -> anyhow::Result<Amount> {
825 self.get_balance_for_unit(AmountUnit::BITCOIN).await
826 }
827
828 pub async fn get_balance_for_unit(&self, unit: AmountUnit) -> anyhow::Result<Amount> {
829 let (id, module) = self
830 .primary_module_for_unit(unit)
831 .ok_or_else(|| anyhow!("Primary module not available"))?;
832 Ok(module
833 .get_balance(id, &mut self.db().begin_transaction_nc().await, unit)
834 .await)
835 }
836
837 pub async fn subscribe_balance_changes(&self, unit: AmountUnit) -> BoxStream<'static, Amount> {
840 let primary_module_things =
841 if let Some((primary_module_id, primary_module)) = self.primary_module_for_unit(unit) {
842 let balance_changes = primary_module.subscribe_balance_changes().await;
843 let initial_balance = self
844 .get_balance_for_unit(unit)
845 .await
846 .expect("Primary is present");
847
848 Some((
849 primary_module_id,
850 primary_module.clone(),
851 balance_changes,
852 initial_balance,
853 ))
854 } else {
855 None
856 };
857 let db = self.db().clone();
858
859 Box::pin(async_stream::stream! {
860 let Some((primary_module_id, primary_module, mut balance_changes, initial_balance)) = primary_module_things else {
861 pending().await
864 };
865
866
867 yield initial_balance;
868 let mut prev_balance = initial_balance;
869 while let Some(()) = balance_changes.next().await {
870 let mut dbtx = db.begin_transaction_nc().await;
871 let balance = primary_module
872 .get_balance(primary_module_id, &mut dbtx, unit)
873 .await;
874
875 if balance != prev_balance {
877 prev_balance = balance;
878 yield balance;
879 }
880 }
881 })
882 }
883
884 async fn make_api_version_request(
889 delay: Duration,
890 peer_id: PeerId,
891 api: &DynGlobalApi,
892 ) -> (
893 PeerId,
894 Result<SupportedApiVersionsSummary, fedimint_connectors::error::ServerError>,
895 ) {
896 runtime::sleep(delay).await;
897 (
898 peer_id,
899 api.request_single_peer::<SupportedApiVersionsSummary>(
900 VERSION_ENDPOINT.to_owned(),
901 ApiRequestErased::default(),
902 peer_id,
903 )
904 .await,
905 )
906 }
907
908 fn create_api_version_backoff() -> impl Iterator<Item = Duration> {
914 custom_backoff(Duration::from_millis(200), Duration::from_secs(600), None)
915 }
916
917 pub async fn fetch_common_api_versions_from_all_peers(
920 num_peers: NumPeers,
921 api: DynGlobalApi,
922 db: Database,
923 num_responses_sender: watch::Sender<usize>,
924 ) {
925 let mut backoff = Self::create_api_version_backoff();
926
927 let mut requests = FuturesUnordered::new();
930
931 for peer_id in num_peers.peer_ids() {
932 requests.push(Self::make_api_version_request(
933 Duration::ZERO,
934 peer_id,
935 &api,
936 ));
937 }
938
939 let mut num_responses = 0;
940
941 while let Some((peer_id, response)) = requests.next().await {
942 let retry = match response {
943 Err(err) => {
944 let has_previous_response = db
945 .begin_transaction_nc()
946 .await
947 .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
948 .await
949 .is_some();
950 debug!(
951 target: LOG_CLIENT,
952 %peer_id,
953 err = %err.fmt_compact(),
954 %has_previous_response,
955 "Failed to refresh API versions of a peer"
956 );
957
958 !has_previous_response
959 }
960 Ok(o) => {
961 let mut dbtx = db.begin_transaction().await;
964 dbtx.insert_entry(
965 &PeerLastApiVersionsSummaryKey(peer_id),
966 &PeerLastApiVersionsSummary(o),
967 )
968 .await;
969 dbtx.commit_tx().await;
970 false
971 }
972 };
973
974 if retry {
975 requests.push(Self::make_api_version_request(
976 backoff.next().expect("Keeps retrying"),
977 peer_id,
978 &api,
979 ));
980 } else {
981 num_responses += 1;
982 num_responses_sender.send_replace(num_responses);
983 }
984 }
985 }
986
987 pub async fn fetch_peers_api_versions_from_threshold_of_peers(
991 num_peers: NumPeers,
992 api: DynGlobalApi,
993 ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
994 let mut backoff = Self::create_api_version_backoff();
995
996 let mut requests = FuturesUnordered::new();
999
1000 for peer_id in num_peers.peer_ids() {
1001 requests.push(Self::make_api_version_request(
1002 Duration::ZERO,
1003 peer_id,
1004 &api,
1005 ));
1006 }
1007
1008 let mut successful_responses = BTreeMap::new();
1009
1010 while successful_responses.len() < num_peers.threshold()
1011 && let Some((peer_id, response)) = requests.next().await
1012 {
1013 let retry = match response {
1014 Err(err) => {
1015 debug!(
1016 target: LOG_CLIENT,
1017 %peer_id,
1018 err = %err.fmt_compact(),
1019 "Failed to fetch API versions from peer"
1020 );
1021 true
1022 }
1023 Ok(response) => {
1024 successful_responses.insert(peer_id, response);
1025 false
1026 }
1027 };
1028
1029 if retry {
1030 requests.push(Self::make_api_version_request(
1031 backoff.next().expect("Keeps retrying"),
1032 peer_id,
1033 &api,
1034 ));
1035 }
1036 }
1037
1038 successful_responses
1039 }
1040
1041 pub async fn fetch_common_api_versions(
1043 config: &ClientConfig,
1044 api: &DynGlobalApi,
1045 ) -> anyhow::Result<BTreeMap<PeerId, SupportedApiVersionsSummary>> {
1046 debug!(
1047 target: LOG_CLIENT,
1048 "Fetching common api versions"
1049 );
1050
1051 let num_peers = NumPeers::from(config.global.api_endpoints.len());
1052
1053 let peer_api_version_sets =
1054 Self::fetch_peers_api_versions_from_threshold_of_peers(num_peers, api.clone()).await;
1055
1056 Ok(peer_api_version_sets)
1057 }
1058
1059 pub async fn write_api_version_cache(
1063 dbtx: &mut DatabaseTransaction<'_>,
1064 api_version_set: ApiVersionSet,
1065 ) {
1066 debug!(
1067 target: LOG_CLIENT,
1068 value = ?api_version_set,
1069 "Writing API version set to cache"
1070 );
1071
1072 dbtx.insert_entry(
1073 &CachedApiVersionSetKey,
1074 &CachedApiVersionSet(api_version_set),
1075 )
1076 .await;
1077 }
1078
1079 pub async fn store_prefetched_api_versions(
1084 db: &Database,
1085 config: &ClientConfig,
1086 client_module_init: &ClientModuleInitRegistry,
1087 peer_api_versions: &BTreeMap<PeerId, SupportedApiVersionsSummary>,
1088 ) {
1089 debug!(
1090 target: LOG_CLIENT,
1091 "Storing {} prefetched peer API version responses and calculating common version set",
1092 peer_api_versions.len()
1093 );
1094
1095 let mut dbtx = db.begin_transaction().await;
1096 let client_supported_versions =
1098 Self::supported_api_versions_summary_static(config, client_module_init);
1099 match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1100 &client_supported_versions,
1101 peer_api_versions,
1102 ) {
1103 Ok(common_api_versions) => {
1104 Self::write_api_version_cache(&mut dbtx.to_ref_nc(), common_api_versions).await;
1106 debug!(target: LOG_CLIENT, "Calculated and stored common API version set");
1107 }
1108 Err(err) => {
1109 debug!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Failed to calculate common API versions from prefetched data");
1110 }
1111 }
1112
1113 for (peer_id, peer_api_versions) in peer_api_versions {
1115 dbtx.insert_entry(
1116 &PeerLastApiVersionsSummaryKey(*peer_id),
1117 &PeerLastApiVersionsSummary(peer_api_versions.clone()),
1118 )
1119 .await;
1120 }
1121 dbtx.commit_tx().await;
1122 debug!(target: LOG_CLIENT, "Stored individual peer API version responses");
1123 }
1124
1125 pub fn supported_api_versions_summary_static(
1127 config: &ClientConfig,
1128 client_module_init: &ClientModuleInitRegistry,
1129 ) -> SupportedApiVersionsSummary {
1130 SupportedApiVersionsSummary {
1131 core: SupportedCoreApiVersions {
1132 core_consensus: config.global.consensus_version,
1133 api: MultiApiVersion::try_from_iter(SUPPORTED_CORE_API_VERSIONS.to_owned())
1134 .expect("must not have conflicting versions"),
1135 },
1136 modules: config
1137 .modules
1138 .iter()
1139 .filter_map(|(&module_instance_id, module_config)| {
1140 client_module_init
1141 .get(module_config.kind())
1142 .map(|module_init| {
1143 (
1144 module_instance_id,
1145 SupportedModuleApiVersions {
1146 core_consensus: config.global.consensus_version,
1147 module_consensus: module_config.version,
1148 api: module_init.supported_api_versions(),
1149 },
1150 )
1151 })
1152 })
1153 .collect(),
1154 }
1155 }
1156
1157 pub async fn load_and_refresh_common_api_version(&self) -> anyhow::Result<ApiVersionSet> {
1158 Self::load_and_refresh_common_api_version_static(
1159 &self.config().await,
1160 &self.module_inits,
1161 self.connectors.clone(),
1162 &self.api,
1163 &self.db,
1164 &self.task_group,
1165 )
1166 .await
1167 }
1168
1169 async fn load_and_refresh_common_api_version_static(
1175 config: &ClientConfig,
1176 module_init: &ClientModuleInitRegistry,
1177 connectors: ConnectorRegistry,
1178 api: &DynGlobalApi,
1179 db: &Database,
1180 task_group: &TaskGroup,
1181 ) -> anyhow::Result<ApiVersionSet> {
1182 if let Some(v) = db
1183 .begin_transaction_nc()
1184 .await
1185 .get_value(&CachedApiVersionSetKey)
1186 .await
1187 {
1188 debug!(
1189 target: LOG_CLIENT,
1190 "Found existing cached common api versions"
1191 );
1192 let config = config.clone();
1193 let client_module_init = module_init.clone();
1194 let api = api.clone();
1195 let db = db.clone();
1196 let task_group = task_group.clone();
1197 task_group
1200 .clone()
1201 .spawn_cancellable("refresh_common_api_version_static", async move {
1202 connectors.wait_for_initialized_connections().await;
1203
1204 if let Err(error) = Self::refresh_common_api_version_static(
1205 &config,
1206 &client_module_init,
1207 &api,
1208 &db,
1209 task_group,
1210 false,
1211 )
1212 .await
1213 {
1214 warn!(
1215 target: LOG_CLIENT,
1216 err = %error.fmt_compact_anyhow(), "Failed to discover common api versions"
1217 );
1218 }
1219 });
1220
1221 return Ok(v.0);
1222 }
1223
1224 info!(
1225 target: LOG_CLIENT,
1226 "Fetching initial API versions "
1227 );
1228 Self::refresh_common_api_version_static(
1229 config,
1230 module_init,
1231 api,
1232 db,
1233 task_group.clone(),
1234 true,
1235 )
1236 .await
1237 }
1238
1239 async fn refresh_common_api_version_static(
1240 config: &ClientConfig,
1241 client_module_init: &ClientModuleInitRegistry,
1242 api: &DynGlobalApi,
1243 db: &Database,
1244 task_group: TaskGroup,
1245 block_until_ok: bool,
1246 ) -> anyhow::Result<ApiVersionSet> {
1247 debug!(
1248 target: LOG_CLIENT,
1249 "Refreshing common api versions"
1250 );
1251
1252 let (num_responses_sender, mut num_responses_receiver) = tokio::sync::watch::channel(0);
1253 let num_peers = NumPeers::from(config.global.api_endpoints.len());
1254
1255 task_group.spawn_cancellable("refresh peers api versions", {
1256 Client::fetch_common_api_versions_from_all_peers(
1257 num_peers,
1258 api.clone(),
1259 db.clone(),
1260 num_responses_sender,
1261 )
1262 });
1263
1264 let common_api_versions = loop {
1265 let _: Result<_, Elapsed> = runtime::timeout(
1273 Duration::from_secs(30),
1274 num_responses_receiver.wait_for(|num| num_peers.threshold() <= *num),
1275 )
1276 .await;
1277
1278 let peer_api_version_sets = Self::load_peers_last_api_versions(db, num_peers).await;
1279
1280 match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1281 &Self::supported_api_versions_summary_static(config, client_module_init),
1282 &peer_api_version_sets,
1283 ) {
1284 Ok(o) => break o,
1285 Err(err) if block_until_ok => {
1286 warn!(
1287 target: LOG_CLIENT,
1288 err = %err.fmt_compact_anyhow(),
1289 "Failed to discover API version to use. Retrying..."
1290 );
1291 continue;
1292 }
1293 Err(e) => return Err(e),
1294 }
1295 };
1296
1297 debug!(
1298 target: LOG_CLIENT,
1299 value = ?common_api_versions,
1300 "Updating the cached common api versions"
1301 );
1302 let mut dbtx = db.begin_transaction().await;
1303 let _ = dbtx
1304 .insert_entry(
1305 &CachedApiVersionSetKey,
1306 &CachedApiVersionSet(common_api_versions.clone()),
1307 )
1308 .await;
1309
1310 dbtx.commit_tx().await;
1311
1312 Ok(common_api_versions)
1313 }
1314
1315 pub async fn get_metadata(&self) -> Metadata {
1317 self.db
1318 .begin_transaction_nc()
1319 .await
1320 .get_value(&ClientMetadataKey)
1321 .await
1322 .unwrap_or_else(|| {
1323 warn!(
1324 target: LOG_CLIENT,
1325 "Missing existing metadata. This key should have been set on Client init"
1326 );
1327 Metadata::empty()
1328 })
1329 }
1330
1331 pub async fn set_metadata(&self, metadata: &Metadata) {
1333 self.db
1334 .autocommit::<_, _, anyhow::Error>(
1335 |dbtx, _| {
1336 Box::pin(async {
1337 Self::set_metadata_dbtx(dbtx, metadata).await;
1338 Ok(())
1339 })
1340 },
1341 None,
1342 )
1343 .await
1344 .expect("Failed to autocommit metadata");
1345 }
1346
1347 pub fn has_pending_recoveries(&self) -> bool {
1348 !self
1349 .client_recovery_progress_receiver
1350 .borrow()
1351 .iter()
1352 .all(|(_id, progress)| progress.is_done())
1353 }
1354
1355 pub async fn wait_for_all_recoveries(&self) -> anyhow::Result<()> {
1363 let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1364 recovery_receiver
1365 .wait_for(|in_progress| {
1366 in_progress
1367 .iter()
1368 .all(|(_id, progress)| progress.is_done())
1369 })
1370 .await
1371 .context("Recovery task completed and update receiver disconnected, but some modules failed to recover")?;
1372
1373 Ok(())
1374 }
1375
1376 pub fn subscribe_to_recovery_progress(
1381 &self,
1382 ) -> impl Stream<Item = (ModuleInstanceId, RecoveryProgress)> + use<> {
1383 WatchStream::new(self.client_recovery_progress_receiver.clone())
1384 .flat_map(futures::stream::iter)
1385 }
1386
1387 pub async fn wait_for_module_kind_recovery(
1388 &self,
1389 module_kind: ModuleKind,
1390 ) -> anyhow::Result<()> {
1391 let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1392 let config = self.config().await;
1393 recovery_receiver
1394 .wait_for(|in_progress| {
1395 !in_progress
1396 .iter()
1397 .filter(|(module_instance_id, _progress)| {
1398 config.modules[module_instance_id].kind == module_kind
1399 })
1400 .any(|(_id, progress)| !progress.is_done())
1401 })
1402 .await
1403 .context("Recovery task completed and update receiver disconnected, but the desired modules are still unavailable or failed to recover")?;
1404
1405 Ok(())
1406 }
1407
1408 pub async fn wait_for_all_active_state_machines(&self) -> anyhow::Result<()> {
1409 loop {
1410 if self.executor.get_active_states().await.is_empty() {
1411 break;
1412 }
1413 sleep(Duration::from_millis(100)).await;
1414 }
1415 Ok(())
1416 }
1417
1418 pub async fn set_metadata_dbtx(dbtx: &mut DatabaseTransaction<'_>, metadata: &Metadata) {
1420 dbtx.insert_new_entry(&ClientMetadataKey, metadata).await;
1421 }
1422
1423 fn spawn_module_recoveries_task(
1424 &self,
1425 recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1426 module_recoveries: BTreeMap<
1427 ModuleInstanceId,
1428 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1429 >,
1430 module_recovery_progress_receivers: BTreeMap<
1431 ModuleInstanceId,
1432 watch::Receiver<RecoveryProgress>,
1433 >,
1434 ) {
1435 let db = self.db.clone();
1436 let log_ordering_wakeup_tx = self.log_ordering_wakeup_tx.clone();
1437 self.task_group
1438 .spawn("module recoveries", |_task_handle| async {
1439 Self::run_module_recoveries_task(
1440 db,
1441 log_ordering_wakeup_tx,
1442 recovery_sender,
1443 module_recoveries,
1444 module_recovery_progress_receivers,
1445 )
1446 .await;
1447 });
1448 }
1449
1450 async fn run_module_recoveries_task(
1451 db: Database,
1452 log_ordering_wakeup_tx: watch::Sender<()>,
1453 recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1454 module_recoveries: BTreeMap<
1455 ModuleInstanceId,
1456 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1457 >,
1458 module_recovery_progress_receivers: BTreeMap<
1459 ModuleInstanceId,
1460 watch::Receiver<RecoveryProgress>,
1461 >,
1462 ) {
1463 debug!(target: LOG_CLIENT_RECOVERY, num_modules=%module_recovery_progress_receivers.len(), "Staring module recoveries");
1464 let mut completed_stream = Vec::new();
1465 let progress_stream = futures::stream::FuturesUnordered::new();
1466
1467 for (module_instance_id, f) in module_recoveries {
1468 completed_stream.push(futures::stream::once(Box::pin(async move {
1469 match f.await {
1470 Ok(()) => (module_instance_id, None),
1471 Err(err) => {
1472 warn!(
1473 target: LOG_CLIENT,
1474 err = %err.fmt_compact_anyhow(), module_instance_id, "Module recovery failed"
1475 );
1476 futures::future::pending::<()>().await;
1480 unreachable!()
1481 }
1482 }
1483 })));
1484 }
1485
1486 for (module_instance_id, rx) in module_recovery_progress_receivers {
1487 progress_stream.push(
1488 tokio_stream::wrappers::WatchStream::new(rx)
1489 .fuse()
1490 .map(move |progress| (module_instance_id, Some(progress))),
1491 );
1492 }
1493
1494 let mut futures = futures::stream::select(
1495 futures::stream::select_all(progress_stream),
1496 futures::stream::select_all(completed_stream),
1497 );
1498
1499 while let Some((module_instance_id, progress)) = futures.next().await {
1500 let mut dbtx = db.begin_transaction().await;
1501
1502 let prev_progress = *recovery_sender
1503 .borrow()
1504 .get(&module_instance_id)
1505 .expect("existing progress must be present");
1506
1507 let progress = if prev_progress.is_done() {
1508 prev_progress
1510 } else if let Some(progress) = progress {
1511 progress
1512 } else {
1513 prev_progress.to_complete()
1514 };
1515
1516 if !prev_progress.is_done() && progress.is_done() {
1517 info!(
1518 target: LOG_CLIENT,
1519 module_instance_id,
1520 progress = format!("{}/{}", progress.complete, progress.total),
1521 "Recovery complete"
1522 );
1523 dbtx.log_event(
1524 log_ordering_wakeup_tx.clone(),
1525 None,
1526 ModuleRecoveryCompleted {
1527 module_id: module_instance_id,
1528 },
1529 )
1530 .await;
1531 } else {
1532 info!(
1533 target: LOG_CLIENT,
1534 module_instance_id,
1535 progress = format!("{}/{}", progress.complete, progress.total),
1536 "Recovery progress"
1537 );
1538 }
1539
1540 dbtx.insert_entry(
1541 &ClientModuleRecovery { module_instance_id },
1542 &ClientModuleRecoveryState { progress },
1543 )
1544 .await;
1545 dbtx.commit_tx().await;
1546
1547 recovery_sender.send_modify(|v| {
1548 v.insert(module_instance_id, progress);
1549 });
1550 }
1551 debug!(target: LOG_CLIENT_RECOVERY, "Recovery executor stopped");
1552 }
1553
1554 async fn load_peers_last_api_versions(
1555 db: &Database,
1556 num_peers: NumPeers,
1557 ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1558 let mut peer_api_version_sets = BTreeMap::new();
1559
1560 let mut dbtx = db.begin_transaction_nc().await;
1561 for peer_id in num_peers.peer_ids() {
1562 if let Some(v) = dbtx
1563 .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1564 .await
1565 {
1566 peer_api_version_sets.insert(peer_id, v.0);
1567 }
1568 }
1569 drop(dbtx);
1570 peer_api_version_sets
1571 }
1572
1573 pub async fn get_peer_url_announcements(&self) -> BTreeMap<PeerId, SignedApiAnnouncement> {
1576 self.db()
1577 .begin_transaction_nc()
1578 .await
1579 .find_by_prefix(&ApiAnnouncementPrefix)
1580 .await
1581 .map(|(announcement_key, announcement)| (announcement_key.0, announcement))
1582 .collect()
1583 .await
1584 }
1585
1586 pub async fn get_peer_urls(&self) -> BTreeMap<PeerId, SafeUrl> {
1588 get_api_urls(&self.db, &self.config().await).await
1589 }
1590
1591 pub async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
1594 self.get_peer_urls()
1595 .await
1596 .into_iter()
1597 .find_map(|(peer_id, url)| (peer == peer_id).then_some(url))
1598 .map(|peer_url| {
1599 InviteCode::new(
1600 peer_url.clone(),
1601 peer,
1602 self.federation_id(),
1603 self.api_secret.clone(),
1604 )
1605 })
1606 }
1607
1608 pub async fn get_guardian_public_keys_blocking(
1612 &self,
1613 ) -> BTreeMap<PeerId, fedimint_core::secp256k1::PublicKey> {
1614 self.db
1615 .autocommit(
1616 |dbtx, _| {
1617 Box::pin(async move {
1618 let config = self.config().await;
1619
1620 let guardian_pub_keys = self
1621 .get_or_backfill_broadcast_public_keys(dbtx, config)
1622 .await;
1623
1624 Result::<_, ()>::Ok(guardian_pub_keys)
1625 })
1626 },
1627 None,
1628 )
1629 .await
1630 .expect("Will retry forever")
1631 }
1632
1633 async fn get_or_backfill_broadcast_public_keys(
1634 &self,
1635 dbtx: &mut DatabaseTransaction<'_>,
1636 config: ClientConfig,
1637 ) -> BTreeMap<PeerId, PublicKey> {
1638 match config.global.broadcast_public_keys {
1639 Some(guardian_pub_keys) => guardian_pub_keys,
1640 _ => {
1641 let (guardian_pub_keys, new_config) = self.fetch_and_update_config(config).await;
1642
1643 dbtx.insert_entry(&ClientConfigKey, &new_config).await;
1644 *(self.config.write().await) = new_config;
1645 guardian_pub_keys
1646 }
1647 }
1648 }
1649
1650 async fn fetch_session_count(&self) -> FederationResult<u64> {
1651 self.api.session_count().await
1652 }
1653
1654 async fn fetch_and_update_config(
1655 &self,
1656 config: ClientConfig,
1657 ) -> (BTreeMap<PeerId, PublicKey>, ClientConfig) {
1658 let fetched_config = retry(
1659 "Fetching guardian public keys",
1660 backoff_util::background_backoff(),
1661 || async {
1662 Ok(self
1663 .api
1664 .request_current_consensus::<ClientConfig>(
1665 CLIENT_CONFIG_ENDPOINT.to_owned(),
1666 ApiRequestErased::default(),
1667 )
1668 .await?)
1669 },
1670 )
1671 .await
1672 .expect("Will never return on error");
1673
1674 let Some(guardian_pub_keys) = fetched_config.global.broadcast_public_keys else {
1675 warn!(
1676 target: LOG_CLIENT,
1677 "Guardian public keys not found in fetched config, server not updated to 0.4 yet"
1678 );
1679 pending::<()>().await;
1680 unreachable!("Pending will never return");
1681 };
1682
1683 let new_config = ClientConfig {
1684 global: GlobalClientConfig {
1685 broadcast_public_keys: Some(guardian_pub_keys.clone()),
1686 ..config.global
1687 },
1688 modules: config.modules,
1689 };
1690 (guardian_pub_keys, new_config)
1691 }
1692
1693 pub fn handle_global_rpc(
1694 &self,
1695 method: String,
1696 params: serde_json::Value,
1697 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1698 Box::pin(try_stream! {
1699 match method.as_str() {
1700 "get_balance" => {
1701 let balance = self.get_balance_for_btc().await.unwrap_or_default();
1702 yield serde_json::to_value(balance)?;
1703 }
1704 "subscribe_balance_changes" => {
1705 let req: GetBalanceChangesRequest= serde_json::from_value(params)?;
1706 let mut stream = self.subscribe_balance_changes(req.unit).await;
1707 while let Some(balance) = stream.next().await {
1708 yield serde_json::to_value(balance)?;
1709 }
1710 }
1711 "get_config" => {
1712 let config = self.config().await;
1713 yield serde_json::to_value(config)?;
1714 }
1715 "get_federation_id" => {
1716 let federation_id = self.federation_id();
1717 yield serde_json::to_value(federation_id)?;
1718 }
1719 "get_invite_code" => {
1720 let req: GetInviteCodeRequest = serde_json::from_value(params)?;
1721 let invite_code = self.invite_code(req.peer).await;
1722 yield serde_json::to_value(invite_code)?;
1723 }
1724 "get_operation" => {
1725 let req: GetOperationIdRequest = serde_json::from_value(params)?;
1726 let operation = self.operation_log().get_operation(req.operation_id).await;
1727 yield serde_json::to_value(operation)?;
1728 }
1729 "list_operations" => {
1730 let req: ListOperationsParams = serde_json::from_value(params)?;
1731 let limit = if req.limit.is_none() && req.last_seen.is_none() {
1732 usize::MAX
1733 } else {
1734 req.limit.unwrap_or(usize::MAX)
1735 };
1736 let operations = self.operation_log()
1737 .paginate_operations_rev(limit, req.last_seen)
1738 .await;
1739 yield serde_json::to_value(operations)?;
1740 }
1741 "session_count" => {
1742 let count = self.fetch_session_count().await?;
1743 yield serde_json::to_value(count)?;
1744 }
1745 "has_pending_recoveries" => {
1746 let has_pending = self.has_pending_recoveries();
1747 yield serde_json::to_value(has_pending)?;
1748 }
1749 "wait_for_all_recoveries" => {
1750 self.wait_for_all_recoveries().await?;
1751 yield serde_json::Value::Null;
1752 }
1753 "subscribe_to_recovery_progress" => {
1754 let mut stream = self.subscribe_to_recovery_progress();
1755 while let Some((module_id, progress)) = stream.next().await {
1756 yield serde_json::json!({
1757 "module_id": module_id,
1758 "progress": progress
1759 });
1760 }
1761 }
1762 "backup_to_federation" => {
1763 let metadata = if params.is_null() {
1764 Metadata::from_json_serialized(serde_json::json!({}))
1765 } else {
1766 Metadata::from_json_serialized(params)
1767 };
1768 self.backup_to_federation(metadata).await?;
1769 yield serde_json::Value::Null;
1770 }
1771 _ => {
1772 Err(anyhow::format_err!("Unknown method: {}", method))?;
1773 unreachable!()
1774 },
1775 }
1776 })
1777 }
1778
1779 pub async fn log_event<E>(&self, module_id: Option<ModuleInstanceId>, event: E)
1780 where
1781 E: Event + Send,
1782 {
1783 let mut dbtx = self.db.begin_transaction().await;
1784 self.log_event_dbtx(&mut dbtx, module_id, event).await;
1785 dbtx.commit_tx().await;
1786 }
1787
1788 pub async fn log_event_dbtx<E, Cap>(
1789 &self,
1790 dbtx: &mut DatabaseTransaction<'_, Cap>,
1791 module_id: Option<ModuleInstanceId>,
1792 event: E,
1793 ) where
1794 E: Event + Send,
1795 Cap: Send,
1796 {
1797 dbtx.log_event(self.log_ordering_wakeup_tx.clone(), module_id, event)
1798 .await;
1799 }
1800
1801 pub async fn log_event_raw_dbtx<Cap>(
1802 &self,
1803 dbtx: &mut DatabaseTransaction<'_, Cap>,
1804 kind: EventKind,
1805 module: Option<(ModuleKind, ModuleInstanceId)>,
1806 payload: Vec<u8>,
1807 persist: EventPersistence,
1808 ) where
1809 Cap: Send,
1810 {
1811 let module_id = module.as_ref().map(|m| m.1);
1812 let module_kind = module.map(|m| m.0);
1813 dbtx.log_event_raw(
1814 self.log_ordering_wakeup_tx.clone(),
1815 kind,
1816 module_kind,
1817 module_id,
1818 payload,
1819 persist,
1820 )
1821 .await;
1822 }
1823
1824 pub fn built_in_application_event_log_tracker(&self) -> DynEventLogTrimableTracker {
1836 struct BuiltInApplicationEventLogTracker;
1837
1838 #[apply(async_trait_maybe_send!)]
1839 impl EventLogTrimableTracker for BuiltInApplicationEventLogTracker {
1840 async fn store(
1842 &mut self,
1843 dbtx: &mut DatabaseTransaction<NonCommittable>,
1844 pos: EventLogTrimableId,
1845 ) -> anyhow::Result<()> {
1846 dbtx.insert_entry(&DefaultApplicationEventLogKey, &pos)
1847 .await;
1848 Ok(())
1849 }
1850
1851 async fn load(
1853 &mut self,
1854 dbtx: &mut DatabaseTransaction<NonCommittable>,
1855 ) -> anyhow::Result<Option<EventLogTrimableId>> {
1856 Ok(dbtx.get_value(&DefaultApplicationEventLogKey).await)
1857 }
1858 }
1859 Box::new(BuiltInApplicationEventLogTracker)
1860 }
1861
1862 pub async fn handle_historical_events<F, R>(
1870 &self,
1871 tracker: fedimint_eventlog::DynEventLogTracker,
1872 handler_fn: F,
1873 ) -> anyhow::Result<()>
1874 where
1875 F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
1876 R: Future<Output = anyhow::Result<()>>,
1877 {
1878 fedimint_eventlog::handle_events(
1879 self.db.clone(),
1880 tracker,
1881 self.log_event_added_rx.clone(),
1882 handler_fn,
1883 )
1884 .await
1885 }
1886
1887 pub async fn handle_events<F, R>(
1906 &self,
1907 tracker: fedimint_eventlog::DynEventLogTrimableTracker,
1908 handler_fn: F,
1909 ) -> anyhow::Result<()>
1910 where
1911 F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
1912 R: Future<Output = anyhow::Result<()>>,
1913 {
1914 fedimint_eventlog::handle_trimable_events(
1915 self.db.clone(),
1916 tracker,
1917 self.log_event_added_rx.clone(),
1918 handler_fn,
1919 )
1920 .await
1921 }
1922
1923 pub async fn get_event_log(
1924 &self,
1925 pos: Option<EventLogId>,
1926 limit: u64,
1927 ) -> Vec<PersistedLogEntry> {
1928 self.get_event_log_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
1929 .await
1930 }
1931
1932 pub async fn get_event_log_trimable(
1933 &self,
1934 pos: Option<EventLogTrimableId>,
1935 limit: u64,
1936 ) -> Vec<PersistedLogEntry> {
1937 self.get_event_log_trimable_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
1938 .await
1939 }
1940
1941 pub async fn get_event_log_dbtx<Cap>(
1942 &self,
1943 dbtx: &mut DatabaseTransaction<'_, Cap>,
1944 pos: Option<EventLogId>,
1945 limit: u64,
1946 ) -> Vec<PersistedLogEntry>
1947 where
1948 Cap: Send,
1949 {
1950 dbtx.get_event_log(pos, limit).await
1951 }
1952
1953 pub async fn get_event_log_trimable_dbtx<Cap>(
1954 &self,
1955 dbtx: &mut DatabaseTransaction<'_, Cap>,
1956 pos: Option<EventLogTrimableId>,
1957 limit: u64,
1958 ) -> Vec<PersistedLogEntry>
1959 where
1960 Cap: Send,
1961 {
1962 dbtx.get_event_log_trimable(pos, limit).await
1963 }
1964
1965 pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
1967 self.log_event_added_transient_tx.subscribe()
1968 }
1969
1970 pub fn log_event_added_rx(&self) -> watch::Receiver<()> {
1972 self.log_event_added_rx.clone()
1973 }
1974
1975 pub fn iroh_enable_dht(&self) -> bool {
1976 self.iroh_enable_dht
1977 }
1978
1979 pub(crate) async fn run_core_migrations(
1980 db_no_decoders: &Database,
1981 ) -> Result<(), anyhow::Error> {
1982 let mut dbtx = db_no_decoders.begin_transaction().await;
1983 apply_migrations_core_client_dbtx(&mut dbtx.to_ref_nc(), "fedimint-client".to_string())
1984 .await?;
1985 if is_running_in_test_env() {
1986 verify_client_db_integrity_dbtx(&mut dbtx.to_ref_nc()).await;
1987 }
1988 dbtx.commit_tx_result().await?;
1989 Ok(())
1990 }
1991
1992 fn primary_modules_for_unit(
1994 &self,
1995 unit: AmountUnit,
1996 ) -> impl Iterator<Item = (ModuleInstanceId, &DynClientModule)> {
1997 self.primary_modules
1998 .iter()
1999 .flat_map(move |(_prio, candidates)| {
2000 candidates
2001 .specific
2002 .get(&unit)
2003 .into_iter()
2004 .flatten()
2005 .copied()
2006 .chain(candidates.wildcard.iter().copied())
2008 })
2009 .map(|id| (id, self.modules.get_expect(id)))
2010 }
2011
2012 pub fn primary_module_for_unit(
2016 &self,
2017 unit: AmountUnit,
2018 ) -> Option<(ModuleInstanceId, &DynClientModule)> {
2019 self.primary_modules_for_unit(unit).next()
2020 }
2021
2022 pub fn primary_module_for_btc(&self) -> (ModuleInstanceId, &DynClientModule) {
2024 self.primary_module_for_unit(AmountUnit::BITCOIN)
2025 .expect("No primary module for Bitcoin")
2026 }
2027}
2028
2029#[apply(async_trait_maybe_send!)]
2030impl ClientContextIface for Client {
2031 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
2032 Client::get_module(self, instance)
2033 }
2034
2035 fn api_clone(&self) -> DynGlobalApi {
2036 Client::api_clone(self)
2037 }
2038 fn decoders(&self) -> &ModuleDecoderRegistry {
2039 Client::decoders(self)
2040 }
2041
2042 async fn finalize_and_submit_transaction(
2043 &self,
2044 operation_id: OperationId,
2045 operation_type: &str,
2046 operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
2047 tx_builder: TransactionBuilder,
2048 ) -> anyhow::Result<OutPointRange> {
2049 Client::finalize_and_submit_transaction(
2050 self,
2051 operation_id,
2052 operation_type,
2053 &operation_meta_gen,
2055 tx_builder,
2056 )
2057 .await
2058 }
2059
2060 async fn finalize_and_submit_transaction_inner(
2061 &self,
2062 dbtx: &mut DatabaseTransaction<'_>,
2063 operation_id: OperationId,
2064 tx_builder: TransactionBuilder,
2065 ) -> anyhow::Result<OutPointRange> {
2066 Client::finalize_and_submit_transaction_inner(self, dbtx, operation_id, tx_builder).await
2067 }
2068
2069 async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
2070 Client::transaction_updates(self, operation_id).await
2071 }
2072
2073 async fn await_primary_module_outputs(
2074 &self,
2075 operation_id: OperationId,
2076 outputs: Vec<OutPoint>,
2078 ) -> anyhow::Result<()> {
2079 Client::await_primary_bitcoin_module_outputs(self, operation_id, outputs).await
2080 }
2081
2082 fn operation_log(&self) -> &dyn IOperationLog {
2083 Client::operation_log(self)
2084 }
2085
2086 async fn has_active_states(&self, operation_id: OperationId) -> bool {
2087 Client::has_active_states(self, operation_id).await
2088 }
2089
2090 async fn operation_exists(&self, operation_id: OperationId) -> bool {
2091 Client::operation_exists(self, operation_id).await
2092 }
2093
2094 async fn config(&self) -> ClientConfig {
2095 Client::config(self).await
2096 }
2097
2098 fn db(&self) -> &Database {
2099 Client::db(self)
2100 }
2101
2102 fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static)) {
2103 Client::executor(self)
2104 }
2105
2106 async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
2107 Client::invite_code(self, peer).await
2108 }
2109
2110 fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
2111 Client::get_internal_payment_markers(self)
2112 }
2113
2114 async fn log_event_json(
2115 &self,
2116 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
2117 module_kind: Option<ModuleKind>,
2118 module_id: ModuleInstanceId,
2119 kind: EventKind,
2120 payload: serde_json::Value,
2121 persist: EventPersistence,
2122 ) {
2123 dbtx.ensure_global()
2124 .expect("Must be called with global dbtx");
2125 self.log_event_raw_dbtx(
2126 dbtx,
2127 kind,
2128 module_kind.map(|kind| (kind, module_id)),
2129 serde_json::to_vec(&payload).expect("Serialization can't fail"),
2130 persist,
2131 )
2132 .await;
2133 }
2134
2135 async fn read_operation_active_states<'dbtx>(
2136 &self,
2137 operation_id: OperationId,
2138 module_id: ModuleInstanceId,
2139 dbtx: &'dbtx mut DatabaseTransaction<'_>,
2140 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>
2141 {
2142 Box::pin(
2143 dbtx.find_by_prefix(&ActiveModuleOperationStateKeyPrefix {
2144 operation_id,
2145 module_instance: module_id,
2146 })
2147 .await
2148 .map(move |(k, v)| (k.0, v)),
2149 )
2150 }
2151 async fn read_operation_inactive_states<'dbtx>(
2152 &self,
2153 operation_id: OperationId,
2154 module_id: ModuleInstanceId,
2155 dbtx: &'dbtx mut DatabaseTransaction<'_>,
2156 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>
2157 {
2158 Box::pin(
2159 dbtx.find_by_prefix(&InactiveModuleOperationStateKeyPrefix {
2160 operation_id,
2161 module_instance: module_id,
2162 })
2163 .await
2164 .map(move |(k, v)| (k.0, v)),
2165 )
2166 }
2167}
2168
2169impl fmt::Debug for Client {
2171 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2172 write!(f, "Client")
2173 }
2174}
2175
2176pub fn client_decoders<'a>(
2177 registry: &ModuleInitRegistry<DynClientModuleInit>,
2178 module_kinds: impl Iterator<Item = (ModuleInstanceId, &'a ModuleKind)>,
2179) -> ModuleDecoderRegistry {
2180 let mut modules = BTreeMap::new();
2181 for (id, kind) in module_kinds {
2182 let Some(init) = registry.get(kind) else {
2183 debug!("Detected configuration for unsupported module id: {id}, kind: {kind}");
2184 continue;
2185 };
2186
2187 modules.insert(
2188 id,
2189 (
2190 kind.clone(),
2191 IClientModuleInit::decoder(AsRef::<dyn IClientModuleInit + 'static>::as_ref(init)),
2192 ),
2193 );
2194 }
2195 ModuleDecoderRegistry::from(modules)
2196}