1use std::collections::{BTreeMap, HashSet};
2use std::fmt::{self, Formatter};
3use std::future::{Future, pending};
4use std::ops::Range;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9use anyhow::{Context as _, anyhow, bail, format_err};
10use async_stream::try_stream;
11use bitcoin::key::Secp256k1;
12use bitcoin::key::rand::thread_rng;
13use bitcoin::secp256k1::{self, PublicKey};
14use fedimint_api_client::api::global_api::with_request_hook::ApiRequestHook;
15use fedimint_api_client::api::net::ConnectorType;
16use fedimint_api_client::api::{
17 ApiVersionSet, DynGlobalApi, FederationApiExt as _, FederationResult, IGlobalFederationApi,
18};
19use fedimint_client_module::module::recovery::RecoveryProgress;
20use fedimint_client_module::module::{
21 ClientContextIface, ClientModule, ClientModuleRegistry, DynClientModule, FinalClientIface,
22 IClientModule, IdxRange, OutPointRange, PrimaryModulePriority,
23};
24use fedimint_client_module::oplog::IOperationLog;
25use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy as _};
26use fedimint_client_module::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
27use fedimint_client_module::sm::{ActiveStateMeta, DynState, InactiveStateMeta};
28use fedimint_client_module::transaction::{
29 TRANSACTION_SUBMISSION_MODULE_INSTANCE, TransactionBuilder, TxSubmissionStates,
30 TxSubmissionStatesSM,
31};
32use fedimint_client_module::{
33 AddStateMachinesResult, ClientModuleInstance, GetInviteCodeRequest, ModuleGlobalContextGen,
34 ModuleRecoveryCompleted, TransactionUpdates, TxCreatedEvent,
35};
36use fedimint_core::config::{
37 ClientConfig, FederationId, GlobalClientConfig, JsonClientConfig, ModuleInitRegistry,
38};
39use fedimint_core::core::{DynInput, DynOutput, ModuleInstanceId, ModuleKind, OperationId};
40use fedimint_core::db::{
41 AutocommitError, Database, DatabaseKey, DatabaseRecord, DatabaseTransaction,
42 IDatabaseTransactionOpsCore as _, IDatabaseTransactionOpsCoreTyped as _, NonCommittable,
43};
44use fedimint_core::encoding::{Decodable, Encodable};
45use fedimint_core::endpoint_constants::{CLIENT_CONFIG_ENDPOINT, VERSION_ENDPOINT};
46use fedimint_core::envs::is_running_in_test_env;
47use fedimint_core::invite_code::InviteCode;
48use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
49use fedimint_core::module::{
50 AmountUnit, Amounts, ApiRequestErased, ApiVersion, MultiApiVersion,
51 SupportedApiVersionsSummary, SupportedCoreApiVersions, SupportedModuleApiVersions,
52};
53use fedimint_core::net::api_announcement::SignedApiAnnouncement;
54use fedimint_core::task::{Elapsed, MaybeSend, MaybeSync, TaskGroup};
55use fedimint_core::transaction::Transaction;
56use fedimint_core::util::backoff_util::custom_backoff;
57use fedimint_core::util::{
58 BoxStream, FmtCompact as _, FmtCompactAnyhow as _, SafeUrl, backoff_util, retry,
59};
60use fedimint_core::{
61 Amount, NumPeers, OutPoint, PeerId, apply, async_trait_maybe_send, maybe_add_send,
62 maybe_add_send_sync, runtime,
63};
64use fedimint_derive_secret::DerivableSecret;
65use fedimint_eventlog::{
66 DBTransactionEventLogExt as _, Event, EventKind, EventLogEntry, EventLogId, EventLogTrimableId,
67 EventPersistence, PersistedLogEntry,
68};
69use fedimint_logging::{LOG_CLIENT, LOG_CLIENT_NET_API, LOG_CLIENT_RECOVERY};
70use futures::stream::FuturesUnordered;
71use futures::{Stream, StreamExt as _};
72use global_ctx::ModuleGlobalClientContext;
73use serde::{Deserialize, Serialize};
74use tokio::sync::{broadcast, watch};
75use tokio_stream::wrappers::WatchStream;
76use tracing::{debug, info, warn};
77
78use crate::ClientBuilder;
79use crate::api_announcements::{ApiAnnouncementPrefix, get_api_urls};
80use crate::backup::Metadata;
81use crate::db::{
82 ApiSecretKey, CachedApiVersionSet, CachedApiVersionSetKey, ChronologicalOperationLogKey,
83 ClientConfigKey, ClientMetadataKey, ClientModuleRecovery, ClientModuleRecoveryState,
84 EncodedClientSecretKey, OperationLogKey, PeerLastApiVersionsSummary,
85 PeerLastApiVersionsSummaryKey, PendingClientConfigKey, apply_migrations_core_client_dbtx,
86 get_decoded_client_secret, verify_client_db_integrity_dbtx,
87};
88use crate::meta::MetaService;
89use crate::module_init::{ClientModuleInitRegistry, DynClientModuleInit, IClientModuleInit};
90use crate::oplog::OperationLog;
91use crate::sm::executor::{
92 ActiveModuleOperationStateKeyPrefix, ActiveOperationStateKeyPrefix, Executor,
93 InactiveModuleOperationStateKeyPrefix, InactiveOperationStateKeyPrefix,
94};
95
96pub(crate) mod builder;
97pub(crate) mod global_ctx;
98pub(crate) mod handle;
99
100const SUPPORTED_CORE_API_VERSIONS: &[fedimint_core::module::ApiVersion] =
104 &[ApiVersion { major: 0, minor: 0 }];
105
106#[derive(Default)]
108pub(crate) struct PrimaryModuleCandidates {
109 specific: BTreeMap<AmountUnit, Vec<ModuleInstanceId>>,
111 wildcard: Vec<ModuleInstanceId>,
113}
114
115pub struct Client {
129 final_client: FinalClientIface,
130 config: tokio::sync::RwLock<ClientConfig>,
131 api_secret: Option<String>,
132 decoders: ModuleDecoderRegistry,
133 db: Database,
134 federation_id: FederationId,
135 federation_config_meta: BTreeMap<String, String>,
136 primary_modules: BTreeMap<PrimaryModulePriority, PrimaryModuleCandidates>,
137 pub(crate) modules: ClientModuleRegistry,
138 module_inits: ClientModuleInitRegistry,
139 executor: Executor,
140 pub(crate) api: DynGlobalApi,
141 root_secret: DerivableSecret,
142 operation_log: OperationLog,
143 secp_ctx: Secp256k1<secp256k1::All>,
144 meta_service: Arc<MetaService>,
145 connector: ConnectorType,
146
147 task_group: TaskGroup,
148
149 client_recovery_progress_receiver:
151 watch::Receiver<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
152
153 log_ordering_wakeup_tx: watch::Sender<()>,
156 log_event_added_rx: watch::Receiver<()>,
158 log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
159 request_hook: ApiRequestHook,
160 iroh_enable_dht: bool,
161 iroh_enable_next: bool,
162}
163
164#[derive(Debug, Serialize, Deserialize)]
165struct ListOperationsParams {
166 limit: Option<usize>,
167 last_seen: Option<ChronologicalOperationLogKey>,
168}
169
170#[derive(Debug, Clone, Serialize, Deserialize)]
171pub struct GetOperationIdRequest {
172 operation_id: OperationId,
173}
174
175#[derive(Debug, Clone, Serialize, Deserialize)]
176pub struct GetBalanceChangesRequest {
177 #[serde(default = "AmountUnit::bitcoin")]
178 unit: AmountUnit,
179}
180
181impl Client {
182 pub async fn builder() -> anyhow::Result<ClientBuilder> {
185 Ok(ClientBuilder::new())
186 }
187
188 pub fn api(&self) -> &(dyn IGlobalFederationApi + 'static) {
189 self.api.as_ref()
190 }
191
192 pub fn api_clone(&self) -> DynGlobalApi {
193 self.api.clone()
194 }
195
196 pub fn task_group(&self) -> &TaskGroup {
198 &self.task_group
199 }
200
201 #[doc(hidden)]
203 pub fn executor(&self) -> &Executor {
204 &self.executor
205 }
206
207 pub async fn get_config_from_db(db: &Database) -> Option<ClientConfig> {
208 let mut dbtx = db.begin_transaction_nc().await;
209 dbtx.get_value(&ClientConfigKey).await
210 }
211
212 pub async fn get_pending_config_from_db(db: &Database) -> Option<ClientConfig> {
213 let mut dbtx = db.begin_transaction_nc().await;
214 dbtx.get_value(&PendingClientConfigKey).await
215 }
216
217 pub async fn get_api_secret_from_db(db: &Database) -> Option<String> {
218 let mut dbtx = db.begin_transaction_nc().await;
219 dbtx.get_value(&ApiSecretKey).await
220 }
221
222 pub async fn store_encodable_client_secret<T: Encodable>(
223 db: &Database,
224 secret: T,
225 ) -> anyhow::Result<()> {
226 let mut dbtx = db.begin_transaction().await;
227
228 if dbtx.get_value(&EncodedClientSecretKey).await.is_some() {
230 bail!("Encoded client secret already exists, cannot overwrite")
231 }
232
233 let encoded_secret = T::consensus_encode_to_vec(&secret);
234 dbtx.insert_entry(&EncodedClientSecretKey, &encoded_secret)
235 .await;
236 dbtx.commit_tx().await;
237 Ok(())
238 }
239
240 pub async fn load_decodable_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
241 let Some(secret) = Self::load_decodable_client_secret_opt(db).await? else {
242 bail!("Encoded client secret not present in DB")
243 };
244
245 Ok(secret)
246 }
247 pub async fn load_decodable_client_secret_opt<T: Decodable>(
248 db: &Database,
249 ) -> anyhow::Result<Option<T>> {
250 let mut dbtx = db.begin_transaction_nc().await;
251
252 let client_secret = dbtx.get_value(&EncodedClientSecretKey).await;
253
254 Ok(match client_secret {
255 Some(client_secret) => Some(
256 T::consensus_decode_whole(&client_secret, &ModuleRegistry::default())
257 .map_err(|e| anyhow!("Decoding failed: {e}"))?,
258 ),
259 None => None,
260 })
261 }
262
263 pub async fn load_or_generate_client_secret(db: &Database) -> anyhow::Result<[u8; 64]> {
264 let client_secret = match Self::load_decodable_client_secret::<[u8; 64]>(db).await {
265 Ok(secret) => secret,
266 _ => {
267 let secret = PlainRootSecretStrategy::random(&mut thread_rng());
268 Self::store_encodable_client_secret(db, secret)
269 .await
270 .expect("Storing client secret must work");
271 secret
272 }
273 };
274 Ok(client_secret)
275 }
276
277 pub async fn is_initialized(db: &Database) -> bool {
278 let mut dbtx = db.begin_transaction_nc().await;
279 dbtx.raw_get_bytes(&[ClientConfigKey::DB_PREFIX])
280 .await
281 .expect("Unrecoverable error occurred while reading and entry from the database")
282 .is_some()
283 }
284
285 pub fn start_executor(self: &Arc<Self>) {
286 debug!(
287 target: LOG_CLIENT,
288 "Starting fedimint client executor",
289 );
290 self.executor.start_executor(self.context_gen());
291 }
292
293 pub fn federation_id(&self) -> FederationId {
294 self.federation_id
295 }
296
297 fn context_gen(self: &Arc<Self>) -> ModuleGlobalContextGen {
298 let client_inner = Arc::downgrade(self);
299 Arc::new(move |module_instance, operation| {
300 ModuleGlobalClientContext {
301 client: client_inner
302 .clone()
303 .upgrade()
304 .expect("ModuleGlobalContextGen called after client was dropped"),
305 module_instance_id: module_instance,
306 operation,
307 }
308 .into()
309 })
310 }
311
312 pub async fn config(&self) -> ClientConfig {
313 self.config.read().await.clone()
314 }
315
316 pub fn api_secret(&self) -> &Option<String> {
317 &self.api_secret
318 }
319
320 pub fn decoders(&self) -> &ModuleDecoderRegistry {
321 &self.decoders
322 }
323
324 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
326 self.try_get_module(instance)
327 .expect("Module instance not found")
328 }
329
330 fn try_get_module(
331 &self,
332 instance: ModuleInstanceId,
333 ) -> Option<&maybe_add_send_sync!(dyn IClientModule)> {
334 Some(self.modules.get(instance)?.as_ref())
335 }
336
337 pub fn has_module(&self, instance: ModuleInstanceId) -> bool {
338 self.modules.get(instance).is_some()
339 }
340
341 fn transaction_builder_get_balance(&self, builder: &TransactionBuilder) -> (Amounts, Amounts) {
347 let mut in_amounts = Amounts::ZERO;
349 let mut out_amounts = Amounts::ZERO;
350 let mut fee_amounts = Amounts::ZERO;
351
352 for input in builder.inputs() {
353 let module = self.get_module(input.input.module_instance_id());
354
355 let item_fees = module.input_fee(&input.amounts, &input.input).expect(
356 "We only build transactions with input versions that are supported by the module",
357 );
358
359 in_amounts.checked_add_mut(&input.amounts);
360 fee_amounts.checked_add_mut(&item_fees);
361 }
362
363 for output in builder.outputs() {
364 let module = self.get_module(output.output.module_instance_id());
365
366 let item_fees = module.output_fee(&output.amounts, &output.output).expect(
367 "We only build transactions with output versions that are supported by the module",
368 );
369
370 out_amounts.checked_add_mut(&output.amounts);
371 fee_amounts.checked_add_mut(&item_fees);
372 }
373
374 out_amounts.checked_add_mut(&fee_amounts);
375 (in_amounts, out_amounts)
376 }
377
378 pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
379 Ok((self.federation_id().to_fake_ln_pub_key(&self.secp_ctx)?, 0))
380 }
381
382 pub fn get_config_meta(&self, key: &str) -> Option<String> {
384 self.federation_config_meta.get(key).cloned()
385 }
386
387 pub(crate) fn root_secret(&self) -> DerivableSecret {
388 self.root_secret.clone()
389 }
390
391 pub async fn add_state_machines(
392 &self,
393 dbtx: &mut DatabaseTransaction<'_>,
394 states: Vec<DynState>,
395 ) -> AddStateMachinesResult {
396 self.executor.add_state_machines_dbtx(dbtx, states).await
397 }
398
399 pub async fn get_active_operations(&self) -> HashSet<OperationId> {
401 let active_states = self.executor.get_active_states().await;
402 let mut active_operations = HashSet::with_capacity(active_states.len());
403 let mut dbtx = self.db().begin_transaction_nc().await;
404 for (state, _) in active_states {
405 let operation_id = state.operation_id();
406 if dbtx
407 .get_value(&OperationLogKey { operation_id })
408 .await
409 .is_some()
410 {
411 active_operations.insert(operation_id);
412 }
413 }
414 active_operations
415 }
416
417 pub fn operation_log(&self) -> &OperationLog {
418 &self.operation_log
419 }
420
421 pub fn meta_service(&self) -> &Arc<MetaService> {
423 &self.meta_service
424 }
425
426 pub async fn get_meta_expiration_timestamp(&self) -> Option<SystemTime> {
428 let meta_service = self.meta_service();
429 let ts = meta_service
430 .get_field::<u64>(self.db(), "federation_expiry_timestamp")
431 .await
432 .and_then(|v| v.value)?;
433 Some(UNIX_EPOCH + Duration::from_secs(ts))
434 }
435
436 async fn finalize_transaction(
438 &self,
439 dbtx: &mut DatabaseTransaction<'_>,
440 operation_id: OperationId,
441 mut partial_transaction: TransactionBuilder,
442 ) -> anyhow::Result<(Transaction, Vec<DynState>, Range<u64>)> {
443 let (in_amounts, out_amounts) = self.transaction_builder_get_balance(&partial_transaction);
444
445 let mut added_inputs_bundles = vec![];
446 let mut added_outputs_bundles = vec![];
447
448 for unit in in_amounts.units().union(&out_amounts.units()) {
459 let input_amount = in_amounts.get(unit).copied().unwrap_or_default();
460 let output_amount = out_amounts.get(unit).copied().unwrap_or_default();
461 if input_amount == output_amount {
462 continue;
463 }
464
465 let Some((module_id, module)) = self.primary_module_for_unit(*unit) else {
466 bail!("No module to balance a partial transaction (affected unit: {unit:?}");
467 };
468
469 let (added_input_bundle, added_output_bundle) = module
470 .create_final_inputs_and_outputs(
471 module_id,
472 dbtx,
473 operation_id,
474 *unit,
475 input_amount,
476 output_amount,
477 )
478 .await?;
479
480 added_inputs_bundles.push(added_input_bundle);
481 added_outputs_bundles.push(added_output_bundle);
482 }
483
484 let change_range = Range {
488 start: partial_transaction.outputs().count() as u64,
489 end: (partial_transaction.outputs().count() as u64
490 + added_outputs_bundles
491 .iter()
492 .map(|output| output.outputs().len() as u64)
493 .sum::<u64>()),
494 };
495
496 for added_inputs in added_inputs_bundles {
497 partial_transaction = partial_transaction.with_inputs(added_inputs);
498 }
499
500 for added_outputs in added_outputs_bundles {
501 partial_transaction = partial_transaction.with_outputs(added_outputs);
502 }
503
504 let (input_amounts, output_amounts) =
505 self.transaction_builder_get_balance(&partial_transaction);
506
507 for (unit, output_amount) in output_amounts {
508 let input_amount = input_amounts.get(&unit).copied().unwrap_or_default();
509
510 assert!(input_amount >= output_amount, "Transaction is underfunded");
511 }
512
513 let (tx, states) = partial_transaction.build(&self.secp_ctx, thread_rng());
514
515 Ok((tx, states, change_range))
516 }
517
518 pub async fn finalize_and_submit_transaction<F, M>(
530 &self,
531 operation_id: OperationId,
532 operation_type: &str,
533 operation_meta_gen: F,
534 tx_builder: TransactionBuilder,
535 ) -> anyhow::Result<OutPointRange>
536 where
537 F: Fn(OutPointRange) -> M + Clone + MaybeSend + MaybeSync,
538 M: serde::Serialize + MaybeSend,
539 {
540 let operation_type = operation_type.to_owned();
541
542 let autocommit_res = self
543 .db
544 .autocommit(
545 |dbtx, _| {
546 let operation_type = operation_type.clone();
547 let tx_builder = tx_builder.clone();
548 let operation_meta_gen = operation_meta_gen.clone();
549 Box::pin(async move {
550 if Client::operation_exists_dbtx(dbtx, operation_id).await {
551 bail!("There already exists an operation with id {operation_id:?}")
552 }
553
554 let out_point_range = self
555 .finalize_and_submit_transaction_inner(dbtx, operation_id, tx_builder)
556 .await?;
557
558 self.operation_log()
559 .add_operation_log_entry_dbtx(
560 dbtx,
561 operation_id,
562 &operation_type,
563 operation_meta_gen(out_point_range),
564 )
565 .await;
566
567 Ok(out_point_range)
568 })
569 },
570 Some(100), )
572 .await;
573
574 match autocommit_res {
575 Ok(txid) => Ok(txid),
576 Err(AutocommitError::ClosureError { error, .. }) => Err(error),
577 Err(AutocommitError::CommitFailed {
578 attempts,
579 last_error,
580 }) => panic!(
581 "Failed to commit tx submission dbtx after {attempts} attempts: {last_error}"
582 ),
583 }
584 }
585
586 async fn finalize_and_submit_transaction_inner(
587 &self,
588 dbtx: &mut DatabaseTransaction<'_>,
589 operation_id: OperationId,
590 tx_builder: TransactionBuilder,
591 ) -> anyhow::Result<OutPointRange> {
592 let (transaction, mut states, change_range) = self
593 .finalize_transaction(&mut dbtx.to_ref_nc(), operation_id, tx_builder)
594 .await?;
595
596 if transaction.consensus_encode_to_vec().len() > Transaction::MAX_TX_SIZE {
597 let inputs = transaction
598 .inputs
599 .iter()
600 .map(DynInput::module_instance_id)
601 .collect::<Vec<_>>();
602 let outputs = transaction
603 .outputs
604 .iter()
605 .map(DynOutput::module_instance_id)
606 .collect::<Vec<_>>();
607 warn!(
608 target: LOG_CLIENT_NET_API,
609 size=%transaction.consensus_encode_to_vec().len(),
610 ?inputs,
611 ?outputs,
612 "Transaction too large",
613 );
614 debug!(target: LOG_CLIENT_NET_API, ?transaction, "transaction details");
615 bail!(
616 "The generated transaction would be rejected by the federation for being too large."
617 );
618 }
619
620 let txid = transaction.tx_hash();
621
622 debug!(target: LOG_CLIENT_NET_API, %txid, ?transaction, "Finalized and submitting transaction");
623
624 let tx_submission_sm = DynState::from_typed(
625 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
626 TxSubmissionStatesSM {
627 operation_id,
628 state: TxSubmissionStates::Created(transaction),
629 },
630 );
631 states.push(tx_submission_sm);
632
633 self.executor.add_state_machines_dbtx(dbtx, states).await?;
634
635 self.log_event_dbtx(dbtx, None, TxCreatedEvent { txid, operation_id })
636 .await;
637
638 Ok(OutPointRange::new(txid, IdxRange::from(change_range)))
639 }
640
641 async fn transaction_update_stream(
642 &self,
643 operation_id: OperationId,
644 ) -> BoxStream<'static, TxSubmissionStatesSM> {
645 self.executor
646 .notifier()
647 .module_notifier::<TxSubmissionStatesSM>(
648 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
649 self.final_client.clone(),
650 )
651 .subscribe(operation_id)
652 .await
653 }
654
655 pub async fn operation_exists(&self, operation_id: OperationId) -> bool {
656 let mut dbtx = self.db().begin_transaction_nc().await;
657
658 Client::operation_exists_dbtx(&mut dbtx, operation_id).await
659 }
660
661 pub async fn operation_exists_dbtx(
662 dbtx: &mut DatabaseTransaction<'_>,
663 operation_id: OperationId,
664 ) -> bool {
665 let active_state_exists = dbtx
666 .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
667 .await
668 .next()
669 .await
670 .is_some();
671
672 let inactive_state_exists = dbtx
673 .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
674 .await
675 .next()
676 .await
677 .is_some();
678
679 active_state_exists || inactive_state_exists
680 }
681
682 pub async fn has_active_states(&self, operation_id: OperationId) -> bool {
683 self.db
684 .begin_transaction_nc()
685 .await
686 .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
687 .await
688 .next()
689 .await
690 .is_some()
691 }
692
693 pub async fn await_primary_bitcoin_module_output(
696 &self,
697 operation_id: OperationId,
698 out_point: OutPoint,
699 ) -> anyhow::Result<()> {
700 self.primary_module_for_unit(AmountUnit::BITCOIN)
701 .ok_or_else(|| anyhow!("No primary module available"))?
702 .1
703 .await_primary_module_output(operation_id, out_point)
704 .await
705 }
706
707 pub fn get_first_module<M: ClientModule>(
709 &'_ self,
710 ) -> anyhow::Result<ClientModuleInstance<'_, M>> {
711 let module_kind = M::kind();
712 let id = self
713 .get_first_instance(&module_kind)
714 .ok_or_else(|| format_err!("No modules found of kind {module_kind}"))?;
715 let module: &M = self
716 .try_get_module(id)
717 .ok_or_else(|| format_err!("Unknown module instance {id}"))?
718 .as_any()
719 .downcast_ref::<M>()
720 .ok_or_else(|| format_err!("Module is not of type {}", std::any::type_name::<M>()))?;
721 let (db, _) = self.db().with_prefix_module_id(id);
722 Ok(ClientModuleInstance {
723 id,
724 db,
725 api: self.api().with_module(id),
726 module,
727 })
728 }
729
730 pub fn get_module_client_dyn(
731 &self,
732 instance_id: ModuleInstanceId,
733 ) -> anyhow::Result<&maybe_add_send_sync!(dyn IClientModule)> {
734 self.try_get_module(instance_id)
735 .ok_or(anyhow!("Unknown module instance {}", instance_id))
736 }
737
738 pub fn db(&self) -> &Database {
739 &self.db
740 }
741
742 pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
745 TransactionUpdates {
746 update_stream: self.transaction_update_stream(operation_id).await,
747 }
748 }
749
750 pub fn get_first_instance(&self, module_kind: &ModuleKind) -> Option<ModuleInstanceId> {
752 self.modules
753 .iter_modules()
754 .find(|(_, kind, _module)| *kind == module_kind)
755 .map(|(instance_id, _, _)| instance_id)
756 }
757
758 pub async fn root_secret_encoding<T: Decodable>(&self) -> anyhow::Result<T> {
761 get_decoded_client_secret::<T>(self.db()).await
762 }
763
764 pub async fn await_primary_bitcoin_module_outputs(
767 &self,
768 operation_id: OperationId,
769 outputs: Vec<OutPoint>,
770 ) -> anyhow::Result<()> {
771 for out_point in outputs {
772 self.await_primary_bitcoin_module_output(operation_id, out_point)
773 .await?;
774 }
775
776 Ok(())
777 }
778
779 pub async fn get_config_json(&self) -> JsonClientConfig {
785 self.config().await.to_json()
786 }
787
788 #[doc(hidden)]
791 pub async fn get_balance_for_btc(&self) -> anyhow::Result<Amount> {
794 self.get_balance_for_unit(AmountUnit::BITCOIN).await
795 }
796
797 pub async fn get_balance_for_unit(&self, unit: AmountUnit) -> anyhow::Result<Amount> {
798 let (id, module) = self
799 .primary_module_for_unit(unit)
800 .ok_or_else(|| anyhow!("Primary module not available"))?;
801 Ok(module
802 .get_balance(id, &mut self.db().begin_transaction_nc().await, unit)
803 .await)
804 }
805
806 pub async fn subscribe_balance_changes(&self, unit: AmountUnit) -> BoxStream<'static, Amount> {
809 let primary_module_things =
810 if let Some((primary_module_id, primary_module)) = self.primary_module_for_unit(unit) {
811 let balance_changes = primary_module.subscribe_balance_changes().await;
812 let initial_balance = self
813 .get_balance_for_unit(unit)
814 .await
815 .expect("Primary is present");
816
817 Some((
818 primary_module_id,
819 primary_module.clone(),
820 balance_changes,
821 initial_balance,
822 ))
823 } else {
824 None
825 };
826 let db = self.db().clone();
827
828 Box::pin(async_stream::stream! {
829 let Some((primary_module_id, primary_module, mut balance_changes, initial_balance)) = primary_module_things else {
830 pending().await
833 };
834
835
836 yield initial_balance;
837 let mut prev_balance = initial_balance;
838 while let Some(()) = balance_changes.next().await {
839 let mut dbtx = db.begin_transaction_nc().await;
840 let balance = primary_module
841 .get_balance(primary_module_id, &mut dbtx, unit)
842 .await;
843
844 if balance != prev_balance {
846 prev_balance = balance;
847 yield balance;
848 }
849 }
850 })
851 }
852
853 async fn make_api_version_request(
858 delay: Duration,
859 peer_id: PeerId,
860 api: &DynGlobalApi,
861 ) -> (
862 PeerId,
863 Result<SupportedApiVersionsSummary, fedimint_api_client::api::PeerError>,
864 ) {
865 runtime::sleep(delay).await;
866 (
867 peer_id,
868 api.request_single_peer::<SupportedApiVersionsSummary>(
869 VERSION_ENDPOINT.to_owned(),
870 ApiRequestErased::default(),
871 peer_id,
872 )
873 .await,
874 )
875 }
876
877 fn create_api_version_backoff() -> impl Iterator<Item = Duration> {
883 custom_backoff(Duration::from_millis(200), Duration::from_secs(600), None)
884 }
885
886 pub async fn fetch_common_api_versions_from_all_peers(
889 num_peers: NumPeers,
890 api: DynGlobalApi,
891 db: Database,
892 num_responses_sender: watch::Sender<usize>,
893 ) {
894 let mut backoff = Self::create_api_version_backoff();
895
896 let mut requests = FuturesUnordered::new();
899
900 for peer_id in num_peers.peer_ids() {
901 requests.push(Self::make_api_version_request(
902 Duration::ZERO,
903 peer_id,
904 &api,
905 ));
906 }
907
908 let mut num_responses = 0;
909
910 while let Some((peer_id, response)) = requests.next().await {
911 let retry = match response {
912 Err(err) => {
913 let has_previous_response = db
914 .begin_transaction_nc()
915 .await
916 .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
917 .await
918 .is_some();
919 debug!(
920 target: LOG_CLIENT,
921 %peer_id,
922 err = %err.fmt_compact(),
923 %has_previous_response,
924 "Failed to refresh API versions of a peer"
925 );
926
927 !has_previous_response
928 }
929 Ok(o) => {
930 let mut dbtx = db.begin_transaction().await;
933 dbtx.insert_entry(
934 &PeerLastApiVersionsSummaryKey(peer_id),
935 &PeerLastApiVersionsSummary(o),
936 )
937 .await;
938 dbtx.commit_tx().await;
939 false
940 }
941 };
942
943 if retry {
944 requests.push(Self::make_api_version_request(
945 backoff.next().expect("Keeps retrying"),
946 peer_id,
947 &api,
948 ));
949 } else {
950 num_responses += 1;
951 num_responses_sender.send_replace(num_responses);
952 }
953 }
954 }
955
956 pub async fn fetch_peers_api_versions_from_threshold_of_peers(
960 num_peers: NumPeers,
961 api: DynGlobalApi,
962 ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
963 let mut backoff = Self::create_api_version_backoff();
964
965 let mut requests = FuturesUnordered::new();
968
969 for peer_id in num_peers.peer_ids() {
970 requests.push(Self::make_api_version_request(
971 Duration::ZERO,
972 peer_id,
973 &api,
974 ));
975 }
976
977 let mut successful_responses = BTreeMap::new();
978
979 while successful_responses.len() < num_peers.threshold()
980 && let Some((peer_id, response)) = requests.next().await
981 {
982 let retry = match response {
983 Err(err) => {
984 debug!(
985 target: LOG_CLIENT,
986 %peer_id,
987 err = %err.fmt_compact(),
988 "Failed to fetch API versions from peer"
989 );
990 true
991 }
992 Ok(response) => {
993 successful_responses.insert(peer_id, response);
994 false
995 }
996 };
997
998 if retry {
999 requests.push(Self::make_api_version_request(
1000 backoff.next().expect("Keeps retrying"),
1001 peer_id,
1002 &api,
1003 ));
1004 }
1005 }
1006
1007 successful_responses
1008 }
1009
1010 pub async fn fetch_common_api_versions(
1012 config: &ClientConfig,
1013 api: &DynGlobalApi,
1014 ) -> anyhow::Result<BTreeMap<PeerId, SupportedApiVersionsSummary>> {
1015 debug!(
1016 target: LOG_CLIENT,
1017 "Fetching common api versions"
1018 );
1019
1020 let num_peers = NumPeers::from(config.global.api_endpoints.len());
1021
1022 let peer_api_version_sets =
1023 Self::fetch_peers_api_versions_from_threshold_of_peers(num_peers, api.clone()).await;
1024
1025 Ok(peer_api_version_sets)
1026 }
1027
1028 pub async fn write_api_version_cache(
1032 dbtx: &mut DatabaseTransaction<'_>,
1033 api_version_set: ApiVersionSet,
1034 ) {
1035 debug!(
1036 target: LOG_CLIENT,
1037 value = ?api_version_set,
1038 "Writing API version set to cache"
1039 );
1040
1041 dbtx.insert_entry(
1042 &CachedApiVersionSetKey,
1043 &CachedApiVersionSet(api_version_set),
1044 )
1045 .await;
1046 }
1047
1048 pub async fn store_prefetched_api_versions(
1053 db: &Database,
1054 config: &ClientConfig,
1055 client_module_init: &ClientModuleInitRegistry,
1056 peer_api_versions: &BTreeMap<PeerId, SupportedApiVersionsSummary>,
1057 ) {
1058 debug!(
1059 target: LOG_CLIENT,
1060 "Storing {} prefetched peer API version responses and calculating common version set",
1061 peer_api_versions.len()
1062 );
1063
1064 let mut dbtx = db.begin_transaction().await;
1065 let client_supported_versions =
1067 Self::supported_api_versions_summary_static(config, client_module_init);
1068 match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1069 &client_supported_versions,
1070 peer_api_versions,
1071 ) {
1072 Ok(common_api_versions) => {
1073 Self::write_api_version_cache(&mut dbtx.to_ref_nc(), common_api_versions).await;
1075 debug!(target: LOG_CLIENT, "Calculated and stored common API version set");
1076 }
1077 Err(err) => {
1078 debug!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Failed to calculate common API versions from prefetched data");
1079 }
1080 }
1081
1082 for (peer_id, peer_api_versions) in peer_api_versions {
1084 dbtx.insert_entry(
1085 &PeerLastApiVersionsSummaryKey(*peer_id),
1086 &PeerLastApiVersionsSummary(peer_api_versions.clone()),
1087 )
1088 .await;
1089 }
1090 dbtx.commit_tx().await;
1091 debug!(target: LOG_CLIENT, "Stored individual peer API version responses");
1092 }
1093
1094 pub fn supported_api_versions_summary_static(
1096 config: &ClientConfig,
1097 client_module_init: &ClientModuleInitRegistry,
1098 ) -> SupportedApiVersionsSummary {
1099 SupportedApiVersionsSummary {
1100 core: SupportedCoreApiVersions {
1101 core_consensus: config.global.consensus_version,
1102 api: MultiApiVersion::try_from_iter(SUPPORTED_CORE_API_VERSIONS.to_owned())
1103 .expect("must not have conflicting versions"),
1104 },
1105 modules: config
1106 .modules
1107 .iter()
1108 .filter_map(|(&module_instance_id, module_config)| {
1109 client_module_init
1110 .get(module_config.kind())
1111 .map(|module_init| {
1112 (
1113 module_instance_id,
1114 SupportedModuleApiVersions {
1115 core_consensus: config.global.consensus_version,
1116 module_consensus: module_config.version,
1117 api: module_init.supported_api_versions(),
1118 },
1119 )
1120 })
1121 })
1122 .collect(),
1123 }
1124 }
1125
1126 pub async fn load_and_refresh_common_api_version(&self) -> anyhow::Result<ApiVersionSet> {
1127 Self::load_and_refresh_common_api_version_static(
1128 &self.config().await,
1129 &self.module_inits,
1130 &self.api,
1131 &self.db,
1132 &self.task_group,
1133 )
1134 .await
1135 }
1136
1137 async fn load_and_refresh_common_api_version_static(
1143 config: &ClientConfig,
1144 module_init: &ClientModuleInitRegistry,
1145 api: &DynGlobalApi,
1146 db: &Database,
1147 task_group: &TaskGroup,
1148 ) -> anyhow::Result<ApiVersionSet> {
1149 if let Some(v) = db
1150 .begin_transaction_nc()
1151 .await
1152 .get_value(&CachedApiVersionSetKey)
1153 .await
1154 {
1155 debug!(
1156 target: LOG_CLIENT,
1157 "Found existing cached common api versions"
1158 );
1159 let config = config.clone();
1160 let client_module_init = module_init.clone();
1161 let api = api.clone();
1162 let db = db.clone();
1163 let task_group = task_group.clone();
1164 task_group
1167 .clone()
1168 .spawn_cancellable("refresh_common_api_version_static", async move {
1169 if let Err(error) = Self::refresh_common_api_version_static(
1170 &config,
1171 &client_module_init,
1172 &api,
1173 &db,
1174 task_group,
1175 false,
1176 )
1177 .await
1178 {
1179 warn!(
1180 target: LOG_CLIENT,
1181 err = %error.fmt_compact_anyhow(), "Failed to discover common api versions"
1182 );
1183 }
1184 });
1185
1186 return Ok(v.0);
1187 }
1188
1189 info!(
1190 target: LOG_CLIENT,
1191 "Fetching initial API versions "
1192 );
1193 Self::refresh_common_api_version_static(
1194 config,
1195 module_init,
1196 api,
1197 db,
1198 task_group.clone(),
1199 true,
1200 )
1201 .await
1202 }
1203
1204 async fn refresh_common_api_version_static(
1205 config: &ClientConfig,
1206 client_module_init: &ClientModuleInitRegistry,
1207 api: &DynGlobalApi,
1208 db: &Database,
1209 task_group: TaskGroup,
1210 block_until_ok: bool,
1211 ) -> anyhow::Result<ApiVersionSet> {
1212 debug!(
1213 target: LOG_CLIENT,
1214 "Refreshing common api versions"
1215 );
1216
1217 let (num_responses_sender, mut num_responses_receiver) = tokio::sync::watch::channel(0);
1218 let num_peers = NumPeers::from(config.global.api_endpoints.len());
1219
1220 task_group.spawn_cancellable("refresh peers api versions", {
1221 Client::fetch_common_api_versions_from_all_peers(
1222 num_peers,
1223 api.clone(),
1224 db.clone(),
1225 num_responses_sender,
1226 )
1227 });
1228
1229 let common_api_versions = loop {
1230 let _: Result<_, Elapsed> = runtime::timeout(
1238 Duration::from_secs(30),
1239 num_responses_receiver.wait_for(|num| num_peers.threshold() <= *num),
1240 )
1241 .await;
1242
1243 let peer_api_version_sets = Self::load_peers_last_api_versions(db, num_peers).await;
1244
1245 match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1246 &Self::supported_api_versions_summary_static(config, client_module_init),
1247 &peer_api_version_sets,
1248 ) {
1249 Ok(o) => break o,
1250 Err(err) if block_until_ok => {
1251 warn!(
1252 target: LOG_CLIENT,
1253 err = %err.fmt_compact_anyhow(),
1254 "Failed to discover API version to use. Retrying..."
1255 );
1256 continue;
1257 }
1258 Err(e) => return Err(e),
1259 }
1260 };
1261
1262 debug!(
1263 target: LOG_CLIENT,
1264 value = ?common_api_versions,
1265 "Updating the cached common api versions"
1266 );
1267 let mut dbtx = db.begin_transaction().await;
1268 let _ = dbtx
1269 .insert_entry(
1270 &CachedApiVersionSetKey,
1271 &CachedApiVersionSet(common_api_versions.clone()),
1272 )
1273 .await;
1274
1275 dbtx.commit_tx().await;
1276
1277 Ok(common_api_versions)
1278 }
1279
1280 pub async fn get_metadata(&self) -> Metadata {
1282 self.db
1283 .begin_transaction_nc()
1284 .await
1285 .get_value(&ClientMetadataKey)
1286 .await
1287 .unwrap_or_else(|| {
1288 warn!(
1289 target: LOG_CLIENT,
1290 "Missing existing metadata. This key should have been set on Client init"
1291 );
1292 Metadata::empty()
1293 })
1294 }
1295
1296 pub async fn set_metadata(&self, metadata: &Metadata) {
1298 self.db
1299 .autocommit::<_, _, anyhow::Error>(
1300 |dbtx, _| {
1301 Box::pin(async {
1302 Self::set_metadata_dbtx(dbtx, metadata).await;
1303 Ok(())
1304 })
1305 },
1306 None,
1307 )
1308 .await
1309 .expect("Failed to autocommit metadata");
1310 }
1311
1312 pub fn has_pending_recoveries(&self) -> bool {
1313 !self
1314 .client_recovery_progress_receiver
1315 .borrow()
1316 .iter()
1317 .all(|(_id, progress)| progress.is_done())
1318 }
1319
1320 pub async fn wait_for_all_recoveries(&self) -> anyhow::Result<()> {
1328 let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1329 recovery_receiver
1330 .wait_for(|in_progress| {
1331 in_progress
1332 .iter()
1333 .all(|(_id, progress)| progress.is_done())
1334 })
1335 .await
1336 .context("Recovery task completed and update receiver disconnected, but some modules failed to recover")?;
1337
1338 Ok(())
1339 }
1340
1341 pub fn subscribe_to_recovery_progress(
1346 &self,
1347 ) -> impl Stream<Item = (ModuleInstanceId, RecoveryProgress)> + use<> {
1348 WatchStream::new(self.client_recovery_progress_receiver.clone())
1349 .flat_map(futures::stream::iter)
1350 }
1351
1352 pub async fn wait_for_module_kind_recovery(
1353 &self,
1354 module_kind: ModuleKind,
1355 ) -> anyhow::Result<()> {
1356 let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1357 let config = self.config().await;
1358 recovery_receiver
1359 .wait_for(|in_progress| {
1360 !in_progress
1361 .iter()
1362 .filter(|(module_instance_id, _progress)| {
1363 config.modules[module_instance_id].kind == module_kind
1364 })
1365 .any(|(_id, progress)| !progress.is_done())
1366 })
1367 .await
1368 .context("Recovery task completed and update receiver disconnected, but the desired modules are still unavailable or failed to recover")?;
1369
1370 Ok(())
1371 }
1372
1373 pub async fn wait_for_all_active_state_machines(&self) -> anyhow::Result<()> {
1374 loop {
1375 if self.executor.get_active_states().await.is_empty() {
1376 break;
1377 }
1378 fedimint_core::runtime::sleep(Duration::from_millis(100)).await;
1379 }
1380 Ok(())
1381 }
1382
1383 pub async fn set_metadata_dbtx(dbtx: &mut DatabaseTransaction<'_>, metadata: &Metadata) {
1385 dbtx.insert_new_entry(&ClientMetadataKey, metadata).await;
1386 }
1387
1388 fn spawn_module_recoveries_task(
1389 &self,
1390 recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1391 module_recoveries: BTreeMap<
1392 ModuleInstanceId,
1393 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1394 >,
1395 module_recovery_progress_receivers: BTreeMap<
1396 ModuleInstanceId,
1397 watch::Receiver<RecoveryProgress>,
1398 >,
1399 ) {
1400 let db = self.db.clone();
1401 let log_ordering_wakeup_tx = self.log_ordering_wakeup_tx.clone();
1402 self.task_group
1403 .spawn("module recoveries", |_task_handle| async {
1404 Self::run_module_recoveries_task(
1405 db,
1406 log_ordering_wakeup_tx,
1407 recovery_sender,
1408 module_recoveries,
1409 module_recovery_progress_receivers,
1410 )
1411 .await;
1412 });
1413 }
1414
1415 async fn run_module_recoveries_task(
1416 db: Database,
1417 log_ordering_wakeup_tx: watch::Sender<()>,
1418 recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1419 module_recoveries: BTreeMap<
1420 ModuleInstanceId,
1421 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1422 >,
1423 module_recovery_progress_receivers: BTreeMap<
1424 ModuleInstanceId,
1425 watch::Receiver<RecoveryProgress>,
1426 >,
1427 ) {
1428 debug!(target: LOG_CLIENT_RECOVERY, num_modules=%module_recovery_progress_receivers.len(), "Staring module recoveries");
1429 let mut completed_stream = Vec::new();
1430 let progress_stream = futures::stream::FuturesUnordered::new();
1431
1432 for (module_instance_id, f) in module_recoveries {
1433 completed_stream.push(futures::stream::once(Box::pin(async move {
1434 match f.await {
1435 Ok(()) => (module_instance_id, None),
1436 Err(err) => {
1437 warn!(
1438 target: LOG_CLIENT,
1439 err = %err.fmt_compact_anyhow(), module_instance_id, "Module recovery failed"
1440 );
1441 futures::future::pending::<()>().await;
1445 unreachable!()
1446 }
1447 }
1448 })));
1449 }
1450
1451 for (module_instance_id, rx) in module_recovery_progress_receivers {
1452 progress_stream.push(
1453 tokio_stream::wrappers::WatchStream::new(rx)
1454 .fuse()
1455 .map(move |progress| (module_instance_id, Some(progress))),
1456 );
1457 }
1458
1459 let mut futures = futures::stream::select(
1460 futures::stream::select_all(progress_stream),
1461 futures::stream::select_all(completed_stream),
1462 );
1463
1464 while let Some((module_instance_id, progress)) = futures.next().await {
1465 let mut dbtx = db.begin_transaction().await;
1466
1467 let prev_progress = *recovery_sender
1468 .borrow()
1469 .get(&module_instance_id)
1470 .expect("existing progress must be present");
1471
1472 let progress = if prev_progress.is_done() {
1473 prev_progress
1475 } else if let Some(progress) = progress {
1476 progress
1477 } else {
1478 prev_progress.to_complete()
1479 };
1480
1481 if !prev_progress.is_done() && progress.is_done() {
1482 info!(
1483 target: LOG_CLIENT,
1484 module_instance_id,
1485 progress = format!("{}/{}", progress.complete, progress.total),
1486 "Recovery complete"
1487 );
1488 dbtx.log_event(
1489 log_ordering_wakeup_tx.clone(),
1490 None,
1491 ModuleRecoveryCompleted {
1492 module_id: module_instance_id,
1493 },
1494 )
1495 .await;
1496 } else {
1497 info!(
1498 target: LOG_CLIENT,
1499 module_instance_id,
1500 progress = format!("{}/{}", progress.complete, progress.total),
1501 "Recovery progress"
1502 );
1503 }
1504
1505 dbtx.insert_entry(
1506 &ClientModuleRecovery { module_instance_id },
1507 &ClientModuleRecoveryState { progress },
1508 )
1509 .await;
1510 dbtx.commit_tx().await;
1511
1512 recovery_sender.send_modify(|v| {
1513 v.insert(module_instance_id, progress);
1514 });
1515 }
1516 debug!(target: LOG_CLIENT_RECOVERY, "Recovery executor stopped");
1517 }
1518
1519 async fn load_peers_last_api_versions(
1520 db: &Database,
1521 num_peers: NumPeers,
1522 ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1523 let mut peer_api_version_sets = BTreeMap::new();
1524
1525 let mut dbtx = db.begin_transaction_nc().await;
1526 for peer_id in num_peers.peer_ids() {
1527 if let Some(v) = dbtx
1528 .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1529 .await
1530 {
1531 peer_api_version_sets.insert(peer_id, v.0);
1532 }
1533 }
1534 drop(dbtx);
1535 peer_api_version_sets
1536 }
1537
1538 pub async fn get_peer_url_announcements(&self) -> BTreeMap<PeerId, SignedApiAnnouncement> {
1541 self.db()
1542 .begin_transaction_nc()
1543 .await
1544 .find_by_prefix(&ApiAnnouncementPrefix)
1545 .await
1546 .map(|(announcement_key, announcement)| (announcement_key.0, announcement))
1547 .collect()
1548 .await
1549 }
1550
1551 pub async fn get_peer_urls(&self) -> BTreeMap<PeerId, SafeUrl> {
1553 get_api_urls(&self.db, &self.config().await).await
1554 }
1555
1556 pub async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
1559 self.get_peer_urls()
1560 .await
1561 .into_iter()
1562 .find_map(|(peer_id, url)| (peer == peer_id).then_some(url))
1563 .map(|peer_url| {
1564 InviteCode::new(
1565 peer_url.clone(),
1566 peer,
1567 self.federation_id(),
1568 self.api_secret.clone(),
1569 )
1570 })
1571 }
1572
1573 pub async fn get_guardian_public_keys_blocking(
1577 &self,
1578 ) -> BTreeMap<PeerId, fedimint_core::secp256k1::PublicKey> {
1579 self.db
1580 .autocommit(
1581 |dbtx, _| {
1582 Box::pin(async move {
1583 let config = self.config().await;
1584
1585 let guardian_pub_keys = self
1586 .get_or_backfill_broadcast_public_keys(dbtx, config)
1587 .await;
1588
1589 Result::<_, ()>::Ok(guardian_pub_keys)
1590 })
1591 },
1592 None,
1593 )
1594 .await
1595 .expect("Will retry forever")
1596 }
1597
1598 async fn get_or_backfill_broadcast_public_keys(
1599 &self,
1600 dbtx: &mut DatabaseTransaction<'_>,
1601 config: ClientConfig,
1602 ) -> BTreeMap<PeerId, PublicKey> {
1603 match config.global.broadcast_public_keys {
1604 Some(guardian_pub_keys) => guardian_pub_keys,
1605 _ => {
1606 let (guardian_pub_keys, new_config) = self.fetch_and_update_config(config).await;
1607
1608 dbtx.insert_entry(&ClientConfigKey, &new_config).await;
1609 *(self.config.write().await) = new_config;
1610 guardian_pub_keys
1611 }
1612 }
1613 }
1614
1615 async fn fetch_session_count(&self) -> FederationResult<u64> {
1616 self.api.session_count().await
1617 }
1618
1619 async fn fetch_and_update_config(
1620 &self,
1621 config: ClientConfig,
1622 ) -> (BTreeMap<PeerId, PublicKey>, ClientConfig) {
1623 let fetched_config = retry(
1624 "Fetching guardian public keys",
1625 backoff_util::background_backoff(),
1626 || async {
1627 Ok(self
1628 .api
1629 .request_current_consensus::<ClientConfig>(
1630 CLIENT_CONFIG_ENDPOINT.to_owned(),
1631 ApiRequestErased::default(),
1632 )
1633 .await?)
1634 },
1635 )
1636 .await
1637 .expect("Will never return on error");
1638
1639 let Some(guardian_pub_keys) = fetched_config.global.broadcast_public_keys else {
1640 warn!(
1641 target: LOG_CLIENT,
1642 "Guardian public keys not found in fetched config, server not updated to 0.4 yet"
1643 );
1644 pending::<()>().await;
1645 unreachable!("Pending will never return");
1646 };
1647
1648 let new_config = ClientConfig {
1649 global: GlobalClientConfig {
1650 broadcast_public_keys: Some(guardian_pub_keys.clone()),
1651 ..config.global
1652 },
1653 modules: config.modules,
1654 };
1655 (guardian_pub_keys, new_config)
1656 }
1657
1658 pub fn handle_global_rpc(
1659 &self,
1660 method: String,
1661 params: serde_json::Value,
1662 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1663 Box::pin(try_stream! {
1664 match method.as_str() {
1665 "get_balance" => {
1666 let balance = self.get_balance_for_btc().await.unwrap_or_default();
1667 yield serde_json::to_value(balance)?;
1668 }
1669 "subscribe_balance_changes" => {
1670 let req: GetBalanceChangesRequest= serde_json::from_value(params)?;
1671 let mut stream = self.subscribe_balance_changes(req.unit).await;
1672 while let Some(balance) = stream.next().await {
1673 yield serde_json::to_value(balance)?;
1674 }
1675 }
1676 "get_config" => {
1677 let config = self.config().await;
1678 yield serde_json::to_value(config)?;
1679 }
1680 "get_federation_id" => {
1681 let federation_id = self.federation_id();
1682 yield serde_json::to_value(federation_id)?;
1683 }
1684 "get_invite_code" => {
1685 let req: GetInviteCodeRequest = serde_json::from_value(params)?;
1686 let invite_code = self.invite_code(req.peer).await;
1687 yield serde_json::to_value(invite_code)?;
1688 }
1689 "get_operation" => {
1690 let req: GetOperationIdRequest = serde_json::from_value(params)?;
1691 let operation = self.operation_log().get_operation(req.operation_id).await;
1692 yield serde_json::to_value(operation)?;
1693 }
1694 "list_operations" => {
1695 let req: ListOperationsParams = serde_json::from_value(params)?;
1696 let limit = if req.limit.is_none() && req.last_seen.is_none() {
1697 usize::MAX
1698 } else {
1699 req.limit.unwrap_or(usize::MAX)
1700 };
1701 let operations = self.operation_log()
1702 .paginate_operations_rev(limit, req.last_seen)
1703 .await;
1704 yield serde_json::to_value(operations)?;
1705 }
1706 "session_count" => {
1707 let count = self.fetch_session_count().await?;
1708 yield serde_json::to_value(count)?;
1709 }
1710 "has_pending_recoveries" => {
1711 let has_pending = self.has_pending_recoveries();
1712 yield serde_json::to_value(has_pending)?;
1713 }
1714 "wait_for_all_recoveries" => {
1715 self.wait_for_all_recoveries().await?;
1716 yield serde_json::Value::Null;
1717 }
1718 "subscribe_to_recovery_progress" => {
1719 let mut stream = self.subscribe_to_recovery_progress();
1720 while let Some((module_id, progress)) = stream.next().await {
1721 yield serde_json::json!({
1722 "module_id": module_id,
1723 "progress": progress
1724 });
1725 }
1726 }
1727 "backup_to_federation" => {
1728 let metadata = if params.is_null() {
1729 Metadata::from_json_serialized(serde_json::json!({}))
1730 } else {
1731 Metadata::from_json_serialized(params)
1732 };
1733 self.backup_to_federation(metadata).await?;
1734 yield serde_json::Value::Null;
1735 }
1736 _ => {
1737 Err(anyhow::format_err!("Unknown method: {}", method))?;
1738 unreachable!()
1739 },
1740 }
1741 })
1742 }
1743
1744 pub async fn log_event<E>(&self, module_id: Option<ModuleInstanceId>, event: E)
1745 where
1746 E: Event + Send,
1747 {
1748 let mut dbtx = self.db.begin_transaction().await;
1749 self.log_event_dbtx(&mut dbtx, module_id, event).await;
1750 dbtx.commit_tx().await;
1751 }
1752
1753 pub async fn log_event_dbtx<E, Cap>(
1754 &self,
1755 dbtx: &mut DatabaseTransaction<'_, Cap>,
1756 module_id: Option<ModuleInstanceId>,
1757 event: E,
1758 ) where
1759 E: Event + Send,
1760 Cap: Send,
1761 {
1762 dbtx.log_event(self.log_ordering_wakeup_tx.clone(), module_id, event)
1763 .await;
1764 }
1765
1766 pub async fn log_event_raw_dbtx<Cap>(
1767 &self,
1768 dbtx: &mut DatabaseTransaction<'_, Cap>,
1769 kind: EventKind,
1770 module: Option<(ModuleKind, ModuleInstanceId)>,
1771 payload: Vec<u8>,
1772 persist: EventPersistence,
1773 ) where
1774 Cap: Send,
1775 {
1776 let module_id = module.as_ref().map(|m| m.1);
1777 let module_kind = module.map(|m| m.0);
1778 dbtx.log_event_raw(
1779 self.log_ordering_wakeup_tx.clone(),
1780 kind,
1781 module_kind,
1782 module_id,
1783 payload,
1784 persist,
1785 )
1786 .await;
1787 }
1788
1789 pub async fn handle_events<F, R, K>(&self, pos_key: &K, call_fn: F) -> anyhow::Result<()>
1790 where
1791 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1792 K: DatabaseRecord<Value = EventLogId>,
1793 F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
1794 R: Future<Output = anyhow::Result<()>>,
1795 {
1796 fedimint_eventlog::handle_events(
1797 self.db.clone(),
1798 pos_key,
1799 self.log_event_added_rx.clone(),
1800 call_fn,
1801 )
1802 .await
1803 }
1804
1805 pub async fn handle_trimable_events<F, R, K>(
1806 &self,
1807 pos_key: &K,
1808 call_fn: F,
1809 ) -> anyhow::Result<()>
1810 where
1811 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1812 K: DatabaseRecord<Value = EventLogTrimableId>,
1813 F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
1814 R: Future<Output = anyhow::Result<()>>,
1815 {
1816 fedimint_eventlog::handle_trimable_events(
1817 self.db.clone(),
1818 pos_key,
1819 self.log_event_added_rx.clone(),
1820 call_fn,
1821 )
1822 .await
1823 }
1824
1825 pub async fn get_event_log(
1826 &self,
1827 pos: Option<EventLogId>,
1828 limit: u64,
1829 ) -> Vec<PersistedLogEntry> {
1830 self.get_event_log_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
1831 .await
1832 }
1833
1834 pub async fn get_event_log_trimable(
1835 &self,
1836 pos: Option<EventLogTrimableId>,
1837 limit: u64,
1838 ) -> Vec<PersistedLogEntry> {
1839 self.get_event_log_trimable_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
1840 .await
1841 }
1842
1843 pub async fn get_event_log_dbtx<Cap>(
1844 &self,
1845 dbtx: &mut DatabaseTransaction<'_, Cap>,
1846 pos: Option<EventLogId>,
1847 limit: u64,
1848 ) -> Vec<PersistedLogEntry>
1849 where
1850 Cap: Send,
1851 {
1852 dbtx.get_event_log(pos, limit).await
1853 }
1854
1855 pub async fn get_event_log_trimable_dbtx<Cap>(
1856 &self,
1857 dbtx: &mut DatabaseTransaction<'_, Cap>,
1858 pos: Option<EventLogTrimableId>,
1859 limit: u64,
1860 ) -> Vec<PersistedLogEntry>
1861 where
1862 Cap: Send,
1863 {
1864 dbtx.get_event_log_trimable(pos, limit).await
1865 }
1866
1867 pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
1869 self.log_event_added_transient_tx.subscribe()
1870 }
1871
1872 pub fn iroh_enable_dht(&self) -> bool {
1873 self.iroh_enable_dht
1874 }
1875
1876 pub(crate) async fn run_core_migrations(
1877 db_no_decoders: &Database,
1878 ) -> Result<(), anyhow::Error> {
1879 let mut dbtx = db_no_decoders.begin_transaction().await;
1880 apply_migrations_core_client_dbtx(&mut dbtx.to_ref_nc(), "fedimint-client".to_string())
1881 .await?;
1882 if is_running_in_test_env() {
1883 verify_client_db_integrity_dbtx(&mut dbtx.to_ref_nc()).await;
1884 }
1885 dbtx.commit_tx_result().await?;
1886 Ok(())
1887 }
1888
1889 fn primary_modules_for_unit(
1891 &self,
1892 unit: AmountUnit,
1893 ) -> impl Iterator<Item = (ModuleInstanceId, &DynClientModule)> {
1894 self.primary_modules
1895 .iter()
1896 .flat_map(move |(_prio, candidates)| {
1897 candidates
1898 .specific
1899 .get(&unit)
1900 .into_iter()
1901 .flatten()
1902 .copied()
1903 .chain(candidates.wildcard.iter().copied())
1905 })
1906 .map(|id| (id, self.modules.get_expect(id)))
1907 }
1908
1909 pub fn primary_module_for_unit(
1913 &self,
1914 unit: AmountUnit,
1915 ) -> Option<(ModuleInstanceId, &DynClientModule)> {
1916 self.primary_modules_for_unit(unit).next()
1917 }
1918
1919 pub fn primary_module_for_btc(&self) -> (ModuleInstanceId, &DynClientModule) {
1921 self.primary_module_for_unit(AmountUnit::BITCOIN)
1922 .expect("No primary module for Bitcoin")
1923 }
1924}
1925
1926#[apply(async_trait_maybe_send!)]
1927impl ClientContextIface for Client {
1928 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
1929 Client::get_module(self, instance)
1930 }
1931
1932 fn api_clone(&self) -> DynGlobalApi {
1933 Client::api_clone(self)
1934 }
1935 fn decoders(&self) -> &ModuleDecoderRegistry {
1936 Client::decoders(self)
1937 }
1938
1939 async fn finalize_and_submit_transaction(
1940 &self,
1941 operation_id: OperationId,
1942 operation_type: &str,
1943 operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
1944 tx_builder: TransactionBuilder,
1945 ) -> anyhow::Result<OutPointRange> {
1946 Client::finalize_and_submit_transaction(
1947 self,
1948 operation_id,
1949 operation_type,
1950 &operation_meta_gen,
1952 tx_builder,
1953 )
1954 .await
1955 }
1956
1957 async fn finalize_and_submit_transaction_inner(
1958 &self,
1959 dbtx: &mut DatabaseTransaction<'_>,
1960 operation_id: OperationId,
1961 tx_builder: TransactionBuilder,
1962 ) -> anyhow::Result<OutPointRange> {
1963 Client::finalize_and_submit_transaction_inner(self, dbtx, operation_id, tx_builder).await
1964 }
1965
1966 async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
1967 Client::transaction_updates(self, operation_id).await
1968 }
1969
1970 async fn await_primary_module_outputs(
1971 &self,
1972 operation_id: OperationId,
1973 outputs: Vec<OutPoint>,
1975 ) -> anyhow::Result<()> {
1976 Client::await_primary_bitcoin_module_outputs(self, operation_id, outputs).await
1977 }
1978
1979 fn operation_log(&self) -> &dyn IOperationLog {
1980 Client::operation_log(self)
1981 }
1982
1983 async fn has_active_states(&self, operation_id: OperationId) -> bool {
1984 Client::has_active_states(self, operation_id).await
1985 }
1986
1987 async fn operation_exists(&self, operation_id: OperationId) -> bool {
1988 Client::operation_exists(self, operation_id).await
1989 }
1990
1991 async fn config(&self) -> ClientConfig {
1992 Client::config(self).await
1993 }
1994
1995 fn db(&self) -> &Database {
1996 Client::db(self)
1997 }
1998
1999 fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static)) {
2000 Client::executor(self)
2001 }
2002
2003 async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
2004 Client::invite_code(self, peer).await
2005 }
2006
2007 fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
2008 Client::get_internal_payment_markers(self)
2009 }
2010
2011 async fn log_event_json(
2012 &self,
2013 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
2014 module_kind: Option<ModuleKind>,
2015 module_id: ModuleInstanceId,
2016 kind: EventKind,
2017 payload: serde_json::Value,
2018 persist: EventPersistence,
2019 ) {
2020 dbtx.ensure_global()
2021 .expect("Must be called with global dbtx");
2022 self.log_event_raw_dbtx(
2023 dbtx,
2024 kind,
2025 module_kind.map(|kind| (kind, module_id)),
2026 serde_json::to_vec(&payload).expect("Serialization can't fail"),
2027 persist,
2028 )
2029 .await;
2030 }
2031
2032 async fn read_operation_active_states<'dbtx>(
2033 &self,
2034 operation_id: OperationId,
2035 module_id: ModuleInstanceId,
2036 dbtx: &'dbtx mut DatabaseTransaction<'_>,
2037 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>
2038 {
2039 Box::pin(
2040 dbtx.find_by_prefix(&ActiveModuleOperationStateKeyPrefix {
2041 operation_id,
2042 module_instance: module_id,
2043 })
2044 .await
2045 .map(move |(k, v)| (k.0, v)),
2046 )
2047 }
2048 async fn read_operation_inactive_states<'dbtx>(
2049 &self,
2050 operation_id: OperationId,
2051 module_id: ModuleInstanceId,
2052 dbtx: &'dbtx mut DatabaseTransaction<'_>,
2053 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>
2054 {
2055 Box::pin(
2056 dbtx.find_by_prefix(&InactiveModuleOperationStateKeyPrefix {
2057 operation_id,
2058 module_instance: module_id,
2059 })
2060 .await
2061 .map(move |(k, v)| (k.0, v)),
2062 )
2063 }
2064}
2065
2066impl fmt::Debug for Client {
2068 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2069 write!(f, "Client")
2070 }
2071}
2072
2073pub fn client_decoders<'a>(
2074 registry: &ModuleInitRegistry<DynClientModuleInit>,
2075 module_kinds: impl Iterator<Item = (ModuleInstanceId, &'a ModuleKind)>,
2076) -> ModuleDecoderRegistry {
2077 let mut modules = BTreeMap::new();
2078 for (id, kind) in module_kinds {
2079 let Some(init) = registry.get(kind) else {
2080 debug!("Detected configuration for unsupported module id: {id}, kind: {kind}");
2081 continue;
2082 };
2083
2084 modules.insert(
2085 id,
2086 (
2087 kind.clone(),
2088 IClientModuleInit::decoder(AsRef::<dyn IClientModuleInit + 'static>::as_ref(init)),
2089 ),
2090 );
2091 }
2092 ModuleDecoderRegistry::from(modules)
2093}