fedimint_client/
client.rs

1use std::collections::{BTreeMap, HashSet};
2use std::fmt::{self, Formatter};
3use std::future::{Future, pending};
4use std::ops::Range;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9use anyhow::{Context as _, anyhow, bail, format_err};
10use async_stream::try_stream;
11use bitcoin::key::Secp256k1;
12use bitcoin::key::rand::thread_rng;
13use bitcoin::secp256k1::{self, PublicKey};
14use fedimint_api_client::api::global_api::with_request_hook::ApiRequestHook;
15use fedimint_api_client::api::net::ConnectorType;
16use fedimint_api_client::api::{
17    ApiVersionSet, DynGlobalApi, FederationApiExt as _, FederationResult, IGlobalFederationApi,
18};
19use fedimint_client_module::module::recovery::RecoveryProgress;
20use fedimint_client_module::module::{
21    ClientContextIface, ClientModule, ClientModuleRegistry, DynClientModule, FinalClientIface,
22    IClientModule, IdxRange, OutPointRange, PrimaryModulePriority,
23};
24use fedimint_client_module::oplog::IOperationLog;
25use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy as _};
26use fedimint_client_module::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
27use fedimint_client_module::sm::{ActiveStateMeta, DynState, InactiveStateMeta};
28use fedimint_client_module::transaction::{
29    TRANSACTION_SUBMISSION_MODULE_INSTANCE, TransactionBuilder, TxSubmissionStates,
30    TxSubmissionStatesSM,
31};
32use fedimint_client_module::{
33    AddStateMachinesResult, ClientModuleInstance, GetInviteCodeRequest, ModuleGlobalContextGen,
34    ModuleRecoveryCompleted, TransactionUpdates, TxCreatedEvent,
35};
36use fedimint_core::config::{
37    ClientConfig, FederationId, GlobalClientConfig, JsonClientConfig, ModuleInitRegistry,
38};
39use fedimint_core::core::{DynInput, DynOutput, ModuleInstanceId, ModuleKind, OperationId};
40use fedimint_core::db::{
41    AutocommitError, Database, DatabaseKey, DatabaseRecord, DatabaseTransaction,
42    IDatabaseTransactionOpsCore as _, IDatabaseTransactionOpsCoreTyped as _, NonCommittable,
43};
44use fedimint_core::encoding::{Decodable, Encodable};
45use fedimint_core::endpoint_constants::{CLIENT_CONFIG_ENDPOINT, VERSION_ENDPOINT};
46use fedimint_core::envs::is_running_in_test_env;
47use fedimint_core::invite_code::InviteCode;
48use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
49use fedimint_core::module::{
50    AmountUnit, Amounts, ApiRequestErased, ApiVersion, MultiApiVersion,
51    SupportedApiVersionsSummary, SupportedCoreApiVersions, SupportedModuleApiVersions,
52};
53use fedimint_core::net::api_announcement::SignedApiAnnouncement;
54use fedimint_core::task::{Elapsed, MaybeSend, MaybeSync, TaskGroup};
55use fedimint_core::transaction::Transaction;
56use fedimint_core::util::backoff_util::custom_backoff;
57use fedimint_core::util::{
58    BoxStream, FmtCompact as _, FmtCompactAnyhow as _, SafeUrl, backoff_util, retry,
59};
60use fedimint_core::{
61    Amount, NumPeers, OutPoint, PeerId, apply, async_trait_maybe_send, maybe_add_send,
62    maybe_add_send_sync, runtime,
63};
64use fedimint_derive_secret::DerivableSecret;
65use fedimint_eventlog::{
66    DBTransactionEventLogExt as _, Event, EventKind, EventLogEntry, EventLogId, EventLogTrimableId,
67    EventPersistence, PersistedLogEntry,
68};
69use fedimint_logging::{LOG_CLIENT, LOG_CLIENT_NET_API, LOG_CLIENT_RECOVERY};
70use futures::stream::FuturesUnordered;
71use futures::{Stream, StreamExt as _};
72use global_ctx::ModuleGlobalClientContext;
73use serde::{Deserialize, Serialize};
74use tokio::sync::{broadcast, watch};
75use tokio_stream::wrappers::WatchStream;
76use tracing::{debug, info, warn};
77
78use crate::ClientBuilder;
79use crate::api_announcements::{ApiAnnouncementPrefix, get_api_urls};
80use crate::backup::Metadata;
81use crate::db::{
82    ApiSecretKey, CachedApiVersionSet, CachedApiVersionSetKey, ChronologicalOperationLogKey,
83    ClientConfigKey, ClientMetadataKey, ClientModuleRecovery, ClientModuleRecoveryState,
84    EncodedClientSecretKey, OperationLogKey, PeerLastApiVersionsSummary,
85    PeerLastApiVersionsSummaryKey, PendingClientConfigKey, apply_migrations_core_client_dbtx,
86    get_decoded_client_secret, verify_client_db_integrity_dbtx,
87};
88use crate::meta::MetaService;
89use crate::module_init::{ClientModuleInitRegistry, DynClientModuleInit, IClientModuleInit};
90use crate::oplog::OperationLog;
91use crate::sm::executor::{
92    ActiveModuleOperationStateKeyPrefix, ActiveOperationStateKeyPrefix, Executor,
93    InactiveModuleOperationStateKeyPrefix, InactiveOperationStateKeyPrefix,
94};
95
96pub(crate) mod builder;
97pub(crate) mod global_ctx;
98pub(crate) mod handle;
99
100/// List of core api versions supported by the implementation.
101/// Notably `major` version is the one being supported, and corresponding
102/// `minor` version is the one required (for given `major` version).
103const SUPPORTED_CORE_API_VERSIONS: &[fedimint_core::module::ApiVersion] =
104    &[ApiVersion { major: 0, minor: 0 }];
105
106/// Primary module candidates at specific priority level
107#[derive(Default)]
108pub(crate) struct PrimaryModuleCandidates {
109    /// Modules that listed specific units they handle
110    specific: BTreeMap<AmountUnit, Vec<ModuleInstanceId>>,
111    /// Modules handling any unit
112    wildcard: Vec<ModuleInstanceId>,
113}
114
115/// Main client type
116///
117/// A handle and API to interacting with a single federation. End user
118/// applications that want to support interacting with multiple federations at
119/// the same time, will need to instantiate and manage multiple instances of
120/// this struct.
121///
122/// Under the hood it is starting and managing service tasks, state machines,
123/// database and other resources required.
124///
125/// This type is shared externally and internally, and
126/// [`crate::ClientHandle`] is responsible for external lifecycle management
127/// and resource freeing of the [`Client`].
128pub struct Client {
129    final_client: FinalClientIface,
130    config: tokio::sync::RwLock<ClientConfig>,
131    api_secret: Option<String>,
132    decoders: ModuleDecoderRegistry,
133    db: Database,
134    federation_id: FederationId,
135    federation_config_meta: BTreeMap<String, String>,
136    primary_modules: BTreeMap<PrimaryModulePriority, PrimaryModuleCandidates>,
137    pub(crate) modules: ClientModuleRegistry,
138    module_inits: ClientModuleInitRegistry,
139    executor: Executor,
140    pub(crate) api: DynGlobalApi,
141    root_secret: DerivableSecret,
142    operation_log: OperationLog,
143    secp_ctx: Secp256k1<secp256k1::All>,
144    meta_service: Arc<MetaService>,
145    connector: ConnectorType,
146
147    task_group: TaskGroup,
148
149    /// Updates about client recovery progress
150    client_recovery_progress_receiver:
151        watch::Receiver<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
152
153    /// Internal client sender to wake up log ordering task every time a
154    /// (unuordered) log event is added.
155    log_ordering_wakeup_tx: watch::Sender<()>,
156    /// Receiver for events fired every time (ordered) log event is added.
157    log_event_added_rx: watch::Receiver<()>,
158    log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
159    request_hook: ApiRequestHook,
160    iroh_enable_dht: bool,
161    iroh_enable_next: bool,
162}
163
164#[derive(Debug, Serialize, Deserialize)]
165struct ListOperationsParams {
166    limit: Option<usize>,
167    last_seen: Option<ChronologicalOperationLogKey>,
168}
169
170#[derive(Debug, Clone, Serialize, Deserialize)]
171pub struct GetOperationIdRequest {
172    operation_id: OperationId,
173}
174
175#[derive(Debug, Clone, Serialize, Deserialize)]
176pub struct GetBalanceChangesRequest {
177    #[serde(default = "AmountUnit::bitcoin")]
178    unit: AmountUnit,
179}
180
181impl Client {
182    /// Initialize a client builder that can be configured to create a new
183    /// client.
184    pub async fn builder() -> anyhow::Result<ClientBuilder> {
185        Ok(ClientBuilder::new())
186    }
187
188    pub fn api(&self) -> &(dyn IGlobalFederationApi + 'static) {
189        self.api.as_ref()
190    }
191
192    pub fn api_clone(&self) -> DynGlobalApi {
193        self.api.clone()
194    }
195
196    /// Get the [`TaskGroup`] that is tied to Client's lifetime.
197    pub fn task_group(&self) -> &TaskGroup {
198        &self.task_group
199    }
200
201    /// Useful for our CLI tooling, not meant for external use
202    #[doc(hidden)]
203    pub fn executor(&self) -> &Executor {
204        &self.executor
205    }
206
207    pub async fn get_config_from_db(db: &Database) -> Option<ClientConfig> {
208        let mut dbtx = db.begin_transaction_nc().await;
209        dbtx.get_value(&ClientConfigKey).await
210    }
211
212    pub async fn get_pending_config_from_db(db: &Database) -> Option<ClientConfig> {
213        let mut dbtx = db.begin_transaction_nc().await;
214        dbtx.get_value(&PendingClientConfigKey).await
215    }
216
217    pub async fn get_api_secret_from_db(db: &Database) -> Option<String> {
218        let mut dbtx = db.begin_transaction_nc().await;
219        dbtx.get_value(&ApiSecretKey).await
220    }
221
222    pub async fn store_encodable_client_secret<T: Encodable>(
223        db: &Database,
224        secret: T,
225    ) -> anyhow::Result<()> {
226        let mut dbtx = db.begin_transaction().await;
227
228        // Don't overwrite an existing secret
229        if dbtx.get_value(&EncodedClientSecretKey).await.is_some() {
230            bail!("Encoded client secret already exists, cannot overwrite")
231        }
232
233        let encoded_secret = T::consensus_encode_to_vec(&secret);
234        dbtx.insert_entry(&EncodedClientSecretKey, &encoded_secret)
235            .await;
236        dbtx.commit_tx().await;
237        Ok(())
238    }
239
240    pub async fn load_decodable_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
241        let Some(secret) = Self::load_decodable_client_secret_opt(db).await? else {
242            bail!("Encoded client secret not present in DB")
243        };
244
245        Ok(secret)
246    }
247    pub async fn load_decodable_client_secret_opt<T: Decodable>(
248        db: &Database,
249    ) -> anyhow::Result<Option<T>> {
250        let mut dbtx = db.begin_transaction_nc().await;
251
252        let client_secret = dbtx.get_value(&EncodedClientSecretKey).await;
253
254        Ok(match client_secret {
255            Some(client_secret) => Some(
256                T::consensus_decode_whole(&client_secret, &ModuleRegistry::default())
257                    .map_err(|e| anyhow!("Decoding failed: {e}"))?,
258            ),
259            None => None,
260        })
261    }
262
263    pub async fn load_or_generate_client_secret(db: &Database) -> anyhow::Result<[u8; 64]> {
264        let client_secret = match Self::load_decodable_client_secret::<[u8; 64]>(db).await {
265            Ok(secret) => secret,
266            _ => {
267                let secret = PlainRootSecretStrategy::random(&mut thread_rng());
268                Self::store_encodable_client_secret(db, secret)
269                    .await
270                    .expect("Storing client secret must work");
271                secret
272            }
273        };
274        Ok(client_secret)
275    }
276
277    pub async fn is_initialized(db: &Database) -> bool {
278        let mut dbtx = db.begin_transaction_nc().await;
279        dbtx.raw_get_bytes(&[ClientConfigKey::DB_PREFIX])
280            .await
281            .expect("Unrecoverable error occurred while reading and entry from the database")
282            .is_some()
283    }
284
285    pub fn start_executor(self: &Arc<Self>) {
286        debug!(
287            target: LOG_CLIENT,
288            "Starting fedimint client executor",
289        );
290        self.executor.start_executor(self.context_gen());
291    }
292
293    pub fn federation_id(&self) -> FederationId {
294        self.federation_id
295    }
296
297    fn context_gen(self: &Arc<Self>) -> ModuleGlobalContextGen {
298        let client_inner = Arc::downgrade(self);
299        Arc::new(move |module_instance, operation| {
300            ModuleGlobalClientContext {
301                client: client_inner
302                    .clone()
303                    .upgrade()
304                    .expect("ModuleGlobalContextGen called after client was dropped"),
305                module_instance_id: module_instance,
306                operation,
307            }
308            .into()
309        })
310    }
311
312    pub async fn config(&self) -> ClientConfig {
313        self.config.read().await.clone()
314    }
315
316    pub fn api_secret(&self) -> &Option<String> {
317        &self.api_secret
318    }
319
320    pub fn decoders(&self) -> &ModuleDecoderRegistry {
321        &self.decoders
322    }
323
324    /// Returns a reference to the module, panics if not found
325    fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
326        self.try_get_module(instance)
327            .expect("Module instance not found")
328    }
329
330    fn try_get_module(
331        &self,
332        instance: ModuleInstanceId,
333    ) -> Option<&maybe_add_send_sync!(dyn IClientModule)> {
334        Some(self.modules.get(instance)?.as_ref())
335    }
336
337    pub fn has_module(&self, instance: ModuleInstanceId) -> bool {
338        self.modules.get(instance).is_some()
339    }
340
341    /// Returns the input amount and output amount of a transaction
342    ///
343    /// # Panics
344    /// If any of the input or output versions in the transaction builder are
345    /// unknown by the respective module.
346    fn transaction_builder_get_balance(&self, builder: &TransactionBuilder) -> (Amounts, Amounts) {
347        // FIXME: prevent overflows, currently not suitable for untrusted input
348        let mut in_amounts = Amounts::ZERO;
349        let mut out_amounts = Amounts::ZERO;
350        let mut fee_amounts = Amounts::ZERO;
351
352        for input in builder.inputs() {
353            let module = self.get_module(input.input.module_instance_id());
354
355            let item_fees = module.input_fee(&input.amounts, &input.input).expect(
356                "We only build transactions with input versions that are supported by the module",
357            );
358
359            in_amounts.checked_add_mut(&input.amounts);
360            fee_amounts.checked_add_mut(&item_fees);
361        }
362
363        for output in builder.outputs() {
364            let module = self.get_module(output.output.module_instance_id());
365
366            let item_fees = module.output_fee(&output.amounts, &output.output).expect(
367                "We only build transactions with output versions that are supported by the module",
368            );
369
370            out_amounts.checked_add_mut(&output.amounts);
371            fee_amounts.checked_add_mut(&item_fees);
372        }
373
374        out_amounts.checked_add_mut(&fee_amounts);
375        (in_amounts, out_amounts)
376    }
377
378    pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
379        Ok((self.federation_id().to_fake_ln_pub_key(&self.secp_ctx)?, 0))
380    }
381
382    /// Get metadata value from the federation config itself
383    pub fn get_config_meta(&self, key: &str) -> Option<String> {
384        self.federation_config_meta.get(key).cloned()
385    }
386
387    pub(crate) fn root_secret(&self) -> DerivableSecret {
388        self.root_secret.clone()
389    }
390
391    pub async fn add_state_machines(
392        &self,
393        dbtx: &mut DatabaseTransaction<'_>,
394        states: Vec<DynState>,
395    ) -> AddStateMachinesResult {
396        self.executor.add_state_machines_dbtx(dbtx, states).await
397    }
398
399    // TODO: implement as part of [`OperationLog`]
400    pub async fn get_active_operations(&self) -> HashSet<OperationId> {
401        let active_states = self.executor.get_active_states().await;
402        let mut active_operations = HashSet::with_capacity(active_states.len());
403        let mut dbtx = self.db().begin_transaction_nc().await;
404        for (state, _) in active_states {
405            let operation_id = state.operation_id();
406            if dbtx
407                .get_value(&OperationLogKey { operation_id })
408                .await
409                .is_some()
410            {
411                active_operations.insert(operation_id);
412            }
413        }
414        active_operations
415    }
416
417    pub fn operation_log(&self) -> &OperationLog {
418        &self.operation_log
419    }
420
421    /// Get the meta manager to read meta fields.
422    pub fn meta_service(&self) -> &Arc<MetaService> {
423        &self.meta_service
424    }
425
426    /// Get the meta manager to read meta fields.
427    pub async fn get_meta_expiration_timestamp(&self) -> Option<SystemTime> {
428        let meta_service = self.meta_service();
429        let ts = meta_service
430            .get_field::<u64>(self.db(), "federation_expiry_timestamp")
431            .await
432            .and_then(|v| v.value)?;
433        Some(UNIX_EPOCH + Duration::from_secs(ts))
434    }
435
436    /// Adds funding to a transaction or removes over-funding via change.
437    async fn finalize_transaction(
438        &self,
439        dbtx: &mut DatabaseTransaction<'_>,
440        operation_id: OperationId,
441        mut partial_transaction: TransactionBuilder,
442    ) -> anyhow::Result<(Transaction, Vec<DynState>, Range<u64>)> {
443        let (in_amounts, out_amounts) = self.transaction_builder_get_balance(&partial_transaction);
444
445        let mut added_inputs_bundles = vec![];
446        let mut added_outputs_bundles = vec![];
447
448        // The way currently things are implemented is OK for modules which can
449        // collect a fee relative to one being used, but will break down in any
450        // fancy scenarios. Future TODOs:
451        //
452        // * create_final_inputs_and_outputs needs to get broken down, so we can use
453        //   primary modules using priorities (possibly separate prios for inputs and
454        //   outputs to be able to drain modules, etc.); we need the split "check if
455        //   possible" and "take" steps,
456        // * extra inputs and outputs adding fees needs to be taken into account,
457        //   possibly with some looping
458        for unit in in_amounts.units().union(&out_amounts.units()) {
459            let input_amount = in_amounts.get(unit).copied().unwrap_or_default();
460            let output_amount = out_amounts.get(unit).copied().unwrap_or_default();
461            if input_amount == output_amount {
462                continue;
463            }
464
465            let Some((module_id, module)) = self.primary_module_for_unit(*unit) else {
466                bail!("No module to balance a partial transaction (affected unit: {unit:?}");
467            };
468
469            let (added_input_bundle, added_output_bundle) = module
470                .create_final_inputs_and_outputs(
471                    module_id,
472                    dbtx,
473                    operation_id,
474                    *unit,
475                    input_amount,
476                    output_amount,
477                )
478                .await?;
479
480            added_inputs_bundles.push(added_input_bundle);
481            added_outputs_bundles.push(added_output_bundle);
482        }
483
484        // This is the range of  outputs that will be added to the transaction
485        // in order to balance it. Notice that it may stay empty in case the transaction
486        // is already balanced.
487        let change_range = Range {
488            start: partial_transaction.outputs().count() as u64,
489            end: (partial_transaction.outputs().count() as u64
490                + added_outputs_bundles
491                    .iter()
492                    .map(|output| output.outputs().len() as u64)
493                    .sum::<u64>()),
494        };
495
496        for added_inputs in added_inputs_bundles {
497            partial_transaction = partial_transaction.with_inputs(added_inputs);
498        }
499
500        for added_outputs in added_outputs_bundles {
501            partial_transaction = partial_transaction.with_outputs(added_outputs);
502        }
503
504        let (input_amounts, output_amounts) =
505            self.transaction_builder_get_balance(&partial_transaction);
506
507        for (unit, output_amount) in output_amounts {
508            let input_amount = input_amounts.get(&unit).copied().unwrap_or_default();
509
510            assert!(input_amount >= output_amount, "Transaction is underfunded");
511        }
512
513        let (tx, states) = partial_transaction.build(&self.secp_ctx, thread_rng());
514
515        Ok((tx, states, change_range))
516    }
517
518    /// Add funding and/or change to the transaction builder as needed, finalize
519    /// the transaction and submit it to the federation.
520    ///
521    /// ## Errors
522    /// The function will return an error if the operation with given ID already
523    /// exists.
524    ///
525    /// ## Panics
526    /// The function will panic if the database transaction collides with
527    /// other and fails with others too often, this should not happen except for
528    /// excessively concurrent scenarios.
529    pub async fn finalize_and_submit_transaction<F, M>(
530        &self,
531        operation_id: OperationId,
532        operation_type: &str,
533        operation_meta_gen: F,
534        tx_builder: TransactionBuilder,
535    ) -> anyhow::Result<OutPointRange>
536    where
537        F: Fn(OutPointRange) -> M + Clone + MaybeSend + MaybeSync,
538        M: serde::Serialize + MaybeSend,
539    {
540        let operation_type = operation_type.to_owned();
541
542        let autocommit_res = self
543            .db
544            .autocommit(
545                |dbtx, _| {
546                    let operation_type = operation_type.clone();
547                    let tx_builder = tx_builder.clone();
548                    let operation_meta_gen = operation_meta_gen.clone();
549                    Box::pin(async move {
550                        if Client::operation_exists_dbtx(dbtx, operation_id).await {
551                            bail!("There already exists an operation with id {operation_id:?}")
552                        }
553
554                        let out_point_range = self
555                            .finalize_and_submit_transaction_inner(dbtx, operation_id, tx_builder)
556                            .await?;
557
558                        self.operation_log()
559                            .add_operation_log_entry_dbtx(
560                                dbtx,
561                                operation_id,
562                                &operation_type,
563                                operation_meta_gen(out_point_range),
564                            )
565                            .await;
566
567                        Ok(out_point_range)
568                    })
569                },
570                Some(100), // TODO: handle what happens after 100 retries
571            )
572            .await;
573
574        match autocommit_res {
575            Ok(txid) => Ok(txid),
576            Err(AutocommitError::ClosureError { error, .. }) => Err(error),
577            Err(AutocommitError::CommitFailed {
578                attempts,
579                last_error,
580            }) => panic!(
581                "Failed to commit tx submission dbtx after {attempts} attempts: {last_error}"
582            ),
583        }
584    }
585
586    async fn finalize_and_submit_transaction_inner(
587        &self,
588        dbtx: &mut DatabaseTransaction<'_>,
589        operation_id: OperationId,
590        tx_builder: TransactionBuilder,
591    ) -> anyhow::Result<OutPointRange> {
592        let (transaction, mut states, change_range) = self
593            .finalize_transaction(&mut dbtx.to_ref_nc(), operation_id, tx_builder)
594            .await?;
595
596        if transaction.consensus_encode_to_vec().len() > Transaction::MAX_TX_SIZE {
597            let inputs = transaction
598                .inputs
599                .iter()
600                .map(DynInput::module_instance_id)
601                .collect::<Vec<_>>();
602            let outputs = transaction
603                .outputs
604                .iter()
605                .map(DynOutput::module_instance_id)
606                .collect::<Vec<_>>();
607            warn!(
608                target: LOG_CLIENT_NET_API,
609                size=%transaction.consensus_encode_to_vec().len(),
610                ?inputs,
611                ?outputs,
612                "Transaction too large",
613            );
614            debug!(target: LOG_CLIENT_NET_API, ?transaction, "transaction details");
615            bail!(
616                "The generated transaction would be rejected by the federation for being too large."
617            );
618        }
619
620        let txid = transaction.tx_hash();
621
622        debug!(target: LOG_CLIENT_NET_API, %txid, ?transaction,  "Finalized and submitting transaction");
623
624        let tx_submission_sm = DynState::from_typed(
625            TRANSACTION_SUBMISSION_MODULE_INSTANCE,
626            TxSubmissionStatesSM {
627                operation_id,
628                state: TxSubmissionStates::Created(transaction),
629            },
630        );
631        states.push(tx_submission_sm);
632
633        self.executor.add_state_machines_dbtx(dbtx, states).await?;
634
635        self.log_event_dbtx(dbtx, None, TxCreatedEvent { txid, operation_id })
636            .await;
637
638        Ok(OutPointRange::new(txid, IdxRange::from(change_range)))
639    }
640
641    async fn transaction_update_stream(
642        &self,
643        operation_id: OperationId,
644    ) -> BoxStream<'static, TxSubmissionStatesSM> {
645        self.executor
646            .notifier()
647            .module_notifier::<TxSubmissionStatesSM>(
648                TRANSACTION_SUBMISSION_MODULE_INSTANCE,
649                self.final_client.clone(),
650            )
651            .subscribe(operation_id)
652            .await
653    }
654
655    pub async fn operation_exists(&self, operation_id: OperationId) -> bool {
656        let mut dbtx = self.db().begin_transaction_nc().await;
657
658        Client::operation_exists_dbtx(&mut dbtx, operation_id).await
659    }
660
661    pub async fn operation_exists_dbtx(
662        dbtx: &mut DatabaseTransaction<'_>,
663        operation_id: OperationId,
664    ) -> bool {
665        let active_state_exists = dbtx
666            .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
667            .await
668            .next()
669            .await
670            .is_some();
671
672        let inactive_state_exists = dbtx
673            .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
674            .await
675            .next()
676            .await
677            .is_some();
678
679        active_state_exists || inactive_state_exists
680    }
681
682    pub async fn has_active_states(&self, operation_id: OperationId) -> bool {
683        self.db
684            .begin_transaction_nc()
685            .await
686            .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
687            .await
688            .next()
689            .await
690            .is_some()
691    }
692
693    /// Waits for an output from the primary module to reach its final
694    /// state.
695    pub async fn await_primary_bitcoin_module_output(
696        &self,
697        operation_id: OperationId,
698        out_point: OutPoint,
699    ) -> anyhow::Result<()> {
700        self.primary_module_for_unit(AmountUnit::BITCOIN)
701            .ok_or_else(|| anyhow!("No primary module available"))?
702            .1
703            .await_primary_module_output(operation_id, out_point)
704            .await
705    }
706
707    /// Returns a reference to a typed module client instance by kind
708    pub fn get_first_module<M: ClientModule>(
709        &'_ self,
710    ) -> anyhow::Result<ClientModuleInstance<'_, M>> {
711        let module_kind = M::kind();
712        let id = self
713            .get_first_instance(&module_kind)
714            .ok_or_else(|| format_err!("No modules found of kind {module_kind}"))?;
715        let module: &M = self
716            .try_get_module(id)
717            .ok_or_else(|| format_err!("Unknown module instance {id}"))?
718            .as_any()
719            .downcast_ref::<M>()
720            .ok_or_else(|| format_err!("Module is not of type {}", std::any::type_name::<M>()))?;
721        let (db, _) = self.db().with_prefix_module_id(id);
722        Ok(ClientModuleInstance {
723            id,
724            db,
725            api: self.api().with_module(id),
726            module,
727        })
728    }
729
730    pub fn get_module_client_dyn(
731        &self,
732        instance_id: ModuleInstanceId,
733    ) -> anyhow::Result<&maybe_add_send_sync!(dyn IClientModule)> {
734        self.try_get_module(instance_id)
735            .ok_or(anyhow!("Unknown module instance {}", instance_id))
736    }
737
738    pub fn db(&self) -> &Database {
739        &self.db
740    }
741
742    /// Returns a stream of transaction updates for the given operation id that
743    /// can later be used to watch for a specific transaction being accepted.
744    pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
745        TransactionUpdates {
746            update_stream: self.transaction_update_stream(operation_id).await,
747        }
748    }
749
750    /// Returns the instance id of the first module of the given kind.
751    pub fn get_first_instance(&self, module_kind: &ModuleKind) -> Option<ModuleInstanceId> {
752        self.modules
753            .iter_modules()
754            .find(|(_, kind, _module)| *kind == module_kind)
755            .map(|(instance_id, _, _)| instance_id)
756    }
757
758    /// Returns the data from which the client's root secret is derived (e.g.
759    /// BIP39 seed phrase struct).
760    pub async fn root_secret_encoding<T: Decodable>(&self) -> anyhow::Result<T> {
761        get_decoded_client_secret::<T>(self.db()).await
762    }
763
764    /// Waits for outputs from the primary module to reach its final
765    /// state.
766    pub async fn await_primary_bitcoin_module_outputs(
767        &self,
768        operation_id: OperationId,
769        outputs: Vec<OutPoint>,
770    ) -> anyhow::Result<()> {
771        for out_point in outputs {
772            self.await_primary_bitcoin_module_output(operation_id, out_point)
773                .await?;
774        }
775
776        Ok(())
777    }
778
779    /// Returns the config of the client in JSON format.
780    ///
781    /// Compared to the consensus module format where module configs are binary
782    /// encoded this format cannot be cryptographically verified but is easier
783    /// to consume and to some degree human-readable.
784    pub async fn get_config_json(&self) -> JsonClientConfig {
785        self.config().await.to_json()
786    }
787
788    // Ideally this would not be in the API, but there's a lot of places where this
789    // makes it easier.
790    #[doc(hidden)]
791    /// Like [`Self::get_balance`] but returns an error if primary module is not
792    /// available
793    pub async fn get_balance_for_btc(&self) -> anyhow::Result<Amount> {
794        self.get_balance_for_unit(AmountUnit::BITCOIN).await
795    }
796
797    pub async fn get_balance_for_unit(&self, unit: AmountUnit) -> anyhow::Result<Amount> {
798        let (id, module) = self
799            .primary_module_for_unit(unit)
800            .ok_or_else(|| anyhow!("Primary module not available"))?;
801        Ok(module
802            .get_balance(id, &mut self.db().begin_transaction_nc().await, unit)
803            .await)
804    }
805
806    /// Returns a stream that yields the current client balance every time it
807    /// changes.
808    pub async fn subscribe_balance_changes(&self, unit: AmountUnit) -> BoxStream<'static, Amount> {
809        let primary_module_things =
810            if let Some((primary_module_id, primary_module)) = self.primary_module_for_unit(unit) {
811                let balance_changes = primary_module.subscribe_balance_changes().await;
812                let initial_balance = self
813                    .get_balance_for_unit(unit)
814                    .await
815                    .expect("Primary is present");
816
817                Some((
818                    primary_module_id,
819                    primary_module.clone(),
820                    balance_changes,
821                    initial_balance,
822                ))
823            } else {
824                None
825            };
826        let db = self.db().clone();
827
828        Box::pin(async_stream::stream! {
829            let Some((primary_module_id, primary_module, mut balance_changes, initial_balance)) = primary_module_things else {
830                // If there is no primary module, there will not be one until client is
831                // restarted
832                pending().await
833            };
834
835
836            yield initial_balance;
837            let mut prev_balance = initial_balance;
838            while let Some(()) = balance_changes.next().await {
839                let mut dbtx = db.begin_transaction_nc().await;
840                let balance = primary_module
841                     .get_balance(primary_module_id, &mut dbtx, unit)
842                    .await;
843
844                // Deduplicate in case modules cannot always tell if the balance actually changed
845                if balance != prev_balance {
846                    prev_balance = balance;
847                    yield balance;
848                }
849            }
850        })
851    }
852
853    /// Make a single API version request to a peer after a delay.
854    ///
855    /// The delay is here to unify the type of a future both for initial request
856    /// and possible retries.
857    async fn make_api_version_request(
858        delay: Duration,
859        peer_id: PeerId,
860        api: &DynGlobalApi,
861    ) -> (
862        PeerId,
863        Result<SupportedApiVersionsSummary, fedimint_api_client::api::PeerError>,
864    ) {
865        runtime::sleep(delay).await;
866        (
867            peer_id,
868            api.request_single_peer::<SupportedApiVersionsSummary>(
869                VERSION_ENDPOINT.to_owned(),
870                ApiRequestErased::default(),
871                peer_id,
872            )
873            .await,
874        )
875    }
876
877    /// Create a backoff strategy for API version requests.
878    ///
879    /// Keep trying, initially somewhat aggressively, but after a while retry
880    /// very slowly, because chances for response are getting lower and
881    /// lower.
882    fn create_api_version_backoff() -> impl Iterator<Item = Duration> {
883        custom_backoff(Duration::from_millis(200), Duration::from_secs(600), None)
884    }
885
886    /// Query the federation for API version support and then calculate
887    /// the best API version to use (supported by most guardians).
888    pub async fn fetch_common_api_versions_from_all_peers(
889        num_peers: NumPeers,
890        api: DynGlobalApi,
891        db: Database,
892        num_responses_sender: watch::Sender<usize>,
893    ) {
894        let mut backoff = Self::create_api_version_backoff();
895
896        // NOTE: `FuturesUnordered` is a footgun, but since we only poll it for result
897        // and make a single async db write operation, it should be OK.
898        let mut requests = FuturesUnordered::new();
899
900        for peer_id in num_peers.peer_ids() {
901            requests.push(Self::make_api_version_request(
902                Duration::ZERO,
903                peer_id,
904                &api,
905            ));
906        }
907
908        let mut num_responses = 0;
909
910        while let Some((peer_id, response)) = requests.next().await {
911            let retry = match response {
912                Err(err) => {
913                    let has_previous_response = db
914                        .begin_transaction_nc()
915                        .await
916                        .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
917                        .await
918                        .is_some();
919                    debug!(
920                        target: LOG_CLIENT,
921                        %peer_id,
922                        err = %err.fmt_compact(),
923                        %has_previous_response,
924                        "Failed to refresh API versions of a peer"
925                    );
926
927                    !has_previous_response
928                }
929                Ok(o) => {
930                    // Save the response to the database right away, just to
931                    // not lose it
932                    let mut dbtx = db.begin_transaction().await;
933                    dbtx.insert_entry(
934                        &PeerLastApiVersionsSummaryKey(peer_id),
935                        &PeerLastApiVersionsSummary(o),
936                    )
937                    .await;
938                    dbtx.commit_tx().await;
939                    false
940                }
941            };
942
943            if retry {
944                requests.push(Self::make_api_version_request(
945                    backoff.next().expect("Keeps retrying"),
946                    peer_id,
947                    &api,
948                ));
949            } else {
950                num_responses += 1;
951                num_responses_sender.send_replace(num_responses);
952            }
953        }
954    }
955
956    /// Fetch API versions from peers, retrying until we get threshold number of
957    /// successful responses. Returns the successful responses collected
958    /// from at least `num_peers.threshold()` peers.
959    pub async fn fetch_peers_api_versions_from_threshold_of_peers(
960        num_peers: NumPeers,
961        api: DynGlobalApi,
962    ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
963        let mut backoff = Self::create_api_version_backoff();
964
965        // NOTE: `FuturesUnordered` is a footgun, but since we only poll it for result
966        // and collect responses, it should be OK.
967        let mut requests = FuturesUnordered::new();
968
969        for peer_id in num_peers.peer_ids() {
970            requests.push(Self::make_api_version_request(
971                Duration::ZERO,
972                peer_id,
973                &api,
974            ));
975        }
976
977        let mut successful_responses = BTreeMap::new();
978
979        while successful_responses.len() < num_peers.threshold()
980            && let Some((peer_id, response)) = requests.next().await
981        {
982            let retry = match response {
983                Err(err) => {
984                    debug!(
985                        target: LOG_CLIENT,
986                        %peer_id,
987                        err = %err.fmt_compact(),
988                        "Failed to fetch API versions from peer"
989                    );
990                    true
991                }
992                Ok(response) => {
993                    successful_responses.insert(peer_id, response);
994                    false
995                }
996            };
997
998            if retry {
999                requests.push(Self::make_api_version_request(
1000                    backoff.next().expect("Keeps retrying"),
1001                    peer_id,
1002                    &api,
1003                ));
1004            }
1005        }
1006
1007        successful_responses
1008    }
1009
1010    /// Fetch API versions from peers and discover common API versions to use.
1011    pub async fn fetch_common_api_versions(
1012        config: &ClientConfig,
1013        api: &DynGlobalApi,
1014    ) -> anyhow::Result<BTreeMap<PeerId, SupportedApiVersionsSummary>> {
1015        debug!(
1016            target: LOG_CLIENT,
1017            "Fetching common api versions"
1018        );
1019
1020        let num_peers = NumPeers::from(config.global.api_endpoints.len());
1021
1022        let peer_api_version_sets =
1023            Self::fetch_peers_api_versions_from_threshold_of_peers(num_peers, api.clone()).await;
1024
1025        Ok(peer_api_version_sets)
1026    }
1027
1028    /// Write API version set to database cache.
1029    /// Used when we have a pre-calculated API version set that should be stored
1030    /// for later use.
1031    pub async fn write_api_version_cache(
1032        dbtx: &mut DatabaseTransaction<'_>,
1033        api_version_set: ApiVersionSet,
1034    ) {
1035        debug!(
1036            target: LOG_CLIENT,
1037            value = ?api_version_set,
1038            "Writing API version set to cache"
1039        );
1040
1041        dbtx.insert_entry(
1042            &CachedApiVersionSetKey,
1043            &CachedApiVersionSet(api_version_set),
1044        )
1045        .await;
1046    }
1047
1048    /// Store prefetched peer API version responses and calculate/store common
1049    /// API version set. This processes the individual peer responses by
1050    /// storing them in the database and calculating the common API version
1051    /// set for caching.
1052    pub async fn store_prefetched_api_versions(
1053        db: &Database,
1054        config: &ClientConfig,
1055        client_module_init: &ClientModuleInitRegistry,
1056        peer_api_versions: &BTreeMap<PeerId, SupportedApiVersionsSummary>,
1057    ) {
1058        debug!(
1059            target: LOG_CLIENT,
1060            "Storing {} prefetched peer API version responses and calculating common version set",
1061            peer_api_versions.len()
1062        );
1063
1064        let mut dbtx = db.begin_transaction().await;
1065        // Calculate common API version set from individual responses
1066        let client_supported_versions =
1067            Self::supported_api_versions_summary_static(config, client_module_init);
1068        match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1069            &client_supported_versions,
1070            peer_api_versions,
1071        ) {
1072            Ok(common_api_versions) => {
1073                // Write the calculated common API version set to database cache
1074                Self::write_api_version_cache(&mut dbtx.to_ref_nc(), common_api_versions).await;
1075                debug!(target: LOG_CLIENT, "Calculated and stored common API version set");
1076            }
1077            Err(err) => {
1078                debug!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Failed to calculate common API versions from prefetched data");
1079            }
1080        }
1081
1082        // Store individual peer responses to database
1083        for (peer_id, peer_api_versions) in peer_api_versions {
1084            dbtx.insert_entry(
1085                &PeerLastApiVersionsSummaryKey(*peer_id),
1086                &PeerLastApiVersionsSummary(peer_api_versions.clone()),
1087            )
1088            .await;
1089        }
1090        dbtx.commit_tx().await;
1091        debug!(target: LOG_CLIENT, "Stored individual peer API version responses");
1092    }
1093
1094    /// [`SupportedApiVersionsSummary`] that the client and its modules support
1095    pub fn supported_api_versions_summary_static(
1096        config: &ClientConfig,
1097        client_module_init: &ClientModuleInitRegistry,
1098    ) -> SupportedApiVersionsSummary {
1099        SupportedApiVersionsSummary {
1100            core: SupportedCoreApiVersions {
1101                core_consensus: config.global.consensus_version,
1102                api: MultiApiVersion::try_from_iter(SUPPORTED_CORE_API_VERSIONS.to_owned())
1103                    .expect("must not have conflicting versions"),
1104            },
1105            modules: config
1106                .modules
1107                .iter()
1108                .filter_map(|(&module_instance_id, module_config)| {
1109                    client_module_init
1110                        .get(module_config.kind())
1111                        .map(|module_init| {
1112                            (
1113                                module_instance_id,
1114                                SupportedModuleApiVersions {
1115                                    core_consensus: config.global.consensus_version,
1116                                    module_consensus: module_config.version,
1117                                    api: module_init.supported_api_versions(),
1118                                },
1119                            )
1120                        })
1121                })
1122                .collect(),
1123        }
1124    }
1125
1126    pub async fn load_and_refresh_common_api_version(&self) -> anyhow::Result<ApiVersionSet> {
1127        Self::load_and_refresh_common_api_version_static(
1128            &self.config().await,
1129            &self.module_inits,
1130            &self.api,
1131            &self.db,
1132            &self.task_group,
1133        )
1134        .await
1135    }
1136
1137    /// Load the common api versions to use from cache and start a background
1138    /// process to refresh them.
1139    ///
1140    /// This is a compromise, so we not have to wait for version discovery to
1141    /// complete every time a [`Client`] is being built.
1142    async fn load_and_refresh_common_api_version_static(
1143        config: &ClientConfig,
1144        module_init: &ClientModuleInitRegistry,
1145        api: &DynGlobalApi,
1146        db: &Database,
1147        task_group: &TaskGroup,
1148    ) -> anyhow::Result<ApiVersionSet> {
1149        if let Some(v) = db
1150            .begin_transaction_nc()
1151            .await
1152            .get_value(&CachedApiVersionSetKey)
1153            .await
1154        {
1155            debug!(
1156                target: LOG_CLIENT,
1157                "Found existing cached common api versions"
1158            );
1159            let config = config.clone();
1160            let client_module_init = module_init.clone();
1161            let api = api.clone();
1162            let db = db.clone();
1163            let task_group = task_group.clone();
1164            // Separate task group, because we actually don't want to be waiting for this to
1165            // finish, and it's just best effort.
1166            task_group
1167                .clone()
1168                .spawn_cancellable("refresh_common_api_version_static", async move {
1169                    if let Err(error) = Self::refresh_common_api_version_static(
1170                        &config,
1171                        &client_module_init,
1172                        &api,
1173                        &db,
1174                        task_group,
1175                        false,
1176                    )
1177                    .await
1178                    {
1179                        warn!(
1180                            target: LOG_CLIENT,
1181                            err = %error.fmt_compact_anyhow(), "Failed to discover common api versions"
1182                        );
1183                    }
1184                });
1185
1186            return Ok(v.0);
1187        }
1188
1189        info!(
1190            target: LOG_CLIENT,
1191            "Fetching initial API versions "
1192        );
1193        Self::refresh_common_api_version_static(
1194            config,
1195            module_init,
1196            api,
1197            db,
1198            task_group.clone(),
1199            true,
1200        )
1201        .await
1202    }
1203
1204    async fn refresh_common_api_version_static(
1205        config: &ClientConfig,
1206        client_module_init: &ClientModuleInitRegistry,
1207        api: &DynGlobalApi,
1208        db: &Database,
1209        task_group: TaskGroup,
1210        block_until_ok: bool,
1211    ) -> anyhow::Result<ApiVersionSet> {
1212        debug!(
1213            target: LOG_CLIENT,
1214            "Refreshing common api versions"
1215        );
1216
1217        let (num_responses_sender, mut num_responses_receiver) = tokio::sync::watch::channel(0);
1218        let num_peers = NumPeers::from(config.global.api_endpoints.len());
1219
1220        task_group.spawn_cancellable("refresh peers api versions", {
1221            Client::fetch_common_api_versions_from_all_peers(
1222                num_peers,
1223                api.clone(),
1224                db.clone(),
1225                num_responses_sender,
1226            )
1227        });
1228
1229        let common_api_versions = loop {
1230            // Wait to collect enough answers before calculating a set of common api
1231            // versions to use. Note that all peers individual responses from
1232            // previous attempts are still being used, and requests, or even
1233            // retries for response of peers are not actually cancelled, as they
1234            // are happening on a separate task. This is all just to bound the
1235            // time user can be waiting for the join operation to finish, at the
1236            // risk of picking wrong version in very rare circumstances.
1237            let _: Result<_, Elapsed> = runtime::timeout(
1238                Duration::from_secs(30),
1239                num_responses_receiver.wait_for(|num| num_peers.threshold() <= *num),
1240            )
1241            .await;
1242
1243            let peer_api_version_sets = Self::load_peers_last_api_versions(db, num_peers).await;
1244
1245            match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1246                &Self::supported_api_versions_summary_static(config, client_module_init),
1247                &peer_api_version_sets,
1248            ) {
1249                Ok(o) => break o,
1250                Err(err) if block_until_ok => {
1251                    warn!(
1252                        target: LOG_CLIENT,
1253                        err = %err.fmt_compact_anyhow(),
1254                        "Failed to discover API version to use. Retrying..."
1255                    );
1256                    continue;
1257                }
1258                Err(e) => return Err(e),
1259            }
1260        };
1261
1262        debug!(
1263            target: LOG_CLIENT,
1264            value = ?common_api_versions,
1265            "Updating the cached common api versions"
1266        );
1267        let mut dbtx = db.begin_transaction().await;
1268        let _ = dbtx
1269            .insert_entry(
1270                &CachedApiVersionSetKey,
1271                &CachedApiVersionSet(common_api_versions.clone()),
1272            )
1273            .await;
1274
1275        dbtx.commit_tx().await;
1276
1277        Ok(common_api_versions)
1278    }
1279
1280    /// Get the client [`Metadata`]
1281    pub async fn get_metadata(&self) -> Metadata {
1282        self.db
1283            .begin_transaction_nc()
1284            .await
1285            .get_value(&ClientMetadataKey)
1286            .await
1287            .unwrap_or_else(|| {
1288                warn!(
1289                    target: LOG_CLIENT,
1290                    "Missing existing metadata. This key should have been set on Client init"
1291                );
1292                Metadata::empty()
1293            })
1294    }
1295
1296    /// Set the client [`Metadata`]
1297    pub async fn set_metadata(&self, metadata: &Metadata) {
1298        self.db
1299            .autocommit::<_, _, anyhow::Error>(
1300                |dbtx, _| {
1301                    Box::pin(async {
1302                        Self::set_metadata_dbtx(dbtx, metadata).await;
1303                        Ok(())
1304                    })
1305                },
1306                None,
1307            )
1308            .await
1309            .expect("Failed to autocommit metadata");
1310    }
1311
1312    pub fn has_pending_recoveries(&self) -> bool {
1313        !self
1314            .client_recovery_progress_receiver
1315            .borrow()
1316            .iter()
1317            .all(|(_id, progress)| progress.is_done())
1318    }
1319
1320    /// Wait for all module recoveries to finish
1321    ///
1322    /// This will block until the recovery task is done with recoveries.
1323    /// Returns success if all recovery tasks are complete (success case),
1324    /// or an error if some modules could not complete the recovery at the time.
1325    ///
1326    /// A bit of a heavy approach.
1327    pub async fn wait_for_all_recoveries(&self) -> anyhow::Result<()> {
1328        let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1329        recovery_receiver
1330            .wait_for(|in_progress| {
1331                in_progress
1332                    .iter()
1333                    .all(|(_id, progress)| progress.is_done())
1334            })
1335            .await
1336            .context("Recovery task completed and update receiver disconnected, but some modules failed to recover")?;
1337
1338        Ok(())
1339    }
1340
1341    /// Subscribe to recover progress for all the modules.
1342    ///
1343    /// This stream can contain duplicate progress for a module.
1344    /// Don't use this stream for detecting completion of recovery.
1345    pub fn subscribe_to_recovery_progress(
1346        &self,
1347    ) -> impl Stream<Item = (ModuleInstanceId, RecoveryProgress)> + use<> {
1348        WatchStream::new(self.client_recovery_progress_receiver.clone())
1349            .flat_map(futures::stream::iter)
1350    }
1351
1352    pub async fn wait_for_module_kind_recovery(
1353        &self,
1354        module_kind: ModuleKind,
1355    ) -> anyhow::Result<()> {
1356        let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1357        let config = self.config().await;
1358        recovery_receiver
1359            .wait_for(|in_progress| {
1360                !in_progress
1361                    .iter()
1362                    .filter(|(module_instance_id, _progress)| {
1363                        config.modules[module_instance_id].kind == module_kind
1364                    })
1365                    .any(|(_id, progress)| !progress.is_done())
1366            })
1367            .await
1368            .context("Recovery task completed and update receiver disconnected, but the desired modules are still unavailable or failed to recover")?;
1369
1370        Ok(())
1371    }
1372
1373    pub async fn wait_for_all_active_state_machines(&self) -> anyhow::Result<()> {
1374        loop {
1375            if self.executor.get_active_states().await.is_empty() {
1376                break;
1377            }
1378            fedimint_core::runtime::sleep(Duration::from_millis(100)).await;
1379        }
1380        Ok(())
1381    }
1382
1383    /// Set the client [`Metadata`]
1384    pub async fn set_metadata_dbtx(dbtx: &mut DatabaseTransaction<'_>, metadata: &Metadata) {
1385        dbtx.insert_new_entry(&ClientMetadataKey, metadata).await;
1386    }
1387
1388    fn spawn_module_recoveries_task(
1389        &self,
1390        recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1391        module_recoveries: BTreeMap<
1392            ModuleInstanceId,
1393            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1394        >,
1395        module_recovery_progress_receivers: BTreeMap<
1396            ModuleInstanceId,
1397            watch::Receiver<RecoveryProgress>,
1398        >,
1399    ) {
1400        let db = self.db.clone();
1401        let log_ordering_wakeup_tx = self.log_ordering_wakeup_tx.clone();
1402        self.task_group
1403            .spawn("module recoveries", |_task_handle| async {
1404                Self::run_module_recoveries_task(
1405                    db,
1406                    log_ordering_wakeup_tx,
1407                    recovery_sender,
1408                    module_recoveries,
1409                    module_recovery_progress_receivers,
1410                )
1411                .await;
1412            });
1413    }
1414
1415    async fn run_module_recoveries_task(
1416        db: Database,
1417        log_ordering_wakeup_tx: watch::Sender<()>,
1418        recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1419        module_recoveries: BTreeMap<
1420            ModuleInstanceId,
1421            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1422        >,
1423        module_recovery_progress_receivers: BTreeMap<
1424            ModuleInstanceId,
1425            watch::Receiver<RecoveryProgress>,
1426        >,
1427    ) {
1428        debug!(target: LOG_CLIENT_RECOVERY, num_modules=%module_recovery_progress_receivers.len(), "Staring module recoveries");
1429        let mut completed_stream = Vec::new();
1430        let progress_stream = futures::stream::FuturesUnordered::new();
1431
1432        for (module_instance_id, f) in module_recoveries {
1433            completed_stream.push(futures::stream::once(Box::pin(async move {
1434                match f.await {
1435                    Ok(()) => (module_instance_id, None),
1436                    Err(err) => {
1437                        warn!(
1438                            target: LOG_CLIENT,
1439                            err = %err.fmt_compact_anyhow(), module_instance_id, "Module recovery failed"
1440                        );
1441                        // a module recovery that failed reports and error and
1442                        // just never finishes, so we don't need a separate state
1443                        // for it
1444                        futures::future::pending::<()>().await;
1445                        unreachable!()
1446                    }
1447                }
1448            })));
1449        }
1450
1451        for (module_instance_id, rx) in module_recovery_progress_receivers {
1452            progress_stream.push(
1453                tokio_stream::wrappers::WatchStream::new(rx)
1454                    .fuse()
1455                    .map(move |progress| (module_instance_id, Some(progress))),
1456            );
1457        }
1458
1459        let mut futures = futures::stream::select(
1460            futures::stream::select_all(progress_stream),
1461            futures::stream::select_all(completed_stream),
1462        );
1463
1464        while let Some((module_instance_id, progress)) = futures.next().await {
1465            let mut dbtx = db.begin_transaction().await;
1466
1467            let prev_progress = *recovery_sender
1468                .borrow()
1469                .get(&module_instance_id)
1470                .expect("existing progress must be present");
1471
1472            let progress = if prev_progress.is_done() {
1473                // since updates might be out of order, once done, stick with it
1474                prev_progress
1475            } else if let Some(progress) = progress {
1476                progress
1477            } else {
1478                prev_progress.to_complete()
1479            };
1480
1481            if !prev_progress.is_done() && progress.is_done() {
1482                info!(
1483                    target: LOG_CLIENT,
1484                    module_instance_id,
1485                    progress = format!("{}/{}", progress.complete, progress.total),
1486                    "Recovery complete"
1487                );
1488                dbtx.log_event(
1489                    log_ordering_wakeup_tx.clone(),
1490                    None,
1491                    ModuleRecoveryCompleted {
1492                        module_id: module_instance_id,
1493                    },
1494                )
1495                .await;
1496            } else {
1497                info!(
1498                    target: LOG_CLIENT,
1499                    module_instance_id,
1500                    progress = format!("{}/{}", progress.complete, progress.total),
1501                    "Recovery progress"
1502                );
1503            }
1504
1505            dbtx.insert_entry(
1506                &ClientModuleRecovery { module_instance_id },
1507                &ClientModuleRecoveryState { progress },
1508            )
1509            .await;
1510            dbtx.commit_tx().await;
1511
1512            recovery_sender.send_modify(|v| {
1513                v.insert(module_instance_id, progress);
1514            });
1515        }
1516        debug!(target: LOG_CLIENT_RECOVERY, "Recovery executor stopped");
1517    }
1518
1519    async fn load_peers_last_api_versions(
1520        db: &Database,
1521        num_peers: NumPeers,
1522    ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1523        let mut peer_api_version_sets = BTreeMap::new();
1524
1525        let mut dbtx = db.begin_transaction_nc().await;
1526        for peer_id in num_peers.peer_ids() {
1527            if let Some(v) = dbtx
1528                .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1529                .await
1530            {
1531                peer_api_version_sets.insert(peer_id, v.0);
1532            }
1533        }
1534        drop(dbtx);
1535        peer_api_version_sets
1536    }
1537
1538    /// You likely want to use [`Client::get_peer_urls`]. This function returns
1539    /// only the announcements and doesn't use the config as fallback.
1540    pub async fn get_peer_url_announcements(&self) -> BTreeMap<PeerId, SignedApiAnnouncement> {
1541        self.db()
1542            .begin_transaction_nc()
1543            .await
1544            .find_by_prefix(&ApiAnnouncementPrefix)
1545            .await
1546            .map(|(announcement_key, announcement)| (announcement_key.0, announcement))
1547            .collect()
1548            .await
1549    }
1550
1551    /// Returns a list of guardian API URLs
1552    pub async fn get_peer_urls(&self) -> BTreeMap<PeerId, SafeUrl> {
1553        get_api_urls(&self.db, &self.config().await).await
1554    }
1555
1556    /// Create an invite code with the api endpoint of the given peer which can
1557    /// be used to download this client config
1558    pub async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
1559        self.get_peer_urls()
1560            .await
1561            .into_iter()
1562            .find_map(|(peer_id, url)| (peer == peer_id).then_some(url))
1563            .map(|peer_url| {
1564                InviteCode::new(
1565                    peer_url.clone(),
1566                    peer,
1567                    self.federation_id(),
1568                    self.api_secret.clone(),
1569                )
1570            })
1571    }
1572
1573    /// Blocks till the client has synced the guardian public key set
1574    /// (introduced in version 0.4) and returns it. Once it has been fetched
1575    /// once this function is guaranteed to return immediately.
1576    pub async fn get_guardian_public_keys_blocking(
1577        &self,
1578    ) -> BTreeMap<PeerId, fedimint_core::secp256k1::PublicKey> {
1579        self.db
1580            .autocommit(
1581                |dbtx, _| {
1582                    Box::pin(async move {
1583                        let config = self.config().await;
1584
1585                        let guardian_pub_keys = self
1586                            .get_or_backfill_broadcast_public_keys(dbtx, config)
1587                            .await;
1588
1589                        Result::<_, ()>::Ok(guardian_pub_keys)
1590                    })
1591                },
1592                None,
1593            )
1594            .await
1595            .expect("Will retry forever")
1596    }
1597
1598    async fn get_or_backfill_broadcast_public_keys(
1599        &self,
1600        dbtx: &mut DatabaseTransaction<'_>,
1601        config: ClientConfig,
1602    ) -> BTreeMap<PeerId, PublicKey> {
1603        match config.global.broadcast_public_keys {
1604            Some(guardian_pub_keys) => guardian_pub_keys,
1605            _ => {
1606                let (guardian_pub_keys, new_config) = self.fetch_and_update_config(config).await;
1607
1608                dbtx.insert_entry(&ClientConfigKey, &new_config).await;
1609                *(self.config.write().await) = new_config;
1610                guardian_pub_keys
1611            }
1612        }
1613    }
1614
1615    async fn fetch_session_count(&self) -> FederationResult<u64> {
1616        self.api.session_count().await
1617    }
1618
1619    async fn fetch_and_update_config(
1620        &self,
1621        config: ClientConfig,
1622    ) -> (BTreeMap<PeerId, PublicKey>, ClientConfig) {
1623        let fetched_config = retry(
1624            "Fetching guardian public keys",
1625            backoff_util::background_backoff(),
1626            || async {
1627                Ok(self
1628                    .api
1629                    .request_current_consensus::<ClientConfig>(
1630                        CLIENT_CONFIG_ENDPOINT.to_owned(),
1631                        ApiRequestErased::default(),
1632                    )
1633                    .await?)
1634            },
1635        )
1636        .await
1637        .expect("Will never return on error");
1638
1639        let Some(guardian_pub_keys) = fetched_config.global.broadcast_public_keys else {
1640            warn!(
1641                target: LOG_CLIENT,
1642                "Guardian public keys not found in fetched config, server not updated to 0.4 yet"
1643            );
1644            pending::<()>().await;
1645            unreachable!("Pending will never return");
1646        };
1647
1648        let new_config = ClientConfig {
1649            global: GlobalClientConfig {
1650                broadcast_public_keys: Some(guardian_pub_keys.clone()),
1651                ..config.global
1652            },
1653            modules: config.modules,
1654        };
1655        (guardian_pub_keys, new_config)
1656    }
1657
1658    pub fn handle_global_rpc(
1659        &self,
1660        method: String,
1661        params: serde_json::Value,
1662    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1663        Box::pin(try_stream! {
1664            match method.as_str() {
1665                "get_balance" => {
1666                    let balance = self.get_balance_for_btc().await.unwrap_or_default();
1667                    yield serde_json::to_value(balance)?;
1668                }
1669                "subscribe_balance_changes" => {
1670                    let req: GetBalanceChangesRequest= serde_json::from_value(params)?;
1671                    let mut stream = self.subscribe_balance_changes(req.unit).await;
1672                    while let Some(balance) = stream.next().await {
1673                        yield serde_json::to_value(balance)?;
1674                    }
1675                }
1676                "get_config" => {
1677                    let config = self.config().await;
1678                    yield serde_json::to_value(config)?;
1679                }
1680                "get_federation_id" => {
1681                    let federation_id = self.federation_id();
1682                    yield serde_json::to_value(federation_id)?;
1683                }
1684                "get_invite_code" => {
1685                    let req: GetInviteCodeRequest = serde_json::from_value(params)?;
1686                    let invite_code = self.invite_code(req.peer).await;
1687                    yield serde_json::to_value(invite_code)?;
1688                }
1689                "get_operation" => {
1690                    let req: GetOperationIdRequest = serde_json::from_value(params)?;
1691                    let operation = self.operation_log().get_operation(req.operation_id).await;
1692                    yield serde_json::to_value(operation)?;
1693                }
1694                "list_operations" => {
1695                    let req: ListOperationsParams = serde_json::from_value(params)?;
1696                    let limit = if req.limit.is_none() && req.last_seen.is_none() {
1697                        usize::MAX
1698                    } else {
1699                        req.limit.unwrap_or(usize::MAX)
1700                    };
1701                    let operations = self.operation_log()
1702                        .paginate_operations_rev(limit, req.last_seen)
1703                        .await;
1704                    yield serde_json::to_value(operations)?;
1705                }
1706                "session_count" => {
1707                    let count = self.fetch_session_count().await?;
1708                    yield serde_json::to_value(count)?;
1709                }
1710                "has_pending_recoveries" => {
1711                    let has_pending = self.has_pending_recoveries();
1712                    yield serde_json::to_value(has_pending)?;
1713                }
1714                "wait_for_all_recoveries" => {
1715                    self.wait_for_all_recoveries().await?;
1716                    yield serde_json::Value::Null;
1717                }
1718                "subscribe_to_recovery_progress" => {
1719                    let mut stream = self.subscribe_to_recovery_progress();
1720                    while let Some((module_id, progress)) = stream.next().await {
1721                        yield serde_json::json!({
1722                            "module_id": module_id,
1723                            "progress": progress
1724                        });
1725                    }
1726                }
1727                "backup_to_federation" => {
1728                    let metadata = if params.is_null() {
1729                        Metadata::from_json_serialized(serde_json::json!({}))
1730                    } else {
1731                        Metadata::from_json_serialized(params)
1732                    };
1733                    self.backup_to_federation(metadata).await?;
1734                    yield serde_json::Value::Null;
1735                }
1736                _ => {
1737                    Err(anyhow::format_err!("Unknown method: {}", method))?;
1738                    unreachable!()
1739                },
1740            }
1741        })
1742    }
1743
1744    pub async fn log_event<E>(&self, module_id: Option<ModuleInstanceId>, event: E)
1745    where
1746        E: Event + Send,
1747    {
1748        let mut dbtx = self.db.begin_transaction().await;
1749        self.log_event_dbtx(&mut dbtx, module_id, event).await;
1750        dbtx.commit_tx().await;
1751    }
1752
1753    pub async fn log_event_dbtx<E, Cap>(
1754        &self,
1755        dbtx: &mut DatabaseTransaction<'_, Cap>,
1756        module_id: Option<ModuleInstanceId>,
1757        event: E,
1758    ) where
1759        E: Event + Send,
1760        Cap: Send,
1761    {
1762        dbtx.log_event(self.log_ordering_wakeup_tx.clone(), module_id, event)
1763            .await;
1764    }
1765
1766    pub async fn log_event_raw_dbtx<Cap>(
1767        &self,
1768        dbtx: &mut DatabaseTransaction<'_, Cap>,
1769        kind: EventKind,
1770        module: Option<(ModuleKind, ModuleInstanceId)>,
1771        payload: Vec<u8>,
1772        persist: EventPersistence,
1773    ) where
1774        Cap: Send,
1775    {
1776        let module_id = module.as_ref().map(|m| m.1);
1777        let module_kind = module.map(|m| m.0);
1778        dbtx.log_event_raw(
1779            self.log_ordering_wakeup_tx.clone(),
1780            kind,
1781            module_kind,
1782            module_id,
1783            payload,
1784            persist,
1785        )
1786        .await;
1787    }
1788
1789    pub async fn handle_events<F, R, K>(&self, pos_key: &K, call_fn: F) -> anyhow::Result<()>
1790    where
1791        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1792        K: DatabaseRecord<Value = EventLogId>,
1793        F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
1794        R: Future<Output = anyhow::Result<()>>,
1795    {
1796        fedimint_eventlog::handle_events(
1797            self.db.clone(),
1798            pos_key,
1799            self.log_event_added_rx.clone(),
1800            call_fn,
1801        )
1802        .await
1803    }
1804
1805    pub async fn handle_trimable_events<F, R, K>(
1806        &self,
1807        pos_key: &K,
1808        call_fn: F,
1809    ) -> anyhow::Result<()>
1810    where
1811        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1812        K: DatabaseRecord<Value = EventLogTrimableId>,
1813        F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
1814        R: Future<Output = anyhow::Result<()>>,
1815    {
1816        fedimint_eventlog::handle_trimable_events(
1817            self.db.clone(),
1818            pos_key,
1819            self.log_event_added_rx.clone(),
1820            call_fn,
1821        )
1822        .await
1823    }
1824
1825    pub async fn get_event_log(
1826        &self,
1827        pos: Option<EventLogId>,
1828        limit: u64,
1829    ) -> Vec<PersistedLogEntry> {
1830        self.get_event_log_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
1831            .await
1832    }
1833
1834    pub async fn get_event_log_trimable(
1835        &self,
1836        pos: Option<EventLogTrimableId>,
1837        limit: u64,
1838    ) -> Vec<PersistedLogEntry> {
1839        self.get_event_log_trimable_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
1840            .await
1841    }
1842
1843    pub async fn get_event_log_dbtx<Cap>(
1844        &self,
1845        dbtx: &mut DatabaseTransaction<'_, Cap>,
1846        pos: Option<EventLogId>,
1847        limit: u64,
1848    ) -> Vec<PersistedLogEntry>
1849    where
1850        Cap: Send,
1851    {
1852        dbtx.get_event_log(pos, limit).await
1853    }
1854
1855    pub async fn get_event_log_trimable_dbtx<Cap>(
1856        &self,
1857        dbtx: &mut DatabaseTransaction<'_, Cap>,
1858        pos: Option<EventLogTrimableId>,
1859        limit: u64,
1860    ) -> Vec<PersistedLogEntry>
1861    where
1862        Cap: Send,
1863    {
1864        dbtx.get_event_log_trimable(pos, limit).await
1865    }
1866
1867    /// Register to receiver all new transient (unpersisted) events
1868    pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
1869        self.log_event_added_transient_tx.subscribe()
1870    }
1871
1872    pub fn iroh_enable_dht(&self) -> bool {
1873        self.iroh_enable_dht
1874    }
1875
1876    pub(crate) async fn run_core_migrations(
1877        db_no_decoders: &Database,
1878    ) -> Result<(), anyhow::Error> {
1879        let mut dbtx = db_no_decoders.begin_transaction().await;
1880        apply_migrations_core_client_dbtx(&mut dbtx.to_ref_nc(), "fedimint-client".to_string())
1881            .await?;
1882        if is_running_in_test_env() {
1883            verify_client_db_integrity_dbtx(&mut dbtx.to_ref_nc()).await;
1884        }
1885        dbtx.commit_tx_result().await?;
1886        Ok(())
1887    }
1888
1889    /// Iterator over primary modules for a given `unit`
1890    fn primary_modules_for_unit(
1891        &self,
1892        unit: AmountUnit,
1893    ) -> impl Iterator<Item = (ModuleInstanceId, &DynClientModule)> {
1894        self.primary_modules
1895            .iter()
1896            .flat_map(move |(_prio, candidates)| {
1897                candidates
1898                    .specific
1899                    .get(&unit)
1900                    .into_iter()
1901                    .flatten()
1902                    .copied()
1903                    // within same priority, wildcard matches come last
1904                    .chain(candidates.wildcard.iter().copied())
1905            })
1906            .map(|id| (id, self.modules.get_expect(id)))
1907    }
1908
1909    /// Primary module to use for `unit`
1910    ///
1911    /// Currently, just pick the first (highest priority) match
1912    pub fn primary_module_for_unit(
1913        &self,
1914        unit: AmountUnit,
1915    ) -> Option<(ModuleInstanceId, &DynClientModule)> {
1916        self.primary_modules_for_unit(unit).next()
1917    }
1918
1919    /// [`Self::primary_module_for_unit`] for Bitcoin
1920    pub fn primary_module_for_btc(&self) -> (ModuleInstanceId, &DynClientModule) {
1921        self.primary_module_for_unit(AmountUnit::BITCOIN)
1922            .expect("No primary module for Bitcoin")
1923    }
1924}
1925
1926#[apply(async_trait_maybe_send!)]
1927impl ClientContextIface for Client {
1928    fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
1929        Client::get_module(self, instance)
1930    }
1931
1932    fn api_clone(&self) -> DynGlobalApi {
1933        Client::api_clone(self)
1934    }
1935    fn decoders(&self) -> &ModuleDecoderRegistry {
1936        Client::decoders(self)
1937    }
1938
1939    async fn finalize_and_submit_transaction(
1940        &self,
1941        operation_id: OperationId,
1942        operation_type: &str,
1943        operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
1944        tx_builder: TransactionBuilder,
1945    ) -> anyhow::Result<OutPointRange> {
1946        Client::finalize_and_submit_transaction(
1947            self,
1948            operation_id,
1949            operation_type,
1950            // |out_point_range| operation_meta_gen(out_point_range),
1951            &operation_meta_gen,
1952            tx_builder,
1953        )
1954        .await
1955    }
1956
1957    async fn finalize_and_submit_transaction_inner(
1958        &self,
1959        dbtx: &mut DatabaseTransaction<'_>,
1960        operation_id: OperationId,
1961        tx_builder: TransactionBuilder,
1962    ) -> anyhow::Result<OutPointRange> {
1963        Client::finalize_and_submit_transaction_inner(self, dbtx, operation_id, tx_builder).await
1964    }
1965
1966    async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
1967        Client::transaction_updates(self, operation_id).await
1968    }
1969
1970    async fn await_primary_module_outputs(
1971        &self,
1972        operation_id: OperationId,
1973        // TODO: make `impl Iterator<Item = ...>`
1974        outputs: Vec<OutPoint>,
1975    ) -> anyhow::Result<()> {
1976        Client::await_primary_bitcoin_module_outputs(self, operation_id, outputs).await
1977    }
1978
1979    fn operation_log(&self) -> &dyn IOperationLog {
1980        Client::operation_log(self)
1981    }
1982
1983    async fn has_active_states(&self, operation_id: OperationId) -> bool {
1984        Client::has_active_states(self, operation_id).await
1985    }
1986
1987    async fn operation_exists(&self, operation_id: OperationId) -> bool {
1988        Client::operation_exists(self, operation_id).await
1989    }
1990
1991    async fn config(&self) -> ClientConfig {
1992        Client::config(self).await
1993    }
1994
1995    fn db(&self) -> &Database {
1996        Client::db(self)
1997    }
1998
1999    fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static)) {
2000        Client::executor(self)
2001    }
2002
2003    async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
2004        Client::invite_code(self, peer).await
2005    }
2006
2007    fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
2008        Client::get_internal_payment_markers(self)
2009    }
2010
2011    async fn log_event_json(
2012        &self,
2013        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
2014        module_kind: Option<ModuleKind>,
2015        module_id: ModuleInstanceId,
2016        kind: EventKind,
2017        payload: serde_json::Value,
2018        persist: EventPersistence,
2019    ) {
2020        dbtx.ensure_global()
2021            .expect("Must be called with global dbtx");
2022        self.log_event_raw_dbtx(
2023            dbtx,
2024            kind,
2025            module_kind.map(|kind| (kind, module_id)),
2026            serde_json::to_vec(&payload).expect("Serialization can't fail"),
2027            persist,
2028        )
2029        .await;
2030    }
2031
2032    async fn read_operation_active_states<'dbtx>(
2033        &self,
2034        operation_id: OperationId,
2035        module_id: ModuleInstanceId,
2036        dbtx: &'dbtx mut DatabaseTransaction<'_>,
2037    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>
2038    {
2039        Box::pin(
2040            dbtx.find_by_prefix(&ActiveModuleOperationStateKeyPrefix {
2041                operation_id,
2042                module_instance: module_id,
2043            })
2044            .await
2045            .map(move |(k, v)| (k.0, v)),
2046        )
2047    }
2048    async fn read_operation_inactive_states<'dbtx>(
2049        &self,
2050        operation_id: OperationId,
2051        module_id: ModuleInstanceId,
2052        dbtx: &'dbtx mut DatabaseTransaction<'_>,
2053    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>
2054    {
2055        Box::pin(
2056            dbtx.find_by_prefix(&InactiveModuleOperationStateKeyPrefix {
2057                operation_id,
2058                module_instance: module_id,
2059            })
2060            .await
2061            .map(move |(k, v)| (k.0, v)),
2062        )
2063    }
2064}
2065
2066// TODO: impl `Debug` for `Client` and derive here
2067impl fmt::Debug for Client {
2068    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2069        write!(f, "Client")
2070    }
2071}
2072
2073pub fn client_decoders<'a>(
2074    registry: &ModuleInitRegistry<DynClientModuleInit>,
2075    module_kinds: impl Iterator<Item = (ModuleInstanceId, &'a ModuleKind)>,
2076) -> ModuleDecoderRegistry {
2077    let mut modules = BTreeMap::new();
2078    for (id, kind) in module_kinds {
2079        let Some(init) = registry.get(kind) else {
2080            debug!("Detected configuration for unsupported module id: {id}, kind: {kind}");
2081            continue;
2082        };
2083
2084        modules.insert(
2085            id,
2086            (
2087                kind.clone(),
2088                IClientModuleInit::decoder(AsRef::<dyn IClientModuleInit + 'static>::as_ref(init)),
2089            ),
2090        );
2091    }
2092    ModuleDecoderRegistry::from(modules)
2093}