1use std::collections::{BTreeMap, HashSet};
2use std::fmt::{self, Formatter};
3use std::future::{Future, pending};
4use std::ops::Range;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9use anyhow::{Context as _, anyhow, bail, format_err};
10use async_stream::try_stream;
11use bitcoin::key::Secp256k1;
12use bitcoin::key::rand::thread_rng;
13use bitcoin::secp256k1::{self, PublicKey};
14use fedimint_api_client::api::global_api::with_request_hook::ApiRequestHook;
15use fedimint_api_client::api::net::Connector;
16use fedimint_api_client::api::{
17 ApiVersionSet, DynGlobalApi, FederationApiExt as _, FederationResult, IGlobalFederationApi,
18};
19use fedimint_client_module::module::recovery::RecoveryProgress;
20use fedimint_client_module::module::{
21 ClientContextIface, ClientModule, ClientModuleRegistry, DynClientModule, FinalClientIface,
22 IClientModule, IdxRange, OutPointRange,
23};
24use fedimint_client_module::oplog::IOperationLog;
25use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy as _};
26use fedimint_client_module::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
27use fedimint_client_module::sm::{ActiveStateMeta, DynState, InactiveStateMeta};
28use fedimint_client_module::transaction::{
29 TRANSACTION_SUBMISSION_MODULE_INSTANCE, TransactionBuilder, TxSubmissionStates,
30 TxSubmissionStatesSM,
31};
32use fedimint_client_module::{
33 AddStateMachinesResult, ClientModuleInstance, GetInviteCodeRequest, ModuleGlobalContextGen,
34 ModuleRecoveryCompleted, TransactionUpdates, TxCreatedEvent,
35};
36use fedimint_core::config::{
37 ClientConfig, FederationId, GlobalClientConfig, JsonClientConfig, ModuleInitRegistry,
38};
39use fedimint_core::core::{DynInput, DynOutput, ModuleInstanceId, ModuleKind, OperationId};
40use fedimint_core::db::{
41 AutocommitError, Database, DatabaseKey, DatabaseRecord, DatabaseTransaction,
42 IDatabaseTransactionOpsCore as _, IDatabaseTransactionOpsCoreTyped as _, NonCommittable,
43};
44use fedimint_core::encoding::{Decodable, Encodable};
45use fedimint_core::endpoint_constants::{CLIENT_CONFIG_ENDPOINT, VERSION_ENDPOINT};
46use fedimint_core::envs::is_running_in_test_env;
47use fedimint_core::invite_code::InviteCode;
48use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
49use fedimint_core::module::{
50 ApiRequestErased, ApiVersion, MultiApiVersion, SupportedApiVersionsSummary,
51 SupportedCoreApiVersions, SupportedModuleApiVersions,
52};
53use fedimint_core::net::api_announcement::SignedApiAnnouncement;
54use fedimint_core::task::{Elapsed, MaybeSend, MaybeSync, TaskGroup};
55use fedimint_core::transaction::Transaction;
56use fedimint_core::util::backoff_util::custom_backoff;
57use fedimint_core::util::{
58 BoxStream, FmtCompact as _, FmtCompactAnyhow as _, SafeUrl, backoff_util, retry,
59};
60use fedimint_core::{
61 Amount, NumPeers, OutPoint, PeerId, apply, async_trait_maybe_send, maybe_add_send,
62 maybe_add_send_sync, runtime,
63};
64use fedimint_derive_secret::DerivableSecret;
65use fedimint_eventlog::{
66 DBTransactionEventLogExt as _, Event, EventKind, EventLogEntry, EventLogId, EventLogTrimableId,
67 EventPersistence, PersistedLogEntry,
68};
69use fedimint_logging::{LOG_CLIENT, LOG_CLIENT_NET_API, LOG_CLIENT_RECOVERY};
70use futures::stream::FuturesUnordered;
71use futures::{Stream, StreamExt as _};
72use global_ctx::ModuleGlobalClientContext;
73use serde::{Deserialize, Serialize};
74use tokio::sync::{broadcast, watch};
75use tokio_stream::wrappers::WatchStream;
76use tracing::{debug, info, warn};
77
78use crate::ClientBuilder;
79use crate::api_announcements::{ApiAnnouncementPrefix, get_api_urls};
80use crate::backup::Metadata;
81use crate::db::{
82 ApiSecretKey, CachedApiVersionSet, CachedApiVersionSetKey, ChronologicalOperationLogKey,
83 ClientConfigKey, ClientMetadataKey, ClientModuleRecovery, ClientModuleRecoveryState,
84 EncodedClientSecretKey, OperationLogKey, PeerLastApiVersionsSummary,
85 PeerLastApiVersionsSummaryKey, PendingClientConfigKey, apply_migrations_core_client_dbtx,
86 get_decoded_client_secret, verify_client_db_integrity_dbtx,
87};
88use crate::meta::MetaService;
89use crate::module_init::{ClientModuleInitRegistry, DynClientModuleInit, IClientModuleInit};
90use crate::oplog::OperationLog;
91use crate::sm::executor::{
92 ActiveModuleOperationStateKeyPrefix, ActiveOperationStateKeyPrefix, Executor,
93 InactiveModuleOperationStateKeyPrefix, InactiveOperationStateKeyPrefix,
94};
95
96pub(crate) mod builder;
97pub(crate) mod global_ctx;
98pub(crate) mod handle;
99
100const SUPPORTED_CORE_API_VERSIONS: &[fedimint_core::module::ApiVersion] =
104 &[ApiVersion { major: 0, minor: 0 }];
105
106pub struct Client {
120 final_client: FinalClientIface,
121 config: tokio::sync::RwLock<ClientConfig>,
122 api_secret: Option<String>,
123 decoders: ModuleDecoderRegistry,
124 db: Database,
125 federation_id: FederationId,
126 federation_config_meta: BTreeMap<String, String>,
127 primary_module_instance: ModuleInstanceId,
128 pub(crate) modules: ClientModuleRegistry,
129 module_inits: ClientModuleInitRegistry,
130 executor: Executor,
131 pub(crate) api: DynGlobalApi,
132 root_secret: DerivableSecret,
133 operation_log: OperationLog,
134 secp_ctx: Secp256k1<secp256k1::All>,
135 meta_service: Arc<MetaService>,
136 connector: Connector,
137
138 task_group: TaskGroup,
139
140 client_recovery_progress_receiver:
142 watch::Receiver<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
143
144 log_ordering_wakeup_tx: watch::Sender<()>,
147 log_event_added_rx: watch::Receiver<()>,
149 log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
150 request_hook: ApiRequestHook,
151 iroh_enable_dht: bool,
152 iroh_enable_next: bool,
153}
154
155#[derive(Debug, Serialize, Deserialize)]
156struct ListOperationsParams {
157 limit: Option<usize>,
158 last_seen: Option<ChronologicalOperationLogKey>,
159}
160
161#[derive(Debug, Clone, Serialize, Deserialize)]
162pub struct GetOperationIdRequest {
163 operation_id: OperationId,
164}
165
166impl Client {
167 pub async fn builder() -> anyhow::Result<ClientBuilder> {
170 Ok(ClientBuilder::new())
171 }
172
173 pub fn api(&self) -> &(dyn IGlobalFederationApi + 'static) {
174 self.api.as_ref()
175 }
176
177 pub fn api_clone(&self) -> DynGlobalApi {
178 self.api.clone()
179 }
180
181 pub fn task_group(&self) -> &TaskGroup {
183 &self.task_group
184 }
185
186 #[doc(hidden)]
188 pub fn executor(&self) -> &Executor {
189 &self.executor
190 }
191
192 pub async fn get_config_from_db(db: &Database) -> Option<ClientConfig> {
193 let mut dbtx = db.begin_transaction_nc().await;
194 dbtx.get_value(&ClientConfigKey).await
195 }
196
197 pub async fn get_pending_config_from_db(db: &Database) -> Option<ClientConfig> {
198 let mut dbtx = db.begin_transaction_nc().await;
199 dbtx.get_value(&PendingClientConfigKey).await
200 }
201
202 pub async fn get_api_secret_from_db(db: &Database) -> Option<String> {
203 let mut dbtx = db.begin_transaction_nc().await;
204 dbtx.get_value(&ApiSecretKey).await
205 }
206
207 pub async fn store_encodable_client_secret<T: Encodable>(
208 db: &Database,
209 secret: T,
210 ) -> anyhow::Result<()> {
211 let mut dbtx = db.begin_transaction().await;
212
213 if dbtx.get_value(&EncodedClientSecretKey).await.is_some() {
215 bail!("Encoded client secret already exists, cannot overwrite")
216 }
217
218 let encoded_secret = T::consensus_encode_to_vec(&secret);
219 dbtx.insert_entry(&EncodedClientSecretKey, &encoded_secret)
220 .await;
221 dbtx.commit_tx().await;
222 Ok(())
223 }
224
225 pub async fn load_decodable_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
226 let Some(secret) = Self::load_decodable_client_secret_opt(db).await? else {
227 bail!("Encoded client secret not present in DB")
228 };
229
230 Ok(secret)
231 }
232 pub async fn load_decodable_client_secret_opt<T: Decodable>(
233 db: &Database,
234 ) -> anyhow::Result<Option<T>> {
235 let mut dbtx = db.begin_transaction_nc().await;
236
237 let client_secret = dbtx.get_value(&EncodedClientSecretKey).await;
238
239 Ok(match client_secret {
240 Some(client_secret) => Some(
241 T::consensus_decode_whole(&client_secret, &ModuleRegistry::default())
242 .map_err(|e| anyhow!("Decoding failed: {e}"))?,
243 ),
244 None => None,
245 })
246 }
247
248 pub async fn load_or_generate_client_secret(db: &Database) -> anyhow::Result<[u8; 64]> {
249 let client_secret = match Self::load_decodable_client_secret::<[u8; 64]>(db).await {
250 Ok(secret) => secret,
251 _ => {
252 let secret = PlainRootSecretStrategy::random(&mut thread_rng());
253 Self::store_encodable_client_secret(db, secret)
254 .await
255 .expect("Storing client secret must work");
256 secret
257 }
258 };
259 Ok(client_secret)
260 }
261
262 pub async fn is_initialized(db: &Database) -> bool {
263 let mut dbtx = db.begin_transaction_nc().await;
264 dbtx.raw_get_bytes(&[ClientConfigKey::DB_PREFIX])
265 .await
266 .expect("Unrecoverable error occurred while reading and entry from the database")
267 .is_some()
268 }
269
270 pub fn start_executor(self: &Arc<Self>) {
271 debug!(
272 target: LOG_CLIENT,
273 "Starting fedimint client executor",
274 );
275 self.executor.start_executor(self.context_gen());
276 }
277
278 pub fn federation_id(&self) -> FederationId {
279 self.federation_id
280 }
281
282 fn context_gen(self: &Arc<Self>) -> ModuleGlobalContextGen {
283 let client_inner = Arc::downgrade(self);
284 Arc::new(move |module_instance, operation| {
285 ModuleGlobalClientContext {
286 client: client_inner
287 .clone()
288 .upgrade()
289 .expect("ModuleGlobalContextGen called after client was dropped"),
290 module_instance_id: module_instance,
291 operation,
292 }
293 .into()
294 })
295 }
296
297 pub async fn config(&self) -> ClientConfig {
298 self.config.read().await.clone()
299 }
300
301 pub fn api_secret(&self) -> &Option<String> {
302 &self.api_secret
303 }
304
305 pub fn decoders(&self) -> &ModuleDecoderRegistry {
306 &self.decoders
307 }
308
309 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
311 self.try_get_module(instance)
312 .expect("Module instance not found")
313 }
314
315 fn try_get_module(
316 &self,
317 instance: ModuleInstanceId,
318 ) -> Option<&maybe_add_send_sync!(dyn IClientModule)> {
319 Some(self.modules.get(instance)?.as_ref())
320 }
321
322 pub fn has_module(&self, instance: ModuleInstanceId) -> bool {
323 self.modules.get(instance).is_some()
324 }
325
326 fn transaction_builder_balance(&self, builder: &TransactionBuilder) -> (Amount, Amount) {
332 let mut in_amount = Amount::ZERO;
334 let mut out_amount = Amount::ZERO;
335 let mut fee_amount = Amount::ZERO;
336
337 for input in builder.inputs() {
338 let module = self.get_module(input.input.module_instance_id());
339
340 let item_fee = module.input_fee(input.amount, &input.input).expect(
341 "We only build transactions with input versions that are supported by the module",
342 );
343
344 in_amount += input.amount;
345 fee_amount += item_fee;
346 }
347
348 for output in builder.outputs() {
349 let module = self.get_module(output.output.module_instance_id());
350
351 let item_fee = module.output_fee(output.amount, &output.output).expect(
352 "We only build transactions with output versions that are supported by the module",
353 );
354
355 out_amount += output.amount;
356 fee_amount += item_fee;
357 }
358
359 (in_amount, out_amount + fee_amount)
360 }
361
362 pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
363 Ok((self.federation_id().to_fake_ln_pub_key(&self.secp_ctx)?, 0))
364 }
365
366 pub fn get_config_meta(&self, key: &str) -> Option<String> {
368 self.federation_config_meta.get(key).cloned()
369 }
370
371 pub(crate) fn root_secret(&self) -> DerivableSecret {
372 self.root_secret.clone()
373 }
374
375 pub async fn add_state_machines(
376 &self,
377 dbtx: &mut DatabaseTransaction<'_>,
378 states: Vec<DynState>,
379 ) -> AddStateMachinesResult {
380 self.executor.add_state_machines_dbtx(dbtx, states).await
381 }
382
383 pub async fn get_active_operations(&self) -> HashSet<OperationId> {
385 let active_states = self.executor.get_active_states().await;
386 let mut active_operations = HashSet::with_capacity(active_states.len());
387 let mut dbtx = self.db().begin_transaction_nc().await;
388 for (state, _) in active_states {
389 let operation_id = state.operation_id();
390 if dbtx
391 .get_value(&OperationLogKey { operation_id })
392 .await
393 .is_some()
394 {
395 active_operations.insert(operation_id);
396 }
397 }
398 active_operations
399 }
400
401 pub fn operation_log(&self) -> &OperationLog {
402 &self.operation_log
403 }
404
405 pub fn meta_service(&self) -> &Arc<MetaService> {
407 &self.meta_service
408 }
409
410 pub async fn get_meta_expiration_timestamp(&self) -> Option<SystemTime> {
412 let meta_service = self.meta_service();
413 let ts = meta_service
414 .get_field::<u64>(self.db(), "federation_expiry_timestamp")
415 .await
416 .and_then(|v| v.value)?;
417 Some(UNIX_EPOCH + Duration::from_secs(ts))
418 }
419
420 async fn finalize_transaction(
422 &self,
423 dbtx: &mut DatabaseTransaction<'_>,
424 operation_id: OperationId,
425 mut partial_transaction: TransactionBuilder,
426 ) -> anyhow::Result<(Transaction, Vec<DynState>, Range<u64>)> {
427 let (input_amount, output_amount) = self.transaction_builder_balance(&partial_transaction);
428
429 let (added_input_bundle, change_outputs) = self
430 .primary_module()
431 .create_final_inputs_and_outputs(
432 self.primary_module_instance,
433 dbtx,
434 operation_id,
435 input_amount,
436 output_amount,
437 )
438 .await?;
439
440 let change_range = Range {
444 start: partial_transaction.outputs().count() as u64,
445 end: (partial_transaction.outputs().count() + change_outputs.outputs().len()) as u64,
446 };
447
448 partial_transaction = partial_transaction
449 .with_inputs(added_input_bundle)
450 .with_outputs(change_outputs);
451
452 let (input_amount, output_amount) = self.transaction_builder_balance(&partial_transaction);
453
454 assert!(input_amount >= output_amount, "Transaction is underfunded");
455
456 let (tx, states) = partial_transaction.build(&self.secp_ctx, thread_rng());
457
458 Ok((tx, states, change_range))
459 }
460
461 pub async fn finalize_and_submit_transaction<F, M>(
473 &self,
474 operation_id: OperationId,
475 operation_type: &str,
476 operation_meta_gen: F,
477 tx_builder: TransactionBuilder,
478 ) -> anyhow::Result<OutPointRange>
479 where
480 F: Fn(OutPointRange) -> M + Clone + MaybeSend + MaybeSync,
481 M: serde::Serialize + MaybeSend,
482 {
483 let operation_type = operation_type.to_owned();
484
485 let autocommit_res = self
486 .db
487 .autocommit(
488 |dbtx, _| {
489 let operation_type = operation_type.clone();
490 let tx_builder = tx_builder.clone();
491 let operation_meta_gen = operation_meta_gen.clone();
492 Box::pin(async move {
493 if Client::operation_exists_dbtx(dbtx, operation_id).await {
494 bail!("There already exists an operation with id {operation_id:?}")
495 }
496
497 let out_point_range = self
498 .finalize_and_submit_transaction_inner(dbtx, operation_id, tx_builder)
499 .await?;
500
501 self.operation_log()
502 .add_operation_log_entry_dbtx(
503 dbtx,
504 operation_id,
505 &operation_type,
506 operation_meta_gen(out_point_range),
507 )
508 .await;
509
510 Ok(out_point_range)
511 })
512 },
513 Some(100), )
515 .await;
516
517 match autocommit_res {
518 Ok(txid) => Ok(txid),
519 Err(AutocommitError::ClosureError { error, .. }) => Err(error),
520 Err(AutocommitError::CommitFailed {
521 attempts,
522 last_error,
523 }) => panic!(
524 "Failed to commit tx submission dbtx after {attempts} attempts: {last_error}"
525 ),
526 }
527 }
528
529 async fn finalize_and_submit_transaction_inner(
530 &self,
531 dbtx: &mut DatabaseTransaction<'_>,
532 operation_id: OperationId,
533 tx_builder: TransactionBuilder,
534 ) -> anyhow::Result<OutPointRange> {
535 let (transaction, mut states, change_range) = self
536 .finalize_transaction(&mut dbtx.to_ref_nc(), operation_id, tx_builder)
537 .await?;
538
539 if transaction.consensus_encode_to_vec().len() > Transaction::MAX_TX_SIZE {
540 let inputs = transaction
541 .inputs
542 .iter()
543 .map(DynInput::module_instance_id)
544 .collect::<Vec<_>>();
545 let outputs = transaction
546 .outputs
547 .iter()
548 .map(DynOutput::module_instance_id)
549 .collect::<Vec<_>>();
550 warn!(
551 target: LOG_CLIENT_NET_API,
552 size=%transaction.consensus_encode_to_vec().len(),
553 ?inputs,
554 ?outputs,
555 "Transaction too large",
556 );
557 debug!(target: LOG_CLIENT_NET_API, ?transaction, "transaction details");
558 bail!(
559 "The generated transaction would be rejected by the federation for being too large."
560 );
561 }
562
563 let txid = transaction.tx_hash();
564
565 debug!(target: LOG_CLIENT_NET_API, %txid, ?transaction, "Finalized and submitting transaction");
566
567 let tx_submission_sm = DynState::from_typed(
568 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
569 TxSubmissionStatesSM {
570 operation_id,
571 state: TxSubmissionStates::Created(transaction),
572 },
573 );
574 states.push(tx_submission_sm);
575
576 self.executor.add_state_machines_dbtx(dbtx, states).await?;
577
578 self.log_event_dbtx(dbtx, None, TxCreatedEvent { txid, operation_id })
579 .await;
580
581 Ok(OutPointRange::new(txid, IdxRange::from(change_range)))
582 }
583
584 async fn transaction_update_stream(
585 &self,
586 operation_id: OperationId,
587 ) -> BoxStream<'static, TxSubmissionStatesSM> {
588 self.executor
589 .notifier()
590 .module_notifier::<TxSubmissionStatesSM>(
591 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
592 self.final_client.clone(),
593 )
594 .subscribe(operation_id)
595 .await
596 }
597
598 pub async fn operation_exists(&self, operation_id: OperationId) -> bool {
599 let mut dbtx = self.db().begin_transaction_nc().await;
600
601 Client::operation_exists_dbtx(&mut dbtx, operation_id).await
602 }
603
604 pub async fn operation_exists_dbtx(
605 dbtx: &mut DatabaseTransaction<'_>,
606 operation_id: OperationId,
607 ) -> bool {
608 let active_state_exists = dbtx
609 .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
610 .await
611 .next()
612 .await
613 .is_some();
614
615 let inactive_state_exists = dbtx
616 .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
617 .await
618 .next()
619 .await
620 .is_some();
621
622 active_state_exists || inactive_state_exists
623 }
624
625 pub async fn has_active_states(&self, operation_id: OperationId) -> bool {
626 self.db
627 .begin_transaction_nc()
628 .await
629 .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
630 .await
631 .next()
632 .await
633 .is_some()
634 }
635
636 pub async fn await_primary_module_output(
639 &self,
640 operation_id: OperationId,
641 out_point: OutPoint,
642 ) -> anyhow::Result<()> {
643 self.primary_module()
644 .await_primary_module_output(operation_id, out_point)
645 .await
646 }
647
648 pub fn get_first_module<M: ClientModule>(
650 &'_ self,
651 ) -> anyhow::Result<ClientModuleInstance<'_, M>> {
652 let module_kind = M::kind();
653 let id = self
654 .get_first_instance(&module_kind)
655 .ok_or_else(|| format_err!("No modules found of kind {module_kind}"))?;
656 let module: &M = self
657 .try_get_module(id)
658 .ok_or_else(|| format_err!("Unknown module instance {id}"))?
659 .as_any()
660 .downcast_ref::<M>()
661 .ok_or_else(|| format_err!("Module is not of type {}", std::any::type_name::<M>()))?;
662 let (db, _) = self.db().with_prefix_module_id(id);
663 Ok(ClientModuleInstance {
664 id,
665 db,
666 api: self.api().with_module(id),
667 module,
668 })
669 }
670
671 pub fn get_module_client_dyn(
672 &self,
673 instance_id: ModuleInstanceId,
674 ) -> anyhow::Result<&maybe_add_send_sync!(dyn IClientModule)> {
675 self.try_get_module(instance_id)
676 .ok_or(anyhow!("Unknown module instance {}", instance_id))
677 }
678
679 pub fn db(&self) -> &Database {
680 &self.db
681 }
682
683 pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
686 TransactionUpdates {
687 update_stream: self.transaction_update_stream(operation_id).await,
688 }
689 }
690
691 pub fn get_first_instance(&self, module_kind: &ModuleKind) -> Option<ModuleInstanceId> {
695 if self
696 .modules
697 .get_with_kind(self.primary_module_instance)
698 .is_some_and(|(kind, _)| kind == module_kind)
699 {
700 return Some(self.primary_module_instance);
701 }
702
703 self.modules
704 .iter_modules()
705 .find(|(_, kind, _module)| *kind == module_kind)
706 .map(|(instance_id, _, _)| instance_id)
707 }
708
709 pub async fn root_secret_encoding<T: Decodable>(&self) -> anyhow::Result<T> {
712 get_decoded_client_secret::<T>(self.db()).await
713 }
714
715 pub async fn await_primary_module_outputs(
718 &self,
719 operation_id: OperationId,
720 outputs: Vec<OutPoint>,
721 ) -> anyhow::Result<()> {
722 for out_point in outputs {
723 self.await_primary_module_output(operation_id, out_point)
724 .await?;
725 }
726
727 Ok(())
728 }
729
730 pub async fn get_config_json(&self) -> JsonClientConfig {
736 self.config().await.to_json()
737 }
738
739 pub fn primary_module(&self) -> &DynClientModule {
741 self.modules
742 .get(self.primary_module_instance)
743 .expect("primary module must be present")
744 }
745
746 pub async fn get_balance(&self) -> Amount {
748 self.primary_module()
749 .get_balance(
750 self.primary_module_instance,
751 &mut self.db().begin_transaction_nc().await,
752 )
753 .await
754 }
755
756 pub async fn subscribe_balance_changes(&self) -> BoxStream<'static, Amount> {
759 let mut balance_changes = self.primary_module().subscribe_balance_changes().await;
760 let initial_balance = self.get_balance().await;
761 let db = self.db().clone();
762 let primary_module = self.primary_module().clone();
763 let primary_module_instance = self.primary_module_instance;
764
765 Box::pin(async_stream::stream! {
766 yield initial_balance;
767 let mut prev_balance = initial_balance;
768 while let Some(()) = balance_changes.next().await {
769 let mut dbtx = db.begin_transaction_nc().await;
770 let balance = primary_module
771 .get_balance(primary_module_instance, &mut dbtx)
772 .await;
773
774 if balance != prev_balance {
776 prev_balance = balance;
777 yield balance;
778 }
779 }
780 })
781 }
782
783 pub async fn refresh_peers_api_versions(
786 num_peers: NumPeers,
787 api: DynGlobalApi,
788 db: Database,
789 num_responses_sender: watch::Sender<usize>,
790 ) {
791 let mut backoff =
794 custom_backoff(Duration::from_millis(200), Duration::from_secs(600), None);
795
796 async fn make_request(
801 delay: Duration,
802 peer_id: PeerId,
803 api: &DynGlobalApi,
804 ) -> (
805 PeerId,
806 Result<SupportedApiVersionsSummary, fedimint_api_client::api::PeerError>,
807 ) {
808 runtime::sleep(delay).await;
809 (
810 peer_id,
811 api.request_single_peer::<SupportedApiVersionsSummary>(
812 VERSION_ENDPOINT.to_owned(),
813 ApiRequestErased::default(),
814 peer_id,
815 )
816 .await,
817 )
818 }
819
820 let mut requests = FuturesUnordered::new();
823
824 for peer_id in num_peers.peer_ids() {
825 requests.push(make_request(Duration::ZERO, peer_id, &api));
826 }
827
828 let mut num_responses = 0;
829
830 while let Some((peer_id, response)) = requests.next().await {
831 let retry = match response {
832 Err(err) => {
833 let has_previous_response = db
834 .begin_transaction_nc()
835 .await
836 .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
837 .await
838 .is_some();
839 debug!(
840 target: LOG_CLIENT,
841 %peer_id,
842 err = %err.fmt_compact(),
843 %has_previous_response,
844 "Failed to refresh API versions of a peer"
845 );
846
847 !has_previous_response
848 }
849 Ok(o) => {
850 let mut dbtx = db.begin_transaction().await;
853 dbtx.insert_entry(
854 &PeerLastApiVersionsSummaryKey(peer_id),
855 &PeerLastApiVersionsSummary(o),
856 )
857 .await;
858 dbtx.commit_tx().await;
859 false
860 }
861 };
862
863 if retry {
864 requests.push(make_request(
865 backoff.next().expect("Keeps retrying"),
866 peer_id,
867 &api,
868 ));
869 } else {
870 num_responses += 1;
871 num_responses_sender.send_replace(num_responses);
872 }
873 }
874 }
875
876 pub fn supported_api_versions_summary_static(
878 config: &ClientConfig,
879 client_module_init: &ClientModuleInitRegistry,
880 ) -> SupportedApiVersionsSummary {
881 SupportedApiVersionsSummary {
882 core: SupportedCoreApiVersions {
883 core_consensus: config.global.consensus_version,
884 api: MultiApiVersion::try_from_iter(SUPPORTED_CORE_API_VERSIONS.to_owned())
885 .expect("must not have conflicting versions"),
886 },
887 modules: config
888 .modules
889 .iter()
890 .filter_map(|(&module_instance_id, module_config)| {
891 client_module_init
892 .get(module_config.kind())
893 .map(|module_init| {
894 (
895 module_instance_id,
896 SupportedModuleApiVersions {
897 core_consensus: config.global.consensus_version,
898 module_consensus: module_config.version,
899 api: module_init.supported_api_versions(),
900 },
901 )
902 })
903 })
904 .collect(),
905 }
906 }
907
908 pub async fn load_and_refresh_common_api_version(&self) -> anyhow::Result<ApiVersionSet> {
909 Self::load_and_refresh_common_api_version_static(
910 &self.config().await,
911 &self.module_inits,
912 &self.api,
913 &self.db,
914 &self.task_group,
915 )
916 .await
917 }
918
919 async fn load_and_refresh_common_api_version_static(
925 config: &ClientConfig,
926 module_init: &ClientModuleInitRegistry,
927 api: &DynGlobalApi,
928 db: &Database,
929 task_group: &TaskGroup,
930 ) -> anyhow::Result<ApiVersionSet> {
931 if let Some(v) = db
932 .begin_transaction_nc()
933 .await
934 .get_value(&CachedApiVersionSetKey)
935 .await
936 {
937 debug!(
938 target: LOG_CLIENT,
939 "Found existing cached common api versions"
940 );
941 let config = config.clone();
942 let client_module_init = module_init.clone();
943 let api = api.clone();
944 let db = db.clone();
945 let task_group = task_group.clone();
946 task_group
949 .clone()
950 .spawn_cancellable("refresh_common_api_version_static", async move {
951 if let Err(error) = Self::refresh_common_api_version_static(
952 &config,
953 &client_module_init,
954 &api,
955 &db,
956 task_group,
957 false,
958 )
959 .await
960 {
961 warn!(
962 target: LOG_CLIENT,
963 err = %error.fmt_compact_anyhow(), "Failed to discover common api versions"
964 );
965 }
966 });
967
968 return Ok(v.0);
969 }
970
971 info!(
972 target: LOG_CLIENT,
973 "Fetching initial API versions "
974 );
975 Self::refresh_common_api_version_static(
976 config,
977 module_init,
978 api,
979 db,
980 task_group.clone(),
981 true,
982 )
983 .await
984 }
985
986 async fn refresh_common_api_version_static(
987 config: &ClientConfig,
988 client_module_init: &ClientModuleInitRegistry,
989 api: &DynGlobalApi,
990 db: &Database,
991 task_group: TaskGroup,
992 block_until_ok: bool,
993 ) -> anyhow::Result<ApiVersionSet> {
994 debug!(
995 target: LOG_CLIENT,
996 "Refreshing common api versions"
997 );
998
999 let (num_responses_sender, mut num_responses_receiver) = tokio::sync::watch::channel(0);
1000 let num_peers = NumPeers::from(config.global.api_endpoints.len());
1001
1002 task_group.spawn_cancellable("refresh peers api versions", {
1003 Client::refresh_peers_api_versions(
1004 num_peers,
1005 api.clone(),
1006 db.clone(),
1007 num_responses_sender,
1008 )
1009 });
1010
1011 let common_api_versions = loop {
1012 let _: Result<_, Elapsed> = runtime::timeout(
1020 Duration::from_secs(30),
1021 num_responses_receiver.wait_for(|num| num_peers.threshold() <= *num),
1022 )
1023 .await;
1024
1025 let peer_api_version_sets = Self::load_peers_last_api_versions(db, num_peers).await;
1026
1027 match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1028 &Self::supported_api_versions_summary_static(config, client_module_init),
1029 &peer_api_version_sets,
1030 ) {
1031 Ok(o) => break o,
1032 Err(err) if block_until_ok => {
1033 warn!(
1034 target: LOG_CLIENT,
1035 err = %err.fmt_compact_anyhow(),
1036 "Failed to discover API version to use. Retrying..."
1037 );
1038 continue;
1039 }
1040 Err(e) => return Err(e),
1041 }
1042 };
1043
1044 debug!(
1045 target: LOG_CLIENT,
1046 value = ?common_api_versions,
1047 "Updating the cached common api versions"
1048 );
1049 let mut dbtx = db.begin_transaction().await;
1050 let _ = dbtx
1051 .insert_entry(
1052 &CachedApiVersionSetKey,
1053 &CachedApiVersionSet(common_api_versions.clone()),
1054 )
1055 .await;
1056
1057 dbtx.commit_tx().await;
1058
1059 Ok(common_api_versions)
1060 }
1061
1062 pub async fn get_metadata(&self) -> Metadata {
1064 self.db
1065 .begin_transaction_nc()
1066 .await
1067 .get_value(&ClientMetadataKey)
1068 .await
1069 .unwrap_or_else(|| {
1070 warn!(
1071 target: LOG_CLIENT,
1072 "Missing existing metadata. This key should have been set on Client init"
1073 );
1074 Metadata::empty()
1075 })
1076 }
1077
1078 pub async fn set_metadata(&self, metadata: &Metadata) {
1080 self.db
1081 .autocommit::<_, _, anyhow::Error>(
1082 |dbtx, _| {
1083 Box::pin(async {
1084 Self::set_metadata_dbtx(dbtx, metadata).await;
1085 Ok(())
1086 })
1087 },
1088 None,
1089 )
1090 .await
1091 .expect("Failed to autocommit metadata");
1092 }
1093
1094 pub fn has_pending_recoveries(&self) -> bool {
1095 !self
1096 .client_recovery_progress_receiver
1097 .borrow()
1098 .iter()
1099 .all(|(_id, progress)| progress.is_done())
1100 }
1101
1102 pub async fn wait_for_all_recoveries(&self) -> anyhow::Result<()> {
1110 let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1111 recovery_receiver
1112 .wait_for(|in_progress| {
1113 in_progress
1114 .iter()
1115 .all(|(_id, progress)| progress.is_done())
1116 })
1117 .await
1118 .context("Recovery task completed and update receiver disconnected, but some modules failed to recover")?;
1119
1120 Ok(())
1121 }
1122
1123 pub fn subscribe_to_recovery_progress(
1128 &self,
1129 ) -> impl Stream<Item = (ModuleInstanceId, RecoveryProgress)> + use<> {
1130 WatchStream::new(self.client_recovery_progress_receiver.clone())
1131 .flat_map(futures::stream::iter)
1132 }
1133
1134 pub async fn wait_for_module_kind_recovery(
1135 &self,
1136 module_kind: ModuleKind,
1137 ) -> anyhow::Result<()> {
1138 let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1139 let config = self.config().await;
1140 recovery_receiver
1141 .wait_for(|in_progress| {
1142 !in_progress
1143 .iter()
1144 .filter(|(module_instance_id, _progress)| {
1145 config.modules[module_instance_id].kind == module_kind
1146 })
1147 .any(|(_id, progress)| !progress.is_done())
1148 })
1149 .await
1150 .context("Recovery task completed and update receiver disconnected, but the desired modules are still unavailable or failed to recover")?;
1151
1152 Ok(())
1153 }
1154
1155 pub async fn wait_for_all_active_state_machines(&self) -> anyhow::Result<()> {
1156 loop {
1157 if self.executor.get_active_states().await.is_empty() {
1158 break;
1159 }
1160 fedimint_core::runtime::sleep(Duration::from_millis(100)).await;
1161 }
1162 Ok(())
1163 }
1164
1165 pub async fn set_metadata_dbtx(dbtx: &mut DatabaseTransaction<'_>, metadata: &Metadata) {
1167 dbtx.insert_new_entry(&ClientMetadataKey, metadata).await;
1168 }
1169
1170 fn spawn_module_recoveries_task(
1171 &self,
1172 recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1173 module_recoveries: BTreeMap<
1174 ModuleInstanceId,
1175 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1176 >,
1177 module_recovery_progress_receivers: BTreeMap<
1178 ModuleInstanceId,
1179 watch::Receiver<RecoveryProgress>,
1180 >,
1181 ) {
1182 let db = self.db.clone();
1183 let log_ordering_wakeup_tx = self.log_ordering_wakeup_tx.clone();
1184 self.task_group
1185 .spawn("module recoveries", |_task_handle| async {
1186 Self::run_module_recoveries_task(
1187 db,
1188 log_ordering_wakeup_tx,
1189 recovery_sender,
1190 module_recoveries,
1191 module_recovery_progress_receivers,
1192 )
1193 .await;
1194 });
1195 }
1196
1197 async fn run_module_recoveries_task(
1198 db: Database,
1199 log_ordering_wakeup_tx: watch::Sender<()>,
1200 recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1201 module_recoveries: BTreeMap<
1202 ModuleInstanceId,
1203 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1204 >,
1205 module_recovery_progress_receivers: BTreeMap<
1206 ModuleInstanceId,
1207 watch::Receiver<RecoveryProgress>,
1208 >,
1209 ) {
1210 debug!(target: LOG_CLIENT_RECOVERY, num_modules=%module_recovery_progress_receivers.len(), "Staring module recoveries");
1211 let mut completed_stream = Vec::new();
1212 let progress_stream = futures::stream::FuturesUnordered::new();
1213
1214 for (module_instance_id, f) in module_recoveries {
1215 completed_stream.push(futures::stream::once(Box::pin(async move {
1216 match f.await {
1217 Ok(()) => (module_instance_id, None),
1218 Err(err) => {
1219 warn!(
1220 target: LOG_CLIENT,
1221 err = %err.fmt_compact_anyhow(), module_instance_id, "Module recovery failed"
1222 );
1223 futures::future::pending::<()>().await;
1227 unreachable!()
1228 }
1229 }
1230 })));
1231 }
1232
1233 for (module_instance_id, rx) in module_recovery_progress_receivers {
1234 progress_stream.push(
1235 tokio_stream::wrappers::WatchStream::new(rx)
1236 .fuse()
1237 .map(move |progress| (module_instance_id, Some(progress))),
1238 );
1239 }
1240
1241 let mut futures = futures::stream::select(
1242 futures::stream::select_all(progress_stream),
1243 futures::stream::select_all(completed_stream),
1244 );
1245
1246 while let Some((module_instance_id, progress)) = futures.next().await {
1247 let mut dbtx = db.begin_transaction().await;
1248
1249 let prev_progress = *recovery_sender
1250 .borrow()
1251 .get(&module_instance_id)
1252 .expect("existing progress must be present");
1253
1254 let progress = if prev_progress.is_done() {
1255 prev_progress
1257 } else if let Some(progress) = progress {
1258 progress
1259 } else {
1260 prev_progress.to_complete()
1261 };
1262
1263 if !prev_progress.is_done() && progress.is_done() {
1264 info!(
1265 target: LOG_CLIENT,
1266 module_instance_id,
1267 progress = format!("{}/{}", progress.complete, progress.total),
1268 "Recovery complete"
1269 );
1270 dbtx.log_event(
1271 log_ordering_wakeup_tx.clone(),
1272 None,
1273 ModuleRecoveryCompleted {
1274 module_id: module_instance_id,
1275 },
1276 )
1277 .await;
1278 } else {
1279 info!(
1280 target: LOG_CLIENT,
1281 module_instance_id,
1282 progress = format!("{}/{}", progress.complete, progress.total),
1283 "Recovery progress"
1284 );
1285 }
1286
1287 dbtx.insert_entry(
1288 &ClientModuleRecovery { module_instance_id },
1289 &ClientModuleRecoveryState { progress },
1290 )
1291 .await;
1292 dbtx.commit_tx().await;
1293
1294 recovery_sender.send_modify(|v| {
1295 v.insert(module_instance_id, progress);
1296 });
1297 }
1298 debug!(target: LOG_CLIENT_RECOVERY, "Recovery executor stopped");
1299 }
1300
1301 async fn load_peers_last_api_versions(
1302 db: &Database,
1303 num_peers: NumPeers,
1304 ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1305 let mut peer_api_version_sets = BTreeMap::new();
1306
1307 let mut dbtx = db.begin_transaction_nc().await;
1308 for peer_id in num_peers.peer_ids() {
1309 if let Some(v) = dbtx
1310 .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1311 .await
1312 {
1313 peer_api_version_sets.insert(peer_id, v.0);
1314 }
1315 }
1316 drop(dbtx);
1317 peer_api_version_sets
1318 }
1319
1320 pub async fn get_peer_url_announcements(&self) -> BTreeMap<PeerId, SignedApiAnnouncement> {
1323 self.db()
1324 .begin_transaction_nc()
1325 .await
1326 .find_by_prefix(&ApiAnnouncementPrefix)
1327 .await
1328 .map(|(announcement_key, announcement)| (announcement_key.0, announcement))
1329 .collect()
1330 .await
1331 }
1332
1333 pub async fn get_peer_urls(&self) -> BTreeMap<PeerId, SafeUrl> {
1335 get_api_urls(&self.db, &self.config().await).await
1336 }
1337
1338 pub async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
1341 self.get_peer_urls()
1342 .await
1343 .into_iter()
1344 .find_map(|(peer_id, url)| (peer == peer_id).then_some(url))
1345 .map(|peer_url| {
1346 InviteCode::new(
1347 peer_url.clone(),
1348 peer,
1349 self.federation_id(),
1350 self.api_secret.clone(),
1351 )
1352 })
1353 }
1354
1355 pub async fn get_guardian_public_keys_blocking(
1359 &self,
1360 ) -> BTreeMap<PeerId, fedimint_core::secp256k1::PublicKey> {
1361 self.db
1362 .autocommit(
1363 |dbtx, _| {
1364 Box::pin(async move {
1365 let config = self.config().await;
1366
1367 let guardian_pub_keys = self
1368 .get_or_backfill_broadcast_public_keys(dbtx, config)
1369 .await;
1370
1371 Result::<_, ()>::Ok(guardian_pub_keys)
1372 })
1373 },
1374 None,
1375 )
1376 .await
1377 .expect("Will retry forever")
1378 }
1379
1380 async fn get_or_backfill_broadcast_public_keys(
1381 &self,
1382 dbtx: &mut DatabaseTransaction<'_>,
1383 config: ClientConfig,
1384 ) -> BTreeMap<PeerId, PublicKey> {
1385 match config.global.broadcast_public_keys {
1386 Some(guardian_pub_keys) => guardian_pub_keys,
1387 _ => {
1388 let (guardian_pub_keys, new_config) = self.fetch_and_update_config(config).await;
1389
1390 dbtx.insert_entry(&ClientConfigKey, &new_config).await;
1391 *(self.config.write().await) = new_config;
1392 guardian_pub_keys
1393 }
1394 }
1395 }
1396
1397 async fn fetch_session_count(&self) -> FederationResult<u64> {
1398 self.api.session_count().await
1399 }
1400
1401 async fn fetch_and_update_config(
1402 &self,
1403 config: ClientConfig,
1404 ) -> (BTreeMap<PeerId, PublicKey>, ClientConfig) {
1405 let fetched_config = retry(
1406 "Fetching guardian public keys",
1407 backoff_util::background_backoff(),
1408 || async {
1409 Ok(self
1410 .api
1411 .request_current_consensus::<ClientConfig>(
1412 CLIENT_CONFIG_ENDPOINT.to_owned(),
1413 ApiRequestErased::default(),
1414 )
1415 .await?)
1416 },
1417 )
1418 .await
1419 .expect("Will never return on error");
1420
1421 let Some(guardian_pub_keys) = fetched_config.global.broadcast_public_keys else {
1422 warn!(
1423 target: LOG_CLIENT,
1424 "Guardian public keys not found in fetched config, server not updated to 0.4 yet"
1425 );
1426 pending::<()>().await;
1427 unreachable!("Pending will never return");
1428 };
1429
1430 let new_config = ClientConfig {
1431 global: GlobalClientConfig {
1432 broadcast_public_keys: Some(guardian_pub_keys.clone()),
1433 ..config.global
1434 },
1435 modules: config.modules,
1436 };
1437 (guardian_pub_keys, new_config)
1438 }
1439
1440 pub fn handle_global_rpc(
1441 &self,
1442 method: String,
1443 params: serde_json::Value,
1444 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1445 Box::pin(try_stream! {
1446 match method.as_str() {
1447 "get_balance" => {
1448 let balance = self.get_balance().await;
1449 yield serde_json::to_value(balance)?;
1450 }
1451 "subscribe_balance_changes" => {
1452 let mut stream = self.subscribe_balance_changes().await;
1453 while let Some(balance) = stream.next().await {
1454 yield serde_json::to_value(balance)?;
1455 }
1456 }
1457 "get_config" => {
1458 let config = self.config().await;
1459 yield serde_json::to_value(config)?;
1460 }
1461 "get_federation_id" => {
1462 let federation_id = self.federation_id();
1463 yield serde_json::to_value(federation_id)?;
1464 }
1465 "get_invite_code" => {
1466 let req: GetInviteCodeRequest = serde_json::from_value(params)?;
1467 let invite_code = self.invite_code(req.peer).await;
1468 yield serde_json::to_value(invite_code)?;
1469 }
1470 "get_operation" => {
1471 let req: GetOperationIdRequest = serde_json::from_value(params)?;
1472 let operation = self.operation_log().get_operation(req.operation_id).await;
1473 yield serde_json::to_value(operation)?;
1474 }
1475 "list_operations" => {
1476 let req: ListOperationsParams = serde_json::from_value(params)?;
1477 let limit = if req.limit.is_none() && req.last_seen.is_none() {
1478 usize::MAX
1479 } else {
1480 req.limit.unwrap_or(usize::MAX)
1481 };
1482 let operations = self.operation_log()
1483 .paginate_operations_rev(limit, req.last_seen)
1484 .await;
1485 yield serde_json::to_value(operations)?;
1486 }
1487 "session_count" => {
1488 let count = self.fetch_session_count().await?;
1489 yield serde_json::to_value(count)?;
1490 }
1491 "has_pending_recoveries" => {
1492 let has_pending = self.has_pending_recoveries();
1493 yield serde_json::to_value(has_pending)?;
1494 }
1495 "wait_for_all_recoveries" => {
1496 self.wait_for_all_recoveries().await?;
1497 yield serde_json::Value::Null;
1498 }
1499 "subscribe_to_recovery_progress" => {
1500 let mut stream = self.subscribe_to_recovery_progress();
1501 while let Some((module_id, progress)) = stream.next().await {
1502 yield serde_json::json!({
1503 "module_id": module_id,
1504 "progress": progress
1505 });
1506 }
1507 }
1508 "backup_to_federation" => {
1509 let metadata = if params.is_null() {
1510 Metadata::from_json_serialized(serde_json::json!({}))
1511 } else {
1512 Metadata::from_json_serialized(params)
1513 };
1514 self.backup_to_federation(metadata).await?;
1515 yield serde_json::Value::Null;
1516 }
1517 _ => {
1518 Err(anyhow::format_err!("Unknown method: {}", method))?;
1519 unreachable!()
1520 },
1521 }
1522 })
1523 }
1524
1525 pub async fn log_event<E>(&self, module_id: Option<ModuleInstanceId>, event: E)
1526 where
1527 E: Event + Send,
1528 {
1529 let mut dbtx = self.db.begin_transaction().await;
1530 self.log_event_dbtx(&mut dbtx, module_id, event).await;
1531 dbtx.commit_tx().await;
1532 }
1533
1534 pub async fn log_event_dbtx<E, Cap>(
1535 &self,
1536 dbtx: &mut DatabaseTransaction<'_, Cap>,
1537 module_id: Option<ModuleInstanceId>,
1538 event: E,
1539 ) where
1540 E: Event + Send,
1541 Cap: Send,
1542 {
1543 dbtx.log_event(self.log_ordering_wakeup_tx.clone(), module_id, event)
1544 .await;
1545 }
1546
1547 pub async fn log_event_raw_dbtx<Cap>(
1548 &self,
1549 dbtx: &mut DatabaseTransaction<'_, Cap>,
1550 kind: EventKind,
1551 module: Option<(ModuleKind, ModuleInstanceId)>,
1552 payload: Vec<u8>,
1553 persist: EventPersistence,
1554 ) where
1555 Cap: Send,
1556 {
1557 let module_id = module.as_ref().map(|m| m.1);
1558 let module_kind = module.map(|m| m.0);
1559 dbtx.log_event_raw(
1560 self.log_ordering_wakeup_tx.clone(),
1561 kind,
1562 module_kind,
1563 module_id,
1564 payload,
1565 persist,
1566 )
1567 .await;
1568 }
1569
1570 pub async fn handle_events<F, R, K>(&self, pos_key: &K, call_fn: F) -> anyhow::Result<()>
1571 where
1572 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1573 K: DatabaseRecord<Value = EventLogId>,
1574 F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
1575 R: Future<Output = anyhow::Result<()>>,
1576 {
1577 fedimint_eventlog::handle_events(
1578 self.db.clone(),
1579 pos_key,
1580 self.log_event_added_rx.clone(),
1581 call_fn,
1582 )
1583 .await
1584 }
1585
1586 pub async fn handle_trimable_events<F, R, K>(
1587 &self,
1588 pos_key: &K,
1589 call_fn: F,
1590 ) -> anyhow::Result<()>
1591 where
1592 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1593 K: DatabaseRecord<Value = EventLogTrimableId>,
1594 F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
1595 R: Future<Output = anyhow::Result<()>>,
1596 {
1597 fedimint_eventlog::handle_trimable_events(
1598 self.db.clone(),
1599 pos_key,
1600 self.log_event_added_rx.clone(),
1601 call_fn,
1602 )
1603 .await
1604 }
1605
1606 pub async fn get_event_log(
1607 &self,
1608 pos: Option<EventLogId>,
1609 limit: u64,
1610 ) -> Vec<PersistedLogEntry> {
1611 self.get_event_log_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
1612 .await
1613 }
1614
1615 pub async fn get_event_log_trimable(
1616 &self,
1617 pos: Option<EventLogTrimableId>,
1618 limit: u64,
1619 ) -> Vec<PersistedLogEntry> {
1620 self.get_event_log_trimable_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
1621 .await
1622 }
1623
1624 pub async fn get_event_log_dbtx<Cap>(
1625 &self,
1626 dbtx: &mut DatabaseTransaction<'_, Cap>,
1627 pos: Option<EventLogId>,
1628 limit: u64,
1629 ) -> Vec<PersistedLogEntry>
1630 where
1631 Cap: Send,
1632 {
1633 dbtx.get_event_log(pos, limit).await
1634 }
1635
1636 pub async fn get_event_log_trimable_dbtx<Cap>(
1637 &self,
1638 dbtx: &mut DatabaseTransaction<'_, Cap>,
1639 pos: Option<EventLogTrimableId>,
1640 limit: u64,
1641 ) -> Vec<PersistedLogEntry>
1642 where
1643 Cap: Send,
1644 {
1645 dbtx.get_event_log_trimable(pos, limit).await
1646 }
1647
1648 pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
1650 self.log_event_added_transient_tx.subscribe()
1651 }
1652
1653 pub fn iroh_enable_dht(&self) -> bool {
1654 self.iroh_enable_dht
1655 }
1656
1657 pub(crate) async fn run_core_migrations(
1658 db_no_decoders: &Database,
1659 ) -> Result<(), anyhow::Error> {
1660 let mut dbtx = db_no_decoders.begin_transaction().await;
1661 apply_migrations_core_client_dbtx(&mut dbtx.to_ref_nc(), "fedimint-client".to_string())
1662 .await?;
1663 if is_running_in_test_env() {
1664 verify_client_db_integrity_dbtx(&mut dbtx.to_ref_nc()).await;
1665 }
1666 dbtx.commit_tx_result().await?;
1667 Ok(())
1668 }
1669}
1670
1671#[apply(async_trait_maybe_send!)]
1672impl ClientContextIface for Client {
1673 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
1674 Client::get_module(self, instance)
1675 }
1676
1677 fn api_clone(&self) -> DynGlobalApi {
1678 Client::api_clone(self)
1679 }
1680 fn decoders(&self) -> &ModuleDecoderRegistry {
1681 Client::decoders(self)
1682 }
1683
1684 async fn finalize_and_submit_transaction(
1685 &self,
1686 operation_id: OperationId,
1687 operation_type: &str,
1688 operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
1689 tx_builder: TransactionBuilder,
1690 ) -> anyhow::Result<OutPointRange> {
1691 Client::finalize_and_submit_transaction(
1692 self,
1693 operation_id,
1694 operation_type,
1695 &operation_meta_gen,
1697 tx_builder,
1698 )
1699 .await
1700 }
1701
1702 async fn finalize_and_submit_transaction_inner(
1703 &self,
1704 dbtx: &mut DatabaseTransaction<'_>,
1705 operation_id: OperationId,
1706 tx_builder: TransactionBuilder,
1707 ) -> anyhow::Result<OutPointRange> {
1708 Client::finalize_and_submit_transaction_inner(self, dbtx, operation_id, tx_builder).await
1709 }
1710
1711 async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
1712 Client::transaction_updates(self, operation_id).await
1713 }
1714
1715 async fn await_primary_module_outputs(
1716 &self,
1717 operation_id: OperationId,
1718 outputs: Vec<OutPoint>,
1720 ) -> anyhow::Result<()> {
1721 Client::await_primary_module_outputs(self, operation_id, outputs).await
1722 }
1723
1724 fn operation_log(&self) -> &dyn IOperationLog {
1725 Client::operation_log(self)
1726 }
1727
1728 async fn has_active_states(&self, operation_id: OperationId) -> bool {
1729 Client::has_active_states(self, operation_id).await
1730 }
1731
1732 async fn operation_exists(&self, operation_id: OperationId) -> bool {
1733 Client::operation_exists(self, operation_id).await
1734 }
1735
1736 async fn config(&self) -> ClientConfig {
1737 Client::config(self).await
1738 }
1739
1740 fn db(&self) -> &Database {
1741 Client::db(self)
1742 }
1743
1744 fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static)) {
1745 Client::executor(self)
1746 }
1747
1748 async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
1749 Client::invite_code(self, peer).await
1750 }
1751
1752 fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
1753 Client::get_internal_payment_markers(self)
1754 }
1755
1756 async fn log_event_json(
1757 &self,
1758 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
1759 module_kind: Option<ModuleKind>,
1760 module_id: ModuleInstanceId,
1761 kind: EventKind,
1762 payload: serde_json::Value,
1763 persist: EventPersistence,
1764 ) {
1765 dbtx.ensure_global()
1766 .expect("Must be called with global dbtx");
1767 self.log_event_raw_dbtx(
1768 dbtx,
1769 kind,
1770 module_kind.map(|kind| (kind, module_id)),
1771 serde_json::to_vec(&payload).expect("Serialization can't fail"),
1772 persist,
1773 )
1774 .await;
1775 }
1776
1777 async fn read_operation_active_states<'dbtx>(
1778 &self,
1779 operation_id: OperationId,
1780 module_id: ModuleInstanceId,
1781 dbtx: &'dbtx mut DatabaseTransaction<'_>,
1782 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>
1783 {
1784 Box::pin(
1785 dbtx.find_by_prefix(&ActiveModuleOperationStateKeyPrefix {
1786 operation_id,
1787 module_instance: module_id,
1788 })
1789 .await
1790 .map(move |(k, v)| (k.0, v)),
1791 )
1792 }
1793 async fn read_operation_inactive_states<'dbtx>(
1794 &self,
1795 operation_id: OperationId,
1796 module_id: ModuleInstanceId,
1797 dbtx: &'dbtx mut DatabaseTransaction<'_>,
1798 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>
1799 {
1800 Box::pin(
1801 dbtx.find_by_prefix(&InactiveModuleOperationStateKeyPrefix {
1802 operation_id,
1803 module_instance: module_id,
1804 })
1805 .await
1806 .map(move |(k, v)| (k.0, v)),
1807 )
1808 }
1809}
1810
1811impl fmt::Debug for Client {
1813 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1814 write!(f, "Client")
1815 }
1816}
1817
1818pub fn client_decoders<'a>(
1819 registry: &ModuleInitRegistry<DynClientModuleInit>,
1820 module_kinds: impl Iterator<Item = (ModuleInstanceId, &'a ModuleKind)>,
1821) -> ModuleDecoderRegistry {
1822 let mut modules = BTreeMap::new();
1823 for (id, kind) in module_kinds {
1824 let Some(init) = registry.get(kind) else {
1825 debug!("Detected configuration for unsupported module id: {id}, kind: {kind}");
1826 continue;
1827 };
1828
1829 modules.insert(
1830 id,
1831 (
1832 kind.clone(),
1833 IClientModuleInit::decoder(AsRef::<dyn IClientModuleInit + 'static>::as_ref(init)),
1834 ),
1835 );
1836 }
1837 ModuleDecoderRegistry::from(modules)
1838}