1use std::collections::{BTreeMap, HashSet};
2use std::fmt::{self, Formatter};
3use std::future::{Future, pending};
4use std::ops::Range;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9use anyhow::{Context as _, anyhow, bail, format_err};
10use async_stream::try_stream;
11use bitcoin::key::Secp256k1;
12use bitcoin::key::rand::thread_rng;
13use bitcoin::secp256k1::{self, PublicKey};
14use fedimint_api_client::api::global_api::with_request_hook::ApiRequestHook;
15use fedimint_api_client::api::net::ConnectorType;
16use fedimint_api_client::api::{
17 ApiVersionSet, DynGlobalApi, FederationApiExt as _, FederationResult, IGlobalFederationApi,
18};
19use fedimint_client_module::module::recovery::RecoveryProgress;
20use fedimint_client_module::module::{
21 ClientContextIface, ClientModule, ClientModuleRegistry, DynClientModule, FinalClientIface,
22 IClientModule, IdxRange, OutPointRange, PrimaryModulePriority,
23};
24use fedimint_client_module::oplog::IOperationLog;
25use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy as _};
26use fedimint_client_module::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
27use fedimint_client_module::sm::{ActiveStateMeta, DynState, InactiveStateMeta};
28use fedimint_client_module::transaction::{
29 TRANSACTION_SUBMISSION_MODULE_INSTANCE, TransactionBuilder, TxSubmissionStates,
30 TxSubmissionStatesSM,
31};
32use fedimint_client_module::{
33 AddStateMachinesResult, ClientModuleInstance, GetInviteCodeRequest, ModuleGlobalContextGen,
34 ModuleRecoveryCompleted, TransactionUpdates, TxCreatedEvent,
35};
36use fedimint_connectors::ConnectorRegistry;
37use fedimint_core::config::{
38 ClientConfig, FederationId, GlobalClientConfig, JsonClientConfig, ModuleInitRegistry,
39};
40use fedimint_core::core::{DynInput, DynOutput, ModuleInstanceId, ModuleKind, OperationId};
41use fedimint_core::db::{
42 AutocommitError, Database, DatabaseRecord, DatabaseTransaction,
43 IDatabaseTransactionOpsCore as _, IDatabaseTransactionOpsCoreTyped as _, NonCommittable,
44};
45use fedimint_core::encoding::{Decodable, Encodable};
46use fedimint_core::endpoint_constants::{CLIENT_CONFIG_ENDPOINT, VERSION_ENDPOINT};
47use fedimint_core::envs::is_running_in_test_env;
48use fedimint_core::invite_code::InviteCode;
49use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
50use fedimint_core::module::{
51 AmountUnit, Amounts, ApiRequestErased, ApiVersion, MultiApiVersion,
52 SupportedApiVersionsSummary, SupportedCoreApiVersions, SupportedModuleApiVersions,
53};
54use fedimint_core::net::api_announcement::SignedApiAnnouncement;
55use fedimint_core::runtime::sleep;
56use fedimint_core::task::{Elapsed, MaybeSend, MaybeSync, TaskGroup};
57use fedimint_core::transaction::Transaction;
58use fedimint_core::util::backoff_util::custom_backoff;
59use fedimint_core::util::{
60 BoxStream, FmtCompact as _, FmtCompactAnyhow as _, SafeUrl, backoff_util, retry,
61};
62use fedimint_core::{
63 Amount, NumPeers, OutPoint, PeerId, apply, async_trait_maybe_send, maybe_add_send,
64 maybe_add_send_sync, runtime,
65};
66use fedimint_derive_secret::DerivableSecret;
67use fedimint_eventlog::{
68 DBTransactionEventLogExt as _, DynEventLogTrimableTracker, Event, EventKind, EventLogEntry,
69 EventLogId, EventLogTrimableId, EventLogTrimableTracker, EventPersistence, PersistedLogEntry,
70};
71use fedimint_logging::{LOG_CLIENT, LOG_CLIENT_NET_API, LOG_CLIENT_RECOVERY};
72use futures::stream::FuturesUnordered;
73use futures::{Stream, StreamExt as _};
74use global_ctx::ModuleGlobalClientContext;
75use serde::{Deserialize, Serialize};
76use tokio::sync::{broadcast, watch};
77use tokio_stream::wrappers::WatchStream;
78use tracing::{debug, info, warn};
79
80use crate::ClientBuilder;
81use crate::api_announcements::{ApiAnnouncementPrefix, get_api_urls};
82use crate::backup::Metadata;
83use crate::client::event_log::DefaultApplicationEventLogKey;
84use crate::db::{
85 ApiSecretKey, CachedApiVersionSet, CachedApiVersionSetKey, ChronologicalOperationLogKey,
86 ClientConfigKey, ClientMetadataKey, ClientModuleRecovery, ClientModuleRecoveryState,
87 EncodedClientSecretKey, OperationLogKey, PeerLastApiVersionsSummary,
88 PeerLastApiVersionsSummaryKey, PendingClientConfigKey, apply_migrations_core_client_dbtx,
89 get_decoded_client_secret, verify_client_db_integrity_dbtx,
90};
91use crate::meta::MetaService;
92use crate::module_init::{ClientModuleInitRegistry, DynClientModuleInit, IClientModuleInit};
93use crate::oplog::OperationLog;
94use crate::sm::executor::{
95 ActiveModuleOperationStateKeyPrefix, ActiveOperationStateKeyPrefix, Executor,
96 InactiveModuleOperationStateKeyPrefix, InactiveOperationStateKeyPrefix,
97};
98
99pub(crate) mod builder;
100pub(crate) mod event_log;
101pub(crate) mod global_ctx;
102pub(crate) mod handle;
103
104const SUPPORTED_CORE_API_VERSIONS: &[fedimint_core::module::ApiVersion] =
108 &[ApiVersion { major: 0, minor: 0 }];
109
110#[derive(Default)]
112pub(crate) struct PrimaryModuleCandidates {
113 specific: BTreeMap<AmountUnit, Vec<ModuleInstanceId>>,
115 wildcard: Vec<ModuleInstanceId>,
117}
118
119pub struct Client {
133 final_client: FinalClientIface,
134 config: tokio::sync::RwLock<ClientConfig>,
135 api_secret: Option<String>,
136 decoders: ModuleDecoderRegistry,
137 connectors: ConnectorRegistry,
138 db: Database,
139 federation_id: FederationId,
140 federation_config_meta: BTreeMap<String, String>,
141 primary_modules: BTreeMap<PrimaryModulePriority, PrimaryModuleCandidates>,
142 pub(crate) modules: ClientModuleRegistry,
143 module_inits: ClientModuleInitRegistry,
144 executor: Executor,
145 pub(crate) api: DynGlobalApi,
146 root_secret: DerivableSecret,
147 operation_log: OperationLog,
148 secp_ctx: Secp256k1<secp256k1::All>,
149 meta_service: Arc<MetaService>,
150 connector: ConnectorType,
151
152 task_group: TaskGroup,
153
154 client_recovery_progress_receiver:
156 watch::Receiver<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
157
158 log_ordering_wakeup_tx: watch::Sender<()>,
161 log_event_added_rx: watch::Receiver<()>,
163 log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
164 request_hook: ApiRequestHook,
165 iroh_enable_dht: bool,
166 iroh_enable_next: bool,
167}
168
169#[derive(Debug, Serialize, Deserialize)]
170struct ListOperationsParams {
171 limit: Option<usize>,
172 last_seen: Option<ChronologicalOperationLogKey>,
173}
174
175#[derive(Debug, Clone, Serialize, Deserialize)]
176pub struct GetOperationIdRequest {
177 operation_id: OperationId,
178}
179
180#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct GetBalanceChangesRequest {
182 #[serde(default = "AmountUnit::bitcoin")]
183 unit: AmountUnit,
184}
185
186impl Client {
187 pub async fn builder() -> anyhow::Result<ClientBuilder> {
190 Ok(ClientBuilder::new())
191 }
192
193 pub fn api(&self) -> &(dyn IGlobalFederationApi + 'static) {
194 self.api.as_ref()
195 }
196
197 pub fn api_clone(&self) -> DynGlobalApi {
198 self.api.clone()
199 }
200
201 pub fn connection_status_stream(&self) -> impl Stream<Item = BTreeMap<PeerId, bool>> {
204 self.api.connection_status_stream()
205 }
206
207 pub fn task_group(&self) -> &TaskGroup {
209 &self.task_group
210 }
211
212 #[doc(hidden)]
214 pub fn executor(&self) -> &Executor {
215 &self.executor
216 }
217
218 pub async fn get_config_from_db(db: &Database) -> Option<ClientConfig> {
219 let mut dbtx = db.begin_transaction_nc().await;
220 dbtx.get_value(&ClientConfigKey).await
221 }
222
223 pub async fn get_pending_config_from_db(db: &Database) -> Option<ClientConfig> {
224 let mut dbtx = db.begin_transaction_nc().await;
225 dbtx.get_value(&PendingClientConfigKey).await
226 }
227
228 pub async fn get_api_secret_from_db(db: &Database) -> Option<String> {
229 let mut dbtx = db.begin_transaction_nc().await;
230 dbtx.get_value(&ApiSecretKey).await
231 }
232
233 pub async fn store_encodable_client_secret<T: Encodable>(
234 db: &Database,
235 secret: T,
236 ) -> anyhow::Result<()> {
237 let mut dbtx = db.begin_transaction().await;
238
239 if dbtx.get_value(&EncodedClientSecretKey).await.is_some() {
241 bail!("Encoded client secret already exists, cannot overwrite")
242 }
243
244 let encoded_secret = T::consensus_encode_to_vec(&secret);
245 dbtx.insert_entry(&EncodedClientSecretKey, &encoded_secret)
246 .await;
247 dbtx.commit_tx().await;
248 Ok(())
249 }
250
251 pub async fn load_decodable_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
252 let Some(secret) = Self::load_decodable_client_secret_opt(db).await? else {
253 bail!("Encoded client secret not present in DB")
254 };
255
256 Ok(secret)
257 }
258 pub async fn load_decodable_client_secret_opt<T: Decodable>(
259 db: &Database,
260 ) -> anyhow::Result<Option<T>> {
261 let mut dbtx = db.begin_transaction_nc().await;
262
263 let client_secret = dbtx.get_value(&EncodedClientSecretKey).await;
264
265 Ok(match client_secret {
266 Some(client_secret) => Some(
267 T::consensus_decode_whole(&client_secret, &ModuleRegistry::default())
268 .map_err(|e| anyhow!("Decoding failed: {e}"))?,
269 ),
270 None => None,
271 })
272 }
273
274 pub async fn load_or_generate_client_secret(db: &Database) -> anyhow::Result<[u8; 64]> {
275 let client_secret = match Self::load_decodable_client_secret::<[u8; 64]>(db).await {
276 Ok(secret) => secret,
277 _ => {
278 let secret = PlainRootSecretStrategy::random(&mut thread_rng());
279 Self::store_encodable_client_secret(db, secret)
280 .await
281 .expect("Storing client secret must work");
282 secret
283 }
284 };
285 Ok(client_secret)
286 }
287
288 pub async fn is_initialized(db: &Database) -> bool {
289 let mut dbtx = db.begin_transaction_nc().await;
290 dbtx.raw_get_bytes(&[ClientConfigKey::DB_PREFIX])
291 .await
292 .expect("Unrecoverable error occurred while reading and entry from the database")
293 .is_some()
294 }
295
296 pub fn start_executor(self: &Arc<Self>) {
297 debug!(
298 target: LOG_CLIENT,
299 "Starting fedimint client executor",
300 );
301 self.executor.start_executor(self.context_gen());
302 }
303
304 pub fn federation_id(&self) -> FederationId {
305 self.federation_id
306 }
307
308 fn context_gen(self: &Arc<Self>) -> ModuleGlobalContextGen {
309 let client_inner = Arc::downgrade(self);
310 Arc::new(move |module_instance, operation| {
311 ModuleGlobalClientContext {
312 client: client_inner
313 .clone()
314 .upgrade()
315 .expect("ModuleGlobalContextGen called after client was dropped"),
316 module_instance_id: module_instance,
317 operation,
318 }
319 .into()
320 })
321 }
322
323 pub async fn config(&self) -> ClientConfig {
324 self.config.read().await.clone()
325 }
326
327 pub fn api_secret(&self) -> &Option<String> {
329 &self.api_secret
330 }
331
332 pub async fn core_api_version(&self) -> ApiVersion {
338 self.db
341 .begin_transaction_nc()
342 .await
343 .get_value(&CachedApiVersionSetKey)
344 .await
345 .map(|cached: CachedApiVersionSet| cached.0.core)
346 .unwrap_or(ApiVersion { major: 0, minor: 0 })
347 }
348
349 pub fn decoders(&self) -> &ModuleDecoderRegistry {
350 &self.decoders
351 }
352
353 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
355 self.try_get_module(instance)
356 .expect("Module instance not found")
357 }
358
359 fn try_get_module(
360 &self,
361 instance: ModuleInstanceId,
362 ) -> Option<&maybe_add_send_sync!(dyn IClientModule)> {
363 Some(self.modules.get(instance)?.as_ref())
364 }
365
366 pub fn has_module(&self, instance: ModuleInstanceId) -> bool {
367 self.modules.get(instance).is_some()
368 }
369
370 fn transaction_builder_get_balance(&self, builder: &TransactionBuilder) -> (Amounts, Amounts) {
376 let mut in_amounts = Amounts::ZERO;
378 let mut out_amounts = Amounts::ZERO;
379 let mut fee_amounts = Amounts::ZERO;
380
381 for input in builder.inputs() {
382 let module = self.get_module(input.input.module_instance_id());
383
384 let item_fees = module.input_fee(&input.amounts, &input.input).expect(
385 "We only build transactions with input versions that are supported by the module",
386 );
387
388 in_amounts.checked_add_mut(&input.amounts);
389 fee_amounts.checked_add_mut(&item_fees);
390 }
391
392 for output in builder.outputs() {
393 let module = self.get_module(output.output.module_instance_id());
394
395 let item_fees = module.output_fee(&output.amounts, &output.output).expect(
396 "We only build transactions with output versions that are supported by the module",
397 );
398
399 out_amounts.checked_add_mut(&output.amounts);
400 fee_amounts.checked_add_mut(&item_fees);
401 }
402
403 out_amounts.checked_add_mut(&fee_amounts);
404 (in_amounts, out_amounts)
405 }
406
407 pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
408 Ok((self.federation_id().to_fake_ln_pub_key(&self.secp_ctx)?, 0))
409 }
410
411 pub fn get_config_meta(&self, key: &str) -> Option<String> {
413 self.federation_config_meta.get(key).cloned()
414 }
415
416 pub(crate) fn root_secret(&self) -> DerivableSecret {
417 self.root_secret.clone()
418 }
419
420 pub async fn add_state_machines(
421 &self,
422 dbtx: &mut DatabaseTransaction<'_>,
423 states: Vec<DynState>,
424 ) -> AddStateMachinesResult {
425 self.executor.add_state_machines_dbtx(dbtx, states).await
426 }
427
428 pub async fn get_active_operations(&self) -> HashSet<OperationId> {
430 let active_states = self.executor.get_active_states().await;
431 let mut active_operations = HashSet::with_capacity(active_states.len());
432 let mut dbtx = self.db().begin_transaction_nc().await;
433 for (state, _) in active_states {
434 let operation_id = state.operation_id();
435 if dbtx
436 .get_value(&OperationLogKey { operation_id })
437 .await
438 .is_some()
439 {
440 active_operations.insert(operation_id);
441 }
442 }
443 active_operations
444 }
445
446 pub fn operation_log(&self) -> &OperationLog {
447 &self.operation_log
448 }
449
450 pub fn meta_service(&self) -> &Arc<MetaService> {
452 &self.meta_service
453 }
454
455 pub async fn get_meta_expiration_timestamp(&self) -> Option<SystemTime> {
457 let meta_service = self.meta_service();
458 let ts = meta_service
459 .get_field::<u64>(self.db(), "federation_expiry_timestamp")
460 .await
461 .and_then(|v| v.value)?;
462 Some(UNIX_EPOCH + Duration::from_secs(ts))
463 }
464
465 async fn finalize_transaction(
467 &self,
468 dbtx: &mut DatabaseTransaction<'_>,
469 operation_id: OperationId,
470 mut partial_transaction: TransactionBuilder,
471 ) -> anyhow::Result<(Transaction, Vec<DynState>, Range<u64>)> {
472 let (in_amounts, out_amounts) = self.transaction_builder_get_balance(&partial_transaction);
473
474 let mut added_inputs_bundles = vec![];
475 let mut added_outputs_bundles = vec![];
476
477 for unit in in_amounts.units().union(&out_amounts.units()) {
488 let input_amount = in_amounts.get(unit).copied().unwrap_or_default();
489 let output_amount = out_amounts.get(unit).copied().unwrap_or_default();
490 if input_amount == output_amount {
491 continue;
492 }
493
494 let Some((module_id, module)) = self.primary_module_for_unit(*unit) else {
495 bail!("No module to balance a partial transaction (affected unit: {unit:?}");
496 };
497
498 let (added_input_bundle, added_output_bundle) = module
499 .create_final_inputs_and_outputs(
500 module_id,
501 dbtx,
502 operation_id,
503 *unit,
504 input_amount,
505 output_amount,
506 )
507 .await?;
508
509 added_inputs_bundles.push(added_input_bundle);
510 added_outputs_bundles.push(added_output_bundle);
511 }
512
513 let change_range = Range {
517 start: partial_transaction.outputs().count() as u64,
518 end: (partial_transaction.outputs().count() as u64
519 + added_outputs_bundles
520 .iter()
521 .map(|output| output.outputs().len() as u64)
522 .sum::<u64>()),
523 };
524
525 for added_inputs in added_inputs_bundles {
526 partial_transaction = partial_transaction.with_inputs(added_inputs);
527 }
528
529 for added_outputs in added_outputs_bundles {
530 partial_transaction = partial_transaction.with_outputs(added_outputs);
531 }
532
533 let (input_amounts, output_amounts) =
534 self.transaction_builder_get_balance(&partial_transaction);
535
536 for (unit, output_amount) in output_amounts {
537 let input_amount = input_amounts.get(&unit).copied().unwrap_or_default();
538
539 assert!(input_amount >= output_amount, "Transaction is underfunded");
540 }
541
542 let (tx, states) = partial_transaction.build(&self.secp_ctx, thread_rng());
543
544 Ok((tx, states, change_range))
545 }
546
547 pub async fn finalize_and_submit_transaction<F, M>(
559 &self,
560 operation_id: OperationId,
561 operation_type: &str,
562 operation_meta_gen: F,
563 tx_builder: TransactionBuilder,
564 ) -> anyhow::Result<OutPointRange>
565 where
566 F: Fn(OutPointRange) -> M + Clone + MaybeSend + MaybeSync,
567 M: serde::Serialize + MaybeSend,
568 {
569 let operation_type = operation_type.to_owned();
570
571 let autocommit_res = self
572 .db
573 .autocommit(
574 |dbtx, _| {
575 let operation_type = operation_type.clone();
576 let tx_builder = tx_builder.clone();
577 let operation_meta_gen = operation_meta_gen.clone();
578 Box::pin(async move {
579 if Client::operation_exists_dbtx(dbtx, operation_id).await {
580 bail!("There already exists an operation with id {operation_id:?}")
581 }
582
583 let out_point_range = self
584 .finalize_and_submit_transaction_inner(dbtx, operation_id, tx_builder)
585 .await?;
586
587 self.operation_log()
588 .add_operation_log_entry_dbtx(
589 dbtx,
590 operation_id,
591 &operation_type,
592 operation_meta_gen(out_point_range),
593 )
594 .await;
595
596 Ok(out_point_range)
597 })
598 },
599 Some(100), )
601 .await;
602
603 match autocommit_res {
604 Ok(txid) => Ok(txid),
605 Err(AutocommitError::ClosureError { error, .. }) => Err(error),
606 Err(AutocommitError::CommitFailed {
607 attempts,
608 last_error,
609 }) => panic!(
610 "Failed to commit tx submission dbtx after {attempts} attempts: {last_error}"
611 ),
612 }
613 }
614
615 async fn finalize_and_submit_transaction_inner(
616 &self,
617 dbtx: &mut DatabaseTransaction<'_>,
618 operation_id: OperationId,
619 tx_builder: TransactionBuilder,
620 ) -> anyhow::Result<OutPointRange> {
621 let (transaction, mut states, change_range) = self
622 .finalize_transaction(&mut dbtx.to_ref_nc(), operation_id, tx_builder)
623 .await?;
624
625 if transaction.consensus_encode_to_vec().len() > Transaction::MAX_TX_SIZE {
626 let inputs = transaction
627 .inputs
628 .iter()
629 .map(DynInput::module_instance_id)
630 .collect::<Vec<_>>();
631 let outputs = transaction
632 .outputs
633 .iter()
634 .map(DynOutput::module_instance_id)
635 .collect::<Vec<_>>();
636 warn!(
637 target: LOG_CLIENT_NET_API,
638 size=%transaction.consensus_encode_to_vec().len(),
639 ?inputs,
640 ?outputs,
641 "Transaction too large",
642 );
643 debug!(target: LOG_CLIENT_NET_API, ?transaction, "transaction details");
644 bail!(
645 "The generated transaction would be rejected by the federation for being too large."
646 );
647 }
648
649 let txid = transaction.tx_hash();
650
651 debug!(target: LOG_CLIENT_NET_API, %txid, ?transaction, "Finalized and submitting transaction");
652
653 let tx_submission_sm = DynState::from_typed(
654 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
655 TxSubmissionStatesSM {
656 operation_id,
657 state: TxSubmissionStates::Created(transaction),
658 },
659 );
660 states.push(tx_submission_sm);
661
662 self.executor.add_state_machines_dbtx(dbtx, states).await?;
663
664 self.log_event_dbtx(dbtx, None, TxCreatedEvent { txid, operation_id })
665 .await;
666
667 Ok(OutPointRange::new(txid, IdxRange::from(change_range)))
668 }
669
670 async fn transaction_update_stream(
671 &self,
672 operation_id: OperationId,
673 ) -> BoxStream<'static, TxSubmissionStatesSM> {
674 self.executor
675 .notifier()
676 .module_notifier::<TxSubmissionStatesSM>(
677 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
678 self.final_client.clone(),
679 )
680 .subscribe(operation_id)
681 .await
682 }
683
684 pub async fn operation_exists(&self, operation_id: OperationId) -> bool {
685 let mut dbtx = self.db().begin_transaction_nc().await;
686
687 Client::operation_exists_dbtx(&mut dbtx, operation_id).await
688 }
689
690 pub async fn operation_exists_dbtx(
691 dbtx: &mut DatabaseTransaction<'_>,
692 operation_id: OperationId,
693 ) -> bool {
694 let active_state_exists = dbtx
695 .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
696 .await
697 .next()
698 .await
699 .is_some();
700
701 let inactive_state_exists = dbtx
702 .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
703 .await
704 .next()
705 .await
706 .is_some();
707
708 active_state_exists || inactive_state_exists
709 }
710
711 pub async fn has_active_states(&self, operation_id: OperationId) -> bool {
712 self.db
713 .begin_transaction_nc()
714 .await
715 .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
716 .await
717 .next()
718 .await
719 .is_some()
720 }
721
722 pub async fn await_primary_bitcoin_module_output(
725 &self,
726 operation_id: OperationId,
727 out_point: OutPoint,
728 ) -> anyhow::Result<()> {
729 self.primary_module_for_unit(AmountUnit::BITCOIN)
730 .ok_or_else(|| anyhow!("No primary module available"))?
731 .1
732 .await_primary_module_output(operation_id, out_point)
733 .await
734 }
735
736 pub fn get_first_module<M: ClientModule>(
738 &'_ self,
739 ) -> anyhow::Result<ClientModuleInstance<'_, M>> {
740 let module_kind = M::kind();
741 let id = self
742 .get_first_instance(&module_kind)
743 .ok_or_else(|| format_err!("No modules found of kind {module_kind}"))?;
744 let module: &M = self
745 .try_get_module(id)
746 .ok_or_else(|| format_err!("Unknown module instance {id}"))?
747 .as_any()
748 .downcast_ref::<M>()
749 .ok_or_else(|| format_err!("Module is not of type {}", std::any::type_name::<M>()))?;
750 let (db, _) = self.db().with_prefix_module_id(id);
751 Ok(ClientModuleInstance {
752 id,
753 db,
754 api: self.api().with_module(id),
755 module,
756 })
757 }
758
759 pub fn get_module_client_dyn(
760 &self,
761 instance_id: ModuleInstanceId,
762 ) -> anyhow::Result<&maybe_add_send_sync!(dyn IClientModule)> {
763 self.try_get_module(instance_id)
764 .ok_or(anyhow!("Unknown module instance {}", instance_id))
765 }
766
767 pub fn db(&self) -> &Database {
768 &self.db
769 }
770
771 pub fn endpoints(&self) -> &ConnectorRegistry {
772 &self.connectors
773 }
774
775 pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
778 TransactionUpdates {
779 update_stream: self.transaction_update_stream(operation_id).await,
780 }
781 }
782
783 pub fn get_first_instance(&self, module_kind: &ModuleKind) -> Option<ModuleInstanceId> {
785 self.modules
786 .iter_modules()
787 .find(|(_, kind, _module)| *kind == module_kind)
788 .map(|(instance_id, _, _)| instance_id)
789 }
790
791 pub async fn root_secret_encoding<T: Decodable>(&self) -> anyhow::Result<T> {
794 get_decoded_client_secret::<T>(self.db()).await
795 }
796
797 pub async fn await_primary_bitcoin_module_outputs(
800 &self,
801 operation_id: OperationId,
802 outputs: Vec<OutPoint>,
803 ) -> anyhow::Result<()> {
804 for out_point in outputs {
805 self.await_primary_bitcoin_module_output(operation_id, out_point)
806 .await?;
807 }
808
809 Ok(())
810 }
811
812 pub async fn get_config_json(&self) -> JsonClientConfig {
818 self.config().await.to_json()
819 }
820
821 #[doc(hidden)]
824 pub async fn get_balance_for_btc(&self) -> anyhow::Result<Amount> {
827 self.get_balance_for_unit(AmountUnit::BITCOIN).await
828 }
829
830 pub async fn get_balance_for_unit(&self, unit: AmountUnit) -> anyhow::Result<Amount> {
831 let (id, module) = self
832 .primary_module_for_unit(unit)
833 .ok_or_else(|| anyhow!("Primary module not available"))?;
834 Ok(module
835 .get_balance(id, &mut self.db().begin_transaction_nc().await, unit)
836 .await)
837 }
838
839 pub async fn subscribe_balance_changes(&self, unit: AmountUnit) -> BoxStream<'static, Amount> {
842 let primary_module_things =
843 if let Some((primary_module_id, primary_module)) = self.primary_module_for_unit(unit) {
844 let balance_changes = primary_module.subscribe_balance_changes().await;
845 let initial_balance = self
846 .get_balance_for_unit(unit)
847 .await
848 .expect("Primary is present");
849
850 Some((
851 primary_module_id,
852 primary_module.clone(),
853 balance_changes,
854 initial_balance,
855 ))
856 } else {
857 None
858 };
859 let db = self.db().clone();
860
861 Box::pin(async_stream::stream! {
862 let Some((primary_module_id, primary_module, mut balance_changes, initial_balance)) = primary_module_things else {
863 pending().await
866 };
867
868
869 yield initial_balance;
870 let mut prev_balance = initial_balance;
871 while let Some(()) = balance_changes.next().await {
872 let mut dbtx = db.begin_transaction_nc().await;
873 let balance = primary_module
874 .get_balance(primary_module_id, &mut dbtx, unit)
875 .await;
876
877 if balance != prev_balance {
879 prev_balance = balance;
880 yield balance;
881 }
882 }
883 })
884 }
885
886 async fn make_api_version_request(
891 delay: Duration,
892 peer_id: PeerId,
893 api: &DynGlobalApi,
894 ) -> (
895 PeerId,
896 Result<SupportedApiVersionsSummary, fedimint_connectors::error::ServerError>,
897 ) {
898 runtime::sleep(delay).await;
899 (
900 peer_id,
901 api.request_single_peer::<SupportedApiVersionsSummary>(
902 VERSION_ENDPOINT.to_owned(),
903 ApiRequestErased::default(),
904 peer_id,
905 )
906 .await,
907 )
908 }
909
910 fn create_api_version_backoff() -> impl Iterator<Item = Duration> {
916 custom_backoff(Duration::from_millis(200), Duration::from_secs(600), None)
917 }
918
919 pub async fn fetch_common_api_versions_from_all_peers(
922 num_peers: NumPeers,
923 api: DynGlobalApi,
924 db: Database,
925 num_responses_sender: watch::Sender<usize>,
926 ) {
927 let mut backoff = Self::create_api_version_backoff();
928
929 let mut requests = FuturesUnordered::new();
932
933 for peer_id in num_peers.peer_ids() {
934 requests.push(Self::make_api_version_request(
935 Duration::ZERO,
936 peer_id,
937 &api,
938 ));
939 }
940
941 let mut num_responses = 0;
942
943 while let Some((peer_id, response)) = requests.next().await {
944 let retry = match response {
945 Err(err) => {
946 let has_previous_response = db
947 .begin_transaction_nc()
948 .await
949 .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
950 .await
951 .is_some();
952 debug!(
953 target: LOG_CLIENT,
954 %peer_id,
955 err = %err.fmt_compact(),
956 %has_previous_response,
957 "Failed to refresh API versions of a peer"
958 );
959
960 !has_previous_response
961 }
962 Ok(o) => {
963 let mut dbtx = db.begin_transaction().await;
966 dbtx.insert_entry(
967 &PeerLastApiVersionsSummaryKey(peer_id),
968 &PeerLastApiVersionsSummary(o),
969 )
970 .await;
971 dbtx.commit_tx().await;
972 false
973 }
974 };
975
976 if retry {
977 requests.push(Self::make_api_version_request(
978 backoff.next().expect("Keeps retrying"),
979 peer_id,
980 &api,
981 ));
982 } else {
983 num_responses += 1;
984 num_responses_sender.send_replace(num_responses);
985 }
986 }
987 }
988
989 pub async fn fetch_peers_api_versions_from_threshold_of_peers(
993 num_peers: NumPeers,
994 api: DynGlobalApi,
995 ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
996 let mut backoff = Self::create_api_version_backoff();
997
998 let mut requests = FuturesUnordered::new();
1001
1002 for peer_id in num_peers.peer_ids() {
1003 requests.push(Self::make_api_version_request(
1004 Duration::ZERO,
1005 peer_id,
1006 &api,
1007 ));
1008 }
1009
1010 let mut successful_responses = BTreeMap::new();
1011
1012 while successful_responses.len() < num_peers.threshold()
1013 && let Some((peer_id, response)) = requests.next().await
1014 {
1015 let retry = match response {
1016 Err(err) => {
1017 debug!(
1018 target: LOG_CLIENT,
1019 %peer_id,
1020 err = %err.fmt_compact(),
1021 "Failed to fetch API versions from peer"
1022 );
1023 true
1024 }
1025 Ok(response) => {
1026 successful_responses.insert(peer_id, response);
1027 false
1028 }
1029 };
1030
1031 if retry {
1032 requests.push(Self::make_api_version_request(
1033 backoff.next().expect("Keeps retrying"),
1034 peer_id,
1035 &api,
1036 ));
1037 }
1038 }
1039
1040 successful_responses
1041 }
1042
1043 pub async fn fetch_common_api_versions(
1045 config: &ClientConfig,
1046 api: &DynGlobalApi,
1047 ) -> anyhow::Result<BTreeMap<PeerId, SupportedApiVersionsSummary>> {
1048 debug!(
1049 target: LOG_CLIENT,
1050 "Fetching common api versions"
1051 );
1052
1053 let num_peers = NumPeers::from(config.global.api_endpoints.len());
1054
1055 let peer_api_version_sets =
1056 Self::fetch_peers_api_versions_from_threshold_of_peers(num_peers, api.clone()).await;
1057
1058 Ok(peer_api_version_sets)
1059 }
1060
1061 pub async fn write_api_version_cache(
1065 dbtx: &mut DatabaseTransaction<'_>,
1066 api_version_set: ApiVersionSet,
1067 ) {
1068 debug!(
1069 target: LOG_CLIENT,
1070 value = ?api_version_set,
1071 "Writing API version set to cache"
1072 );
1073
1074 dbtx.insert_entry(
1075 &CachedApiVersionSetKey,
1076 &CachedApiVersionSet(api_version_set),
1077 )
1078 .await;
1079 }
1080
1081 pub async fn store_prefetched_api_versions(
1086 db: &Database,
1087 config: &ClientConfig,
1088 client_module_init: &ClientModuleInitRegistry,
1089 peer_api_versions: &BTreeMap<PeerId, SupportedApiVersionsSummary>,
1090 ) {
1091 debug!(
1092 target: LOG_CLIENT,
1093 "Storing {} prefetched peer API version responses and calculating common version set",
1094 peer_api_versions.len()
1095 );
1096
1097 let mut dbtx = db.begin_transaction().await;
1098 let client_supported_versions =
1100 Self::supported_api_versions_summary_static(config, client_module_init);
1101 match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1102 &client_supported_versions,
1103 peer_api_versions,
1104 ) {
1105 Ok(common_api_versions) => {
1106 Self::write_api_version_cache(&mut dbtx.to_ref_nc(), common_api_versions).await;
1108 debug!(target: LOG_CLIENT, "Calculated and stored common API version set");
1109 }
1110 Err(err) => {
1111 debug!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Failed to calculate common API versions from prefetched data");
1112 }
1113 }
1114
1115 for (peer_id, peer_api_versions) in peer_api_versions {
1117 dbtx.insert_entry(
1118 &PeerLastApiVersionsSummaryKey(*peer_id),
1119 &PeerLastApiVersionsSummary(peer_api_versions.clone()),
1120 )
1121 .await;
1122 }
1123 dbtx.commit_tx().await;
1124 debug!(target: LOG_CLIENT, "Stored individual peer API version responses");
1125 }
1126
1127 pub fn supported_api_versions_summary_static(
1129 config: &ClientConfig,
1130 client_module_init: &ClientModuleInitRegistry,
1131 ) -> SupportedApiVersionsSummary {
1132 SupportedApiVersionsSummary {
1133 core: SupportedCoreApiVersions {
1134 core_consensus: config.global.consensus_version,
1135 api: MultiApiVersion::try_from_iter(SUPPORTED_CORE_API_VERSIONS.to_owned())
1136 .expect("must not have conflicting versions"),
1137 },
1138 modules: config
1139 .modules
1140 .iter()
1141 .filter_map(|(&module_instance_id, module_config)| {
1142 client_module_init
1143 .get(module_config.kind())
1144 .map(|module_init| {
1145 (
1146 module_instance_id,
1147 SupportedModuleApiVersions {
1148 core_consensus: config.global.consensus_version,
1149 module_consensus: module_config.version,
1150 api: module_init.supported_api_versions(),
1151 },
1152 )
1153 })
1154 })
1155 .collect(),
1156 }
1157 }
1158
1159 pub async fn load_and_refresh_common_api_version(&self) -> anyhow::Result<ApiVersionSet> {
1160 Self::load_and_refresh_common_api_version_static(
1161 &self.config().await,
1162 &self.module_inits,
1163 self.connectors.clone(),
1164 &self.api,
1165 &self.db,
1166 &self.task_group,
1167 )
1168 .await
1169 }
1170
1171 async fn load_and_refresh_common_api_version_static(
1177 config: &ClientConfig,
1178 module_init: &ClientModuleInitRegistry,
1179 connectors: ConnectorRegistry,
1180 api: &DynGlobalApi,
1181 db: &Database,
1182 task_group: &TaskGroup,
1183 ) -> anyhow::Result<ApiVersionSet> {
1184 if let Some(v) = db
1185 .begin_transaction_nc()
1186 .await
1187 .get_value(&CachedApiVersionSetKey)
1188 .await
1189 {
1190 debug!(
1191 target: LOG_CLIENT,
1192 "Found existing cached common api versions"
1193 );
1194 let config = config.clone();
1195 let client_module_init = module_init.clone();
1196 let api = api.clone();
1197 let db = db.clone();
1198 let task_group = task_group.clone();
1199 task_group
1202 .clone()
1203 .spawn_cancellable("refresh_common_api_version_static", async move {
1204 connectors.wait_for_initialized_connections().await;
1205
1206 if let Err(error) = Self::refresh_common_api_version_static(
1207 &config,
1208 &client_module_init,
1209 &api,
1210 &db,
1211 task_group,
1212 false,
1213 )
1214 .await
1215 {
1216 warn!(
1217 target: LOG_CLIENT,
1218 err = %error.fmt_compact_anyhow(), "Failed to discover common api versions"
1219 );
1220 }
1221 });
1222
1223 return Ok(v.0);
1224 }
1225
1226 info!(
1227 target: LOG_CLIENT,
1228 "Fetching initial API versions "
1229 );
1230 Self::refresh_common_api_version_static(
1231 config,
1232 module_init,
1233 api,
1234 db,
1235 task_group.clone(),
1236 true,
1237 )
1238 .await
1239 }
1240
1241 async fn refresh_common_api_version_static(
1242 config: &ClientConfig,
1243 client_module_init: &ClientModuleInitRegistry,
1244 api: &DynGlobalApi,
1245 db: &Database,
1246 task_group: TaskGroup,
1247 block_until_ok: bool,
1248 ) -> anyhow::Result<ApiVersionSet> {
1249 debug!(
1250 target: LOG_CLIENT,
1251 "Refreshing common api versions"
1252 );
1253
1254 let (num_responses_sender, mut num_responses_receiver) = tokio::sync::watch::channel(0);
1255 let num_peers = NumPeers::from(config.global.api_endpoints.len());
1256
1257 task_group.spawn_cancellable("refresh peers api versions", {
1258 Client::fetch_common_api_versions_from_all_peers(
1259 num_peers,
1260 api.clone(),
1261 db.clone(),
1262 num_responses_sender,
1263 )
1264 });
1265
1266 let common_api_versions = loop {
1267 let _: Result<_, Elapsed> = runtime::timeout(
1275 Duration::from_secs(30),
1276 num_responses_receiver.wait_for(|num| num_peers.threshold() <= *num),
1277 )
1278 .await;
1279
1280 let peer_api_version_sets = Self::load_peers_last_api_versions(db, num_peers).await;
1281
1282 match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1283 &Self::supported_api_versions_summary_static(config, client_module_init),
1284 &peer_api_version_sets,
1285 ) {
1286 Ok(o) => break o,
1287 Err(err) if block_until_ok => {
1288 warn!(
1289 target: LOG_CLIENT,
1290 err = %err.fmt_compact_anyhow(),
1291 "Failed to discover API version to use. Retrying..."
1292 );
1293 continue;
1294 }
1295 Err(e) => return Err(e),
1296 }
1297 };
1298
1299 debug!(
1300 target: LOG_CLIENT,
1301 value = ?common_api_versions,
1302 "Updating the cached common api versions"
1303 );
1304 let mut dbtx = db.begin_transaction().await;
1305 let _ = dbtx
1306 .insert_entry(
1307 &CachedApiVersionSetKey,
1308 &CachedApiVersionSet(common_api_versions.clone()),
1309 )
1310 .await;
1311
1312 dbtx.commit_tx().await;
1313
1314 Ok(common_api_versions)
1315 }
1316
1317 pub async fn get_metadata(&self) -> Metadata {
1319 self.db
1320 .begin_transaction_nc()
1321 .await
1322 .get_value(&ClientMetadataKey)
1323 .await
1324 .unwrap_or_else(|| {
1325 warn!(
1326 target: LOG_CLIENT,
1327 "Missing existing metadata. This key should have been set on Client init"
1328 );
1329 Metadata::empty()
1330 })
1331 }
1332
1333 pub async fn set_metadata(&self, metadata: &Metadata) {
1335 self.db
1336 .autocommit::<_, _, anyhow::Error>(
1337 |dbtx, _| {
1338 Box::pin(async {
1339 Self::set_metadata_dbtx(dbtx, metadata).await;
1340 Ok(())
1341 })
1342 },
1343 None,
1344 )
1345 .await
1346 .expect("Failed to autocommit metadata");
1347 }
1348
1349 pub fn has_pending_recoveries(&self) -> bool {
1350 !self
1351 .client_recovery_progress_receiver
1352 .borrow()
1353 .iter()
1354 .all(|(_id, progress)| progress.is_done())
1355 }
1356
1357 pub async fn wait_for_all_recoveries(&self) -> anyhow::Result<()> {
1365 let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1366 recovery_receiver
1367 .wait_for(|in_progress| {
1368 in_progress
1369 .iter()
1370 .all(|(_id, progress)| progress.is_done())
1371 })
1372 .await
1373 .context("Recovery task completed and update receiver disconnected, but some modules failed to recover")?;
1374
1375 Ok(())
1376 }
1377
1378 pub fn subscribe_to_recovery_progress(
1383 &self,
1384 ) -> impl Stream<Item = (ModuleInstanceId, RecoveryProgress)> + use<> {
1385 WatchStream::new(self.client_recovery_progress_receiver.clone())
1386 .flat_map(futures::stream::iter)
1387 }
1388
1389 pub async fn wait_for_module_kind_recovery(
1390 &self,
1391 module_kind: ModuleKind,
1392 ) -> anyhow::Result<()> {
1393 let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1394 let config = self.config().await;
1395 recovery_receiver
1396 .wait_for(|in_progress| {
1397 !in_progress
1398 .iter()
1399 .filter(|(module_instance_id, _progress)| {
1400 config.modules[module_instance_id].kind == module_kind
1401 })
1402 .any(|(_id, progress)| !progress.is_done())
1403 })
1404 .await
1405 .context("Recovery task completed and update receiver disconnected, but the desired modules are still unavailable or failed to recover")?;
1406
1407 Ok(())
1408 }
1409
1410 pub async fn wait_for_all_active_state_machines(&self) -> anyhow::Result<()> {
1411 loop {
1412 if self.executor.get_active_states().await.is_empty() {
1413 break;
1414 }
1415 sleep(Duration::from_millis(100)).await;
1416 }
1417 Ok(())
1418 }
1419
1420 pub async fn set_metadata_dbtx(dbtx: &mut DatabaseTransaction<'_>, metadata: &Metadata) {
1422 dbtx.insert_new_entry(&ClientMetadataKey, metadata).await;
1423 }
1424
1425 fn spawn_module_recoveries_task(
1426 &self,
1427 recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1428 module_recoveries: BTreeMap<
1429 ModuleInstanceId,
1430 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1431 >,
1432 module_recovery_progress_receivers: BTreeMap<
1433 ModuleInstanceId,
1434 watch::Receiver<RecoveryProgress>,
1435 >,
1436 ) {
1437 let db = self.db.clone();
1438 let log_ordering_wakeup_tx = self.log_ordering_wakeup_tx.clone();
1439 self.task_group
1440 .spawn("module recoveries", |_task_handle| async {
1441 Self::run_module_recoveries_task(
1442 db,
1443 log_ordering_wakeup_tx,
1444 recovery_sender,
1445 module_recoveries,
1446 module_recovery_progress_receivers,
1447 )
1448 .await;
1449 });
1450 }
1451
1452 async fn run_module_recoveries_task(
1453 db: Database,
1454 log_ordering_wakeup_tx: watch::Sender<()>,
1455 recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1456 module_recoveries: BTreeMap<
1457 ModuleInstanceId,
1458 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1459 >,
1460 module_recovery_progress_receivers: BTreeMap<
1461 ModuleInstanceId,
1462 watch::Receiver<RecoveryProgress>,
1463 >,
1464 ) {
1465 debug!(target: LOG_CLIENT_RECOVERY, num_modules=%module_recovery_progress_receivers.len(), "Staring module recoveries");
1466 let mut completed_stream = Vec::new();
1467 let progress_stream = futures::stream::FuturesUnordered::new();
1468
1469 for (module_instance_id, f) in module_recoveries {
1470 completed_stream.push(futures::stream::once(Box::pin(async move {
1471 match f.await {
1472 Ok(()) => (module_instance_id, None),
1473 Err(err) => {
1474 warn!(
1475 target: LOG_CLIENT,
1476 err = %err.fmt_compact_anyhow(), module_instance_id, "Module recovery failed"
1477 );
1478 futures::future::pending::<()>().await;
1482 unreachable!()
1483 }
1484 }
1485 })));
1486 }
1487
1488 for (module_instance_id, rx) in module_recovery_progress_receivers {
1489 progress_stream.push(
1490 tokio_stream::wrappers::WatchStream::new(rx)
1491 .fuse()
1492 .map(move |progress| (module_instance_id, Some(progress))),
1493 );
1494 }
1495
1496 let mut futures = futures::stream::select(
1497 futures::stream::select_all(progress_stream),
1498 futures::stream::select_all(completed_stream),
1499 );
1500
1501 while let Some((module_instance_id, progress)) = futures.next().await {
1502 let mut dbtx = db.begin_transaction().await;
1503
1504 let prev_progress = *recovery_sender
1505 .borrow()
1506 .get(&module_instance_id)
1507 .expect("existing progress must be present");
1508
1509 let progress = if prev_progress.is_done() {
1510 prev_progress
1512 } else if let Some(progress) = progress {
1513 progress
1514 } else {
1515 prev_progress.to_complete()
1516 };
1517
1518 if !prev_progress.is_done() && progress.is_done() {
1519 info!(
1520 target: LOG_CLIENT,
1521 module_instance_id,
1522 progress = format!("{}/{}", progress.complete, progress.total),
1523 "Recovery complete"
1524 );
1525 dbtx.log_event(
1526 log_ordering_wakeup_tx.clone(),
1527 None,
1528 ModuleRecoveryCompleted {
1529 module_id: module_instance_id,
1530 },
1531 )
1532 .await;
1533 } else {
1534 info!(
1535 target: LOG_CLIENT,
1536 module_instance_id,
1537 progress = format!("{}/{}", progress.complete, progress.total),
1538 "Recovery progress"
1539 );
1540 }
1541
1542 dbtx.insert_entry(
1543 &ClientModuleRecovery { module_instance_id },
1544 &ClientModuleRecoveryState { progress },
1545 )
1546 .await;
1547 dbtx.commit_tx().await;
1548
1549 recovery_sender.send_modify(|v| {
1550 v.insert(module_instance_id, progress);
1551 });
1552 }
1553 debug!(target: LOG_CLIENT_RECOVERY, "Recovery executor stopped");
1554 }
1555
1556 async fn load_peers_last_api_versions(
1557 db: &Database,
1558 num_peers: NumPeers,
1559 ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1560 let mut peer_api_version_sets = BTreeMap::new();
1561
1562 let mut dbtx = db.begin_transaction_nc().await;
1563 for peer_id in num_peers.peer_ids() {
1564 if let Some(v) = dbtx
1565 .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1566 .await
1567 {
1568 peer_api_version_sets.insert(peer_id, v.0);
1569 }
1570 }
1571 drop(dbtx);
1572 peer_api_version_sets
1573 }
1574
1575 pub async fn get_peer_url_announcements(&self) -> BTreeMap<PeerId, SignedApiAnnouncement> {
1578 self.db()
1579 .begin_transaction_nc()
1580 .await
1581 .find_by_prefix(&ApiAnnouncementPrefix)
1582 .await
1583 .map(|(announcement_key, announcement)| (announcement_key.0, announcement))
1584 .collect()
1585 .await
1586 }
1587
1588 pub async fn get_peer_urls(&self) -> BTreeMap<PeerId, SafeUrl> {
1590 get_api_urls(&self.db, &self.config().await).await
1591 }
1592
1593 pub async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
1596 self.get_peer_urls()
1597 .await
1598 .into_iter()
1599 .find_map(|(peer_id, url)| (peer == peer_id).then_some(url))
1600 .map(|peer_url| {
1601 InviteCode::new(
1602 peer_url.clone(),
1603 peer,
1604 self.federation_id(),
1605 self.api_secret.clone(),
1606 )
1607 })
1608 }
1609
1610 pub async fn get_guardian_public_keys_blocking(
1614 &self,
1615 ) -> BTreeMap<PeerId, fedimint_core::secp256k1::PublicKey> {
1616 self.db
1617 .autocommit(
1618 |dbtx, _| {
1619 Box::pin(async move {
1620 let config = self.config().await;
1621
1622 let guardian_pub_keys = self
1623 .get_or_backfill_broadcast_public_keys(dbtx, config)
1624 .await;
1625
1626 Result::<_, ()>::Ok(guardian_pub_keys)
1627 })
1628 },
1629 None,
1630 )
1631 .await
1632 .expect("Will retry forever")
1633 }
1634
1635 async fn get_or_backfill_broadcast_public_keys(
1636 &self,
1637 dbtx: &mut DatabaseTransaction<'_>,
1638 config: ClientConfig,
1639 ) -> BTreeMap<PeerId, PublicKey> {
1640 match config.global.broadcast_public_keys {
1641 Some(guardian_pub_keys) => guardian_pub_keys,
1642 _ => {
1643 let (guardian_pub_keys, new_config) = self.fetch_and_update_config(config).await;
1644
1645 dbtx.insert_entry(&ClientConfigKey, &new_config).await;
1646 *(self.config.write().await) = new_config;
1647 guardian_pub_keys
1648 }
1649 }
1650 }
1651
1652 async fn fetch_session_count(&self) -> FederationResult<u64> {
1653 self.api.session_count().await
1654 }
1655
1656 async fn fetch_and_update_config(
1657 &self,
1658 config: ClientConfig,
1659 ) -> (BTreeMap<PeerId, PublicKey>, ClientConfig) {
1660 let fetched_config = retry(
1661 "Fetching guardian public keys",
1662 backoff_util::background_backoff(),
1663 || async {
1664 Ok(self
1665 .api
1666 .request_current_consensus::<ClientConfig>(
1667 CLIENT_CONFIG_ENDPOINT.to_owned(),
1668 ApiRequestErased::default(),
1669 )
1670 .await?)
1671 },
1672 )
1673 .await
1674 .expect("Will never return on error");
1675
1676 let Some(guardian_pub_keys) = fetched_config.global.broadcast_public_keys else {
1677 warn!(
1678 target: LOG_CLIENT,
1679 "Guardian public keys not found in fetched config, server not updated to 0.4 yet"
1680 );
1681 pending::<()>().await;
1682 unreachable!("Pending will never return");
1683 };
1684
1685 let new_config = ClientConfig {
1686 global: GlobalClientConfig {
1687 broadcast_public_keys: Some(guardian_pub_keys.clone()),
1688 ..config.global
1689 },
1690 modules: config.modules,
1691 };
1692 (guardian_pub_keys, new_config)
1693 }
1694
1695 pub fn handle_global_rpc(
1696 &self,
1697 method: String,
1698 params: serde_json::Value,
1699 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1700 Box::pin(try_stream! {
1701 match method.as_str() {
1702 "get_balance" => {
1703 let balance = self.get_balance_for_btc().await.unwrap_or_default();
1704 yield serde_json::to_value(balance)?;
1705 }
1706 "subscribe_balance_changes" => {
1707 let req: GetBalanceChangesRequest= serde_json::from_value(params)?;
1708 let mut stream = self.subscribe_balance_changes(req.unit).await;
1709 while let Some(balance) = stream.next().await {
1710 yield serde_json::to_value(balance)?;
1711 }
1712 }
1713 "get_config" => {
1714 let config = self.config().await;
1715 yield serde_json::to_value(config)?;
1716 }
1717 "get_federation_id" => {
1718 let federation_id = self.federation_id();
1719 yield serde_json::to_value(federation_id)?;
1720 }
1721 "get_invite_code" => {
1722 let req: GetInviteCodeRequest = serde_json::from_value(params)?;
1723 let invite_code = self.invite_code(req.peer).await;
1724 yield serde_json::to_value(invite_code)?;
1725 }
1726 "get_operation" => {
1727 let req: GetOperationIdRequest = serde_json::from_value(params)?;
1728 let operation = self.operation_log().get_operation(req.operation_id).await;
1729 yield serde_json::to_value(operation)?;
1730 }
1731 "list_operations" => {
1732 let req: ListOperationsParams = serde_json::from_value(params)?;
1733 let limit = if req.limit.is_none() && req.last_seen.is_none() {
1734 usize::MAX
1735 } else {
1736 req.limit.unwrap_or(usize::MAX)
1737 };
1738 let operations = self.operation_log()
1739 .paginate_operations_rev(limit, req.last_seen)
1740 .await;
1741 yield serde_json::to_value(operations)?;
1742 }
1743 "session_count" => {
1744 let count = self.fetch_session_count().await?;
1745 yield serde_json::to_value(count)?;
1746 }
1747 "has_pending_recoveries" => {
1748 let has_pending = self.has_pending_recoveries();
1749 yield serde_json::to_value(has_pending)?;
1750 }
1751 "wait_for_all_recoveries" => {
1752 self.wait_for_all_recoveries().await?;
1753 yield serde_json::Value::Null;
1754 }
1755 "subscribe_to_recovery_progress" => {
1756 let mut stream = self.subscribe_to_recovery_progress();
1757 while let Some((module_id, progress)) = stream.next().await {
1758 yield serde_json::json!({
1759 "module_id": module_id,
1760 "progress": progress
1761 });
1762 }
1763 }
1764 "backup_to_federation" => {
1765 let metadata = if params.is_null() {
1766 Metadata::from_json_serialized(serde_json::json!({}))
1767 } else {
1768 Metadata::from_json_serialized(params)
1769 };
1770 self.backup_to_federation(metadata).await?;
1771 yield serde_json::Value::Null;
1772 }
1773 _ => {
1774 Err(anyhow::format_err!("Unknown method: {}", method))?;
1775 unreachable!()
1776 },
1777 }
1778 })
1779 }
1780
1781 pub async fn log_event<E>(&self, module_id: Option<ModuleInstanceId>, event: E)
1782 where
1783 E: Event + Send,
1784 {
1785 let mut dbtx = self.db.begin_transaction().await;
1786 self.log_event_dbtx(&mut dbtx, module_id, event).await;
1787 dbtx.commit_tx().await;
1788 }
1789
1790 pub async fn log_event_dbtx<E, Cap>(
1791 &self,
1792 dbtx: &mut DatabaseTransaction<'_, Cap>,
1793 module_id: Option<ModuleInstanceId>,
1794 event: E,
1795 ) where
1796 E: Event + Send,
1797 Cap: Send,
1798 {
1799 dbtx.log_event(self.log_ordering_wakeup_tx.clone(), module_id, event)
1800 .await;
1801 }
1802
1803 pub async fn log_event_raw_dbtx<Cap>(
1804 &self,
1805 dbtx: &mut DatabaseTransaction<'_, Cap>,
1806 kind: EventKind,
1807 module: Option<(ModuleKind, ModuleInstanceId)>,
1808 payload: Vec<u8>,
1809 persist: EventPersistence,
1810 ) where
1811 Cap: Send,
1812 {
1813 let module_id = module.as_ref().map(|m| m.1);
1814 let module_kind = module.map(|m| m.0);
1815 dbtx.log_event_raw(
1816 self.log_ordering_wakeup_tx.clone(),
1817 kind,
1818 module_kind,
1819 module_id,
1820 payload,
1821 persist,
1822 )
1823 .await;
1824 }
1825
1826 pub fn built_in_application_event_log_tracker(&self) -> DynEventLogTrimableTracker {
1838 struct BuiltInApplicationEventLogTracker;
1839
1840 #[apply(async_trait_maybe_send!)]
1841 impl EventLogTrimableTracker for BuiltInApplicationEventLogTracker {
1842 async fn store(
1844 &mut self,
1845 dbtx: &mut DatabaseTransaction<NonCommittable>,
1846 pos: EventLogTrimableId,
1847 ) -> anyhow::Result<()> {
1848 dbtx.insert_entry(&DefaultApplicationEventLogKey, &pos)
1849 .await;
1850 Ok(())
1851 }
1852
1853 async fn load(
1855 &mut self,
1856 dbtx: &mut DatabaseTransaction<NonCommittable>,
1857 ) -> anyhow::Result<Option<EventLogTrimableId>> {
1858 Ok(dbtx.get_value(&DefaultApplicationEventLogKey).await)
1859 }
1860 }
1861 Box::new(BuiltInApplicationEventLogTracker)
1862 }
1863
1864 pub async fn handle_historical_events<F, R>(
1872 &self,
1873 tracker: fedimint_eventlog::DynEventLogTracker,
1874 handler_fn: F,
1875 ) -> anyhow::Result<()>
1876 where
1877 F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
1878 R: Future<Output = anyhow::Result<()>>,
1879 {
1880 fedimint_eventlog::handle_events(
1881 self.db.clone(),
1882 tracker,
1883 self.log_event_added_rx.clone(),
1884 handler_fn,
1885 )
1886 .await
1887 }
1888
1889 pub async fn handle_events<F, R>(
1908 &self,
1909 tracker: fedimint_eventlog::DynEventLogTrimableTracker,
1910 handler_fn: F,
1911 ) -> anyhow::Result<()>
1912 where
1913 F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
1914 R: Future<Output = anyhow::Result<()>>,
1915 {
1916 fedimint_eventlog::handle_trimable_events(
1917 self.db.clone(),
1918 tracker,
1919 self.log_event_added_rx.clone(),
1920 handler_fn,
1921 )
1922 .await
1923 }
1924
1925 pub async fn get_event_log(
1926 &self,
1927 pos: Option<EventLogId>,
1928 limit: u64,
1929 ) -> Vec<PersistedLogEntry> {
1930 self.get_event_log_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
1931 .await
1932 }
1933
1934 pub async fn get_event_log_trimable(
1935 &self,
1936 pos: Option<EventLogTrimableId>,
1937 limit: u64,
1938 ) -> Vec<PersistedLogEntry> {
1939 self.get_event_log_trimable_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
1940 .await
1941 }
1942
1943 pub async fn get_event_log_dbtx<Cap>(
1944 &self,
1945 dbtx: &mut DatabaseTransaction<'_, Cap>,
1946 pos: Option<EventLogId>,
1947 limit: u64,
1948 ) -> Vec<PersistedLogEntry>
1949 where
1950 Cap: Send,
1951 {
1952 dbtx.get_event_log(pos, limit).await
1953 }
1954
1955 pub async fn get_event_log_trimable_dbtx<Cap>(
1956 &self,
1957 dbtx: &mut DatabaseTransaction<'_, Cap>,
1958 pos: Option<EventLogTrimableId>,
1959 limit: u64,
1960 ) -> Vec<PersistedLogEntry>
1961 where
1962 Cap: Send,
1963 {
1964 dbtx.get_event_log_trimable(pos, limit).await
1965 }
1966
1967 pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
1969 self.log_event_added_transient_tx.subscribe()
1970 }
1971
1972 pub fn log_event_added_rx(&self) -> watch::Receiver<()> {
1974 self.log_event_added_rx.clone()
1975 }
1976
1977 pub fn iroh_enable_dht(&self) -> bool {
1978 self.iroh_enable_dht
1979 }
1980
1981 pub(crate) async fn run_core_migrations(
1982 db_no_decoders: &Database,
1983 ) -> Result<(), anyhow::Error> {
1984 let mut dbtx = db_no_decoders.begin_transaction().await;
1985 apply_migrations_core_client_dbtx(&mut dbtx.to_ref_nc(), "fedimint-client".to_string())
1986 .await?;
1987 if is_running_in_test_env() {
1988 verify_client_db_integrity_dbtx(&mut dbtx.to_ref_nc()).await;
1989 }
1990 dbtx.commit_tx_result().await?;
1991 Ok(())
1992 }
1993
1994 fn primary_modules_for_unit(
1996 &self,
1997 unit: AmountUnit,
1998 ) -> impl Iterator<Item = (ModuleInstanceId, &DynClientModule)> {
1999 self.primary_modules
2000 .iter()
2001 .flat_map(move |(_prio, candidates)| {
2002 candidates
2003 .specific
2004 .get(&unit)
2005 .into_iter()
2006 .flatten()
2007 .copied()
2008 .chain(candidates.wildcard.iter().copied())
2010 })
2011 .map(|id| (id, self.modules.get_expect(id)))
2012 }
2013
2014 pub fn primary_module_for_unit(
2018 &self,
2019 unit: AmountUnit,
2020 ) -> Option<(ModuleInstanceId, &DynClientModule)> {
2021 self.primary_modules_for_unit(unit).next()
2022 }
2023
2024 pub fn primary_module_for_btc(&self) -> (ModuleInstanceId, &DynClientModule) {
2026 self.primary_module_for_unit(AmountUnit::BITCOIN)
2027 .expect("No primary module for Bitcoin")
2028 }
2029}
2030
2031#[apply(async_trait_maybe_send!)]
2032impl ClientContextIface for Client {
2033 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
2034 Client::get_module(self, instance)
2035 }
2036
2037 fn api_clone(&self) -> DynGlobalApi {
2038 Client::api_clone(self)
2039 }
2040 fn decoders(&self) -> &ModuleDecoderRegistry {
2041 Client::decoders(self)
2042 }
2043
2044 async fn finalize_and_submit_transaction(
2045 &self,
2046 operation_id: OperationId,
2047 operation_type: &str,
2048 operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
2049 tx_builder: TransactionBuilder,
2050 ) -> anyhow::Result<OutPointRange> {
2051 Client::finalize_and_submit_transaction(
2052 self,
2053 operation_id,
2054 operation_type,
2055 &operation_meta_gen,
2057 tx_builder,
2058 )
2059 .await
2060 }
2061
2062 async fn finalize_and_submit_transaction_inner(
2063 &self,
2064 dbtx: &mut DatabaseTransaction<'_>,
2065 operation_id: OperationId,
2066 tx_builder: TransactionBuilder,
2067 ) -> anyhow::Result<OutPointRange> {
2068 Client::finalize_and_submit_transaction_inner(self, dbtx, operation_id, tx_builder).await
2069 }
2070
2071 async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
2072 Client::transaction_updates(self, operation_id).await
2073 }
2074
2075 async fn await_primary_module_outputs(
2076 &self,
2077 operation_id: OperationId,
2078 outputs: Vec<OutPoint>,
2080 ) -> anyhow::Result<()> {
2081 Client::await_primary_bitcoin_module_outputs(self, operation_id, outputs).await
2082 }
2083
2084 fn operation_log(&self) -> &dyn IOperationLog {
2085 Client::operation_log(self)
2086 }
2087
2088 async fn has_active_states(&self, operation_id: OperationId) -> bool {
2089 Client::has_active_states(self, operation_id).await
2090 }
2091
2092 async fn operation_exists(&self, operation_id: OperationId) -> bool {
2093 Client::operation_exists(self, operation_id).await
2094 }
2095
2096 async fn config(&self) -> ClientConfig {
2097 Client::config(self).await
2098 }
2099
2100 fn db(&self) -> &Database {
2101 Client::db(self)
2102 }
2103
2104 fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static)) {
2105 Client::executor(self)
2106 }
2107
2108 async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
2109 Client::invite_code(self, peer).await
2110 }
2111
2112 fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
2113 Client::get_internal_payment_markers(self)
2114 }
2115
2116 async fn log_event_json(
2117 &self,
2118 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
2119 module_kind: Option<ModuleKind>,
2120 module_id: ModuleInstanceId,
2121 kind: EventKind,
2122 payload: serde_json::Value,
2123 persist: EventPersistence,
2124 ) {
2125 dbtx.ensure_global()
2126 .expect("Must be called with global dbtx");
2127 self.log_event_raw_dbtx(
2128 dbtx,
2129 kind,
2130 module_kind.map(|kind| (kind, module_id)),
2131 serde_json::to_vec(&payload).expect("Serialization can't fail"),
2132 persist,
2133 )
2134 .await;
2135 }
2136
2137 async fn read_operation_active_states<'dbtx>(
2138 &self,
2139 operation_id: OperationId,
2140 module_id: ModuleInstanceId,
2141 dbtx: &'dbtx mut DatabaseTransaction<'_>,
2142 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>
2143 {
2144 Box::pin(
2145 dbtx.find_by_prefix(&ActiveModuleOperationStateKeyPrefix {
2146 operation_id,
2147 module_instance: module_id,
2148 })
2149 .await
2150 .map(move |(k, v)| (k.0, v)),
2151 )
2152 }
2153 async fn read_operation_inactive_states<'dbtx>(
2154 &self,
2155 operation_id: OperationId,
2156 module_id: ModuleInstanceId,
2157 dbtx: &'dbtx mut DatabaseTransaction<'_>,
2158 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>
2159 {
2160 Box::pin(
2161 dbtx.find_by_prefix(&InactiveModuleOperationStateKeyPrefix {
2162 operation_id,
2163 module_instance: module_id,
2164 })
2165 .await
2166 .map(move |(k, v)| (k.0, v)),
2167 )
2168 }
2169}
2170
2171impl fmt::Debug for Client {
2173 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2174 write!(f, "Client")
2175 }
2176}
2177
2178pub fn client_decoders<'a>(
2179 registry: &ModuleInitRegistry<DynClientModuleInit>,
2180 module_kinds: impl Iterator<Item = (ModuleInstanceId, &'a ModuleKind)>,
2181) -> ModuleDecoderRegistry {
2182 let mut modules = BTreeMap::new();
2183 for (id, kind) in module_kinds {
2184 let Some(init) = registry.get(kind) else {
2185 debug!("Detected configuration for unsupported module id: {id}, kind: {kind}");
2186 continue;
2187 };
2188
2189 modules.insert(
2190 id,
2191 (
2192 kind.clone(),
2193 IClientModuleInit::decoder(AsRef::<dyn IClientModuleInit + 'static>::as_ref(init)),
2194 ),
2195 );
2196 }
2197 ModuleDecoderRegistry::from(modules)
2198}