1use std::collections::{BTreeMap, HashSet};
2use std::fmt::{self, Formatter};
3use std::future::{Future, pending};
4use std::ops::Range;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9use anyhow::{Context as _, anyhow, bail, format_err};
10use async_stream::try_stream;
11use bitcoin::key::Secp256k1;
12use bitcoin::key::rand::thread_rng;
13use bitcoin::secp256k1::{self, PublicKey};
14use fedimint_api_client::api::global_api::with_request_hook::ApiRequestHook;
15use fedimint_api_client::api::net::Connector;
16use fedimint_api_client::api::{
17 ApiVersionSet, DynGlobalApi, FederationApiExt as _, FederationResult, IGlobalFederationApi,
18};
19use fedimint_client_module::module::recovery::RecoveryProgress;
20use fedimint_client_module::module::{
21 ClientContextIface, ClientModule, ClientModuleRegistry, DynClientModule, FinalClientIface,
22 IClientModule, IdxRange, OutPointRange,
23};
24use fedimint_client_module::oplog::IOperationLog;
25use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy as _};
26use fedimint_client_module::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
27use fedimint_client_module::sm::{ActiveStateMeta, DynState, InactiveStateMeta};
28use fedimint_client_module::transaction::{
29 TRANSACTION_SUBMISSION_MODULE_INSTANCE, TransactionBuilder, TxSubmissionStates,
30 TxSubmissionStatesSM,
31};
32use fedimint_client_module::{
33 AddStateMachinesResult, ClientModuleInstance, GetInviteCodeRequest, ModuleGlobalContextGen,
34 ModuleRecoveryCompleted, TransactionUpdates, TxCreatedEvent,
35};
36use fedimint_core::config::{
37 ClientConfig, FederationId, GlobalClientConfig, JsonClientConfig, ModuleInitRegistry,
38};
39use fedimint_core::core::{DynInput, DynOutput, ModuleInstanceId, ModuleKind, OperationId};
40use fedimint_core::db::{
41 AutocommitError, Database, DatabaseKey, DatabaseRecord, DatabaseTransaction,
42 IDatabaseTransactionOpsCoreTyped as _, NonCommittable,
43};
44use fedimint_core::encoding::{Decodable, Encodable};
45use fedimint_core::endpoint_constants::{CLIENT_CONFIG_ENDPOINT, VERSION_ENDPOINT};
46use fedimint_core::envs::is_running_in_test_env;
47use fedimint_core::invite_code::InviteCode;
48use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
49use fedimint_core::module::{
50 ApiRequestErased, ApiVersion, MultiApiVersion, SupportedApiVersionsSummary,
51 SupportedCoreApiVersions, SupportedModuleApiVersions,
52};
53use fedimint_core::net::api_announcement::SignedApiAnnouncement;
54use fedimint_core::task::{Elapsed, MaybeSend, MaybeSync, TaskGroup};
55use fedimint_core::transaction::Transaction;
56use fedimint_core::util::{
57 BoxStream, FmtCompact as _, FmtCompactAnyhow as _, SafeUrl, backoff_util, retry,
58};
59use fedimint_core::{
60 Amount, NumPeers, OutPoint, PeerId, apply, async_trait_maybe_send,
61 fedimint_build_code_version_env, maybe_add_send, maybe_add_send_sync, runtime,
62};
63use fedimint_derive_secret::DerivableSecret;
64use fedimint_eventlog::{
65 DBTransactionEventLogExt as _, Event, EventKind, EventLogEntry, EventLogId, EventLogTrimableId,
66 EventPersistence, PersistedLogEntry,
67};
68use fedimint_logging::{LOG_CLIENT, LOG_CLIENT_NET_API, LOG_CLIENT_RECOVERY};
69use futures::stream::FuturesUnordered;
70use futures::{Stream, StreamExt as _};
71use global_ctx::ModuleGlobalClientContext;
72use serde::{Deserialize, Serialize};
73use tokio::sync::{broadcast, watch};
74use tokio_stream::wrappers::WatchStream;
75use tracing::{debug, info, warn};
76
77use crate::ClientBuilder;
78use crate::api_announcements::{ApiAnnouncementPrefix, get_api_urls};
79use crate::backup::Metadata;
80use crate::db::{
81 ApiSecretKey, CachedApiVersionSet, CachedApiVersionSetKey, ChronologicalOperationLogKey,
82 ClientConfigKey, ClientMetadataKey, ClientModuleRecovery, ClientModuleRecoveryState,
83 EncodedClientSecretKey, OperationLogKey, PeerLastApiVersionsSummary,
84 PeerLastApiVersionsSummaryKey, PendingClientConfigKey, apply_migrations_core_client_dbtx,
85 get_decoded_client_secret, verify_client_db_integrity_dbtx,
86};
87use crate::meta::MetaService;
88use crate::module_init::{ClientModuleInitRegistry, DynClientModuleInit, IClientModuleInit};
89use crate::oplog::OperationLog;
90use crate::sm::executor::{
91 ActiveModuleOperationStateKeyPrefix, ActiveOperationStateKeyPrefix, Executor,
92 InactiveModuleOperationStateKeyPrefix, InactiveOperationStateKeyPrefix,
93};
94
95pub(crate) mod builder;
96pub(crate) mod global_ctx;
97pub(crate) mod handle;
98
99const SUPPORTED_CORE_API_VERSIONS: &[fedimint_core::module::ApiVersion] =
103 &[ApiVersion { major: 0, minor: 0 }];
104
105pub struct Client {
119 final_client: FinalClientIface,
120 config: tokio::sync::RwLock<ClientConfig>,
121 api_secret: Option<String>,
122 decoders: ModuleDecoderRegistry,
123 db: Database,
124 federation_id: FederationId,
125 federation_config_meta: BTreeMap<String, String>,
126 primary_module_instance: ModuleInstanceId,
127 pub(crate) modules: ClientModuleRegistry,
128 module_inits: ClientModuleInitRegistry,
129 executor: Executor,
130 pub(crate) api: DynGlobalApi,
131 root_secret: DerivableSecret,
132 operation_log: OperationLog,
133 secp_ctx: Secp256k1<secp256k1::All>,
134 meta_service: Arc<MetaService>,
135 connector: Connector,
136
137 task_group: TaskGroup,
138
139 client_recovery_progress_receiver:
141 watch::Receiver<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
142
143 log_ordering_wakeup_tx: watch::Sender<()>,
146 log_event_added_rx: watch::Receiver<()>,
148 log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
149 request_hook: ApiRequestHook,
150}
151
152#[derive(Debug, Serialize, Deserialize)]
153struct ListOperationsParams {
154 limit: Option<usize>,
155 last_seen: Option<ChronologicalOperationLogKey>,
156}
157
158#[derive(Debug, Clone, Serialize, Deserialize)]
159pub struct GetOperationIdRequest {
160 operation_id: OperationId,
161}
162
163impl Client {
164 pub async fn builder(db: Database) -> anyhow::Result<ClientBuilder> {
167 let mut dbtx = db.begin_transaction().await;
168 apply_migrations_core_client_dbtx(&mut dbtx.to_ref_nc(), "fedimint-client".to_string())
169 .await?;
170 if is_running_in_test_env() {
171 verify_client_db_integrity_dbtx(&mut dbtx.to_ref_nc()).await;
172 }
173 dbtx.commit_tx_result().await?;
174
175 Ok(ClientBuilder::new(db))
176 }
177
178 pub fn api(&self) -> &(dyn IGlobalFederationApi + 'static) {
179 self.api.as_ref()
180 }
181
182 pub fn api_clone(&self) -> DynGlobalApi {
183 self.api.clone()
184 }
185
186 pub fn task_group(&self) -> &TaskGroup {
188 &self.task_group
189 }
190
191 #[doc(hidden)]
193 pub fn executor(&self) -> &Executor {
194 &self.executor
195 }
196
197 pub async fn get_config_from_db(db: &Database) -> Option<ClientConfig> {
198 let mut dbtx = db.begin_transaction_nc().await;
199 dbtx.get_value(&ClientConfigKey).await
200 }
201
202 pub async fn get_pending_config_from_db(db: &Database) -> Option<ClientConfig> {
203 let mut dbtx = db.begin_transaction_nc().await;
204 dbtx.get_value(&PendingClientConfigKey).await
205 }
206
207 pub async fn get_api_secret_from_db(db: &Database) -> Option<String> {
208 let mut dbtx = db.begin_transaction_nc().await;
209 dbtx.get_value(&ApiSecretKey).await
210 }
211
212 pub async fn store_encodable_client_secret<T: Encodable>(
213 db: &Database,
214 secret: T,
215 ) -> anyhow::Result<()> {
216 let mut dbtx = db.begin_transaction().await;
217
218 if dbtx.get_value(&EncodedClientSecretKey).await.is_some() {
220 bail!("Encoded client secret already exists, cannot overwrite")
221 }
222
223 let encoded_secret = T::consensus_encode_to_vec(&secret);
224 dbtx.insert_entry(&EncodedClientSecretKey, &encoded_secret)
225 .await;
226 dbtx.commit_tx().await;
227 Ok(())
228 }
229
230 pub async fn load_decodable_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
231 let Some(secret) = Self::load_decodable_client_secret_opt(db).await? else {
232 bail!("Encoded client secret not present in DB")
233 };
234
235 Ok(secret)
236 }
237 pub async fn load_decodable_client_secret_opt<T: Decodable>(
238 db: &Database,
239 ) -> anyhow::Result<Option<T>> {
240 let mut dbtx = db.begin_transaction_nc().await;
241
242 let client_secret = dbtx.get_value(&EncodedClientSecretKey).await;
243
244 Ok(match client_secret {
245 Some(client_secret) => Some(
246 T::consensus_decode_whole(&client_secret, &ModuleRegistry::default())
247 .map_err(|e| anyhow!("Decoding failed: {e}"))?,
248 ),
249 None => None,
250 })
251 }
252
253 pub async fn load_or_generate_client_secret(db: &Database) -> anyhow::Result<[u8; 64]> {
254 let client_secret = match Self::load_decodable_client_secret::<[u8; 64]>(db).await {
255 Ok(secret) => secret,
256 _ => {
257 let secret = PlainRootSecretStrategy::random(&mut thread_rng());
258 Self::store_encodable_client_secret(db, secret)
259 .await
260 .expect("Storing client secret must work");
261 secret
262 }
263 };
264 Ok(client_secret)
265 }
266
267 pub async fn is_initialized(db: &Database) -> bool {
268 Self::get_config_from_db(db).await.is_some()
269 }
270
271 pub fn start_executor(self: &Arc<Self>) {
272 debug!(
273 target: LOG_CLIENT,
274 "Starting fedimint client executor (version: {})",
275 fedimint_build_code_version_env!()
276 );
277 self.executor.start_executor(self.context_gen());
278 }
279
280 pub fn federation_id(&self) -> FederationId {
281 self.federation_id
282 }
283
284 fn context_gen(self: &Arc<Self>) -> ModuleGlobalContextGen {
285 let client_inner = Arc::downgrade(self);
286 Arc::new(move |module_instance, operation| {
287 ModuleGlobalClientContext {
288 client: client_inner
289 .clone()
290 .upgrade()
291 .expect("ModuleGlobalContextGen called after client was dropped"),
292 module_instance_id: module_instance,
293 operation,
294 }
295 .into()
296 })
297 }
298
299 pub async fn config(&self) -> ClientConfig {
300 self.config.read().await.clone()
301 }
302
303 pub fn api_secret(&self) -> &Option<String> {
304 &self.api_secret
305 }
306
307 pub fn decoders(&self) -> &ModuleDecoderRegistry {
308 &self.decoders
309 }
310
311 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
313 self.try_get_module(instance)
314 .expect("Module instance not found")
315 }
316
317 fn try_get_module(
318 &self,
319 instance: ModuleInstanceId,
320 ) -> Option<&maybe_add_send_sync!(dyn IClientModule)> {
321 Some(self.modules.get(instance)?.as_ref())
322 }
323
324 pub fn has_module(&self, instance: ModuleInstanceId) -> bool {
325 self.modules.get(instance).is_some()
326 }
327
328 fn transaction_builder_balance(&self, builder: &TransactionBuilder) -> (Amount, Amount) {
334 let mut in_amount = Amount::ZERO;
336 let mut out_amount = Amount::ZERO;
337 let mut fee_amount = Amount::ZERO;
338
339 for input in builder.inputs() {
340 let module = self.get_module(input.input.module_instance_id());
341
342 let item_fee = module.input_fee(input.amount, &input.input).expect(
343 "We only build transactions with input versions that are supported by the module",
344 );
345
346 in_amount += input.amount;
347 fee_amount += item_fee;
348 }
349
350 for output in builder.outputs() {
351 let module = self.get_module(output.output.module_instance_id());
352
353 let item_fee = module.output_fee(output.amount, &output.output).expect(
354 "We only build transactions with output versions that are supported by the module",
355 );
356
357 out_amount += output.amount;
358 fee_amount += item_fee;
359 }
360
361 (in_amount, out_amount + fee_amount)
362 }
363
364 pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
365 Ok((self.federation_id().to_fake_ln_pub_key(&self.secp_ctx)?, 0))
366 }
367
368 pub fn get_config_meta(&self, key: &str) -> Option<String> {
370 self.federation_config_meta.get(key).cloned()
371 }
372
373 pub(crate) fn root_secret(&self) -> DerivableSecret {
374 self.root_secret.clone()
375 }
376
377 pub async fn add_state_machines(
378 &self,
379 dbtx: &mut DatabaseTransaction<'_>,
380 states: Vec<DynState>,
381 ) -> AddStateMachinesResult {
382 self.executor.add_state_machines_dbtx(dbtx, states).await
383 }
384
385 pub async fn get_active_operations(&self) -> HashSet<OperationId> {
387 let active_states = self.executor.get_active_states().await;
388 let mut active_operations = HashSet::with_capacity(active_states.len());
389 let mut dbtx = self.db().begin_transaction_nc().await;
390 for (state, _) in active_states {
391 let operation_id = state.operation_id();
392 if dbtx
393 .get_value(&OperationLogKey { operation_id })
394 .await
395 .is_some()
396 {
397 active_operations.insert(operation_id);
398 }
399 }
400 active_operations
401 }
402
403 pub fn operation_log(&self) -> &OperationLog {
404 &self.operation_log
405 }
406
407 pub fn meta_service(&self) -> &Arc<MetaService> {
409 &self.meta_service
410 }
411
412 pub async fn get_meta_expiration_timestamp(&self) -> Option<SystemTime> {
414 let meta_service = self.meta_service();
415 let ts = meta_service
416 .get_field::<u64>(self.db(), "federation_expiry_timestamp")
417 .await
418 .and_then(|v| v.value)?;
419 Some(UNIX_EPOCH + Duration::from_secs(ts))
420 }
421
422 async fn finalize_transaction(
424 &self,
425 dbtx: &mut DatabaseTransaction<'_>,
426 operation_id: OperationId,
427 mut partial_transaction: TransactionBuilder,
428 ) -> anyhow::Result<(Transaction, Vec<DynState>, Range<u64>)> {
429 let (input_amount, output_amount) = self.transaction_builder_balance(&partial_transaction);
430
431 let (added_input_bundle, change_outputs) = self
432 .primary_module()
433 .create_final_inputs_and_outputs(
434 self.primary_module_instance,
435 dbtx,
436 operation_id,
437 input_amount,
438 output_amount,
439 )
440 .await?;
441
442 let change_range = Range {
446 start: partial_transaction.outputs().count() as u64,
447 end: (partial_transaction.outputs().count() + change_outputs.outputs().len()) as u64,
448 };
449
450 partial_transaction = partial_transaction
451 .with_inputs(added_input_bundle)
452 .with_outputs(change_outputs);
453
454 let (input_amount, output_amount) = self.transaction_builder_balance(&partial_transaction);
455
456 assert!(input_amount >= output_amount, "Transaction is underfunded");
457
458 let (tx, states) = partial_transaction.build(&self.secp_ctx, thread_rng());
459
460 Ok((tx, states, change_range))
461 }
462
463 pub async fn finalize_and_submit_transaction<F, M>(
475 &self,
476 operation_id: OperationId,
477 operation_type: &str,
478 operation_meta_gen: F,
479 tx_builder: TransactionBuilder,
480 ) -> anyhow::Result<OutPointRange>
481 where
482 F: Fn(OutPointRange) -> M + Clone + MaybeSend + MaybeSync,
483 M: serde::Serialize + MaybeSend,
484 {
485 let operation_type = operation_type.to_owned();
486
487 let autocommit_res = self
488 .db
489 .autocommit(
490 |dbtx, _| {
491 let operation_type = operation_type.clone();
492 let tx_builder = tx_builder.clone();
493 let operation_meta_gen = operation_meta_gen.clone();
494 Box::pin(async move {
495 if Client::operation_exists_dbtx(dbtx, operation_id).await {
496 bail!("There already exists an operation with id {operation_id:?}")
497 }
498
499 let out_point_range = self
500 .finalize_and_submit_transaction_inner(dbtx, operation_id, tx_builder)
501 .await?;
502
503 self.operation_log()
504 .add_operation_log_entry_dbtx(
505 dbtx,
506 operation_id,
507 &operation_type,
508 operation_meta_gen(out_point_range),
509 )
510 .await;
511
512 Ok(out_point_range)
513 })
514 },
515 Some(100), )
517 .await;
518
519 match autocommit_res {
520 Ok(txid) => Ok(txid),
521 Err(AutocommitError::ClosureError { error, .. }) => Err(error),
522 Err(AutocommitError::CommitFailed {
523 attempts,
524 last_error,
525 }) => panic!(
526 "Failed to commit tx submission dbtx after {attempts} attempts: {last_error}"
527 ),
528 }
529 }
530
531 async fn finalize_and_submit_transaction_inner(
532 &self,
533 dbtx: &mut DatabaseTransaction<'_>,
534 operation_id: OperationId,
535 tx_builder: TransactionBuilder,
536 ) -> anyhow::Result<OutPointRange> {
537 let (transaction, mut states, change_range) = self
538 .finalize_transaction(&mut dbtx.to_ref_nc(), operation_id, tx_builder)
539 .await?;
540
541 if transaction.consensus_encode_to_vec().len() > Transaction::MAX_TX_SIZE {
542 let inputs = transaction
543 .inputs
544 .iter()
545 .map(DynInput::module_instance_id)
546 .collect::<Vec<_>>();
547 let outputs = transaction
548 .outputs
549 .iter()
550 .map(DynOutput::module_instance_id)
551 .collect::<Vec<_>>();
552 warn!(
553 target: LOG_CLIENT_NET_API,
554 size=%transaction.consensus_encode_to_vec().len(),
555 ?inputs,
556 ?outputs,
557 "Transaction too large",
558 );
559 debug!(target: LOG_CLIENT_NET_API, ?transaction, "transaction details");
560 bail!(
561 "The generated transaction would be rejected by the federation for being too large."
562 );
563 }
564
565 let txid = transaction.tx_hash();
566
567 debug!(target: LOG_CLIENT_NET_API, %txid, ?transaction, "Finalized and submitting transaction");
568
569 let tx_submission_sm = DynState::from_typed(
570 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
571 TxSubmissionStatesSM {
572 operation_id,
573 state: TxSubmissionStates::Created(transaction),
574 },
575 );
576 states.push(tx_submission_sm);
577
578 self.executor.add_state_machines_dbtx(dbtx, states).await?;
579
580 self.log_event_dbtx(dbtx, None, TxCreatedEvent { txid, operation_id })
581 .await;
582
583 Ok(OutPointRange::new(txid, IdxRange::from(change_range)))
584 }
585
586 async fn transaction_update_stream(
587 &self,
588 operation_id: OperationId,
589 ) -> BoxStream<'static, TxSubmissionStatesSM> {
590 self.executor
591 .notifier()
592 .module_notifier::<TxSubmissionStatesSM>(
593 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
594 self.final_client.clone(),
595 )
596 .subscribe(operation_id)
597 .await
598 }
599
600 pub async fn operation_exists(&self, operation_id: OperationId) -> bool {
601 let mut dbtx = self.db().begin_transaction_nc().await;
602
603 Client::operation_exists_dbtx(&mut dbtx, operation_id).await
604 }
605
606 pub async fn operation_exists_dbtx(
607 dbtx: &mut DatabaseTransaction<'_>,
608 operation_id: OperationId,
609 ) -> bool {
610 let active_state_exists = dbtx
611 .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
612 .await
613 .next()
614 .await
615 .is_some();
616
617 let inactive_state_exists = dbtx
618 .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
619 .await
620 .next()
621 .await
622 .is_some();
623
624 active_state_exists || inactive_state_exists
625 }
626
627 pub async fn has_active_states(&self, operation_id: OperationId) -> bool {
628 self.db
629 .begin_transaction_nc()
630 .await
631 .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
632 .await
633 .next()
634 .await
635 .is_some()
636 }
637
638 pub async fn await_primary_module_output(
641 &self,
642 operation_id: OperationId,
643 out_point: OutPoint,
644 ) -> anyhow::Result<()> {
645 self.primary_module()
646 .await_primary_module_output(operation_id, out_point)
647 .await
648 }
649
650 pub fn get_first_module<M: ClientModule>(
652 &'_ self,
653 ) -> anyhow::Result<ClientModuleInstance<'_, M>> {
654 let module_kind = M::kind();
655 let id = self
656 .get_first_instance(&module_kind)
657 .ok_or_else(|| format_err!("No modules found of kind {module_kind}"))?;
658 let module: &M = self
659 .try_get_module(id)
660 .ok_or_else(|| format_err!("Unknown module instance {id}"))?
661 .as_any()
662 .downcast_ref::<M>()
663 .ok_or_else(|| format_err!("Module is not of type {}", std::any::type_name::<M>()))?;
664 let (db, _) = self.db().with_prefix_module_id(id);
665 Ok(ClientModuleInstance {
666 id,
667 db,
668 api: self.api().with_module(id),
669 module,
670 })
671 }
672
673 pub fn get_module_client_dyn(
674 &self,
675 instance_id: ModuleInstanceId,
676 ) -> anyhow::Result<&maybe_add_send_sync!(dyn IClientModule)> {
677 self.try_get_module(instance_id)
678 .ok_or(anyhow!("Unknown module instance {}", instance_id))
679 }
680
681 pub fn db(&self) -> &Database {
682 &self.db
683 }
684
685 pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
688 TransactionUpdates {
689 update_stream: self.transaction_update_stream(operation_id).await,
690 }
691 }
692
693 pub fn get_first_instance(&self, module_kind: &ModuleKind) -> Option<ModuleInstanceId> {
697 if self
698 .modules
699 .get_with_kind(self.primary_module_instance)
700 .is_some_and(|(kind, _)| kind == module_kind)
701 {
702 return Some(self.primary_module_instance);
703 }
704
705 self.modules
706 .iter_modules()
707 .find(|(_, kind, _module)| *kind == module_kind)
708 .map(|(instance_id, _, _)| instance_id)
709 }
710
711 pub async fn root_secret_encoding<T: Decodable>(&self) -> anyhow::Result<T> {
714 get_decoded_client_secret::<T>(self.db()).await
715 }
716
717 pub async fn await_primary_module_outputs(
720 &self,
721 operation_id: OperationId,
722 outputs: Vec<OutPoint>,
723 ) -> anyhow::Result<()> {
724 for out_point in outputs {
725 self.await_primary_module_output(operation_id, out_point)
726 .await?;
727 }
728
729 Ok(())
730 }
731
732 pub async fn get_config_json(&self) -> JsonClientConfig {
738 self.config().await.to_json()
739 }
740
741 pub fn primary_module(&self) -> &DynClientModule {
743 self.modules
744 .get(self.primary_module_instance)
745 .expect("primary module must be present")
746 }
747
748 pub async fn get_balance(&self) -> Amount {
750 self.primary_module()
751 .get_balance(
752 self.primary_module_instance,
753 &mut self.db().begin_transaction_nc().await,
754 )
755 .await
756 }
757
758 pub async fn subscribe_balance_changes(&self) -> BoxStream<'static, Amount> {
761 let mut balance_changes = self.primary_module().subscribe_balance_changes().await;
762 let initial_balance = self.get_balance().await;
763 let db = self.db().clone();
764 let primary_module = self.primary_module().clone();
765 let primary_module_instance = self.primary_module_instance;
766
767 Box::pin(async_stream::stream! {
768 yield initial_balance;
769 let mut prev_balance = initial_balance;
770 while let Some(()) = balance_changes.next().await {
771 let mut dbtx = db.begin_transaction_nc().await;
772 let balance = primary_module
773 .get_balance(primary_module_instance, &mut dbtx)
774 .await;
775
776 if balance != prev_balance {
778 prev_balance = balance;
779 yield balance;
780 }
781 }
782 })
783 }
784
785 pub async fn refresh_peers_api_versions(
788 num_peers: NumPeers,
789 api: DynGlobalApi,
790 db: Database,
791 num_responses_sender: watch::Sender<usize>,
792 ) {
793 async fn make_request(
798 delay: Duration,
799 peer_id: PeerId,
800 api: &DynGlobalApi,
801 ) -> (
802 PeerId,
803 Result<SupportedApiVersionsSummary, fedimint_api_client::api::PeerError>,
804 ) {
805 runtime::sleep(delay).await;
806 (
807 peer_id,
808 api.request_single_peer::<SupportedApiVersionsSummary>(
809 VERSION_ENDPOINT.to_owned(),
810 ApiRequestErased::default(),
811 peer_id,
812 )
813 .await,
814 )
815 }
816
817 let mut requests = FuturesUnordered::new();
820
821 for peer_id in num_peers.peer_ids() {
822 requests.push(make_request(Duration::ZERO, peer_id, &api));
823 }
824
825 let mut num_responses = 0;
826
827 while let Some((peer_id, response)) = requests.next().await {
828 match response {
829 Err(err) => {
830 if db
831 .begin_transaction_nc()
832 .await
833 .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
834 .await
835 .is_some()
836 {
837 debug!(target: LOG_CLIENT, %peer_id, err = %err.fmt_compact(), "Failed to refresh API versions of a peer, but we have a previous response");
838 } else {
839 debug!(target: LOG_CLIENT, %peer_id, err = %err.fmt_compact(), "Failed to refresh API versions of a peer, will retry");
840 requests.push(make_request(Duration::from_secs(15), peer_id, &api));
841 }
842 }
843 Ok(o) => {
844 let mut dbtx = db.begin_transaction().await;
847 dbtx.insert_entry(
848 &PeerLastApiVersionsSummaryKey(peer_id),
849 &PeerLastApiVersionsSummary(o),
850 )
851 .await;
852 dbtx.commit_tx().await;
853 num_responses += 1;
854 num_responses_sender.send_replace(num_responses);
856 }
857 }
858 }
859 }
860
861 pub fn supported_api_versions_summary_static(
863 config: &ClientConfig,
864 client_module_init: &ClientModuleInitRegistry,
865 ) -> SupportedApiVersionsSummary {
866 SupportedApiVersionsSummary {
867 core: SupportedCoreApiVersions {
868 core_consensus: config.global.consensus_version,
869 api: MultiApiVersion::try_from_iter(SUPPORTED_CORE_API_VERSIONS.to_owned())
870 .expect("must not have conflicting versions"),
871 },
872 modules: config
873 .modules
874 .iter()
875 .filter_map(|(&module_instance_id, module_config)| {
876 client_module_init
877 .get(module_config.kind())
878 .map(|module_init| {
879 (
880 module_instance_id,
881 SupportedModuleApiVersions {
882 core_consensus: config.global.consensus_version,
883 module_consensus: module_config.version,
884 api: module_init.supported_api_versions(),
885 },
886 )
887 })
888 })
889 .collect(),
890 }
891 }
892
893 pub async fn load_and_refresh_common_api_version(&self) -> anyhow::Result<ApiVersionSet> {
894 Self::load_and_refresh_common_api_version_static(
895 &self.config().await,
896 &self.module_inits,
897 &self.api,
898 &self.db,
899 &self.task_group,
900 )
901 .await
902 }
903
904 async fn load_and_refresh_common_api_version_static(
910 config: &ClientConfig,
911 module_init: &ClientModuleInitRegistry,
912 api: &DynGlobalApi,
913 db: &Database,
914 task_group: &TaskGroup,
915 ) -> anyhow::Result<ApiVersionSet> {
916 if let Some(v) = db
917 .begin_transaction_nc()
918 .await
919 .get_value(&CachedApiVersionSetKey)
920 .await
921 {
922 debug!(
923 target: LOG_CLIENT,
924 "Found existing cached common api versions"
925 );
926 let config = config.clone();
927 let client_module_init = module_init.clone();
928 let api = api.clone();
929 let db = db.clone();
930 let task_group = task_group.clone();
931 task_group
934 .clone()
935 .spawn_cancellable("refresh_common_api_version_static", async move {
936 if let Err(error) = Self::refresh_common_api_version_static(
937 &config,
938 &client_module_init,
939 &api,
940 &db,
941 task_group,
942 )
943 .await
944 {
945 warn!(
946 target: LOG_CLIENT,
947 err = %error.fmt_compact_anyhow(), "Failed to discover common api versions"
948 );
949 }
950 });
951
952 return Ok(v.0);
953 }
954
955 debug!(
956 target: LOG_CLIENT,
957 "No existing cached common api versions found, waiting for initial discovery"
958 );
959 Self::refresh_common_api_version_static(config, module_init, api, db, task_group.clone())
960 .await
961 }
962
963 async fn refresh_common_api_version_static(
964 config: &ClientConfig,
965 client_module_init: &ClientModuleInitRegistry,
966 api: &DynGlobalApi,
967 db: &Database,
968 task_group: TaskGroup,
969 ) -> anyhow::Result<ApiVersionSet> {
970 debug!(
971 target: LOG_CLIENT,
972 "Refreshing common api versions"
973 );
974
975 let (num_responses_sender, mut num_responses_receiver) = tokio::sync::watch::channel(0);
976 let num_peers = NumPeers::from(config.global.api_endpoints.len());
977
978 task_group.spawn_cancellable("refresh peers api versions", {
979 Client::refresh_peers_api_versions(
980 num_peers,
981 api.clone(),
982 db.clone(),
983 num_responses_sender,
984 )
985 });
986
987 let _: Result<_, Elapsed> = runtime::timeout(
995 Duration::from_secs(15),
996 num_responses_receiver.wait_for(|num| num_peers.threshold() <= *num),
997 )
998 .await;
999
1000 let peer_api_version_sets = Self::load_peers_last_api_versions(db, num_peers).await;
1001
1002 let common_api_versions =
1003 fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1004 &Self::supported_api_versions_summary_static(config, client_module_init),
1005 &peer_api_version_sets,
1006 )?;
1007
1008 debug!(
1009 target: LOG_CLIENT,
1010 value = ?common_api_versions,
1011 "Updating the cached common api versions"
1012 );
1013 let mut dbtx = db.begin_transaction().await;
1014 let _ = dbtx
1015 .insert_entry(
1016 &CachedApiVersionSetKey,
1017 &CachedApiVersionSet(common_api_versions.clone()),
1018 )
1019 .await;
1020
1021 dbtx.commit_tx().await;
1022
1023 Ok(common_api_versions)
1024 }
1025
1026 pub async fn get_metadata(&self) -> Metadata {
1028 self.db
1029 .begin_transaction_nc()
1030 .await
1031 .get_value(&ClientMetadataKey)
1032 .await
1033 .unwrap_or_else(|| {
1034 warn!(
1035 target: LOG_CLIENT,
1036 "Missing existing metadata. This key should have been set on Client init"
1037 );
1038 Metadata::empty()
1039 })
1040 }
1041
1042 pub async fn set_metadata(&self, metadata: &Metadata) {
1044 self.db
1045 .autocommit::<_, _, anyhow::Error>(
1046 |dbtx, _| {
1047 Box::pin(async {
1048 Self::set_metadata_dbtx(dbtx, metadata).await;
1049 Ok(())
1050 })
1051 },
1052 None,
1053 )
1054 .await
1055 .expect("Failed to autocommit metadata");
1056 }
1057
1058 pub fn has_pending_recoveries(&self) -> bool {
1059 !self
1060 .client_recovery_progress_receiver
1061 .borrow()
1062 .iter()
1063 .all(|(_id, progress)| progress.is_done())
1064 }
1065
1066 pub async fn wait_for_all_recoveries(&self) -> anyhow::Result<()> {
1074 let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1075 recovery_receiver
1076 .wait_for(|in_progress| {
1077 in_progress
1078 .iter()
1079 .all(|(_id, progress)| progress.is_done())
1080 })
1081 .await
1082 .context("Recovery task completed and update receiver disconnected, but some modules failed to recover")?;
1083
1084 Ok(())
1085 }
1086
1087 pub fn subscribe_to_recovery_progress(
1092 &self,
1093 ) -> impl Stream<Item = (ModuleInstanceId, RecoveryProgress)> + use<> {
1094 WatchStream::new(self.client_recovery_progress_receiver.clone())
1095 .flat_map(futures::stream::iter)
1096 }
1097
1098 pub async fn wait_for_module_kind_recovery(
1099 &self,
1100 module_kind: ModuleKind,
1101 ) -> anyhow::Result<()> {
1102 let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1103 let config = self.config().await;
1104 recovery_receiver
1105 .wait_for(|in_progress| {
1106 !in_progress
1107 .iter()
1108 .filter(|(module_instance_id, _progress)| {
1109 config.modules[module_instance_id].kind == module_kind
1110 })
1111 .any(|(_id, progress)| !progress.is_done())
1112 })
1113 .await
1114 .context("Recovery task completed and update receiver disconnected, but the desired modules are still unavailable or failed to recover")?;
1115
1116 Ok(())
1117 }
1118
1119 pub async fn wait_for_all_active_state_machines(&self) -> anyhow::Result<()> {
1120 loop {
1121 if self.executor.get_active_states().await.is_empty() {
1122 break;
1123 }
1124 fedimint_core::runtime::sleep(Duration::from_millis(100)).await;
1125 }
1126 Ok(())
1127 }
1128
1129 pub async fn set_metadata_dbtx(dbtx: &mut DatabaseTransaction<'_>, metadata: &Metadata) {
1131 dbtx.insert_new_entry(&ClientMetadataKey, metadata).await;
1132 }
1133
1134 fn spawn_module_recoveries_task(
1135 &self,
1136 recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1137 module_recoveries: BTreeMap<
1138 ModuleInstanceId,
1139 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1140 >,
1141 module_recovery_progress_receivers: BTreeMap<
1142 ModuleInstanceId,
1143 watch::Receiver<RecoveryProgress>,
1144 >,
1145 ) {
1146 let db = self.db.clone();
1147 let log_ordering_wakeup_tx = self.log_ordering_wakeup_tx.clone();
1148 self.task_group
1149 .spawn("module recoveries", |_task_handle| async {
1150 Self::run_module_recoveries_task(
1151 db,
1152 log_ordering_wakeup_tx,
1153 recovery_sender,
1154 module_recoveries,
1155 module_recovery_progress_receivers,
1156 )
1157 .await;
1158 });
1159 }
1160
1161 async fn run_module_recoveries_task(
1162 db: Database,
1163 log_ordering_wakeup_tx: watch::Sender<()>,
1164 recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1165 module_recoveries: BTreeMap<
1166 ModuleInstanceId,
1167 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1168 >,
1169 module_recovery_progress_receivers: BTreeMap<
1170 ModuleInstanceId,
1171 watch::Receiver<RecoveryProgress>,
1172 >,
1173 ) {
1174 debug!(target: LOG_CLIENT_RECOVERY, num_modules=%module_recovery_progress_receivers.len(), "Staring module recoveries");
1175 let mut completed_stream = Vec::new();
1176 let progress_stream = futures::stream::FuturesUnordered::new();
1177
1178 for (module_instance_id, f) in module_recoveries {
1179 completed_stream.push(futures::stream::once(Box::pin(async move {
1180 match f.await {
1181 Ok(()) => (module_instance_id, None),
1182 Err(err) => {
1183 warn!(
1184 target: LOG_CLIENT,
1185 err = %err.fmt_compact_anyhow(), module_instance_id, "Module recovery failed"
1186 );
1187 futures::future::pending::<()>().await;
1191 unreachable!()
1192 }
1193 }
1194 })));
1195 }
1196
1197 for (module_instance_id, rx) in module_recovery_progress_receivers {
1198 progress_stream.push(
1199 tokio_stream::wrappers::WatchStream::new(rx)
1200 .fuse()
1201 .map(move |progress| (module_instance_id, Some(progress))),
1202 );
1203 }
1204
1205 let mut futures = futures::stream::select(
1206 futures::stream::select_all(progress_stream),
1207 futures::stream::select_all(completed_stream),
1208 );
1209
1210 while let Some((module_instance_id, progress)) = futures.next().await {
1211 let mut dbtx = db.begin_transaction().await;
1212
1213 let prev_progress = *recovery_sender
1214 .borrow()
1215 .get(&module_instance_id)
1216 .expect("existing progress must be present");
1217
1218 let progress = if prev_progress.is_done() {
1219 prev_progress
1221 } else if let Some(progress) = progress {
1222 progress
1223 } else {
1224 prev_progress.to_complete()
1225 };
1226
1227 if !prev_progress.is_done() && progress.is_done() {
1228 info!(
1229 target: LOG_CLIENT,
1230 module_instance_id,
1231 progress = format!("{}/{}", progress.complete, progress.total),
1232 "Recovery complete"
1233 );
1234 dbtx.log_event(
1235 log_ordering_wakeup_tx.clone(),
1236 None,
1237 ModuleRecoveryCompleted {
1238 module_id: module_instance_id,
1239 },
1240 )
1241 .await;
1242 } else {
1243 info!(
1244 target: LOG_CLIENT,
1245 module_instance_id,
1246 progress = format!("{}/{}", progress.complete, progress.total),
1247 "Recovery progress"
1248 );
1249 }
1250
1251 dbtx.insert_entry(
1252 &ClientModuleRecovery { module_instance_id },
1253 &ClientModuleRecoveryState { progress },
1254 )
1255 .await;
1256 dbtx.commit_tx().await;
1257
1258 recovery_sender.send_modify(|v| {
1259 v.insert(module_instance_id, progress);
1260 });
1261 }
1262 debug!(target: LOG_CLIENT_RECOVERY, "Recovery executor stopped");
1263 }
1264
1265 async fn load_peers_last_api_versions(
1266 db: &Database,
1267 num_peers: NumPeers,
1268 ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1269 let mut peer_api_version_sets = BTreeMap::new();
1270
1271 let mut dbtx = db.begin_transaction_nc().await;
1272 for peer_id in num_peers.peer_ids() {
1273 if let Some(v) = dbtx
1274 .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1275 .await
1276 {
1277 peer_api_version_sets.insert(peer_id, v.0);
1278 }
1279 }
1280 drop(dbtx);
1281 peer_api_version_sets
1282 }
1283
1284 pub async fn get_peer_url_announcements(&self) -> BTreeMap<PeerId, SignedApiAnnouncement> {
1287 self.db()
1288 .begin_transaction_nc()
1289 .await
1290 .find_by_prefix(&ApiAnnouncementPrefix)
1291 .await
1292 .map(|(announcement_key, announcement)| (announcement_key.0, announcement))
1293 .collect()
1294 .await
1295 }
1296
1297 pub async fn get_peer_urls(&self) -> BTreeMap<PeerId, SafeUrl> {
1299 get_api_urls(&self.db, &self.config().await).await
1300 }
1301
1302 pub async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
1305 self.get_peer_urls()
1306 .await
1307 .into_iter()
1308 .find_map(|(peer_id, url)| (peer == peer_id).then_some(url))
1309 .map(|peer_url| {
1310 InviteCode::new(
1311 peer_url.clone(),
1312 peer,
1313 self.federation_id(),
1314 self.api_secret.clone(),
1315 )
1316 })
1317 }
1318
1319 pub async fn get_guardian_public_keys_blocking(
1323 &self,
1324 ) -> BTreeMap<PeerId, fedimint_core::secp256k1::PublicKey> {
1325 self.db
1326 .autocommit(
1327 |dbtx, _| {
1328 Box::pin(async move {
1329 let config = self.config().await;
1330
1331 let guardian_pub_keys = self
1332 .get_or_backfill_broadcast_public_keys(dbtx, config)
1333 .await;
1334
1335 Result::<_, ()>::Ok(guardian_pub_keys)
1336 })
1337 },
1338 None,
1339 )
1340 .await
1341 .expect("Will retry forever")
1342 }
1343
1344 async fn get_or_backfill_broadcast_public_keys(
1345 &self,
1346 dbtx: &mut DatabaseTransaction<'_>,
1347 config: ClientConfig,
1348 ) -> BTreeMap<PeerId, PublicKey> {
1349 match config.global.broadcast_public_keys {
1350 Some(guardian_pub_keys) => guardian_pub_keys,
1351 _ => {
1352 let (guardian_pub_keys, new_config) = self.fetch_and_update_config(config).await;
1353
1354 dbtx.insert_entry(&ClientConfigKey, &new_config).await;
1355 *(self.config.write().await) = new_config;
1356 guardian_pub_keys
1357 }
1358 }
1359 }
1360
1361 async fn fetch_session_count(&self) -> FederationResult<u64> {
1362 self.api.session_count().await
1363 }
1364
1365 async fn fetch_and_update_config(
1366 &self,
1367 config: ClientConfig,
1368 ) -> (BTreeMap<PeerId, PublicKey>, ClientConfig) {
1369 let fetched_config = retry(
1370 "Fetching guardian public keys",
1371 backoff_util::background_backoff(),
1372 || async {
1373 Ok(self
1374 .api
1375 .request_current_consensus::<ClientConfig>(
1376 CLIENT_CONFIG_ENDPOINT.to_owned(),
1377 ApiRequestErased::default(),
1378 )
1379 .await?)
1380 },
1381 )
1382 .await
1383 .expect("Will never return on error");
1384
1385 let Some(guardian_pub_keys) = fetched_config.global.broadcast_public_keys else {
1386 warn!(
1387 target: LOG_CLIENT,
1388 "Guardian public keys not found in fetched config, server not updated to 0.4 yet"
1389 );
1390 pending::<()>().await;
1391 unreachable!("Pending will never return");
1392 };
1393
1394 let new_config = ClientConfig {
1395 global: GlobalClientConfig {
1396 broadcast_public_keys: Some(guardian_pub_keys.clone()),
1397 ..config.global
1398 },
1399 modules: config.modules,
1400 };
1401 (guardian_pub_keys, new_config)
1402 }
1403
1404 pub fn handle_global_rpc(
1405 &self,
1406 method: String,
1407 params: serde_json::Value,
1408 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1409 Box::pin(try_stream! {
1410 match method.as_str() {
1411 "get_balance" => {
1412 let balance = self.get_balance().await;
1413 yield serde_json::to_value(balance)?;
1414 }
1415 "subscribe_balance_changes" => {
1416 let mut stream = self.subscribe_balance_changes().await;
1417 while let Some(balance) = stream.next().await {
1418 yield serde_json::to_value(balance)?;
1419 }
1420 }
1421 "get_config" => {
1422 let config = self.config().await;
1423 yield serde_json::to_value(config)?;
1424 }
1425 "get_federation_id" => {
1426 let federation_id = self.federation_id();
1427 yield serde_json::to_value(federation_id)?;
1428 }
1429 "get_invite_code" => {
1430 let req: GetInviteCodeRequest = serde_json::from_value(params)?;
1431 let invite_code = self.invite_code(req.peer).await;
1432 yield serde_json::to_value(invite_code)?;
1433 }
1434 "get_operation" => {
1435 let req: GetOperationIdRequest = serde_json::from_value(params)?;
1436 let operation = self.operation_log().get_operation(req.operation_id).await;
1437 yield serde_json::to_value(operation)?;
1438 }
1439 "list_operations" => {
1440 let req: ListOperationsParams = serde_json::from_value(params)?;
1441 let limit = if req.limit.is_none() && req.last_seen.is_none() {
1442 usize::MAX
1443 } else {
1444 req.limit.unwrap_or(usize::MAX)
1445 };
1446 let operations = self.operation_log()
1447 .paginate_operations_rev(limit, req.last_seen)
1448 .await;
1449 yield serde_json::to_value(operations)?;
1450 }
1451 "session_count" => {
1452 let count = self.fetch_session_count().await?;
1453 yield serde_json::to_value(count)?;
1454 }
1455 "has_pending_recoveries" => {
1456 let has_pending = self.has_pending_recoveries();
1457 yield serde_json::to_value(has_pending)?;
1458 }
1459 "wait_for_all_recoveries" => {
1460 self.wait_for_all_recoveries().await?;
1461 yield serde_json::Value::Null;
1462 }
1463 "subscribe_to_recovery_progress" => {
1464 let mut stream = self.subscribe_to_recovery_progress();
1465 while let Some((module_id, progress)) = stream.next().await {
1466 yield serde_json::json!({
1467 "module_id": module_id,
1468 "progress": progress
1469 });
1470 }
1471 }
1472 "backup_to_federation" => {
1473 let metadata = if params.is_null() {
1474 Metadata::from_json_serialized(serde_json::json!({}))
1475 } else {
1476 Metadata::from_json_serialized(params)
1477 };
1478 self.backup_to_federation(metadata).await?;
1479 yield serde_json::Value::Null;
1480 }
1481 _ => {
1482 Err(anyhow::format_err!("Unknown method: {}", method))?;
1483 unreachable!()
1484 },
1485 }
1486 })
1487 }
1488
1489 pub async fn log_event<E>(&self, module_id: Option<ModuleInstanceId>, event: E)
1490 where
1491 E: Event + Send,
1492 {
1493 let mut dbtx = self.db.begin_transaction().await;
1494 self.log_event_dbtx(&mut dbtx, module_id, event).await;
1495 dbtx.commit_tx().await;
1496 }
1497
1498 pub async fn log_event_dbtx<E, Cap>(
1499 &self,
1500 dbtx: &mut DatabaseTransaction<'_, Cap>,
1501 module_id: Option<ModuleInstanceId>,
1502 event: E,
1503 ) where
1504 E: Event + Send,
1505 Cap: Send,
1506 {
1507 dbtx.log_event(self.log_ordering_wakeup_tx.clone(), module_id, event)
1508 .await;
1509 }
1510
1511 pub async fn log_event_raw_dbtx<Cap>(
1512 &self,
1513 dbtx: &mut DatabaseTransaction<'_, Cap>,
1514 kind: EventKind,
1515 module: Option<(ModuleKind, ModuleInstanceId)>,
1516 payload: Vec<u8>,
1517 persist: EventPersistence,
1518 ) where
1519 Cap: Send,
1520 {
1521 let module_id = module.as_ref().map(|m| m.1);
1522 let module_kind = module.map(|m| m.0);
1523 dbtx.log_event_raw(
1524 self.log_ordering_wakeup_tx.clone(),
1525 kind,
1526 module_kind,
1527 module_id,
1528 payload,
1529 persist,
1530 )
1531 .await;
1532 }
1533
1534 pub async fn handle_events<F, R, K>(&self, pos_key: &K, call_fn: F) -> anyhow::Result<()>
1535 where
1536 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1537 K: DatabaseRecord<Value = EventLogId>,
1538 F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
1539 R: Future<Output = anyhow::Result<()>>,
1540 {
1541 fedimint_eventlog::handle_events(
1542 self.db.clone(),
1543 pos_key,
1544 self.log_event_added_rx.clone(),
1545 call_fn,
1546 )
1547 .await
1548 }
1549
1550 pub async fn handle_trimable_events<F, R, K>(
1551 &self,
1552 pos_key: &K,
1553 call_fn: F,
1554 ) -> anyhow::Result<()>
1555 where
1556 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1557 K: DatabaseRecord<Value = EventLogTrimableId>,
1558 F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
1559 R: Future<Output = anyhow::Result<()>>,
1560 {
1561 fedimint_eventlog::handle_trimable_events(
1562 self.db.clone(),
1563 pos_key,
1564 self.log_event_added_rx.clone(),
1565 call_fn,
1566 )
1567 .await
1568 }
1569
1570 pub async fn get_event_log(
1571 &self,
1572 pos: Option<EventLogId>,
1573 limit: u64,
1574 ) -> Vec<PersistedLogEntry> {
1575 self.get_event_log_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
1576 .await
1577 }
1578
1579 pub async fn get_event_log_trimable(
1580 &self,
1581 pos: Option<EventLogTrimableId>,
1582 limit: u64,
1583 ) -> Vec<PersistedLogEntry> {
1584 self.get_event_log_trimable_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
1585 .await
1586 }
1587
1588 pub async fn get_event_log_dbtx<Cap>(
1589 &self,
1590 dbtx: &mut DatabaseTransaction<'_, Cap>,
1591 pos: Option<EventLogId>,
1592 limit: u64,
1593 ) -> Vec<PersistedLogEntry>
1594 where
1595 Cap: Send,
1596 {
1597 dbtx.get_event_log(pos, limit).await
1598 }
1599
1600 pub async fn get_event_log_trimable_dbtx<Cap>(
1601 &self,
1602 dbtx: &mut DatabaseTransaction<'_, Cap>,
1603 pos: Option<EventLogTrimableId>,
1604 limit: u64,
1605 ) -> Vec<PersistedLogEntry>
1606 where
1607 Cap: Send,
1608 {
1609 dbtx.get_event_log_trimable(pos, limit).await
1610 }
1611
1612 pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
1614 self.log_event_added_transient_tx.subscribe()
1615 }
1616}
1617
1618#[apply(async_trait_maybe_send!)]
1619impl ClientContextIface for Client {
1620 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
1621 Client::get_module(self, instance)
1622 }
1623
1624 fn api_clone(&self) -> DynGlobalApi {
1625 Client::api_clone(self)
1626 }
1627 fn decoders(&self) -> &ModuleDecoderRegistry {
1628 Client::decoders(self)
1629 }
1630
1631 async fn finalize_and_submit_transaction(
1632 &self,
1633 operation_id: OperationId,
1634 operation_type: &str,
1635 operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
1636 tx_builder: TransactionBuilder,
1637 ) -> anyhow::Result<OutPointRange> {
1638 Client::finalize_and_submit_transaction(
1639 self,
1640 operation_id,
1641 operation_type,
1642 &operation_meta_gen,
1644 tx_builder,
1645 )
1646 .await
1647 }
1648
1649 async fn finalize_and_submit_transaction_inner(
1650 &self,
1651 dbtx: &mut DatabaseTransaction<'_>,
1652 operation_id: OperationId,
1653 tx_builder: TransactionBuilder,
1654 ) -> anyhow::Result<OutPointRange> {
1655 Client::finalize_and_submit_transaction_inner(self, dbtx, operation_id, tx_builder).await
1656 }
1657
1658 async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
1659 Client::transaction_updates(self, operation_id).await
1660 }
1661
1662 async fn await_primary_module_outputs(
1663 &self,
1664 operation_id: OperationId,
1665 outputs: Vec<OutPoint>,
1667 ) -> anyhow::Result<()> {
1668 Client::await_primary_module_outputs(self, operation_id, outputs).await
1669 }
1670
1671 fn operation_log(&self) -> &dyn IOperationLog {
1672 Client::operation_log(self)
1673 }
1674
1675 async fn has_active_states(&self, operation_id: OperationId) -> bool {
1676 Client::has_active_states(self, operation_id).await
1677 }
1678
1679 async fn operation_exists(&self, operation_id: OperationId) -> bool {
1680 Client::operation_exists(self, operation_id).await
1681 }
1682
1683 async fn config(&self) -> ClientConfig {
1684 Client::config(self).await
1685 }
1686
1687 fn db(&self) -> &Database {
1688 Client::db(self)
1689 }
1690
1691 fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static)) {
1692 Client::executor(self)
1693 }
1694
1695 async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
1696 Client::invite_code(self, peer).await
1697 }
1698
1699 fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
1700 Client::get_internal_payment_markers(self)
1701 }
1702
1703 async fn log_event_json(
1704 &self,
1705 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
1706 module_kind: Option<ModuleKind>,
1707 module_id: ModuleInstanceId,
1708 kind: EventKind,
1709 payload: serde_json::Value,
1710 persist: EventPersistence,
1711 ) {
1712 dbtx.ensure_global()
1713 .expect("Must be called with global dbtx");
1714 self.log_event_raw_dbtx(
1715 dbtx,
1716 kind,
1717 module_kind.map(|kind| (kind, module_id)),
1718 serde_json::to_vec(&payload).expect("Serialization can't fail"),
1719 persist,
1720 )
1721 .await;
1722 }
1723
1724 async fn read_operation_active_states<'dbtx>(
1725 &self,
1726 operation_id: OperationId,
1727 module_id: ModuleInstanceId,
1728 dbtx: &'dbtx mut DatabaseTransaction<'_>,
1729 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>
1730 {
1731 Box::pin(
1732 dbtx.find_by_prefix(&ActiveModuleOperationStateKeyPrefix {
1733 operation_id,
1734 module_instance: module_id,
1735 })
1736 .await
1737 .map(move |(k, v)| (k.0, v)),
1738 )
1739 }
1740 async fn read_operation_inactive_states<'dbtx>(
1741 &self,
1742 operation_id: OperationId,
1743 module_id: ModuleInstanceId,
1744 dbtx: &'dbtx mut DatabaseTransaction<'_>,
1745 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>
1746 {
1747 Box::pin(
1748 dbtx.find_by_prefix(&InactiveModuleOperationStateKeyPrefix {
1749 operation_id,
1750 module_instance: module_id,
1751 })
1752 .await
1753 .map(move |(k, v)| (k.0, v)),
1754 )
1755 }
1756}
1757
1758impl fmt::Debug for Client {
1760 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1761 write!(f, "Client")
1762 }
1763}
1764
1765pub fn client_decoders<'a>(
1766 registry: &ModuleInitRegistry<DynClientModuleInit>,
1767 module_kinds: impl Iterator<Item = (ModuleInstanceId, &'a ModuleKind)>,
1768) -> ModuleDecoderRegistry {
1769 let mut modules = BTreeMap::new();
1770 for (id, kind) in module_kinds {
1771 let Some(init) = registry.get(kind) else {
1772 debug!("Detected configuration for unsupported module id: {id}, kind: {kind}");
1773 continue;
1774 };
1775
1776 modules.insert(
1777 id,
1778 (
1779 kind.clone(),
1780 IClientModuleInit::decoder(AsRef::<dyn IClientModuleInit + 'static>::as_ref(init)),
1781 ),
1782 );
1783 }
1784 ModuleDecoderRegistry::from(modules)
1785}