1use std::collections::{BTreeMap, HashSet};
2use std::fmt::{self, Formatter};
3use std::future::{Future, pending};
4use std::ops::Range;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9use anyhow::{Context as _, anyhow, bail, format_err};
10use async_stream::try_stream;
11use bitcoin::key::Secp256k1;
12use bitcoin::key::rand::thread_rng;
13use bitcoin::secp256k1::{self, PublicKey};
14use fedimint_api_client::api::global_api::with_request_hook::ApiRequestHook;
15use fedimint_api_client::api::{
16 ApiVersionSet, DynGlobalApi, FederationApiExt as _, FederationResult, IGlobalFederationApi,
17};
18use fedimint_bitcoind::DynBitcoindRpc;
19use fedimint_client_module::module::recovery::RecoveryProgress;
20use fedimint_client_module::module::{
21 ClientContextIface, ClientModule, ClientModuleRegistry, DynClientModule, FinalClientIface,
22 IClientModule, IdxRange, OutPointRange, PrimaryModulePriority,
23};
24use fedimint_client_module::oplog::IOperationLog;
25use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy as _};
26use fedimint_client_module::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
27use fedimint_client_module::sm::{ActiveStateMeta, DynState, InactiveStateMeta};
28use fedimint_client_module::transaction::{
29 TRANSACTION_SUBMISSION_MODULE_INSTANCE, TransactionBuilder, TxSubmissionStates,
30 TxSubmissionStatesSM,
31};
32use fedimint_client_module::{
33 AddStateMachinesResult, ClientModuleInstance, GetInviteCodeRequest, ModuleGlobalContextGen,
34 ModuleRecoveryCompleted, TransactionUpdates, TxCreatedEvent,
35};
36use fedimint_connectors::{ConnectorRegistry, PeerStatus};
37use fedimint_core::config::{
38 ClientConfig, FederationId, GlobalClientConfig, JsonClientConfig, ModuleInitRegistry,
39};
40use fedimint_core::core::{DynInput, DynOutput, ModuleInstanceId, ModuleKind, OperationId};
41use fedimint_core::db::{
42 AutocommitError, Database, DatabaseRecord, DatabaseTransaction,
43 IDatabaseTransactionOpsCore as _, IDatabaseTransactionOpsCoreTyped as _, NonCommittable,
44};
45use fedimint_core::encoding::{Decodable, Encodable};
46use fedimint_core::endpoint_constants::{CLIENT_CONFIG_ENDPOINT, VERSION_ENDPOINT};
47use fedimint_core::envs::is_running_in_test_env;
48use fedimint_core::invite_code::InviteCode;
49use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
50use fedimint_core::module::{
51 AmountUnit, Amounts, ApiRequestErased, ApiVersion, MultiApiVersion,
52 SupportedApiVersionsSummary, SupportedCoreApiVersions, SupportedModuleApiVersions,
53};
54use fedimint_core::net::api_announcement::SignedApiAnnouncement;
55use fedimint_core::runtime::sleep;
56use fedimint_core::task::{
57 Elapsed, MaybeSend, MaybeSync, ShuttingDownError, TaskGroup, TaskHandle,
58};
59use fedimint_core::transaction::Transaction;
60use fedimint_core::util::backoff_util::custom_backoff;
61use fedimint_core::util::{
62 BoxStream, FmtCompact as _, FmtCompactAnyhow as _, SafeUrl, backoff_util, retry,
63};
64use fedimint_core::{
65 Amount, ChainId, NumPeers, OutPoint, PeerId, apply, async_trait_maybe_send, maybe_add_send,
66 maybe_add_send_sync, runtime,
67};
68use fedimint_derive_secret::DerivableSecret;
69use fedimint_eventlog::{
70 DBTransactionEventLogExt as _, DynEventLogTrimableTracker, Event, EventKind, EventLogEntry,
71 EventLogId, EventLogTrimableId, EventLogTrimableTracker, EventPersistence, PersistedLogEntry,
72};
73use fedimint_logging::{LOG_CLIENT, LOG_CLIENT_NET_API, LOG_CLIENT_RECOVERY};
74use futures::stream::FuturesUnordered;
75use futures::{Stream, StreamExt as _};
76use global_ctx::ModuleGlobalClientContext;
77use serde::{Deserialize, Serialize};
78use tokio::sync::{broadcast, oneshot, watch};
79use tokio_stream::wrappers::WatchStream;
80use tracing::{Span, debug, info, warn};
81
82use crate::ClientBuilder;
83use crate::api_announcements::{ApiAnnouncementPrefix, get_api_urls};
84use crate::backup::Metadata;
85use crate::client::event_log::DefaultApplicationEventLogKey;
86use crate::db::{
87 ApiSecretKey, CachedApiVersionSet, CachedApiVersionSetKey, ChainIdKey,
88 ChronologicalOperationLogKey, ClientConfigKey, ClientMetadataKey, ClientModuleRecovery,
89 ClientModuleRecoveryState, EncodedClientSecretKey, OperationLogKey, PeerLastApiVersionsSummary,
90 PeerLastApiVersionsSummaryKey, PendingClientConfigKey, apply_migrations_core_client_dbtx,
91 get_decoded_client_secret, verify_client_db_integrity_dbtx,
92};
93use crate::meta::MetaService;
94use crate::module_init::{ClientModuleInitRegistry, DynClientModuleInit, IClientModuleInit};
95use crate::oplog::OperationLog;
96use crate::sm::executor::{
97 ActiveModuleOperationStateKeyPrefix, ActiveOperationStateKeyPrefix, Executor,
98 InactiveModuleOperationStateKeyPrefix, InactiveOperationStateKeyPrefix,
99};
100
101pub(crate) mod builder;
102pub(crate) mod event_log;
103pub(crate) mod global_ctx;
104pub(crate) mod handle;
105
106const SUPPORTED_CORE_API_VERSIONS: &[fedimint_core::module::ApiVersion] =
110 &[ApiVersion { major: 0, minor: 0 }];
111
112#[derive(Default)]
114pub(crate) struct PrimaryModuleCandidates {
115 specific: BTreeMap<AmountUnit, Vec<ModuleInstanceId>>,
117 wildcard: Vec<ModuleInstanceId>,
119}
120
121pub struct Client {
135 final_client: FinalClientIface,
136 config: tokio::sync::RwLock<ClientConfig>,
137 api_secret: Option<String>,
138 decoders: ModuleDecoderRegistry,
139 connectors: ConnectorRegistry,
140 db: Database,
141 federation_id: FederationId,
142 federation_config_meta: BTreeMap<String, String>,
143 primary_modules: BTreeMap<PrimaryModulePriority, PrimaryModuleCandidates>,
144 pub(crate) modules: ClientModuleRegistry,
145 module_inits: ClientModuleInitRegistry,
146 executor: Executor,
147 pub(crate) api: DynGlobalApi,
148 root_secret: DerivableSecret,
149 operation_log: OperationLog,
150 secp_ctx: Secp256k1<secp256k1::All>,
151 meta_service: Arc<MetaService>,
152
153 task_group: TaskGroup,
154
155 client_span: Span,
159
160 client_recovery_progress_receiver:
162 watch::Receiver<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
163
164 log_ordering_wakeup_tx: watch::Sender<()>,
167 log_event_added_rx: watch::Receiver<()>,
169 log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
170 request_hook: ApiRequestHook,
171 iroh_enable_dht: bool,
172 iroh_enable_next: bool,
173 #[allow(dead_code)]
178 user_bitcoind_rpc: Option<DynBitcoindRpc>,
179 pub(crate) user_bitcoind_rpc_no_chain_id:
184 Option<fedimint_client_module::module::init::BitcoindRpcNoChainIdFactory>,
185}
186
187#[derive(Debug, Serialize, Deserialize)]
188struct ListOperationsParams {
189 limit: Option<usize>,
190 last_seen: Option<ChronologicalOperationLogKey>,
191}
192
193const DEFAULT_EVENT_LOG_PAGE_SIZE: u64 = 100;
194const MAX_EVENT_LOG_PAGE_SIZE: u64 = 10_000;
195
196#[derive(Debug, Clone, Serialize, Deserialize)]
197struct GetEventLogRequest {
198 pos: Option<EventLogId>,
199 limit: Option<u64>,
200}
201
202#[derive(Debug, Clone, Serialize, Deserialize)]
203pub struct GetOperationIdRequest {
204 operation_id: OperationId,
205}
206
207#[derive(Debug, Clone, Serialize, Deserialize)]
208pub struct GetBalanceChangesRequest {
209 #[serde(default = "AmountUnit::bitcoin")]
210 unit: AmountUnit,
211}
212
213impl Client {
214 pub async fn builder() -> anyhow::Result<ClientBuilder> {
217 Ok(ClientBuilder::new())
218 }
219
220 pub fn api(&self) -> &(dyn IGlobalFederationApi + 'static) {
221 self.api.as_ref()
222 }
223
224 pub fn api_clone(&self) -> DynGlobalApi {
225 self.api.clone()
226 }
227
228 pub fn connection_status_stream(&self) -> impl Stream<Item = BTreeMap<PeerId, PeerStatus>> {
231 self.api.connection_status_stream()
232 }
233
234 pub fn federation_reconnect(&self) {
242 let peers: Vec<PeerId> = self.api.all_peers().iter().copied().collect();
243
244 for peer_id in peers {
245 let api = self.api.clone();
246 self.spawn_cancellable(format!("federation-reconnect-once-{peer_id}"), async move {
247 if let Err(e) = api.get_peer_connection(peer_id).await {
248 debug!(
249 target: LOG_CLIENT_NET_API,
250 %peer_id,
251 err = %e.fmt_compact(),
252 "Failed to connect to peer"
253 );
254 }
255 });
256 }
257 }
258
259 pub fn spawn_federation_reconnect(&self) {
281 let peers: Vec<PeerId> = self.api.all_peers().iter().copied().collect();
282
283 for peer_id in peers {
284 let api = self.api.clone();
285 self.spawn_cancellable(format!("federation-reconnect-{peer_id}"), async move {
286 loop {
287 match api.get_peer_connection(peer_id).await {
288 Ok(conn) => {
289 conn.await_disconnection().await;
290 }
291 Err(e) => {
292 debug!(
295 target: LOG_CLIENT_NET_API,
296 %peer_id,
297 err = %e.fmt_compact(),
298 "Failed to connect to peer, will retry"
299 );
300 }
301 }
302 }
303 });
304 }
305 }
306
307 pub fn task_group(&self) -> &TaskGroup {
309 &self.task_group
310 }
311
312 pub(crate) fn make_client_span(federation_id: FederationId) -> Span {
319 tracing::info_span!(
320 target: LOG_CLIENT,
321 parent: None,
322 "client",
323 fed_id = %federation_id.to_prefix(),
324 )
325 }
326
327 pub(crate) fn spawn_cancellable<R>(
330 &self,
331 name: impl Into<String>,
332 future: impl Future<Output = R> + MaybeSend + 'static,
333 ) -> oneshot::Receiver<Result<R, ShuttingDownError>>
334 where
335 R: MaybeSend + 'static,
336 {
337 self.task_group
338 .spawn_cancellable_with_span(self.client_span.clone(), name, future)
339 }
340
341 pub(crate) fn spawn<Fut, R>(
345 &self,
346 name: impl Into<String>,
347 f: impl FnOnce(TaskHandle) -> Fut + MaybeSend + 'static,
348 ) -> oneshot::Receiver<R>
349 where
350 Fut: Future<Output = R> + MaybeSend + 'static,
351 R: MaybeSend + 'static,
352 {
353 self.task_group
354 .spawn_with_span(self.client_span.clone(), name, f)
355 }
356
357 pub fn get_metrics() -> anyhow::Result<String> {
362 fedimint_metrics::get_metrics()
363 }
364
365 #[doc(hidden)]
367 pub fn executor(&self) -> &Executor {
368 &self.executor
369 }
370
371 pub async fn get_config_from_db(db: &Database) -> Option<ClientConfig> {
372 let mut dbtx = db.begin_transaction_nc().await;
373 dbtx.get_value(&ClientConfigKey).await
374 }
375
376 pub async fn get_pending_config_from_db(db: &Database) -> Option<ClientConfig> {
377 let mut dbtx = db.begin_transaction_nc().await;
378 dbtx.get_value(&PendingClientConfigKey).await
379 }
380
381 pub async fn get_api_secret_from_db(db: &Database) -> Option<String> {
382 let mut dbtx = db.begin_transaction_nc().await;
383 dbtx.get_value(&ApiSecretKey).await
384 }
385
386 pub async fn store_encodable_client_secret<T: Encodable>(
387 db: &Database,
388 secret: T,
389 ) -> anyhow::Result<()> {
390 let mut dbtx = db.begin_transaction().await;
391
392 if dbtx.get_value(&EncodedClientSecretKey).await.is_some() {
394 bail!("Encoded client secret already exists, cannot overwrite")
395 }
396
397 let encoded_secret = T::consensus_encode_to_vec(&secret);
398 dbtx.insert_entry(&EncodedClientSecretKey, &encoded_secret)
399 .await;
400 dbtx.commit_tx().await;
401 Ok(())
402 }
403
404 pub async fn load_decodable_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
405 let Some(secret) = Self::load_decodable_client_secret_opt(db).await? else {
406 bail!("Encoded client secret not present in DB")
407 };
408
409 Ok(secret)
410 }
411 pub async fn load_decodable_client_secret_opt<T: Decodable>(
412 db: &Database,
413 ) -> anyhow::Result<Option<T>> {
414 let mut dbtx = db.begin_transaction_nc().await;
415
416 let client_secret = dbtx.get_value(&EncodedClientSecretKey).await;
417
418 Ok(match client_secret {
419 Some(client_secret) => Some(
420 T::consensus_decode_whole(&client_secret, &ModuleRegistry::default())
421 .map_err(|e| anyhow!("Decoding failed: {e}"))?,
422 ),
423 None => None,
424 })
425 }
426
427 pub async fn load_or_generate_client_secret(db: &Database) -> anyhow::Result<[u8; 64]> {
428 let client_secret = match Self::load_decodable_client_secret::<[u8; 64]>(db).await {
429 Ok(secret) => secret,
430 _ => {
431 let secret = PlainRootSecretStrategy::random(&mut thread_rng());
432 Self::store_encodable_client_secret(db, secret)
433 .await
434 .expect("Storing client secret must work");
435 secret
436 }
437 };
438 Ok(client_secret)
439 }
440
441 pub async fn is_initialized(db: &Database) -> bool {
442 let mut dbtx = db.begin_transaction_nc().await;
443 dbtx.raw_get_bytes(&[ClientConfigKey::DB_PREFIX])
444 .await
445 .expect("Unrecoverable error occurred while reading and entry from the database")
446 .is_some()
447 }
448
449 pub fn start_executor(self: &Arc<Self>) {
450 self.client_span.in_scope(|| {
451 debug!(
452 target: LOG_CLIENT,
453 "Starting fedimint client executor",
454 );
455 });
456 self.executor
457 .start_executor(self.context_gen(), self.client_span.clone());
458 }
459
460 pub fn federation_id(&self) -> FederationId {
461 self.federation_id
462 }
463
464 fn context_gen(self: &Arc<Self>) -> ModuleGlobalContextGen {
465 let client_inner = Arc::downgrade(self);
466 Arc::new(move |module_instance, operation| {
467 ModuleGlobalClientContext {
468 client: client_inner
469 .clone()
470 .upgrade()
471 .expect("ModuleGlobalContextGen called after client was dropped"),
472 module_instance_id: module_instance,
473 operation,
474 }
475 .into()
476 })
477 }
478
479 pub async fn config(&self) -> ClientConfig {
480 self.config.read().await.clone()
481 }
482
483 pub fn api_secret(&self) -> &Option<String> {
485 &self.api_secret
486 }
487
488 pub async fn core_api_version(&self) -> ApiVersion {
494 self.db
497 .begin_transaction_nc()
498 .await
499 .get_value(&CachedApiVersionSetKey)
500 .await
501 .map(|cached: CachedApiVersionSet| cached.0.core)
502 .unwrap_or(ApiVersion { major: 0, minor: 0 })
503 }
504
505 pub async fn chain_id(&self) -> anyhow::Result<ChainId> {
512 if let Some(chain_id) = self
514 .db
515 .begin_transaction_nc()
516 .await
517 .get_value(&ChainIdKey)
518 .await
519 {
520 return Ok(chain_id);
521 }
522
523 let chain_id = self.api.chain_id().await?;
525
526 let mut dbtx = self.db.begin_transaction().await;
528 dbtx.insert_entry(&ChainIdKey, &chain_id).await;
529 dbtx.commit_tx().await;
530
531 Ok(chain_id)
532 }
533
534 pub fn decoders(&self) -> &ModuleDecoderRegistry {
535 &self.decoders
536 }
537
538 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
540 self.try_get_module(instance)
541 .expect("Module instance not found")
542 }
543
544 fn try_get_module(
545 &self,
546 instance: ModuleInstanceId,
547 ) -> Option<&maybe_add_send_sync!(dyn IClientModule)> {
548 Some(self.modules.get(instance)?.as_ref())
549 }
550
551 pub fn has_module(&self, instance: ModuleInstanceId) -> bool {
552 self.modules.get(instance).is_some()
553 }
554
555 fn transaction_builder_get_balance(&self, builder: &TransactionBuilder) -> (Amounts, Amounts) {
561 let mut in_amounts = Amounts::ZERO;
563 let mut out_amounts = Amounts::ZERO;
564 let mut fee_amounts = Amounts::ZERO;
565
566 for input in builder.inputs() {
567 let module = self.get_module(input.input.module_instance_id());
568
569 let item_fees = module.input_fee(&input.amounts, &input.input).expect(
570 "We only build transactions with input versions that are supported by the module",
571 );
572
573 in_amounts.checked_add_mut(&input.amounts);
574 fee_amounts.checked_add_mut(&item_fees);
575 }
576
577 for output in builder.outputs() {
578 let module = self.get_module(output.output.module_instance_id());
579
580 let item_fees = module.output_fee(&output.amounts, &output.output).expect(
581 "We only build transactions with output versions that are supported by the module",
582 );
583
584 out_amounts.checked_add_mut(&output.amounts);
585 fee_amounts.checked_add_mut(&item_fees);
586 }
587
588 out_amounts.checked_add_mut(&fee_amounts);
589 (in_amounts, out_amounts)
590 }
591
592 pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
593 Ok((self.federation_id().to_fake_ln_pub_key(&self.secp_ctx)?, 0))
594 }
595
596 pub fn get_config_meta(&self, key: &str) -> Option<String> {
598 self.federation_config_meta.get(key).cloned()
599 }
600
601 pub(crate) fn root_secret(&self) -> DerivableSecret {
602 self.root_secret.clone()
603 }
604
605 pub async fn add_state_machines(
606 &self,
607 dbtx: &mut DatabaseTransaction<'_>,
608 states: Vec<DynState>,
609 ) -> AddStateMachinesResult {
610 self.executor.add_state_machines_dbtx(dbtx, states).await
611 }
612
613 pub async fn get_active_operations(&self) -> HashSet<OperationId> {
615 let active_states = self.executor.get_active_states().await;
616 let mut active_operations = HashSet::with_capacity(active_states.len());
617 let mut dbtx = self.db().begin_transaction_nc().await;
618 for (state, _) in active_states {
619 let operation_id = state.operation_id();
620 if dbtx
621 .get_value(&OperationLogKey { operation_id })
622 .await
623 .is_some()
624 {
625 active_operations.insert(operation_id);
626 }
627 }
628 active_operations
629 }
630
631 pub fn operation_log(&self) -> &OperationLog {
632 &self.operation_log
633 }
634
635 pub fn meta_service(&self) -> &Arc<MetaService> {
637 &self.meta_service
638 }
639
640 pub async fn get_meta_expiration_timestamp(&self) -> Option<SystemTime> {
642 let meta_service = self.meta_service();
643 let ts = meta_service
644 .get_field::<u64>(self.db(), "federation_expiry_timestamp")
645 .await
646 .and_then(|v| v.value)?;
647 Some(UNIX_EPOCH + Duration::from_secs(ts))
648 }
649
650 async fn finalize_transaction(
652 &self,
653 dbtx: &mut DatabaseTransaction<'_>,
654 operation_id: OperationId,
655 mut partial_transaction: TransactionBuilder,
656 ) -> anyhow::Result<(Transaction, Vec<DynState>, Range<u64>)> {
657 let (in_amounts, out_amounts) = self.transaction_builder_get_balance(&partial_transaction);
658
659 let mut added_inputs_bundles = vec![];
660 let mut added_outputs_bundles = vec![];
661
662 for unit in in_amounts.units().union(&out_amounts.units()) {
673 let input_amount = in_amounts.get(unit).copied().unwrap_or_default();
674 let output_amount = out_amounts.get(unit).copied().unwrap_or_default();
675 if input_amount == output_amount {
676 continue;
677 }
678
679 let Some((module_id, module)) = self.primary_module_for_unit(*unit) else {
680 bail!("No module to balance a partial transaction (affected unit: {unit:?}");
681 };
682
683 let (added_input_bundle, added_output_bundle) = module
684 .create_final_inputs_and_outputs(
685 module_id,
686 dbtx,
687 operation_id,
688 *unit,
689 input_amount,
690 output_amount,
691 )
692 .await?;
693
694 added_inputs_bundles.push(added_input_bundle);
695 added_outputs_bundles.push(added_output_bundle);
696 }
697
698 let change_range = Range {
702 start: partial_transaction.outputs().count() as u64,
703 end: (partial_transaction.outputs().count() as u64
704 + added_outputs_bundles
705 .iter()
706 .map(|output| output.outputs().len() as u64)
707 .sum::<u64>()),
708 };
709
710 for added_inputs in added_inputs_bundles {
711 partial_transaction = partial_transaction.with_inputs(added_inputs);
712 }
713
714 for added_outputs in added_outputs_bundles {
715 partial_transaction = partial_transaction.with_outputs(added_outputs);
716 }
717
718 let (input_amounts, output_amounts) =
719 self.transaction_builder_get_balance(&partial_transaction);
720
721 for (unit, output_amount) in output_amounts {
722 let input_amount = input_amounts.get(&unit).copied().unwrap_or_default();
723
724 assert!(input_amount >= output_amount, "Transaction is underfunded");
725 }
726
727 let (tx, states) = partial_transaction.build(&self.secp_ctx, thread_rng());
728
729 Ok((tx, states, change_range))
730 }
731
732 pub async fn finalize_and_submit_transaction<F, M>(
744 &self,
745 operation_id: OperationId,
746 operation_type: &str,
747 operation_meta_gen: F,
748 tx_builder: TransactionBuilder,
749 ) -> anyhow::Result<OutPointRange>
750 where
751 F: Fn(OutPointRange) -> M + Clone + MaybeSend + MaybeSync,
752 M: serde::Serialize + MaybeSend,
753 {
754 let operation_type = operation_type.to_owned();
755
756 let autocommit_res = self
757 .db
758 .autocommit(
759 |dbtx, _| {
760 let operation_type = operation_type.clone();
761 let tx_builder = tx_builder.clone();
762 let operation_meta_gen = operation_meta_gen.clone();
763 Box::pin(async move {
764 self.finalize_and_submit_transaction_dbtx(
765 dbtx,
766 operation_id,
767 &operation_type,
768 operation_meta_gen,
769 tx_builder,
770 )
771 .await
772 })
773 },
774 Some(100), )
776 .await;
777
778 match autocommit_res {
779 Ok(txid) => Ok(txid),
780 Err(AutocommitError::ClosureError { error, .. }) => Err(error),
781 Err(AutocommitError::CommitFailed {
782 attempts,
783 last_error,
784 }) => panic!(
785 "Failed to commit tx submission dbtx after {attempts} attempts: {last_error}"
786 ),
787 }
788 }
789
790 pub async fn finalize_and_submit_transaction_dbtx<F, M>(
793 &self,
794 dbtx: &mut DatabaseTransaction<'_>,
795 operation_id: OperationId,
796 operation_type: &str,
797 operation_meta_gen: F,
798 tx_builder: TransactionBuilder,
799 ) -> anyhow::Result<OutPointRange>
800 where
801 F: FnOnce(OutPointRange) -> M + MaybeSend,
802 M: serde::Serialize + MaybeSend,
803 {
804 if Client::operation_exists_dbtx(dbtx, operation_id).await {
805 bail!("There already exists an operation with id {operation_id:?}")
806 }
807
808 let out_point_range = self
809 .finalize_and_submit_transaction_inner(dbtx, operation_id, tx_builder)
810 .await?;
811
812 self.operation_log()
813 .add_operation_log_entry_dbtx(
814 dbtx,
815 operation_id,
816 operation_type,
817 operation_meta_gen(out_point_range),
818 )
819 .await;
820
821 Ok(out_point_range)
822 }
823
824 async fn finalize_and_submit_transaction_inner(
825 &self,
826 dbtx: &mut DatabaseTransaction<'_>,
827 operation_id: OperationId,
828 tx_builder: TransactionBuilder,
829 ) -> anyhow::Result<OutPointRange> {
830 let (transaction, mut states, change_range) = self
831 .finalize_transaction(&mut dbtx.to_ref_nc(), operation_id, tx_builder)
832 .await?;
833
834 if transaction.consensus_encode_to_vec().len() > Transaction::MAX_TX_SIZE {
835 let inputs = transaction
836 .inputs
837 .iter()
838 .map(DynInput::module_instance_id)
839 .collect::<Vec<_>>();
840 let outputs = transaction
841 .outputs
842 .iter()
843 .map(DynOutput::module_instance_id)
844 .collect::<Vec<_>>();
845 warn!(
846 target: LOG_CLIENT_NET_API,
847 size=%transaction.consensus_encode_to_vec().len(),
848 ?inputs,
849 ?outputs,
850 "Transaction too large",
851 );
852 debug!(target: LOG_CLIENT_NET_API, ?transaction, "transaction details");
853 bail!(
854 "The generated transaction would be rejected by the federation for being too large."
855 );
856 }
857
858 let txid = transaction.tx_hash();
859
860 debug!(
861 target: LOG_CLIENT_NET_API,
862 %txid,
863 operation_id = %operation_id.fmt_short(),
864 ?transaction,
865 "Finalized and submitting transaction",
866 );
867
868 let tx_submission_sm = DynState::from_typed(
869 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
870 TxSubmissionStatesSM {
871 operation_id,
872 state: TxSubmissionStates::Created(transaction),
873 },
874 );
875 states.push(tx_submission_sm);
876
877 self.executor.add_state_machines_dbtx(dbtx, states).await?;
878
879 self.log_event_dbtx(dbtx, None, TxCreatedEvent { txid, operation_id })
880 .await;
881
882 Ok(OutPointRange::new(txid, IdxRange::from(change_range)))
883 }
884
885 async fn transaction_update_stream(
886 &self,
887 operation_id: OperationId,
888 ) -> BoxStream<'static, TxSubmissionStatesSM> {
889 self.executor
890 .notifier()
891 .module_notifier::<TxSubmissionStatesSM>(
892 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
893 self.final_client.clone(),
894 )
895 .subscribe(operation_id)
896 .await
897 }
898
899 pub async fn operation_exists(&self, operation_id: OperationId) -> bool {
900 let mut dbtx = self.db().begin_transaction_nc().await;
901
902 Client::operation_exists_dbtx(&mut dbtx, operation_id).await
903 }
904
905 pub async fn operation_exists_dbtx(
906 dbtx: &mut DatabaseTransaction<'_>,
907 operation_id: OperationId,
908 ) -> bool {
909 let active_state_exists = dbtx
910 .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
911 .await
912 .next()
913 .await
914 .is_some();
915
916 let inactive_state_exists = dbtx
917 .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
918 .await
919 .next()
920 .await
921 .is_some();
922
923 active_state_exists || inactive_state_exists
924 }
925
926 pub async fn has_active_states(&self, operation_id: OperationId) -> bool {
927 self.db
928 .begin_transaction_nc()
929 .await
930 .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
931 .await
932 .next()
933 .await
934 .is_some()
935 }
936
937 pub async fn await_primary_bitcoin_module_output(
940 &self,
941 operation_id: OperationId,
942 out_point: OutPoint,
943 ) -> anyhow::Result<()> {
944 self.primary_module_for_unit(AmountUnit::BITCOIN)
945 .ok_or_else(|| anyhow!("No primary module available"))?
946 .1
947 .await_primary_module_output(operation_id, out_point)
948 .await
949 }
950
951 pub fn get_first_module<M: ClientModule>(
953 &'_ self,
954 ) -> anyhow::Result<ClientModuleInstance<'_, M>> {
955 let module_kind = M::kind();
956 let id = self
957 .get_first_instance(&module_kind)
958 .ok_or_else(|| format_err!("No modules found of kind {module_kind}"))?;
959 let module: &M = self
960 .try_get_module(id)
961 .ok_or_else(|| format_err!("Unknown module instance {id}"))?
962 .as_any()
963 .downcast_ref::<M>()
964 .ok_or_else(|| format_err!("Module is not of type {}", std::any::type_name::<M>()))?;
965 let (db, _) = self.db().with_prefix_module_id(id);
966 Ok(ClientModuleInstance {
967 id,
968 db,
969 api: self.api().with_module(id),
970 module,
971 })
972 }
973
974 pub fn get_module_client_dyn(
975 &self,
976 instance_id: ModuleInstanceId,
977 ) -> anyhow::Result<&maybe_add_send_sync!(dyn IClientModule)> {
978 self.try_get_module(instance_id)
979 .ok_or(anyhow!("Unknown module instance {}", instance_id))
980 }
981
982 pub fn db(&self) -> &Database {
983 &self.db
984 }
985
986 pub fn endpoints(&self) -> &ConnectorRegistry {
987 &self.connectors
988 }
989
990 pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
993 TransactionUpdates {
994 update_stream: self.transaction_update_stream(operation_id).await,
995 }
996 }
997
998 pub fn get_first_instance(&self, module_kind: &ModuleKind) -> Option<ModuleInstanceId> {
1000 self.modules
1001 .iter_modules()
1002 .find(|(_, kind, _module)| *kind == module_kind)
1003 .map(|(instance_id, _, _)| instance_id)
1004 }
1005
1006 pub async fn root_secret_encoding<T: Decodable>(&self) -> anyhow::Result<T> {
1009 get_decoded_client_secret::<T>(self.db()).await
1010 }
1011
1012 pub async fn await_primary_bitcoin_module_outputs(
1015 &self,
1016 operation_id: OperationId,
1017 outputs: Vec<OutPoint>,
1018 ) -> anyhow::Result<()> {
1019 for out_point in outputs {
1020 self.await_primary_bitcoin_module_output(operation_id, out_point)
1021 .await?;
1022 }
1023
1024 Ok(())
1025 }
1026
1027 pub async fn get_config_json(&self) -> JsonClientConfig {
1033 self.config().await.to_json()
1034 }
1035
1036 #[doc(hidden)]
1039 pub async fn get_balance_for_btc(&self) -> anyhow::Result<Amount> {
1042 self.get_balance_for_unit(AmountUnit::BITCOIN).await
1043 }
1044
1045 pub async fn get_balance_for_unit(&self, unit: AmountUnit) -> anyhow::Result<Amount> {
1046 let (id, module) = self
1047 .primary_module_for_unit(unit)
1048 .ok_or_else(|| anyhow!("Primary module not available"))?;
1049 Ok(module
1050 .get_balance(id, &mut self.db().begin_transaction_nc().await, unit)
1051 .await)
1052 }
1053
1054 pub async fn subscribe_balance_changes(&self, unit: AmountUnit) -> BoxStream<'static, Amount> {
1057 let primary_module_things =
1058 if let Some((primary_module_id, primary_module)) = self.primary_module_for_unit(unit) {
1059 let balance_changes = primary_module.subscribe_balance_changes().await;
1060 let initial_balance = self
1061 .get_balance_for_unit(unit)
1062 .await
1063 .expect("Primary is present");
1064
1065 Some((
1066 primary_module_id,
1067 primary_module.clone(),
1068 balance_changes,
1069 initial_balance,
1070 ))
1071 } else {
1072 None
1073 };
1074 let db = self.db().clone();
1075
1076 Box::pin(async_stream::stream! {
1077 let Some((primary_module_id, primary_module, mut balance_changes, initial_balance)) = primary_module_things else {
1078 pending().await
1081 };
1082
1083
1084 yield initial_balance;
1085 let mut prev_balance = initial_balance;
1086 while let Some(()) = balance_changes.next().await {
1087 let mut dbtx = db.begin_transaction_nc().await;
1088 let balance = primary_module
1089 .get_balance(primary_module_id, &mut dbtx, unit)
1090 .await;
1091
1092 if balance != prev_balance {
1094 prev_balance = balance;
1095 yield balance;
1096 }
1097 }
1098 })
1099 }
1100
1101 async fn make_api_version_request(
1106 delay: Duration,
1107 peer_id: PeerId,
1108 api: &DynGlobalApi,
1109 ) -> (
1110 PeerId,
1111 Result<SupportedApiVersionsSummary, fedimint_connectors::error::ServerError>,
1112 ) {
1113 runtime::sleep(delay).await;
1114 (
1115 peer_id,
1116 api.request_single_peer::<SupportedApiVersionsSummary>(
1117 VERSION_ENDPOINT.to_owned(),
1118 ApiRequestErased::default(),
1119 peer_id,
1120 )
1121 .await,
1122 )
1123 }
1124
1125 fn create_api_version_backoff() -> impl Iterator<Item = Duration> {
1131 custom_backoff(Duration::from_millis(200), Duration::from_secs(600), None)
1132 }
1133
1134 pub async fn fetch_common_api_versions_from_all_peers(
1137 num_peers: NumPeers,
1138 api: DynGlobalApi,
1139 db: Database,
1140 num_responses_sender: watch::Sender<usize>,
1141 ) {
1142 let mut backoff = Self::create_api_version_backoff();
1143
1144 let mut requests = FuturesUnordered::new();
1147
1148 for peer_id in num_peers.peer_ids() {
1149 requests.push(Self::make_api_version_request(
1150 Duration::ZERO,
1151 peer_id,
1152 &api,
1153 ));
1154 }
1155
1156 let mut num_responses = 0;
1157
1158 while let Some((peer_id, response)) = requests.next().await {
1159 let retry = match response {
1160 Err(err) => {
1161 let has_previous_response = db
1162 .begin_transaction_nc()
1163 .await
1164 .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1165 .await
1166 .is_some();
1167 debug!(
1168 target: LOG_CLIENT,
1169 %peer_id,
1170 err = %err.fmt_compact(),
1171 %has_previous_response,
1172 "Failed to refresh API versions of a peer"
1173 );
1174
1175 !has_previous_response
1176 }
1177 Ok(o) => {
1178 let mut dbtx = db.begin_transaction().await;
1181 dbtx.insert_entry(
1182 &PeerLastApiVersionsSummaryKey(peer_id),
1183 &PeerLastApiVersionsSummary(o),
1184 )
1185 .await;
1186 dbtx.commit_tx().await;
1187 false
1188 }
1189 };
1190
1191 if retry {
1192 requests.push(Self::make_api_version_request(
1193 backoff.next().expect("Keeps retrying"),
1194 peer_id,
1195 &api,
1196 ));
1197 } else {
1198 num_responses += 1;
1199 num_responses_sender.send_replace(num_responses);
1200 }
1201 }
1202 }
1203
1204 pub async fn fetch_peers_api_versions_from_threshold_of_peers(
1208 num_peers: NumPeers,
1209 api: DynGlobalApi,
1210 ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1211 let mut backoff = Self::create_api_version_backoff();
1212
1213 let mut requests = FuturesUnordered::new();
1216
1217 for peer_id in num_peers.peer_ids() {
1218 requests.push(Self::make_api_version_request(
1219 Duration::ZERO,
1220 peer_id,
1221 &api,
1222 ));
1223 }
1224
1225 let mut successful_responses = BTreeMap::new();
1226
1227 while successful_responses.len() < num_peers.threshold()
1228 && let Some((peer_id, response)) = requests.next().await
1229 {
1230 let retry = match response {
1231 Err(err) => {
1232 debug!(
1233 target: LOG_CLIENT,
1234 %peer_id,
1235 err = %err.fmt_compact(),
1236 "Failed to fetch API versions from peer"
1237 );
1238 true
1239 }
1240 Ok(response) => {
1241 successful_responses.insert(peer_id, response);
1242 false
1243 }
1244 };
1245
1246 if retry {
1247 requests.push(Self::make_api_version_request(
1248 backoff.next().expect("Keeps retrying"),
1249 peer_id,
1250 &api,
1251 ));
1252 }
1253 }
1254
1255 successful_responses
1256 }
1257
1258 pub async fn fetch_common_api_versions(
1260 config: &ClientConfig,
1261 api: &DynGlobalApi,
1262 ) -> anyhow::Result<BTreeMap<PeerId, SupportedApiVersionsSummary>> {
1263 debug!(
1264 target: LOG_CLIENT,
1265 "Fetching common api versions"
1266 );
1267
1268 let num_peers = NumPeers::from(config.global.api_endpoints.len());
1269
1270 let peer_api_version_sets =
1271 Self::fetch_peers_api_versions_from_threshold_of_peers(num_peers, api.clone()).await;
1272
1273 Ok(peer_api_version_sets)
1274 }
1275
1276 pub async fn write_api_version_cache(
1280 dbtx: &mut DatabaseTransaction<'_>,
1281 api_version_set: ApiVersionSet,
1282 ) {
1283 debug!(
1284 target: LOG_CLIENT,
1285 value = ?api_version_set,
1286 "Writing API version set to cache"
1287 );
1288
1289 dbtx.insert_entry(
1290 &CachedApiVersionSetKey,
1291 &CachedApiVersionSet(api_version_set),
1292 )
1293 .await;
1294 }
1295
1296 pub async fn store_prefetched_api_versions(
1301 db: &Database,
1302 config: &ClientConfig,
1303 client_module_init: &ClientModuleInitRegistry,
1304 peer_api_versions: &BTreeMap<PeerId, SupportedApiVersionsSummary>,
1305 ) {
1306 debug!(
1307 target: LOG_CLIENT,
1308 "Storing {} prefetched peer API version responses and calculating common version set",
1309 peer_api_versions.len()
1310 );
1311
1312 let mut dbtx = db.begin_transaction().await;
1313 let client_supported_versions =
1315 Self::supported_api_versions_summary_static(config, client_module_init);
1316 match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1317 &client_supported_versions,
1318 peer_api_versions,
1319 ) {
1320 Ok(common_api_versions) => {
1321 Self::write_api_version_cache(&mut dbtx.to_ref_nc(), common_api_versions).await;
1323 debug!(target: LOG_CLIENT, "Calculated and stored common API version set");
1324 }
1325 Err(err) => {
1326 debug!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Failed to calculate common API versions from prefetched data");
1327 }
1328 }
1329
1330 for (peer_id, peer_api_versions) in peer_api_versions {
1332 dbtx.insert_entry(
1333 &PeerLastApiVersionsSummaryKey(*peer_id),
1334 &PeerLastApiVersionsSummary(peer_api_versions.clone()),
1335 )
1336 .await;
1337 }
1338 dbtx.commit_tx().await;
1339 debug!(target: LOG_CLIENT, "Stored individual peer API version responses");
1340 }
1341
1342 pub fn supported_api_versions_summary_static(
1344 config: &ClientConfig,
1345 client_module_init: &ClientModuleInitRegistry,
1346 ) -> SupportedApiVersionsSummary {
1347 SupportedApiVersionsSummary {
1348 core: SupportedCoreApiVersions {
1349 core_consensus: config.global.consensus_version,
1350 api: MultiApiVersion::try_from_iter(SUPPORTED_CORE_API_VERSIONS.to_owned())
1351 .expect("must not have conflicting versions"),
1352 },
1353 modules: config
1354 .modules
1355 .iter()
1356 .filter_map(|(&module_instance_id, module_config)| {
1357 client_module_init
1358 .get(module_config.kind())
1359 .map(|module_init| {
1360 (
1361 module_instance_id,
1362 SupportedModuleApiVersions {
1363 core_consensus: config.global.consensus_version,
1364 module_consensus: module_config.version,
1365 api: module_init.supported_api_versions(),
1366 },
1367 )
1368 })
1369 })
1370 .collect(),
1371 }
1372 }
1373
1374 pub async fn load_and_refresh_common_api_version(&self) -> anyhow::Result<ApiVersionSet> {
1375 Self::load_and_refresh_common_api_version_static(
1376 &self.config().await,
1377 &self.module_inits,
1378 self.connectors.clone(),
1379 &self.api,
1380 &self.db,
1381 &self.task_group,
1382 &self.client_span,
1383 )
1384 .await
1385 }
1386
1387 pub async fn refresh_api_versions(&self) -> anyhow::Result<ApiVersionSet> {
1393 Self::refresh_common_api_version_static(
1394 &self.config().await,
1395 &self.module_inits,
1396 &self.api,
1397 &self.db,
1398 self.task_group.clone(),
1399 &self.client_span,
1400 true,
1401 )
1402 .await
1403 }
1404
1405 pub(crate) async fn load_and_refresh_common_api_version_static(
1411 config: &ClientConfig,
1412 module_init: &ClientModuleInitRegistry,
1413 connectors: ConnectorRegistry,
1414 api: &DynGlobalApi,
1415 db: &Database,
1416 task_group: &TaskGroup,
1417 client_span: &Span,
1418 ) -> anyhow::Result<ApiVersionSet> {
1419 if let Some(v) = db
1420 .begin_transaction_nc()
1421 .await
1422 .get_value(&CachedApiVersionSetKey)
1423 .await
1424 {
1425 client_span.in_scope(|| {
1426 debug!(
1427 target: LOG_CLIENT,
1428 "Found existing cached common api versions"
1429 );
1430 });
1431 let config = config.clone();
1432 let client_module_init = module_init.clone();
1433 let api = api.clone();
1434 let db = db.clone();
1435 let task_group = task_group.clone();
1436 let client_span_owned = client_span.clone();
1437 task_group.clone().spawn_cancellable_with_span(
1440 client_span.clone(),
1441 "refresh_common_api_version_static",
1442 async move {
1443 connectors.wait_for_initialized_connections().await;
1444
1445 if let Err(error) = Self::refresh_common_api_version_static(
1446 &config,
1447 &client_module_init,
1448 &api,
1449 &db,
1450 task_group,
1451 &client_span_owned,
1452 false,
1453 )
1454 .await
1455 {
1456 warn!(
1457 target: LOG_CLIENT,
1458 err = %error.fmt_compact_anyhow(), "Failed to discover common api versions"
1459 );
1460 }
1461 },
1462 );
1463
1464 return Ok(v.0);
1465 }
1466
1467 info!(
1468 target: LOG_CLIENT,
1469 "Fetching initial API versions "
1470 );
1471 Self::refresh_common_api_version_static(
1472 config,
1473 module_init,
1474 api,
1475 db,
1476 task_group.clone(),
1477 client_span,
1478 true,
1479 )
1480 .await
1481 }
1482
1483 async fn refresh_common_api_version_static(
1484 config: &ClientConfig,
1485 client_module_init: &ClientModuleInitRegistry,
1486 api: &DynGlobalApi,
1487 db: &Database,
1488 task_group: TaskGroup,
1489 client_span: &Span,
1490 block_until_ok: bool,
1491 ) -> anyhow::Result<ApiVersionSet> {
1492 debug!(
1493 target: LOG_CLIENT,
1494 "Refreshing common api versions"
1495 );
1496
1497 let (num_responses_sender, mut num_responses_receiver) = tokio::sync::watch::channel(0);
1498 let num_peers = NumPeers::from(config.global.api_endpoints.len());
1499
1500 task_group.spawn_cancellable_with_span(
1501 client_span.clone(),
1502 "refresh peers api versions",
1503 Client::fetch_common_api_versions_from_all_peers(
1504 num_peers,
1505 api.clone(),
1506 db.clone(),
1507 num_responses_sender,
1508 ),
1509 );
1510
1511 let common_api_versions = loop {
1512 let _: Result<_, Elapsed> = runtime::timeout(
1520 Duration::from_secs(30),
1521 num_responses_receiver.wait_for(|num| num_peers.threshold() <= *num),
1522 )
1523 .await;
1524
1525 let peer_api_version_sets = Self::load_peers_last_api_versions(db, num_peers).await;
1526
1527 match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1528 &Self::supported_api_versions_summary_static(config, client_module_init),
1529 &peer_api_version_sets,
1530 ) {
1531 Ok(o) => break o,
1532 Err(err) if block_until_ok => {
1533 warn!(
1534 target: LOG_CLIENT,
1535 err = %err.fmt_compact_anyhow(),
1536 "Failed to discover API version to use. Retrying..."
1537 );
1538 continue;
1539 }
1540 Err(e) => return Err(e),
1541 }
1542 };
1543
1544 debug!(
1545 target: LOG_CLIENT,
1546 value = ?common_api_versions,
1547 "Updating the cached common api versions"
1548 );
1549 let mut dbtx = db.begin_transaction().await;
1550 let _ = dbtx
1551 .insert_entry(
1552 &CachedApiVersionSetKey,
1553 &CachedApiVersionSet(common_api_versions.clone()),
1554 )
1555 .await;
1556
1557 dbtx.commit_tx().await;
1558
1559 Ok(common_api_versions)
1560 }
1561
1562 pub async fn get_metadata(&self) -> Metadata {
1564 self.db
1565 .begin_transaction_nc()
1566 .await
1567 .get_value(&ClientMetadataKey)
1568 .await
1569 .unwrap_or_else(|| {
1570 warn!(
1571 target: LOG_CLIENT,
1572 "Missing existing metadata. This key should have been set on Client init"
1573 );
1574 Metadata::empty()
1575 })
1576 }
1577
1578 pub async fn set_metadata(&self, metadata: &Metadata) {
1580 self.db
1581 .autocommit::<_, _, anyhow::Error>(
1582 |dbtx, _| {
1583 Box::pin(async {
1584 Self::set_metadata_dbtx(dbtx, metadata).await;
1585 Ok(())
1586 })
1587 },
1588 None,
1589 )
1590 .await
1591 .expect("Failed to autocommit metadata");
1592 }
1593
1594 pub fn has_pending_recoveries(&self) -> bool {
1595 !self
1596 .client_recovery_progress_receiver
1597 .borrow()
1598 .iter()
1599 .all(|(_id, progress)| progress.is_done())
1600 }
1601
1602 pub async fn wait_for_all_recoveries(&self) -> anyhow::Result<()> {
1610 let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1611 recovery_receiver
1612 .wait_for(|in_progress| {
1613 in_progress
1614 .iter()
1615 .all(|(_id, progress)| progress.is_done())
1616 })
1617 .await
1618 .context("Recovery task completed and update receiver disconnected, but some modules failed to recover")?;
1619
1620 Ok(())
1621 }
1622
1623 pub fn subscribe_to_recovery_progress(
1628 &self,
1629 ) -> impl Stream<Item = (ModuleInstanceId, RecoveryProgress)> + use<> {
1630 WatchStream::new(self.client_recovery_progress_receiver.clone())
1631 .flat_map(futures::stream::iter)
1632 }
1633
1634 pub async fn wait_for_module_kind_recovery(
1635 &self,
1636 module_kind: ModuleKind,
1637 ) -> anyhow::Result<()> {
1638 let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1639 let config = self.config().await;
1640 recovery_receiver
1641 .wait_for(|in_progress| {
1642 !in_progress
1643 .iter()
1644 .filter(|(module_instance_id, _progress)| {
1645 config.modules[module_instance_id].kind == module_kind
1646 })
1647 .any(|(_id, progress)| !progress.is_done())
1648 })
1649 .await
1650 .context("Recovery task completed and update receiver disconnected, but the desired modules are still unavailable or failed to recover")?;
1651
1652 Ok(())
1653 }
1654
1655 pub async fn wait_for_all_active_state_machines(&self) -> anyhow::Result<()> {
1656 loop {
1657 if self.executor.get_active_states().await.is_empty() {
1658 break;
1659 }
1660 sleep(Duration::from_millis(100)).await;
1661 }
1662 Ok(())
1663 }
1664
1665 pub async fn set_metadata_dbtx(dbtx: &mut DatabaseTransaction<'_>, metadata: &Metadata) {
1667 dbtx.insert_new_entry(&ClientMetadataKey, metadata).await;
1668 }
1669
1670 fn spawn_module_recoveries_task(
1671 &self,
1672 recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1673 module_recoveries: BTreeMap<
1674 ModuleInstanceId,
1675 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1676 >,
1677 module_recovery_progress_receivers: BTreeMap<
1678 ModuleInstanceId,
1679 watch::Receiver<RecoveryProgress>,
1680 >,
1681 ) {
1682 let db = self.db.clone();
1683 let log_ordering_wakeup_tx = self.log_ordering_wakeup_tx.clone();
1684 let module_kinds: BTreeMap<ModuleInstanceId, String> = self
1685 .modules
1686 .iter_modules_id_kind()
1687 .map(|(id, kind)| (id, kind.to_string()))
1688 .collect();
1689 self.spawn("module recoveries", |_task_handle| async {
1690 Self::run_module_recoveries_task(
1691 db,
1692 log_ordering_wakeup_tx,
1693 recovery_sender,
1694 module_recoveries,
1695 module_recovery_progress_receivers,
1696 module_kinds,
1697 )
1698 .await;
1699 });
1700 }
1701
1702 async fn run_module_recoveries_task(
1703 db: Database,
1704 log_ordering_wakeup_tx: watch::Sender<()>,
1705 recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1706 module_recoveries: BTreeMap<
1707 ModuleInstanceId,
1708 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1709 >,
1710 module_recovery_progress_receivers: BTreeMap<
1711 ModuleInstanceId,
1712 watch::Receiver<RecoveryProgress>,
1713 >,
1714 module_kinds: BTreeMap<ModuleInstanceId, String>,
1715 ) {
1716 debug!(target: LOG_CLIENT_RECOVERY, num_modules=%module_recovery_progress_receivers.len(), "Staring module recoveries");
1717 let mut completed_stream = Vec::new();
1718 let progress_stream = futures::stream::FuturesUnordered::new();
1719
1720 for (module_instance_id, f) in module_recoveries {
1721 completed_stream.push(futures::stream::once(Box::pin(async move {
1722 match f.await {
1723 Ok(()) => (module_instance_id, None),
1724 Err(err) => {
1725 warn!(
1726 target: LOG_CLIENT,
1727 err = %err.fmt_compact_anyhow(), module_instance_id, "Module recovery failed"
1728 );
1729 futures::future::pending::<()>().await;
1733 unreachable!()
1734 }
1735 }
1736 })));
1737 }
1738
1739 for (module_instance_id, rx) in module_recovery_progress_receivers {
1740 progress_stream.push(
1741 tokio_stream::wrappers::WatchStream::new(rx)
1742 .fuse()
1743 .map(move |progress| (module_instance_id, Some(progress))),
1744 );
1745 }
1746
1747 let mut futures = futures::stream::select(
1748 futures::stream::select_all(progress_stream),
1749 futures::stream::select_all(completed_stream),
1750 );
1751
1752 while let Some((module_instance_id, progress)) = futures.next().await {
1753 let mut dbtx = db.begin_transaction().await;
1754
1755 let prev_progress = *recovery_sender
1756 .borrow()
1757 .get(&module_instance_id)
1758 .expect("existing progress must be present");
1759
1760 let progress = if prev_progress.is_done() {
1761 prev_progress
1763 } else if let Some(progress) = progress {
1764 progress
1765 } else {
1766 prev_progress.to_complete()
1767 };
1768
1769 if !prev_progress.is_done() && progress.is_done() {
1770 info!(
1771 target: LOG_CLIENT,
1772 module_instance_id,
1773 progress = format!("{}/{}", progress.complete, progress.total),
1774 "Recovery complete"
1775 );
1776 dbtx.log_event(
1777 log_ordering_wakeup_tx.clone(),
1778 None,
1779 ModuleRecoveryCompleted {
1780 module_id: module_instance_id,
1781 },
1782 )
1783 .await;
1784 } else {
1785 info!(
1786 target: LOG_CLIENT,
1787 module_instance_id,
1788 kind = module_kinds.get(&module_instance_id).map(String::as_str).unwrap_or("unknown"),
1789 progress = format!("{}/{}", progress.complete, progress.total),
1790 "Recovery progress"
1791 );
1792 }
1793
1794 dbtx.insert_entry(
1795 &ClientModuleRecovery { module_instance_id },
1796 &ClientModuleRecoveryState { progress },
1797 )
1798 .await;
1799 dbtx.commit_tx().await;
1800
1801 recovery_sender.send_modify(|v| {
1802 v.insert(module_instance_id, progress);
1803 });
1804 }
1805 debug!(target: LOG_CLIENT_RECOVERY, "Recovery executor stopped");
1806 }
1807
1808 async fn load_peers_last_api_versions(
1809 db: &Database,
1810 num_peers: NumPeers,
1811 ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1812 let mut peer_api_version_sets = BTreeMap::new();
1813
1814 let mut dbtx = db.begin_transaction_nc().await;
1815 for peer_id in num_peers.peer_ids() {
1816 if let Some(v) = dbtx
1817 .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1818 .await
1819 {
1820 peer_api_version_sets.insert(peer_id, v.0);
1821 }
1822 }
1823 drop(dbtx);
1824 peer_api_version_sets
1825 }
1826
1827 pub async fn get_peer_url_announcements(&self) -> BTreeMap<PeerId, SignedApiAnnouncement> {
1830 self.db()
1831 .begin_transaction_nc()
1832 .await
1833 .find_by_prefix(&ApiAnnouncementPrefix)
1834 .await
1835 .map(|(announcement_key, announcement)| (announcement_key.0, announcement))
1836 .collect()
1837 .await
1838 }
1839
1840 pub async fn get_guardian_metadata(
1842 &self,
1843 ) -> BTreeMap<PeerId, fedimint_core::net::guardian_metadata::SignedGuardianMetadata> {
1844 self.db()
1845 .begin_transaction_nc()
1846 .await
1847 .find_by_prefix(&crate::guardian_metadata::GuardianMetadataPrefix)
1848 .await
1849 .map(|(key, metadata)| (key.0, metadata))
1850 .collect()
1851 .await
1852 }
1853
1854 pub async fn get_peer_urls(&self) -> BTreeMap<PeerId, SafeUrl> {
1856 get_api_urls(&self.db, &self.config().await).await
1857 }
1858
1859 pub async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
1862 self.get_peer_urls()
1863 .await
1864 .into_iter()
1865 .find_map(|(peer_id, url)| (peer == peer_id).then_some(url))
1866 .map(|peer_url| {
1867 InviteCode::new(
1868 peer_url.clone(),
1869 peer,
1870 self.federation_id(),
1871 self.api_secret.clone(),
1872 )
1873 })
1874 }
1875
1876 pub async fn get_guardian_public_keys_blocking(
1880 &self,
1881 ) -> BTreeMap<PeerId, fedimint_core::secp256k1::PublicKey> {
1882 self.db
1883 .autocommit(
1884 |dbtx, _| {
1885 Box::pin(async move {
1886 let config = self.config().await;
1887
1888 let guardian_pub_keys = self
1889 .get_or_backfill_broadcast_public_keys(dbtx, config)
1890 .await;
1891
1892 Result::<_, ()>::Ok(guardian_pub_keys)
1893 })
1894 },
1895 None,
1896 )
1897 .await
1898 .expect("Will retry forever")
1899 }
1900
1901 async fn get_or_backfill_broadcast_public_keys(
1902 &self,
1903 dbtx: &mut DatabaseTransaction<'_>,
1904 config: ClientConfig,
1905 ) -> BTreeMap<PeerId, PublicKey> {
1906 match config.global.broadcast_public_keys {
1907 Some(guardian_pub_keys) => guardian_pub_keys,
1908 _ => {
1909 let (guardian_pub_keys, new_config) = self.fetch_and_update_config(config).await;
1910
1911 dbtx.insert_entry(&ClientConfigKey, &new_config).await;
1912 *(self.config.write().await) = new_config;
1913 guardian_pub_keys
1914 }
1915 }
1916 }
1917
1918 async fn fetch_session_count(&self) -> FederationResult<u64> {
1919 self.api.session_count().await
1920 }
1921
1922 async fn fetch_and_update_config(
1923 &self,
1924 config: ClientConfig,
1925 ) -> (BTreeMap<PeerId, PublicKey>, ClientConfig) {
1926 let fetched_config = retry(
1927 "Fetching guardian public keys",
1928 backoff_util::background_backoff(),
1929 || async {
1930 Ok(self
1931 .api
1932 .request_current_consensus::<ClientConfig>(
1933 CLIENT_CONFIG_ENDPOINT.to_owned(),
1934 ApiRequestErased::default(),
1935 )
1936 .await?)
1937 },
1938 )
1939 .await
1940 .expect("Will never return on error");
1941
1942 let Some(guardian_pub_keys) = fetched_config.global.broadcast_public_keys else {
1943 warn!(
1944 target: LOG_CLIENT,
1945 "Guardian public keys not found in fetched config, server not updated to 0.4 yet"
1946 );
1947 pending::<()>().await;
1948 unreachable!("Pending will never return");
1949 };
1950
1951 let new_config = ClientConfig {
1952 global: GlobalClientConfig {
1953 broadcast_public_keys: Some(guardian_pub_keys.clone()),
1954 ..config.global
1955 },
1956 modules: config.modules,
1957 };
1958 (guardian_pub_keys, new_config)
1959 }
1960
1961 pub fn handle_global_rpc(
1962 &self,
1963 method: String,
1964 params: serde_json::Value,
1965 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1966 Box::pin(try_stream! {
1967 match method.as_str() {
1968 "get_balance" => {
1969 let balance = self.get_balance_for_btc().await.unwrap_or_default();
1970 yield serde_json::to_value(balance)?;
1971 }
1972 "subscribe_balance_changes" => {
1973 let req: GetBalanceChangesRequest= serde_json::from_value(params)?;
1974 let mut stream = self.subscribe_balance_changes(req.unit).await;
1975 while let Some(balance) = stream.next().await {
1976 yield serde_json::to_value(balance)?;
1977 }
1978 }
1979 "get_config" => {
1980 let config = self.config().await;
1981 yield serde_json::to_value(config)?;
1982 }
1983 "get_federation_id" => {
1984 let federation_id = self.federation_id();
1985 yield serde_json::to_value(federation_id)?;
1986 }
1987 "get_invite_code" => {
1988 let req: GetInviteCodeRequest = serde_json::from_value(params)?;
1989 let invite_code = self.invite_code(req.peer).await;
1990 yield serde_json::to_value(invite_code)?;
1991 }
1992 "get_operation" => {
1993 let req: GetOperationIdRequest = serde_json::from_value(params)?;
1994 let operation = self.operation_log().get_operation(req.operation_id).await;
1995 yield serde_json::to_value(operation)?;
1996 }
1997 "list_operations" => {
1998 let req: ListOperationsParams = serde_json::from_value(params)?;
1999 let limit = if req.limit.is_none() && req.last_seen.is_none() {
2000 usize::MAX
2001 } else {
2002 req.limit.unwrap_or(usize::MAX)
2003 };
2004 let operations = self.operation_log()
2005 .paginate_operations_rev(limit, req.last_seen)
2006 .await;
2007 yield serde_json::to_value(operations)?;
2008 }
2009 "get_event_log" => {
2010 let req: GetEventLogRequest = serde_json::from_value(params)?;
2011 let limit = req
2012 .limit
2013 .unwrap_or(DEFAULT_EVENT_LOG_PAGE_SIZE)
2014 .min(MAX_EVENT_LOG_PAGE_SIZE);
2015 let events = self.get_event_log(req.pos, limit).await;
2016 yield serde_json::to_value(events)?;
2017 }
2018 "session_count" => {
2019 let count = self.fetch_session_count().await?;
2020 yield serde_json::to_value(count)?;
2021 }
2022 "has_pending_recoveries" => {
2023 let has_pending = self.has_pending_recoveries();
2024 yield serde_json::to_value(has_pending)?;
2025 }
2026 "wait_for_all_recoveries" => {
2027 self.wait_for_all_recoveries().await?;
2028 yield serde_json::Value::Null;
2029 }
2030 "subscribe_to_recovery_progress" => {
2031 let mut stream = self.subscribe_to_recovery_progress();
2032 while let Some((module_id, progress)) = stream.next().await {
2033 yield serde_json::json!({
2034 "module_id": module_id,
2035 "progress": progress
2036 });
2037 }
2038 }
2039 #[allow(deprecated)]
2040 "backup_to_federation" => {
2041 let metadata = if params.is_null() {
2042 Metadata::from_json_serialized(serde_json::json!({}))
2043 } else {
2044 Metadata::from_json_serialized(params)
2045 };
2046 self.backup_to_federation(metadata).await?;
2047 yield serde_json::Value::Null;
2048 }
2049 _ => {
2050 Err(anyhow::format_err!("Unknown method: {}", method))?;
2051 unreachable!()
2052 },
2053 }
2054 })
2055 }
2056
2057 pub async fn log_event<E>(&self, module_id: Option<ModuleInstanceId>, event: E)
2058 where
2059 E: Event + Send,
2060 {
2061 let mut dbtx = self.db.begin_transaction().await;
2062 self.log_event_dbtx(&mut dbtx, module_id, event).await;
2063 dbtx.commit_tx().await;
2064 }
2065
2066 pub async fn log_event_dbtx<E, Cap>(
2067 &self,
2068 dbtx: &mut DatabaseTransaction<'_, Cap>,
2069 module_id: Option<ModuleInstanceId>,
2070 event: E,
2071 ) where
2072 E: Event + Send,
2073 Cap: Send,
2074 {
2075 dbtx.log_event(self.log_ordering_wakeup_tx.clone(), module_id, event)
2076 .await;
2077 }
2078
2079 pub async fn log_event_raw_dbtx<Cap>(
2080 &self,
2081 dbtx: &mut DatabaseTransaction<'_, Cap>,
2082 kind: EventKind,
2083 module: Option<(ModuleKind, ModuleInstanceId)>,
2084 payload: Vec<u8>,
2085 persist: EventPersistence,
2086 ) where
2087 Cap: Send,
2088 {
2089 let module_id = module.as_ref().map(|m| m.1);
2090 let module_kind = module.map(|m| m.0);
2091 dbtx.log_event_raw(
2092 self.log_ordering_wakeup_tx.clone(),
2093 kind,
2094 module_kind,
2095 module_id,
2096 payload,
2097 persist,
2098 )
2099 .await;
2100 }
2101
2102 pub fn built_in_application_event_log_tracker(&self) -> DynEventLogTrimableTracker {
2114 struct BuiltInApplicationEventLogTracker;
2115
2116 #[apply(async_trait_maybe_send!)]
2117 impl EventLogTrimableTracker for BuiltInApplicationEventLogTracker {
2118 async fn store(
2120 &mut self,
2121 dbtx: &mut DatabaseTransaction<NonCommittable>,
2122 pos: EventLogTrimableId,
2123 ) -> anyhow::Result<()> {
2124 dbtx.insert_entry(&DefaultApplicationEventLogKey, &pos)
2125 .await;
2126 Ok(())
2127 }
2128
2129 async fn load(
2131 &mut self,
2132 dbtx: &mut DatabaseTransaction<NonCommittable>,
2133 ) -> anyhow::Result<Option<EventLogTrimableId>> {
2134 Ok(dbtx.get_value(&DefaultApplicationEventLogKey).await)
2135 }
2136 }
2137 Box::new(BuiltInApplicationEventLogTracker)
2138 }
2139
2140 pub async fn handle_historical_events<F, R>(
2148 &self,
2149 tracker: fedimint_eventlog::DynEventLogTracker,
2150 handler_fn: F,
2151 ) -> anyhow::Result<()>
2152 where
2153 F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
2154 R: Future<Output = anyhow::Result<()>>,
2155 {
2156 fedimint_eventlog::handle_events(
2157 self.db.clone(),
2158 tracker,
2159 self.log_event_added_rx.clone(),
2160 handler_fn,
2161 )
2162 .await
2163 }
2164
2165 pub async fn handle_events<F, R>(
2184 &self,
2185 tracker: fedimint_eventlog::DynEventLogTrimableTracker,
2186 handler_fn: F,
2187 ) -> anyhow::Result<()>
2188 where
2189 F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
2190 R: Future<Output = anyhow::Result<()>>,
2191 {
2192 fedimint_eventlog::handle_trimable_events(
2193 self.db.clone(),
2194 tracker,
2195 self.log_event_added_rx.clone(),
2196 handler_fn,
2197 )
2198 .await
2199 }
2200
2201 pub async fn get_event_log(
2202 &self,
2203 pos: Option<EventLogId>,
2204 limit: u64,
2205 ) -> Vec<PersistedLogEntry> {
2206 self.get_event_log_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
2207 .await
2208 }
2209
2210 pub async fn get_event_log_trimable(
2211 &self,
2212 pos: Option<EventLogTrimableId>,
2213 limit: u64,
2214 ) -> Vec<PersistedLogEntry> {
2215 self.get_event_log_trimable_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
2216 .await
2217 }
2218
2219 pub async fn get_event_log_dbtx<Cap>(
2220 &self,
2221 dbtx: &mut DatabaseTransaction<'_, Cap>,
2222 pos: Option<EventLogId>,
2223 limit: u64,
2224 ) -> Vec<PersistedLogEntry>
2225 where
2226 Cap: Send,
2227 {
2228 dbtx.get_event_log(pos, limit).await
2229 }
2230
2231 pub async fn get_event_log_trimable_dbtx<Cap>(
2232 &self,
2233 dbtx: &mut DatabaseTransaction<'_, Cap>,
2234 pos: Option<EventLogTrimableId>,
2235 limit: u64,
2236 ) -> Vec<PersistedLogEntry>
2237 where
2238 Cap: Send,
2239 {
2240 dbtx.get_event_log_trimable(pos, limit).await
2241 }
2242
2243 pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
2245 self.log_event_added_transient_tx.subscribe()
2246 }
2247
2248 pub fn log_event_added_rx(&self) -> watch::Receiver<()> {
2250 self.log_event_added_rx.clone()
2251 }
2252
2253 pub fn iroh_enable_dht(&self) -> bool {
2254 self.iroh_enable_dht
2255 }
2256
2257 pub(crate) async fn run_core_migrations(
2258 db_no_decoders: &Database,
2259 ) -> Result<(), anyhow::Error> {
2260 let mut dbtx = db_no_decoders.begin_transaction().await;
2261 apply_migrations_core_client_dbtx(&mut dbtx.to_ref_nc(), "fedimint-client".to_string())
2262 .await?;
2263 if is_running_in_test_env() {
2264 verify_client_db_integrity_dbtx(&mut dbtx.to_ref_nc()).await;
2265 }
2266 dbtx.commit_tx_result().await?;
2267 Ok(())
2268 }
2269
2270 fn primary_modules_for_unit(
2272 &self,
2273 unit: AmountUnit,
2274 ) -> impl Iterator<Item = (ModuleInstanceId, &DynClientModule)> {
2275 self.primary_modules
2276 .iter()
2277 .flat_map(move |(_prio, candidates)| {
2278 candidates
2279 .specific
2280 .get(&unit)
2281 .into_iter()
2282 .flatten()
2283 .copied()
2284 .chain(candidates.wildcard.iter().copied())
2286 })
2287 .map(|id| (id, self.modules.get_expect(id)))
2288 }
2289
2290 pub fn primary_module_for_unit(
2294 &self,
2295 unit: AmountUnit,
2296 ) -> Option<(ModuleInstanceId, &DynClientModule)> {
2297 self.primary_modules_for_unit(unit).next()
2298 }
2299
2300 pub fn primary_module_for_btc(&self) -> (ModuleInstanceId, &DynClientModule) {
2302 self.primary_module_for_unit(AmountUnit::BITCOIN)
2303 .expect("No primary module for Bitcoin")
2304 }
2305}
2306
2307#[apply(async_trait_maybe_send!)]
2308impl ClientContextIface for Client {
2309 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
2310 Client::get_module(self, instance)
2311 }
2312
2313 fn api_clone(&self) -> DynGlobalApi {
2314 Client::api_clone(self)
2315 }
2316 fn decoders(&self) -> &ModuleDecoderRegistry {
2317 Client::decoders(self)
2318 }
2319
2320 async fn finalize_and_submit_transaction(
2321 &self,
2322 operation_id: OperationId,
2323 operation_type: &str,
2324 operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
2325 tx_builder: TransactionBuilder,
2326 ) -> anyhow::Result<OutPointRange> {
2327 Client::finalize_and_submit_transaction(
2328 self,
2329 operation_id,
2330 operation_type,
2331 &operation_meta_gen,
2333 tx_builder,
2334 )
2335 .await
2336 }
2337
2338 async fn finalize_and_submit_transaction_dbtx(
2339 &self,
2340 dbtx: &mut DatabaseTransaction<'_>,
2341 operation_id: OperationId,
2342 operation_type: &str,
2343 operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
2344 tx_builder: TransactionBuilder,
2345 ) -> anyhow::Result<OutPointRange> {
2346 Client::finalize_and_submit_transaction_dbtx(
2347 self,
2348 dbtx,
2349 operation_id,
2350 operation_type,
2351 &operation_meta_gen,
2352 tx_builder,
2353 )
2354 .await
2355 }
2356
2357 async fn finalize_and_submit_transaction_inner(
2358 &self,
2359 dbtx: &mut DatabaseTransaction<'_>,
2360 operation_id: OperationId,
2361 tx_builder: TransactionBuilder,
2362 ) -> anyhow::Result<OutPointRange> {
2363 Client::finalize_and_submit_transaction_inner(self, dbtx, operation_id, tx_builder).await
2364 }
2365
2366 async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
2367 Client::transaction_updates(self, operation_id).await
2368 }
2369
2370 async fn await_primary_module_outputs(
2371 &self,
2372 operation_id: OperationId,
2373 outputs: Vec<OutPoint>,
2375 ) -> anyhow::Result<()> {
2376 Client::await_primary_bitcoin_module_outputs(self, operation_id, outputs).await
2377 }
2378
2379 fn operation_log(&self) -> &dyn IOperationLog {
2380 Client::operation_log(self)
2381 }
2382
2383 async fn has_active_states(&self, operation_id: OperationId) -> bool {
2384 Client::has_active_states(self, operation_id).await
2385 }
2386
2387 async fn operation_exists(&self, operation_id: OperationId) -> bool {
2388 Client::operation_exists(self, operation_id).await
2389 }
2390
2391 async fn config(&self) -> ClientConfig {
2392 Client::config(self).await
2393 }
2394
2395 fn db(&self) -> &Database {
2396 Client::db(self)
2397 }
2398
2399 fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static)) {
2400 Client::executor(self)
2401 }
2402
2403 async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
2404 Client::invite_code(self, peer).await
2405 }
2406
2407 fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
2408 Client::get_internal_payment_markers(self)
2409 }
2410
2411 async fn log_event_json(
2412 &self,
2413 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
2414 module_kind: Option<ModuleKind>,
2415 module_id: ModuleInstanceId,
2416 kind: EventKind,
2417 payload: serde_json::Value,
2418 persist: EventPersistence,
2419 ) {
2420 dbtx.ensure_global()
2421 .expect("Must be called with global dbtx");
2422 self.log_event_raw_dbtx(
2423 dbtx,
2424 kind,
2425 module_kind.map(|kind| (kind, module_id)),
2426 serde_json::to_vec(&payload).expect("Serialization can't fail"),
2427 persist,
2428 )
2429 .await;
2430 }
2431
2432 async fn read_operation_active_states<'dbtx>(
2433 &self,
2434 operation_id: OperationId,
2435 module_id: ModuleInstanceId,
2436 dbtx: &'dbtx mut DatabaseTransaction<'_>,
2437 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>
2438 {
2439 Box::pin(
2440 dbtx.find_by_prefix(&ActiveModuleOperationStateKeyPrefix {
2441 operation_id,
2442 module_instance: module_id,
2443 })
2444 .await
2445 .map(move |(k, v)| (k.0, v)),
2446 )
2447 }
2448 async fn read_operation_inactive_states<'dbtx>(
2449 &self,
2450 operation_id: OperationId,
2451 module_id: ModuleInstanceId,
2452 dbtx: &'dbtx mut DatabaseTransaction<'_>,
2453 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>
2454 {
2455 Box::pin(
2456 dbtx.find_by_prefix(&InactiveModuleOperationStateKeyPrefix {
2457 operation_id,
2458 module_instance: module_id,
2459 })
2460 .await
2461 .map(move |(k, v)| (k.0, v)),
2462 )
2463 }
2464}
2465
2466impl fmt::Debug for Client {
2468 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2469 write!(f, "Client")
2470 }
2471}
2472
2473pub fn client_decoders<'a>(
2474 registry: &ModuleInitRegistry<DynClientModuleInit>,
2475 module_kinds: impl Iterator<Item = (ModuleInstanceId, &'a ModuleKind)>,
2476) -> ModuleDecoderRegistry {
2477 let mut modules = BTreeMap::new();
2478 for (id, kind) in module_kinds {
2479 let Some(init) = registry.get(kind) else {
2480 debug!("Detected configuration for unsupported module id: {id}, kind: {kind}");
2481 continue;
2482 };
2483
2484 modules.insert(
2485 id,
2486 (
2487 kind.clone(),
2488 IClientModuleInit::decoder(AsRef::<dyn IClientModuleInit + 'static>::as_ref(init)),
2489 ),
2490 );
2491 }
2492 ModuleDecoderRegistry::from(modules)
2493}