1use std::collections::{BTreeMap, HashSet};
2use std::fmt::{self, Formatter};
3use std::future::{Future, pending};
4use std::ops::Range;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9use anyhow::{Context as _, anyhow, bail, format_err};
10use async_stream::try_stream;
11use bitcoin::key::Secp256k1;
12use bitcoin::key::rand::thread_rng;
13use bitcoin::secp256k1::{self, PublicKey};
14use fedimint_api_client::api::global_api::with_request_hook::ApiRequestHook;
15use fedimint_api_client::api::net::ConnectorType;
16use fedimint_api_client::api::{
17 ApiVersionSet, DynGlobalApi, FederationApiExt as _, FederationResult, IGlobalFederationApi,
18};
19use fedimint_client_module::module::recovery::RecoveryProgress;
20use fedimint_client_module::module::{
21 ClientContextIface, ClientModule, ClientModuleRegistry, DynClientModule, FinalClientIface,
22 IClientModule, IdxRange, OutPointRange, PrimaryModulePriority,
23};
24use fedimint_client_module::oplog::IOperationLog;
25use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy as _};
26use fedimint_client_module::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
27use fedimint_client_module::sm::{ActiveStateMeta, DynState, InactiveStateMeta};
28use fedimint_client_module::transaction::{
29 TRANSACTION_SUBMISSION_MODULE_INSTANCE, TransactionBuilder, TxSubmissionStates,
30 TxSubmissionStatesSM,
31};
32use fedimint_client_module::{
33 AddStateMachinesResult, ClientModuleInstance, GetInviteCodeRequest, ModuleGlobalContextGen,
34 ModuleRecoveryCompleted, TransactionUpdates, TxCreatedEvent,
35};
36use fedimint_connectors::ConnectorRegistry;
37use fedimint_core::config::{
38 ClientConfig, FederationId, GlobalClientConfig, JsonClientConfig, ModuleInitRegistry,
39};
40use fedimint_core::core::{DynInput, DynOutput, ModuleInstanceId, ModuleKind, OperationId};
41use fedimint_core::db::{
42 AutocommitError, Database, DatabaseRecord, DatabaseTransaction,
43 IDatabaseTransactionOpsCore as _, IDatabaseTransactionOpsCoreTyped as _, NonCommittable,
44};
45use fedimint_core::encoding::{Decodable, Encodable};
46use fedimint_core::endpoint_constants::{CLIENT_CONFIG_ENDPOINT, VERSION_ENDPOINT};
47use fedimint_core::envs::is_running_in_test_env;
48use fedimint_core::invite_code::InviteCode;
49use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
50use fedimint_core::module::{
51 AmountUnit, Amounts, ApiRequestErased, ApiVersion, MultiApiVersion,
52 SupportedApiVersionsSummary, SupportedCoreApiVersions, SupportedModuleApiVersions,
53};
54use fedimint_core::net::api_announcement::SignedApiAnnouncement;
55use fedimint_core::runtime::sleep;
56use fedimint_core::task::{Elapsed, MaybeSend, MaybeSync, TaskGroup};
57use fedimint_core::transaction::Transaction;
58use fedimint_core::util::backoff_util::custom_backoff;
59use fedimint_core::util::{
60 BoxStream, FmtCompact as _, FmtCompactAnyhow as _, SafeUrl, backoff_util, retry,
61};
62use fedimint_core::{
63 Amount, NumPeers, OutPoint, PeerId, apply, async_trait_maybe_send, maybe_add_send,
64 maybe_add_send_sync, runtime,
65};
66use fedimint_derive_secret::DerivableSecret;
67use fedimint_eventlog::{
68 DBTransactionEventLogExt as _, DynEventLogTrimableTracker, Event, EventKind, EventLogEntry,
69 EventLogId, EventLogTrimableId, EventLogTrimableTracker, EventPersistence, PersistedLogEntry,
70};
71use fedimint_logging::{LOG_CLIENT, LOG_CLIENT_NET_API, LOG_CLIENT_RECOVERY};
72use futures::stream::FuturesUnordered;
73use futures::{Stream, StreamExt as _};
74use global_ctx::ModuleGlobalClientContext;
75use serde::{Deserialize, Serialize};
76use tokio::sync::{broadcast, watch};
77use tokio_stream::wrappers::WatchStream;
78use tracing::{debug, info, warn};
79
80use crate::ClientBuilder;
81use crate::api_announcements::{ApiAnnouncementPrefix, get_api_urls};
82use crate::backup::Metadata;
83use crate::client::event_log::DefaultApplicationEventLogKey;
84use crate::db::{
85 ApiSecretKey, CachedApiVersionSet, CachedApiVersionSetKey, ChronologicalOperationLogKey,
86 ClientConfigKey, ClientMetadataKey, ClientModuleRecovery, ClientModuleRecoveryState,
87 EncodedClientSecretKey, OperationLogKey, PeerLastApiVersionsSummary,
88 PeerLastApiVersionsSummaryKey, PendingClientConfigKey, apply_migrations_core_client_dbtx,
89 get_decoded_client_secret, verify_client_db_integrity_dbtx,
90};
91use crate::meta::MetaService;
92use crate::module_init::{ClientModuleInitRegistry, DynClientModuleInit, IClientModuleInit};
93use crate::oplog::OperationLog;
94use crate::sm::executor::{
95 ActiveModuleOperationStateKeyPrefix, ActiveOperationStateKeyPrefix, Executor,
96 InactiveModuleOperationStateKeyPrefix, InactiveOperationStateKeyPrefix,
97};
98
99pub(crate) mod builder;
100pub(crate) mod event_log;
101pub(crate) mod global_ctx;
102pub(crate) mod handle;
103
104const SUPPORTED_CORE_API_VERSIONS: &[fedimint_core::module::ApiVersion] =
108 &[ApiVersion { major: 0, minor: 0 }];
109
110#[derive(Default)]
112pub(crate) struct PrimaryModuleCandidates {
113 specific: BTreeMap<AmountUnit, Vec<ModuleInstanceId>>,
115 wildcard: Vec<ModuleInstanceId>,
117}
118
119pub struct Client {
133 final_client: FinalClientIface,
134 config: tokio::sync::RwLock<ClientConfig>,
135 api_secret: Option<String>,
136 decoders: ModuleDecoderRegistry,
137 connectors: ConnectorRegistry,
138 db: Database,
139 federation_id: FederationId,
140 federation_config_meta: BTreeMap<String, String>,
141 primary_modules: BTreeMap<PrimaryModulePriority, PrimaryModuleCandidates>,
142 pub(crate) modules: ClientModuleRegistry,
143 module_inits: ClientModuleInitRegistry,
144 executor: Executor,
145 pub(crate) api: DynGlobalApi,
146 root_secret: DerivableSecret,
147 operation_log: OperationLog,
148 secp_ctx: Secp256k1<secp256k1::All>,
149 meta_service: Arc<MetaService>,
150 connector: ConnectorType,
151
152 task_group: TaskGroup,
153
154 client_recovery_progress_receiver:
156 watch::Receiver<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
157
158 log_ordering_wakeup_tx: watch::Sender<()>,
161 log_event_added_rx: watch::Receiver<()>,
163 log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
164 request_hook: ApiRequestHook,
165 iroh_enable_dht: bool,
166 iroh_enable_next: bool,
167}
168
169#[derive(Debug, Serialize, Deserialize)]
170struct ListOperationsParams {
171 limit: Option<usize>,
172 last_seen: Option<ChronologicalOperationLogKey>,
173}
174
175#[derive(Debug, Clone, Serialize, Deserialize)]
176pub struct GetOperationIdRequest {
177 operation_id: OperationId,
178}
179
180#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct GetBalanceChangesRequest {
182 #[serde(default = "AmountUnit::bitcoin")]
183 unit: AmountUnit,
184}
185
186impl Client {
187 pub async fn builder() -> anyhow::Result<ClientBuilder> {
190 Ok(ClientBuilder::new())
191 }
192
193 pub fn api(&self) -> &(dyn IGlobalFederationApi + 'static) {
194 self.api.as_ref()
195 }
196
197 pub fn api_clone(&self) -> DynGlobalApi {
198 self.api.clone()
199 }
200
201 pub fn task_group(&self) -> &TaskGroup {
203 &self.task_group
204 }
205
206 #[doc(hidden)]
208 pub fn executor(&self) -> &Executor {
209 &self.executor
210 }
211
212 pub async fn get_config_from_db(db: &Database) -> Option<ClientConfig> {
213 let mut dbtx = db.begin_transaction_nc().await;
214 dbtx.get_value(&ClientConfigKey).await
215 }
216
217 pub async fn get_pending_config_from_db(db: &Database) -> Option<ClientConfig> {
218 let mut dbtx = db.begin_transaction_nc().await;
219 dbtx.get_value(&PendingClientConfigKey).await
220 }
221
222 pub async fn get_api_secret_from_db(db: &Database) -> Option<String> {
223 let mut dbtx = db.begin_transaction_nc().await;
224 dbtx.get_value(&ApiSecretKey).await
225 }
226
227 pub async fn store_encodable_client_secret<T: Encodable>(
228 db: &Database,
229 secret: T,
230 ) -> anyhow::Result<()> {
231 let mut dbtx = db.begin_transaction().await;
232
233 if dbtx.get_value(&EncodedClientSecretKey).await.is_some() {
235 bail!("Encoded client secret already exists, cannot overwrite")
236 }
237
238 let encoded_secret = T::consensus_encode_to_vec(&secret);
239 dbtx.insert_entry(&EncodedClientSecretKey, &encoded_secret)
240 .await;
241 dbtx.commit_tx().await;
242 Ok(())
243 }
244
245 pub async fn load_decodable_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
246 let Some(secret) = Self::load_decodable_client_secret_opt(db).await? else {
247 bail!("Encoded client secret not present in DB")
248 };
249
250 Ok(secret)
251 }
252 pub async fn load_decodable_client_secret_opt<T: Decodable>(
253 db: &Database,
254 ) -> anyhow::Result<Option<T>> {
255 let mut dbtx = db.begin_transaction_nc().await;
256
257 let client_secret = dbtx.get_value(&EncodedClientSecretKey).await;
258
259 Ok(match client_secret {
260 Some(client_secret) => Some(
261 T::consensus_decode_whole(&client_secret, &ModuleRegistry::default())
262 .map_err(|e| anyhow!("Decoding failed: {e}"))?,
263 ),
264 None => None,
265 })
266 }
267
268 pub async fn load_or_generate_client_secret(db: &Database) -> anyhow::Result<[u8; 64]> {
269 let client_secret = match Self::load_decodable_client_secret::<[u8; 64]>(db).await {
270 Ok(secret) => secret,
271 _ => {
272 let secret = PlainRootSecretStrategy::random(&mut thread_rng());
273 Self::store_encodable_client_secret(db, secret)
274 .await
275 .expect("Storing client secret must work");
276 secret
277 }
278 };
279 Ok(client_secret)
280 }
281
282 pub async fn is_initialized(db: &Database) -> bool {
283 let mut dbtx = db.begin_transaction_nc().await;
284 dbtx.raw_get_bytes(&[ClientConfigKey::DB_PREFIX])
285 .await
286 .expect("Unrecoverable error occurred while reading and entry from the database")
287 .is_some()
288 }
289
290 pub fn start_executor(self: &Arc<Self>) {
291 debug!(
292 target: LOG_CLIENT,
293 "Starting fedimint client executor",
294 );
295 self.executor.start_executor(self.context_gen());
296 }
297
298 pub fn federation_id(&self) -> FederationId {
299 self.federation_id
300 }
301
302 fn context_gen(self: &Arc<Self>) -> ModuleGlobalContextGen {
303 let client_inner = Arc::downgrade(self);
304 Arc::new(move |module_instance, operation| {
305 ModuleGlobalClientContext {
306 client: client_inner
307 .clone()
308 .upgrade()
309 .expect("ModuleGlobalContextGen called after client was dropped"),
310 module_instance_id: module_instance,
311 operation,
312 }
313 .into()
314 })
315 }
316
317 pub async fn config(&self) -> ClientConfig {
318 self.config.read().await.clone()
319 }
320
321 pub fn api_secret(&self) -> &Option<String> {
323 &self.api_secret
324 }
325
326 pub async fn core_api_version(&self) -> ApiVersion {
332 self.db
335 .begin_transaction_nc()
336 .await
337 .get_value(&CachedApiVersionSetKey)
338 .await
339 .map(|cached: CachedApiVersionSet| cached.0.core)
340 .unwrap_or(ApiVersion { major: 0, minor: 0 })
341 }
342
343 pub fn decoders(&self) -> &ModuleDecoderRegistry {
344 &self.decoders
345 }
346
347 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
349 self.try_get_module(instance)
350 .expect("Module instance not found")
351 }
352
353 fn try_get_module(
354 &self,
355 instance: ModuleInstanceId,
356 ) -> Option<&maybe_add_send_sync!(dyn IClientModule)> {
357 Some(self.modules.get(instance)?.as_ref())
358 }
359
360 pub fn has_module(&self, instance: ModuleInstanceId) -> bool {
361 self.modules.get(instance).is_some()
362 }
363
364 fn transaction_builder_get_balance(&self, builder: &TransactionBuilder) -> (Amounts, Amounts) {
370 let mut in_amounts = Amounts::ZERO;
372 let mut out_amounts = Amounts::ZERO;
373 let mut fee_amounts = Amounts::ZERO;
374
375 for input in builder.inputs() {
376 let module = self.get_module(input.input.module_instance_id());
377
378 let item_fees = module.input_fee(&input.amounts, &input.input).expect(
379 "We only build transactions with input versions that are supported by the module",
380 );
381
382 in_amounts.checked_add_mut(&input.amounts);
383 fee_amounts.checked_add_mut(&item_fees);
384 }
385
386 for output in builder.outputs() {
387 let module = self.get_module(output.output.module_instance_id());
388
389 let item_fees = module.output_fee(&output.amounts, &output.output).expect(
390 "We only build transactions with output versions that are supported by the module",
391 );
392
393 out_amounts.checked_add_mut(&output.amounts);
394 fee_amounts.checked_add_mut(&item_fees);
395 }
396
397 out_amounts.checked_add_mut(&fee_amounts);
398 (in_amounts, out_amounts)
399 }
400
401 pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
402 Ok((self.federation_id().to_fake_ln_pub_key(&self.secp_ctx)?, 0))
403 }
404
405 pub fn get_config_meta(&self, key: &str) -> Option<String> {
407 self.federation_config_meta.get(key).cloned()
408 }
409
410 pub(crate) fn root_secret(&self) -> DerivableSecret {
411 self.root_secret.clone()
412 }
413
414 pub async fn add_state_machines(
415 &self,
416 dbtx: &mut DatabaseTransaction<'_>,
417 states: Vec<DynState>,
418 ) -> AddStateMachinesResult {
419 self.executor.add_state_machines_dbtx(dbtx, states).await
420 }
421
422 pub async fn get_active_operations(&self) -> HashSet<OperationId> {
424 let active_states = self.executor.get_active_states().await;
425 let mut active_operations = HashSet::with_capacity(active_states.len());
426 let mut dbtx = self.db().begin_transaction_nc().await;
427 for (state, _) in active_states {
428 let operation_id = state.operation_id();
429 if dbtx
430 .get_value(&OperationLogKey { operation_id })
431 .await
432 .is_some()
433 {
434 active_operations.insert(operation_id);
435 }
436 }
437 active_operations
438 }
439
440 pub fn operation_log(&self) -> &OperationLog {
441 &self.operation_log
442 }
443
444 pub fn meta_service(&self) -> &Arc<MetaService> {
446 &self.meta_service
447 }
448
449 pub async fn get_meta_expiration_timestamp(&self) -> Option<SystemTime> {
451 let meta_service = self.meta_service();
452 let ts = meta_service
453 .get_field::<u64>(self.db(), "federation_expiry_timestamp")
454 .await
455 .and_then(|v| v.value)?;
456 Some(UNIX_EPOCH + Duration::from_secs(ts))
457 }
458
459 async fn finalize_transaction(
461 &self,
462 dbtx: &mut DatabaseTransaction<'_>,
463 operation_id: OperationId,
464 mut partial_transaction: TransactionBuilder,
465 ) -> anyhow::Result<(Transaction, Vec<DynState>, Range<u64>)> {
466 let (in_amounts, out_amounts) = self.transaction_builder_get_balance(&partial_transaction);
467
468 let mut added_inputs_bundles = vec![];
469 let mut added_outputs_bundles = vec![];
470
471 for unit in in_amounts.units().union(&out_amounts.units()) {
482 let input_amount = in_amounts.get(unit).copied().unwrap_or_default();
483 let output_amount = out_amounts.get(unit).copied().unwrap_or_default();
484 if input_amount == output_amount {
485 continue;
486 }
487
488 let Some((module_id, module)) = self.primary_module_for_unit(*unit) else {
489 bail!("No module to balance a partial transaction (affected unit: {unit:?}");
490 };
491
492 let (added_input_bundle, added_output_bundle) = module
493 .create_final_inputs_and_outputs(
494 module_id,
495 dbtx,
496 operation_id,
497 *unit,
498 input_amount,
499 output_amount,
500 )
501 .await?;
502
503 added_inputs_bundles.push(added_input_bundle);
504 added_outputs_bundles.push(added_output_bundle);
505 }
506
507 let change_range = Range {
511 start: partial_transaction.outputs().count() as u64,
512 end: (partial_transaction.outputs().count() as u64
513 + added_outputs_bundles
514 .iter()
515 .map(|output| output.outputs().len() as u64)
516 .sum::<u64>()),
517 };
518
519 for added_inputs in added_inputs_bundles {
520 partial_transaction = partial_transaction.with_inputs(added_inputs);
521 }
522
523 for added_outputs in added_outputs_bundles {
524 partial_transaction = partial_transaction.with_outputs(added_outputs);
525 }
526
527 let (input_amounts, output_amounts) =
528 self.transaction_builder_get_balance(&partial_transaction);
529
530 for (unit, output_amount) in output_amounts {
531 let input_amount = input_amounts.get(&unit).copied().unwrap_or_default();
532
533 assert!(input_amount >= output_amount, "Transaction is underfunded");
534 }
535
536 let (tx, states) = partial_transaction.build(&self.secp_ctx, thread_rng());
537
538 Ok((tx, states, change_range))
539 }
540
541 pub async fn finalize_and_submit_transaction<F, M>(
553 &self,
554 operation_id: OperationId,
555 operation_type: &str,
556 operation_meta_gen: F,
557 tx_builder: TransactionBuilder,
558 ) -> anyhow::Result<OutPointRange>
559 where
560 F: Fn(OutPointRange) -> M + Clone + MaybeSend + MaybeSync,
561 M: serde::Serialize + MaybeSend,
562 {
563 let operation_type = operation_type.to_owned();
564
565 let autocommit_res = self
566 .db
567 .autocommit(
568 |dbtx, _| {
569 let operation_type = operation_type.clone();
570 let tx_builder = tx_builder.clone();
571 let operation_meta_gen = operation_meta_gen.clone();
572 Box::pin(async move {
573 if Client::operation_exists_dbtx(dbtx, operation_id).await {
574 bail!("There already exists an operation with id {operation_id:?}")
575 }
576
577 let out_point_range = self
578 .finalize_and_submit_transaction_inner(dbtx, operation_id, tx_builder)
579 .await?;
580
581 self.operation_log()
582 .add_operation_log_entry_dbtx(
583 dbtx,
584 operation_id,
585 &operation_type,
586 operation_meta_gen(out_point_range),
587 )
588 .await;
589
590 Ok(out_point_range)
591 })
592 },
593 Some(100), )
595 .await;
596
597 match autocommit_res {
598 Ok(txid) => Ok(txid),
599 Err(AutocommitError::ClosureError { error, .. }) => Err(error),
600 Err(AutocommitError::CommitFailed {
601 attempts,
602 last_error,
603 }) => panic!(
604 "Failed to commit tx submission dbtx after {attempts} attempts: {last_error}"
605 ),
606 }
607 }
608
609 async fn finalize_and_submit_transaction_inner(
610 &self,
611 dbtx: &mut DatabaseTransaction<'_>,
612 operation_id: OperationId,
613 tx_builder: TransactionBuilder,
614 ) -> anyhow::Result<OutPointRange> {
615 let (transaction, mut states, change_range) = self
616 .finalize_transaction(&mut dbtx.to_ref_nc(), operation_id, tx_builder)
617 .await?;
618
619 if transaction.consensus_encode_to_vec().len() > Transaction::MAX_TX_SIZE {
620 let inputs = transaction
621 .inputs
622 .iter()
623 .map(DynInput::module_instance_id)
624 .collect::<Vec<_>>();
625 let outputs = transaction
626 .outputs
627 .iter()
628 .map(DynOutput::module_instance_id)
629 .collect::<Vec<_>>();
630 warn!(
631 target: LOG_CLIENT_NET_API,
632 size=%transaction.consensus_encode_to_vec().len(),
633 ?inputs,
634 ?outputs,
635 "Transaction too large",
636 );
637 debug!(target: LOG_CLIENT_NET_API, ?transaction, "transaction details");
638 bail!(
639 "The generated transaction would be rejected by the federation for being too large."
640 );
641 }
642
643 let txid = transaction.tx_hash();
644
645 debug!(target: LOG_CLIENT_NET_API, %txid, ?transaction, "Finalized and submitting transaction");
646
647 let tx_submission_sm = DynState::from_typed(
648 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
649 TxSubmissionStatesSM {
650 operation_id,
651 state: TxSubmissionStates::Created(transaction),
652 },
653 );
654 states.push(tx_submission_sm);
655
656 self.executor.add_state_machines_dbtx(dbtx, states).await?;
657
658 self.log_event_dbtx(dbtx, None, TxCreatedEvent { txid, operation_id })
659 .await;
660
661 Ok(OutPointRange::new(txid, IdxRange::from(change_range)))
662 }
663
664 async fn transaction_update_stream(
665 &self,
666 operation_id: OperationId,
667 ) -> BoxStream<'static, TxSubmissionStatesSM> {
668 self.executor
669 .notifier()
670 .module_notifier::<TxSubmissionStatesSM>(
671 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
672 self.final_client.clone(),
673 )
674 .subscribe(operation_id)
675 .await
676 }
677
678 pub async fn operation_exists(&self, operation_id: OperationId) -> bool {
679 let mut dbtx = self.db().begin_transaction_nc().await;
680
681 Client::operation_exists_dbtx(&mut dbtx, operation_id).await
682 }
683
684 pub async fn operation_exists_dbtx(
685 dbtx: &mut DatabaseTransaction<'_>,
686 operation_id: OperationId,
687 ) -> bool {
688 let active_state_exists = dbtx
689 .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
690 .await
691 .next()
692 .await
693 .is_some();
694
695 let inactive_state_exists = dbtx
696 .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
697 .await
698 .next()
699 .await
700 .is_some();
701
702 active_state_exists || inactive_state_exists
703 }
704
705 pub async fn has_active_states(&self, operation_id: OperationId) -> bool {
706 self.db
707 .begin_transaction_nc()
708 .await
709 .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
710 .await
711 .next()
712 .await
713 .is_some()
714 }
715
716 pub async fn await_primary_bitcoin_module_output(
719 &self,
720 operation_id: OperationId,
721 out_point: OutPoint,
722 ) -> anyhow::Result<()> {
723 self.primary_module_for_unit(AmountUnit::BITCOIN)
724 .ok_or_else(|| anyhow!("No primary module available"))?
725 .1
726 .await_primary_module_output(operation_id, out_point)
727 .await
728 }
729
730 pub fn get_first_module<M: ClientModule>(
732 &'_ self,
733 ) -> anyhow::Result<ClientModuleInstance<'_, M>> {
734 let module_kind = M::kind();
735 let id = self
736 .get_first_instance(&module_kind)
737 .ok_or_else(|| format_err!("No modules found of kind {module_kind}"))?;
738 let module: &M = self
739 .try_get_module(id)
740 .ok_or_else(|| format_err!("Unknown module instance {id}"))?
741 .as_any()
742 .downcast_ref::<M>()
743 .ok_or_else(|| format_err!("Module is not of type {}", std::any::type_name::<M>()))?;
744 let (db, _) = self.db().with_prefix_module_id(id);
745 Ok(ClientModuleInstance {
746 id,
747 db,
748 api: self.api().with_module(id),
749 module,
750 })
751 }
752
753 pub fn get_module_client_dyn(
754 &self,
755 instance_id: ModuleInstanceId,
756 ) -> anyhow::Result<&maybe_add_send_sync!(dyn IClientModule)> {
757 self.try_get_module(instance_id)
758 .ok_or(anyhow!("Unknown module instance {}", instance_id))
759 }
760
761 pub fn db(&self) -> &Database {
762 &self.db
763 }
764
765 pub fn endpoints(&self) -> &ConnectorRegistry {
766 &self.connectors
767 }
768
769 pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
772 TransactionUpdates {
773 update_stream: self.transaction_update_stream(operation_id).await,
774 }
775 }
776
777 pub fn get_first_instance(&self, module_kind: &ModuleKind) -> Option<ModuleInstanceId> {
779 self.modules
780 .iter_modules()
781 .find(|(_, kind, _module)| *kind == module_kind)
782 .map(|(instance_id, _, _)| instance_id)
783 }
784
785 pub async fn root_secret_encoding<T: Decodable>(&self) -> anyhow::Result<T> {
788 get_decoded_client_secret::<T>(self.db()).await
789 }
790
791 pub async fn await_primary_bitcoin_module_outputs(
794 &self,
795 operation_id: OperationId,
796 outputs: Vec<OutPoint>,
797 ) -> anyhow::Result<()> {
798 for out_point in outputs {
799 self.await_primary_bitcoin_module_output(operation_id, out_point)
800 .await?;
801 }
802
803 Ok(())
804 }
805
806 pub async fn get_config_json(&self) -> JsonClientConfig {
812 self.config().await.to_json()
813 }
814
815 #[doc(hidden)]
818 pub async fn get_balance_for_btc(&self) -> anyhow::Result<Amount> {
821 self.get_balance_for_unit(AmountUnit::BITCOIN).await
822 }
823
824 pub async fn get_balance_for_unit(&self, unit: AmountUnit) -> anyhow::Result<Amount> {
825 let (id, module) = self
826 .primary_module_for_unit(unit)
827 .ok_or_else(|| anyhow!("Primary module not available"))?;
828 Ok(module
829 .get_balance(id, &mut self.db().begin_transaction_nc().await, unit)
830 .await)
831 }
832
833 pub async fn subscribe_balance_changes(&self, unit: AmountUnit) -> BoxStream<'static, Amount> {
836 let primary_module_things =
837 if let Some((primary_module_id, primary_module)) = self.primary_module_for_unit(unit) {
838 let balance_changes = primary_module.subscribe_balance_changes().await;
839 let initial_balance = self
840 .get_balance_for_unit(unit)
841 .await
842 .expect("Primary is present");
843
844 Some((
845 primary_module_id,
846 primary_module.clone(),
847 balance_changes,
848 initial_balance,
849 ))
850 } else {
851 None
852 };
853 let db = self.db().clone();
854
855 Box::pin(async_stream::stream! {
856 let Some((primary_module_id, primary_module, mut balance_changes, initial_balance)) = primary_module_things else {
857 pending().await
860 };
861
862
863 yield initial_balance;
864 let mut prev_balance = initial_balance;
865 while let Some(()) = balance_changes.next().await {
866 let mut dbtx = db.begin_transaction_nc().await;
867 let balance = primary_module
868 .get_balance(primary_module_id, &mut dbtx, unit)
869 .await;
870
871 if balance != prev_balance {
873 prev_balance = balance;
874 yield balance;
875 }
876 }
877 })
878 }
879
880 async fn make_api_version_request(
885 delay: Duration,
886 peer_id: PeerId,
887 api: &DynGlobalApi,
888 ) -> (
889 PeerId,
890 Result<SupportedApiVersionsSummary, fedimint_connectors::error::ServerError>,
891 ) {
892 runtime::sleep(delay).await;
893 (
894 peer_id,
895 api.request_single_peer::<SupportedApiVersionsSummary>(
896 VERSION_ENDPOINT.to_owned(),
897 ApiRequestErased::default(),
898 peer_id,
899 )
900 .await,
901 )
902 }
903
904 fn create_api_version_backoff() -> impl Iterator<Item = Duration> {
910 custom_backoff(Duration::from_millis(200), Duration::from_secs(600), None)
911 }
912
913 pub async fn fetch_common_api_versions_from_all_peers(
916 num_peers: NumPeers,
917 api: DynGlobalApi,
918 db: Database,
919 num_responses_sender: watch::Sender<usize>,
920 ) {
921 let mut backoff = Self::create_api_version_backoff();
922
923 let mut requests = FuturesUnordered::new();
926
927 for peer_id in num_peers.peer_ids() {
928 requests.push(Self::make_api_version_request(
929 Duration::ZERO,
930 peer_id,
931 &api,
932 ));
933 }
934
935 let mut num_responses = 0;
936
937 while let Some((peer_id, response)) = requests.next().await {
938 let retry = match response {
939 Err(err) => {
940 let has_previous_response = db
941 .begin_transaction_nc()
942 .await
943 .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
944 .await
945 .is_some();
946 debug!(
947 target: LOG_CLIENT,
948 %peer_id,
949 err = %err.fmt_compact(),
950 %has_previous_response,
951 "Failed to refresh API versions of a peer"
952 );
953
954 !has_previous_response
955 }
956 Ok(o) => {
957 let mut dbtx = db.begin_transaction().await;
960 dbtx.insert_entry(
961 &PeerLastApiVersionsSummaryKey(peer_id),
962 &PeerLastApiVersionsSummary(o),
963 )
964 .await;
965 dbtx.commit_tx().await;
966 false
967 }
968 };
969
970 if retry {
971 requests.push(Self::make_api_version_request(
972 backoff.next().expect("Keeps retrying"),
973 peer_id,
974 &api,
975 ));
976 } else {
977 num_responses += 1;
978 num_responses_sender.send_replace(num_responses);
979 }
980 }
981 }
982
983 pub async fn fetch_peers_api_versions_from_threshold_of_peers(
987 num_peers: NumPeers,
988 api: DynGlobalApi,
989 ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
990 let mut backoff = Self::create_api_version_backoff();
991
992 let mut requests = FuturesUnordered::new();
995
996 for peer_id in num_peers.peer_ids() {
997 requests.push(Self::make_api_version_request(
998 Duration::ZERO,
999 peer_id,
1000 &api,
1001 ));
1002 }
1003
1004 let mut successful_responses = BTreeMap::new();
1005
1006 while successful_responses.len() < num_peers.threshold()
1007 && let Some((peer_id, response)) = requests.next().await
1008 {
1009 let retry = match response {
1010 Err(err) => {
1011 debug!(
1012 target: LOG_CLIENT,
1013 %peer_id,
1014 err = %err.fmt_compact(),
1015 "Failed to fetch API versions from peer"
1016 );
1017 true
1018 }
1019 Ok(response) => {
1020 successful_responses.insert(peer_id, response);
1021 false
1022 }
1023 };
1024
1025 if retry {
1026 requests.push(Self::make_api_version_request(
1027 backoff.next().expect("Keeps retrying"),
1028 peer_id,
1029 &api,
1030 ));
1031 }
1032 }
1033
1034 successful_responses
1035 }
1036
1037 pub async fn fetch_common_api_versions(
1039 config: &ClientConfig,
1040 api: &DynGlobalApi,
1041 ) -> anyhow::Result<BTreeMap<PeerId, SupportedApiVersionsSummary>> {
1042 debug!(
1043 target: LOG_CLIENT,
1044 "Fetching common api versions"
1045 );
1046
1047 let num_peers = NumPeers::from(config.global.api_endpoints.len());
1048
1049 let peer_api_version_sets =
1050 Self::fetch_peers_api_versions_from_threshold_of_peers(num_peers, api.clone()).await;
1051
1052 Ok(peer_api_version_sets)
1053 }
1054
1055 pub async fn write_api_version_cache(
1059 dbtx: &mut DatabaseTransaction<'_>,
1060 api_version_set: ApiVersionSet,
1061 ) {
1062 debug!(
1063 target: LOG_CLIENT,
1064 value = ?api_version_set,
1065 "Writing API version set to cache"
1066 );
1067
1068 dbtx.insert_entry(
1069 &CachedApiVersionSetKey,
1070 &CachedApiVersionSet(api_version_set),
1071 )
1072 .await;
1073 }
1074
1075 pub async fn store_prefetched_api_versions(
1080 db: &Database,
1081 config: &ClientConfig,
1082 client_module_init: &ClientModuleInitRegistry,
1083 peer_api_versions: &BTreeMap<PeerId, SupportedApiVersionsSummary>,
1084 ) {
1085 debug!(
1086 target: LOG_CLIENT,
1087 "Storing {} prefetched peer API version responses and calculating common version set",
1088 peer_api_versions.len()
1089 );
1090
1091 let mut dbtx = db.begin_transaction().await;
1092 let client_supported_versions =
1094 Self::supported_api_versions_summary_static(config, client_module_init);
1095 match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1096 &client_supported_versions,
1097 peer_api_versions,
1098 ) {
1099 Ok(common_api_versions) => {
1100 Self::write_api_version_cache(&mut dbtx.to_ref_nc(), common_api_versions).await;
1102 debug!(target: LOG_CLIENT, "Calculated and stored common API version set");
1103 }
1104 Err(err) => {
1105 debug!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Failed to calculate common API versions from prefetched data");
1106 }
1107 }
1108
1109 for (peer_id, peer_api_versions) in peer_api_versions {
1111 dbtx.insert_entry(
1112 &PeerLastApiVersionsSummaryKey(*peer_id),
1113 &PeerLastApiVersionsSummary(peer_api_versions.clone()),
1114 )
1115 .await;
1116 }
1117 dbtx.commit_tx().await;
1118 debug!(target: LOG_CLIENT, "Stored individual peer API version responses");
1119 }
1120
1121 pub fn supported_api_versions_summary_static(
1123 config: &ClientConfig,
1124 client_module_init: &ClientModuleInitRegistry,
1125 ) -> SupportedApiVersionsSummary {
1126 SupportedApiVersionsSummary {
1127 core: SupportedCoreApiVersions {
1128 core_consensus: config.global.consensus_version,
1129 api: MultiApiVersion::try_from_iter(SUPPORTED_CORE_API_VERSIONS.to_owned())
1130 .expect("must not have conflicting versions"),
1131 },
1132 modules: config
1133 .modules
1134 .iter()
1135 .filter_map(|(&module_instance_id, module_config)| {
1136 client_module_init
1137 .get(module_config.kind())
1138 .map(|module_init| {
1139 (
1140 module_instance_id,
1141 SupportedModuleApiVersions {
1142 core_consensus: config.global.consensus_version,
1143 module_consensus: module_config.version,
1144 api: module_init.supported_api_versions(),
1145 },
1146 )
1147 })
1148 })
1149 .collect(),
1150 }
1151 }
1152
1153 pub async fn load_and_refresh_common_api_version(&self) -> anyhow::Result<ApiVersionSet> {
1154 Self::load_and_refresh_common_api_version_static(
1155 &self.config().await,
1156 &self.module_inits,
1157 &self.api,
1158 &self.db,
1159 &self.task_group,
1160 )
1161 .await
1162 }
1163
1164 async fn load_and_refresh_common_api_version_static(
1170 config: &ClientConfig,
1171 module_init: &ClientModuleInitRegistry,
1172 api: &DynGlobalApi,
1173 db: &Database,
1174 task_group: &TaskGroup,
1175 ) -> anyhow::Result<ApiVersionSet> {
1176 if let Some(v) = db
1177 .begin_transaction_nc()
1178 .await
1179 .get_value(&CachedApiVersionSetKey)
1180 .await
1181 {
1182 debug!(
1183 target: LOG_CLIENT,
1184 "Found existing cached common api versions"
1185 );
1186 let config = config.clone();
1187 let client_module_init = module_init.clone();
1188 let api = api.clone();
1189 let db = db.clone();
1190 let task_group = task_group.clone();
1191 task_group
1194 .clone()
1195 .spawn_cancellable("refresh_common_api_version_static", async move {
1196 sleep(Duration::from_secs(1)).await;
1200 if let Err(error) = Self::refresh_common_api_version_static(
1201 &config,
1202 &client_module_init,
1203 &api,
1204 &db,
1205 task_group,
1206 false,
1207 )
1208 .await
1209 {
1210 warn!(
1211 target: LOG_CLIENT,
1212 err = %error.fmt_compact_anyhow(), "Failed to discover common api versions"
1213 );
1214 }
1215 });
1216
1217 return Ok(v.0);
1218 }
1219
1220 info!(
1221 target: LOG_CLIENT,
1222 "Fetching initial API versions "
1223 );
1224 Self::refresh_common_api_version_static(
1225 config,
1226 module_init,
1227 api,
1228 db,
1229 task_group.clone(),
1230 true,
1231 )
1232 .await
1233 }
1234
1235 async fn refresh_common_api_version_static(
1236 config: &ClientConfig,
1237 client_module_init: &ClientModuleInitRegistry,
1238 api: &DynGlobalApi,
1239 db: &Database,
1240 task_group: TaskGroup,
1241 block_until_ok: bool,
1242 ) -> anyhow::Result<ApiVersionSet> {
1243 debug!(
1244 target: LOG_CLIENT,
1245 "Refreshing common api versions"
1246 );
1247
1248 let (num_responses_sender, mut num_responses_receiver) = tokio::sync::watch::channel(0);
1249 let num_peers = NumPeers::from(config.global.api_endpoints.len());
1250
1251 task_group.spawn_cancellable("refresh peers api versions", {
1252 Client::fetch_common_api_versions_from_all_peers(
1253 num_peers,
1254 api.clone(),
1255 db.clone(),
1256 num_responses_sender,
1257 )
1258 });
1259
1260 let common_api_versions = loop {
1261 let _: Result<_, Elapsed> = runtime::timeout(
1269 Duration::from_secs(30),
1270 num_responses_receiver.wait_for(|num| num_peers.threshold() <= *num),
1271 )
1272 .await;
1273
1274 let peer_api_version_sets = Self::load_peers_last_api_versions(db, num_peers).await;
1275
1276 match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1277 &Self::supported_api_versions_summary_static(config, client_module_init),
1278 &peer_api_version_sets,
1279 ) {
1280 Ok(o) => break o,
1281 Err(err) if block_until_ok => {
1282 warn!(
1283 target: LOG_CLIENT,
1284 err = %err.fmt_compact_anyhow(),
1285 "Failed to discover API version to use. Retrying..."
1286 );
1287 continue;
1288 }
1289 Err(e) => return Err(e),
1290 }
1291 };
1292
1293 debug!(
1294 target: LOG_CLIENT,
1295 value = ?common_api_versions,
1296 "Updating the cached common api versions"
1297 );
1298 let mut dbtx = db.begin_transaction().await;
1299 let _ = dbtx
1300 .insert_entry(
1301 &CachedApiVersionSetKey,
1302 &CachedApiVersionSet(common_api_versions.clone()),
1303 )
1304 .await;
1305
1306 dbtx.commit_tx().await;
1307
1308 Ok(common_api_versions)
1309 }
1310
1311 pub async fn get_metadata(&self) -> Metadata {
1313 self.db
1314 .begin_transaction_nc()
1315 .await
1316 .get_value(&ClientMetadataKey)
1317 .await
1318 .unwrap_or_else(|| {
1319 warn!(
1320 target: LOG_CLIENT,
1321 "Missing existing metadata. This key should have been set on Client init"
1322 );
1323 Metadata::empty()
1324 })
1325 }
1326
1327 pub async fn set_metadata(&self, metadata: &Metadata) {
1329 self.db
1330 .autocommit::<_, _, anyhow::Error>(
1331 |dbtx, _| {
1332 Box::pin(async {
1333 Self::set_metadata_dbtx(dbtx, metadata).await;
1334 Ok(())
1335 })
1336 },
1337 None,
1338 )
1339 .await
1340 .expect("Failed to autocommit metadata");
1341 }
1342
1343 pub fn has_pending_recoveries(&self) -> bool {
1344 !self
1345 .client_recovery_progress_receiver
1346 .borrow()
1347 .iter()
1348 .all(|(_id, progress)| progress.is_done())
1349 }
1350
1351 pub async fn wait_for_all_recoveries(&self) -> anyhow::Result<()> {
1359 let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1360 recovery_receiver
1361 .wait_for(|in_progress| {
1362 in_progress
1363 .iter()
1364 .all(|(_id, progress)| progress.is_done())
1365 })
1366 .await
1367 .context("Recovery task completed and update receiver disconnected, but some modules failed to recover")?;
1368
1369 Ok(())
1370 }
1371
1372 pub fn subscribe_to_recovery_progress(
1377 &self,
1378 ) -> impl Stream<Item = (ModuleInstanceId, RecoveryProgress)> + use<> {
1379 WatchStream::new(self.client_recovery_progress_receiver.clone())
1380 .flat_map(futures::stream::iter)
1381 }
1382
1383 pub async fn wait_for_module_kind_recovery(
1384 &self,
1385 module_kind: ModuleKind,
1386 ) -> anyhow::Result<()> {
1387 let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1388 let config = self.config().await;
1389 recovery_receiver
1390 .wait_for(|in_progress| {
1391 !in_progress
1392 .iter()
1393 .filter(|(module_instance_id, _progress)| {
1394 config.modules[module_instance_id].kind == module_kind
1395 })
1396 .any(|(_id, progress)| !progress.is_done())
1397 })
1398 .await
1399 .context("Recovery task completed and update receiver disconnected, but the desired modules are still unavailable or failed to recover")?;
1400
1401 Ok(())
1402 }
1403
1404 pub async fn wait_for_all_active_state_machines(&self) -> anyhow::Result<()> {
1405 loop {
1406 if self.executor.get_active_states().await.is_empty() {
1407 break;
1408 }
1409 sleep(Duration::from_millis(100)).await;
1410 }
1411 Ok(())
1412 }
1413
1414 pub async fn set_metadata_dbtx(dbtx: &mut DatabaseTransaction<'_>, metadata: &Metadata) {
1416 dbtx.insert_new_entry(&ClientMetadataKey, metadata).await;
1417 }
1418
1419 fn spawn_module_recoveries_task(
1420 &self,
1421 recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1422 module_recoveries: BTreeMap<
1423 ModuleInstanceId,
1424 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1425 >,
1426 module_recovery_progress_receivers: BTreeMap<
1427 ModuleInstanceId,
1428 watch::Receiver<RecoveryProgress>,
1429 >,
1430 ) {
1431 let db = self.db.clone();
1432 let log_ordering_wakeup_tx = self.log_ordering_wakeup_tx.clone();
1433 self.task_group
1434 .spawn("module recoveries", |_task_handle| async {
1435 Self::run_module_recoveries_task(
1436 db,
1437 log_ordering_wakeup_tx,
1438 recovery_sender,
1439 module_recoveries,
1440 module_recovery_progress_receivers,
1441 )
1442 .await;
1443 });
1444 }
1445
1446 async fn run_module_recoveries_task(
1447 db: Database,
1448 log_ordering_wakeup_tx: watch::Sender<()>,
1449 recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1450 module_recoveries: BTreeMap<
1451 ModuleInstanceId,
1452 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1453 >,
1454 module_recovery_progress_receivers: BTreeMap<
1455 ModuleInstanceId,
1456 watch::Receiver<RecoveryProgress>,
1457 >,
1458 ) {
1459 debug!(target: LOG_CLIENT_RECOVERY, num_modules=%module_recovery_progress_receivers.len(), "Staring module recoveries");
1460 let mut completed_stream = Vec::new();
1461 let progress_stream = futures::stream::FuturesUnordered::new();
1462
1463 for (module_instance_id, f) in module_recoveries {
1464 completed_stream.push(futures::stream::once(Box::pin(async move {
1465 match f.await {
1466 Ok(()) => (module_instance_id, None),
1467 Err(err) => {
1468 warn!(
1469 target: LOG_CLIENT,
1470 err = %err.fmt_compact_anyhow(), module_instance_id, "Module recovery failed"
1471 );
1472 futures::future::pending::<()>().await;
1476 unreachable!()
1477 }
1478 }
1479 })));
1480 }
1481
1482 for (module_instance_id, rx) in module_recovery_progress_receivers {
1483 progress_stream.push(
1484 tokio_stream::wrappers::WatchStream::new(rx)
1485 .fuse()
1486 .map(move |progress| (module_instance_id, Some(progress))),
1487 );
1488 }
1489
1490 let mut futures = futures::stream::select(
1491 futures::stream::select_all(progress_stream),
1492 futures::stream::select_all(completed_stream),
1493 );
1494
1495 while let Some((module_instance_id, progress)) = futures.next().await {
1496 let mut dbtx = db.begin_transaction().await;
1497
1498 let prev_progress = *recovery_sender
1499 .borrow()
1500 .get(&module_instance_id)
1501 .expect("existing progress must be present");
1502
1503 let progress = if prev_progress.is_done() {
1504 prev_progress
1506 } else if let Some(progress) = progress {
1507 progress
1508 } else {
1509 prev_progress.to_complete()
1510 };
1511
1512 if !prev_progress.is_done() && progress.is_done() {
1513 info!(
1514 target: LOG_CLIENT,
1515 module_instance_id,
1516 progress = format!("{}/{}", progress.complete, progress.total),
1517 "Recovery complete"
1518 );
1519 dbtx.log_event(
1520 log_ordering_wakeup_tx.clone(),
1521 None,
1522 ModuleRecoveryCompleted {
1523 module_id: module_instance_id,
1524 },
1525 )
1526 .await;
1527 } else {
1528 info!(
1529 target: LOG_CLIENT,
1530 module_instance_id,
1531 progress = format!("{}/{}", progress.complete, progress.total),
1532 "Recovery progress"
1533 );
1534 }
1535
1536 dbtx.insert_entry(
1537 &ClientModuleRecovery { module_instance_id },
1538 &ClientModuleRecoveryState { progress },
1539 )
1540 .await;
1541 dbtx.commit_tx().await;
1542
1543 recovery_sender.send_modify(|v| {
1544 v.insert(module_instance_id, progress);
1545 });
1546 }
1547 debug!(target: LOG_CLIENT_RECOVERY, "Recovery executor stopped");
1548 }
1549
1550 async fn load_peers_last_api_versions(
1551 db: &Database,
1552 num_peers: NumPeers,
1553 ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1554 let mut peer_api_version_sets = BTreeMap::new();
1555
1556 let mut dbtx = db.begin_transaction_nc().await;
1557 for peer_id in num_peers.peer_ids() {
1558 if let Some(v) = dbtx
1559 .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1560 .await
1561 {
1562 peer_api_version_sets.insert(peer_id, v.0);
1563 }
1564 }
1565 drop(dbtx);
1566 peer_api_version_sets
1567 }
1568
1569 pub async fn get_peer_url_announcements(&self) -> BTreeMap<PeerId, SignedApiAnnouncement> {
1572 self.db()
1573 .begin_transaction_nc()
1574 .await
1575 .find_by_prefix(&ApiAnnouncementPrefix)
1576 .await
1577 .map(|(announcement_key, announcement)| (announcement_key.0, announcement))
1578 .collect()
1579 .await
1580 }
1581
1582 pub async fn get_peer_urls(&self) -> BTreeMap<PeerId, SafeUrl> {
1584 get_api_urls(&self.db, &self.config().await).await
1585 }
1586
1587 pub async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
1590 self.get_peer_urls()
1591 .await
1592 .into_iter()
1593 .find_map(|(peer_id, url)| (peer == peer_id).then_some(url))
1594 .map(|peer_url| {
1595 InviteCode::new(
1596 peer_url.clone(),
1597 peer,
1598 self.federation_id(),
1599 self.api_secret.clone(),
1600 )
1601 })
1602 }
1603
1604 pub async fn get_guardian_public_keys_blocking(
1608 &self,
1609 ) -> BTreeMap<PeerId, fedimint_core::secp256k1::PublicKey> {
1610 self.db
1611 .autocommit(
1612 |dbtx, _| {
1613 Box::pin(async move {
1614 let config = self.config().await;
1615
1616 let guardian_pub_keys = self
1617 .get_or_backfill_broadcast_public_keys(dbtx, config)
1618 .await;
1619
1620 Result::<_, ()>::Ok(guardian_pub_keys)
1621 })
1622 },
1623 None,
1624 )
1625 .await
1626 .expect("Will retry forever")
1627 }
1628
1629 async fn get_or_backfill_broadcast_public_keys(
1630 &self,
1631 dbtx: &mut DatabaseTransaction<'_>,
1632 config: ClientConfig,
1633 ) -> BTreeMap<PeerId, PublicKey> {
1634 match config.global.broadcast_public_keys {
1635 Some(guardian_pub_keys) => guardian_pub_keys,
1636 _ => {
1637 let (guardian_pub_keys, new_config) = self.fetch_and_update_config(config).await;
1638
1639 dbtx.insert_entry(&ClientConfigKey, &new_config).await;
1640 *(self.config.write().await) = new_config;
1641 guardian_pub_keys
1642 }
1643 }
1644 }
1645
1646 async fn fetch_session_count(&self) -> FederationResult<u64> {
1647 self.api.session_count().await
1648 }
1649
1650 async fn fetch_and_update_config(
1651 &self,
1652 config: ClientConfig,
1653 ) -> (BTreeMap<PeerId, PublicKey>, ClientConfig) {
1654 let fetched_config = retry(
1655 "Fetching guardian public keys",
1656 backoff_util::background_backoff(),
1657 || async {
1658 Ok(self
1659 .api
1660 .request_current_consensus::<ClientConfig>(
1661 CLIENT_CONFIG_ENDPOINT.to_owned(),
1662 ApiRequestErased::default(),
1663 )
1664 .await?)
1665 },
1666 )
1667 .await
1668 .expect("Will never return on error");
1669
1670 let Some(guardian_pub_keys) = fetched_config.global.broadcast_public_keys else {
1671 warn!(
1672 target: LOG_CLIENT,
1673 "Guardian public keys not found in fetched config, server not updated to 0.4 yet"
1674 );
1675 pending::<()>().await;
1676 unreachable!("Pending will never return");
1677 };
1678
1679 let new_config = ClientConfig {
1680 global: GlobalClientConfig {
1681 broadcast_public_keys: Some(guardian_pub_keys.clone()),
1682 ..config.global
1683 },
1684 modules: config.modules,
1685 };
1686 (guardian_pub_keys, new_config)
1687 }
1688
1689 pub fn handle_global_rpc(
1690 &self,
1691 method: String,
1692 params: serde_json::Value,
1693 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1694 Box::pin(try_stream! {
1695 match method.as_str() {
1696 "get_balance" => {
1697 let balance = self.get_balance_for_btc().await.unwrap_or_default();
1698 yield serde_json::to_value(balance)?;
1699 }
1700 "subscribe_balance_changes" => {
1701 let req: GetBalanceChangesRequest= serde_json::from_value(params)?;
1702 let mut stream = self.subscribe_balance_changes(req.unit).await;
1703 while let Some(balance) = stream.next().await {
1704 yield serde_json::to_value(balance)?;
1705 }
1706 }
1707 "get_config" => {
1708 let config = self.config().await;
1709 yield serde_json::to_value(config)?;
1710 }
1711 "get_federation_id" => {
1712 let federation_id = self.federation_id();
1713 yield serde_json::to_value(federation_id)?;
1714 }
1715 "get_invite_code" => {
1716 let req: GetInviteCodeRequest = serde_json::from_value(params)?;
1717 let invite_code = self.invite_code(req.peer).await;
1718 yield serde_json::to_value(invite_code)?;
1719 }
1720 "get_operation" => {
1721 let req: GetOperationIdRequest = serde_json::from_value(params)?;
1722 let operation = self.operation_log().get_operation(req.operation_id).await;
1723 yield serde_json::to_value(operation)?;
1724 }
1725 "list_operations" => {
1726 let req: ListOperationsParams = serde_json::from_value(params)?;
1727 let limit = if req.limit.is_none() && req.last_seen.is_none() {
1728 usize::MAX
1729 } else {
1730 req.limit.unwrap_or(usize::MAX)
1731 };
1732 let operations = self.operation_log()
1733 .paginate_operations_rev(limit, req.last_seen)
1734 .await;
1735 yield serde_json::to_value(operations)?;
1736 }
1737 "session_count" => {
1738 let count = self.fetch_session_count().await?;
1739 yield serde_json::to_value(count)?;
1740 }
1741 "has_pending_recoveries" => {
1742 let has_pending = self.has_pending_recoveries();
1743 yield serde_json::to_value(has_pending)?;
1744 }
1745 "wait_for_all_recoveries" => {
1746 self.wait_for_all_recoveries().await?;
1747 yield serde_json::Value::Null;
1748 }
1749 "subscribe_to_recovery_progress" => {
1750 let mut stream = self.subscribe_to_recovery_progress();
1751 while let Some((module_id, progress)) = stream.next().await {
1752 yield serde_json::json!({
1753 "module_id": module_id,
1754 "progress": progress
1755 });
1756 }
1757 }
1758 "backup_to_federation" => {
1759 let metadata = if params.is_null() {
1760 Metadata::from_json_serialized(serde_json::json!({}))
1761 } else {
1762 Metadata::from_json_serialized(params)
1763 };
1764 self.backup_to_federation(metadata).await?;
1765 yield serde_json::Value::Null;
1766 }
1767 _ => {
1768 Err(anyhow::format_err!("Unknown method: {}", method))?;
1769 unreachable!()
1770 },
1771 }
1772 })
1773 }
1774
1775 pub async fn log_event<E>(&self, module_id: Option<ModuleInstanceId>, event: E)
1776 where
1777 E: Event + Send,
1778 {
1779 let mut dbtx = self.db.begin_transaction().await;
1780 self.log_event_dbtx(&mut dbtx, module_id, event).await;
1781 dbtx.commit_tx().await;
1782 }
1783
1784 pub async fn log_event_dbtx<E, Cap>(
1785 &self,
1786 dbtx: &mut DatabaseTransaction<'_, Cap>,
1787 module_id: Option<ModuleInstanceId>,
1788 event: E,
1789 ) where
1790 E: Event + Send,
1791 Cap: Send,
1792 {
1793 dbtx.log_event(self.log_ordering_wakeup_tx.clone(), module_id, event)
1794 .await;
1795 }
1796
1797 pub async fn log_event_raw_dbtx<Cap>(
1798 &self,
1799 dbtx: &mut DatabaseTransaction<'_, Cap>,
1800 kind: EventKind,
1801 module: Option<(ModuleKind, ModuleInstanceId)>,
1802 payload: Vec<u8>,
1803 persist: EventPersistence,
1804 ) where
1805 Cap: Send,
1806 {
1807 let module_id = module.as_ref().map(|m| m.1);
1808 let module_kind = module.map(|m| m.0);
1809 dbtx.log_event_raw(
1810 self.log_ordering_wakeup_tx.clone(),
1811 kind,
1812 module_kind,
1813 module_id,
1814 payload,
1815 persist,
1816 )
1817 .await;
1818 }
1819
1820 pub fn built_in_application_event_log_tracker(&self) -> DynEventLogTrimableTracker {
1832 struct BuiltInApplicationEventLogTracker;
1833
1834 #[apply(async_trait_maybe_send!)]
1835 impl EventLogTrimableTracker for BuiltInApplicationEventLogTracker {
1836 async fn store(
1838 &mut self,
1839 dbtx: &mut DatabaseTransaction<NonCommittable>,
1840 pos: EventLogTrimableId,
1841 ) -> anyhow::Result<()> {
1842 dbtx.insert_entry(&DefaultApplicationEventLogKey, &pos)
1843 .await;
1844 Ok(())
1845 }
1846
1847 async fn load(
1849 &mut self,
1850 dbtx: &mut DatabaseTransaction<NonCommittable>,
1851 ) -> anyhow::Result<Option<EventLogTrimableId>> {
1852 Ok(dbtx.get_value(&DefaultApplicationEventLogKey).await)
1853 }
1854 }
1855 Box::new(BuiltInApplicationEventLogTracker)
1856 }
1857
1858 pub async fn handle_historical_events<F, R>(
1866 &self,
1867 tracker: fedimint_eventlog::DynEventLogTracker,
1868 handler_fn: F,
1869 ) -> anyhow::Result<()>
1870 where
1871 F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
1872 R: Future<Output = anyhow::Result<()>>,
1873 {
1874 fedimint_eventlog::handle_events(
1875 self.db.clone(),
1876 tracker,
1877 self.log_event_added_rx.clone(),
1878 handler_fn,
1879 )
1880 .await
1881 }
1882
1883 pub async fn handle_events<F, R>(
1902 &self,
1903 tracker: fedimint_eventlog::DynEventLogTrimableTracker,
1904 handler_fn: F,
1905 ) -> anyhow::Result<()>
1906 where
1907 F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
1908 R: Future<Output = anyhow::Result<()>>,
1909 {
1910 fedimint_eventlog::handle_trimable_events(
1911 self.db.clone(),
1912 tracker,
1913 self.log_event_added_rx.clone(),
1914 handler_fn,
1915 )
1916 .await
1917 }
1918
1919 pub async fn get_event_log(
1920 &self,
1921 pos: Option<EventLogId>,
1922 limit: u64,
1923 ) -> Vec<PersistedLogEntry> {
1924 self.get_event_log_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
1925 .await
1926 }
1927
1928 pub async fn get_event_log_trimable(
1929 &self,
1930 pos: Option<EventLogTrimableId>,
1931 limit: u64,
1932 ) -> Vec<PersistedLogEntry> {
1933 self.get_event_log_trimable_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
1934 .await
1935 }
1936
1937 pub async fn get_event_log_dbtx<Cap>(
1938 &self,
1939 dbtx: &mut DatabaseTransaction<'_, Cap>,
1940 pos: Option<EventLogId>,
1941 limit: u64,
1942 ) -> Vec<PersistedLogEntry>
1943 where
1944 Cap: Send,
1945 {
1946 dbtx.get_event_log(pos, limit).await
1947 }
1948
1949 pub async fn get_event_log_trimable_dbtx<Cap>(
1950 &self,
1951 dbtx: &mut DatabaseTransaction<'_, Cap>,
1952 pos: Option<EventLogTrimableId>,
1953 limit: u64,
1954 ) -> Vec<PersistedLogEntry>
1955 where
1956 Cap: Send,
1957 {
1958 dbtx.get_event_log_trimable(pos, limit).await
1959 }
1960
1961 pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
1963 self.log_event_added_transient_tx.subscribe()
1964 }
1965
1966 pub fn log_event_added_rx(&self) -> watch::Receiver<()> {
1968 self.log_event_added_rx.clone()
1969 }
1970
1971 pub fn iroh_enable_dht(&self) -> bool {
1972 self.iroh_enable_dht
1973 }
1974
1975 pub(crate) async fn run_core_migrations(
1976 db_no_decoders: &Database,
1977 ) -> Result<(), anyhow::Error> {
1978 let mut dbtx = db_no_decoders.begin_transaction().await;
1979 apply_migrations_core_client_dbtx(&mut dbtx.to_ref_nc(), "fedimint-client".to_string())
1980 .await?;
1981 if is_running_in_test_env() {
1982 verify_client_db_integrity_dbtx(&mut dbtx.to_ref_nc()).await;
1983 }
1984 dbtx.commit_tx_result().await?;
1985 Ok(())
1986 }
1987
1988 fn primary_modules_for_unit(
1990 &self,
1991 unit: AmountUnit,
1992 ) -> impl Iterator<Item = (ModuleInstanceId, &DynClientModule)> {
1993 self.primary_modules
1994 .iter()
1995 .flat_map(move |(_prio, candidates)| {
1996 candidates
1997 .specific
1998 .get(&unit)
1999 .into_iter()
2000 .flatten()
2001 .copied()
2002 .chain(candidates.wildcard.iter().copied())
2004 })
2005 .map(|id| (id, self.modules.get_expect(id)))
2006 }
2007
2008 pub fn primary_module_for_unit(
2012 &self,
2013 unit: AmountUnit,
2014 ) -> Option<(ModuleInstanceId, &DynClientModule)> {
2015 self.primary_modules_for_unit(unit).next()
2016 }
2017
2018 pub fn primary_module_for_btc(&self) -> (ModuleInstanceId, &DynClientModule) {
2020 self.primary_module_for_unit(AmountUnit::BITCOIN)
2021 .expect("No primary module for Bitcoin")
2022 }
2023}
2024
2025#[apply(async_trait_maybe_send!)]
2026impl ClientContextIface for Client {
2027 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
2028 Client::get_module(self, instance)
2029 }
2030
2031 fn api_clone(&self) -> DynGlobalApi {
2032 Client::api_clone(self)
2033 }
2034 fn decoders(&self) -> &ModuleDecoderRegistry {
2035 Client::decoders(self)
2036 }
2037
2038 async fn finalize_and_submit_transaction(
2039 &self,
2040 operation_id: OperationId,
2041 operation_type: &str,
2042 operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
2043 tx_builder: TransactionBuilder,
2044 ) -> anyhow::Result<OutPointRange> {
2045 Client::finalize_and_submit_transaction(
2046 self,
2047 operation_id,
2048 operation_type,
2049 &operation_meta_gen,
2051 tx_builder,
2052 )
2053 .await
2054 }
2055
2056 async fn finalize_and_submit_transaction_inner(
2057 &self,
2058 dbtx: &mut DatabaseTransaction<'_>,
2059 operation_id: OperationId,
2060 tx_builder: TransactionBuilder,
2061 ) -> anyhow::Result<OutPointRange> {
2062 Client::finalize_and_submit_transaction_inner(self, dbtx, operation_id, tx_builder).await
2063 }
2064
2065 async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
2066 Client::transaction_updates(self, operation_id).await
2067 }
2068
2069 async fn await_primary_module_outputs(
2070 &self,
2071 operation_id: OperationId,
2072 outputs: Vec<OutPoint>,
2074 ) -> anyhow::Result<()> {
2075 Client::await_primary_bitcoin_module_outputs(self, operation_id, outputs).await
2076 }
2077
2078 fn operation_log(&self) -> &dyn IOperationLog {
2079 Client::operation_log(self)
2080 }
2081
2082 async fn has_active_states(&self, operation_id: OperationId) -> bool {
2083 Client::has_active_states(self, operation_id).await
2084 }
2085
2086 async fn operation_exists(&self, operation_id: OperationId) -> bool {
2087 Client::operation_exists(self, operation_id).await
2088 }
2089
2090 async fn config(&self) -> ClientConfig {
2091 Client::config(self).await
2092 }
2093
2094 fn db(&self) -> &Database {
2095 Client::db(self)
2096 }
2097
2098 fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static)) {
2099 Client::executor(self)
2100 }
2101
2102 async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
2103 Client::invite_code(self, peer).await
2104 }
2105
2106 fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
2107 Client::get_internal_payment_markers(self)
2108 }
2109
2110 async fn log_event_json(
2111 &self,
2112 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
2113 module_kind: Option<ModuleKind>,
2114 module_id: ModuleInstanceId,
2115 kind: EventKind,
2116 payload: serde_json::Value,
2117 persist: EventPersistence,
2118 ) {
2119 dbtx.ensure_global()
2120 .expect("Must be called with global dbtx");
2121 self.log_event_raw_dbtx(
2122 dbtx,
2123 kind,
2124 module_kind.map(|kind| (kind, module_id)),
2125 serde_json::to_vec(&payload).expect("Serialization can't fail"),
2126 persist,
2127 )
2128 .await;
2129 }
2130
2131 async fn read_operation_active_states<'dbtx>(
2132 &self,
2133 operation_id: OperationId,
2134 module_id: ModuleInstanceId,
2135 dbtx: &'dbtx mut DatabaseTransaction<'_>,
2136 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>
2137 {
2138 Box::pin(
2139 dbtx.find_by_prefix(&ActiveModuleOperationStateKeyPrefix {
2140 operation_id,
2141 module_instance: module_id,
2142 })
2143 .await
2144 .map(move |(k, v)| (k.0, v)),
2145 )
2146 }
2147 async fn read_operation_inactive_states<'dbtx>(
2148 &self,
2149 operation_id: OperationId,
2150 module_id: ModuleInstanceId,
2151 dbtx: &'dbtx mut DatabaseTransaction<'_>,
2152 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>
2153 {
2154 Box::pin(
2155 dbtx.find_by_prefix(&InactiveModuleOperationStateKeyPrefix {
2156 operation_id,
2157 module_instance: module_id,
2158 })
2159 .await
2160 .map(move |(k, v)| (k.0, v)),
2161 )
2162 }
2163}
2164
2165impl fmt::Debug for Client {
2167 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2168 write!(f, "Client")
2169 }
2170}
2171
2172pub fn client_decoders<'a>(
2173 registry: &ModuleInitRegistry<DynClientModuleInit>,
2174 module_kinds: impl Iterator<Item = (ModuleInstanceId, &'a ModuleKind)>,
2175) -> ModuleDecoderRegistry {
2176 let mut modules = BTreeMap::new();
2177 for (id, kind) in module_kinds {
2178 let Some(init) = registry.get(kind) else {
2179 debug!("Detected configuration for unsupported module id: {id}, kind: {kind}");
2180 continue;
2181 };
2182
2183 modules.insert(
2184 id,
2185 (
2186 kind.clone(),
2187 IClientModuleInit::decoder(AsRef::<dyn IClientModuleInit + 'static>::as_ref(init)),
2188 ),
2189 );
2190 }
2191 ModuleDecoderRegistry::from(modules)
2192}