fedimint_client/
client.rs

1use std::collections::{BTreeMap, HashSet};
2use std::fmt::{self, Formatter};
3use std::future::{Future, pending};
4use std::ops::Range;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9use anyhow::{Context as _, anyhow, bail, format_err};
10use async_stream::try_stream;
11use bitcoin::key::Secp256k1;
12use bitcoin::key::rand::thread_rng;
13use bitcoin::secp256k1::{self, PublicKey};
14use fedimint_api_client::api::global_api::with_request_hook::ApiRequestHook;
15use fedimint_api_client::api::net::ConnectorType;
16use fedimint_api_client::api::{
17    ApiVersionSet, DynGlobalApi, FederationApiExt as _, FederationResult, IGlobalFederationApi,
18};
19use fedimint_client_module::module::recovery::RecoveryProgress;
20use fedimint_client_module::module::{
21    ClientContextIface, ClientModule, ClientModuleRegistry, DynClientModule, FinalClientIface,
22    IClientModule, IdxRange, OutPointRange, PrimaryModulePriority,
23};
24use fedimint_client_module::oplog::IOperationLog;
25use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy as _};
26use fedimint_client_module::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
27use fedimint_client_module::sm::{ActiveStateMeta, DynState, InactiveStateMeta};
28use fedimint_client_module::transaction::{
29    TRANSACTION_SUBMISSION_MODULE_INSTANCE, TransactionBuilder, TxSubmissionStates,
30    TxSubmissionStatesSM,
31};
32use fedimint_client_module::{
33    AddStateMachinesResult, ClientModuleInstance, GetInviteCodeRequest, ModuleGlobalContextGen,
34    ModuleRecoveryCompleted, TransactionUpdates, TxCreatedEvent,
35};
36use fedimint_connectors::ConnectorRegistry;
37use fedimint_core::config::{
38    ClientConfig, FederationId, GlobalClientConfig, JsonClientConfig, ModuleInitRegistry,
39};
40use fedimint_core::core::{DynInput, DynOutput, ModuleInstanceId, ModuleKind, OperationId};
41use fedimint_core::db::{
42    AutocommitError, Database, DatabaseRecord, DatabaseTransaction,
43    IDatabaseTransactionOpsCore as _, IDatabaseTransactionOpsCoreTyped as _, NonCommittable,
44};
45use fedimint_core::encoding::{Decodable, Encodable};
46use fedimint_core::endpoint_constants::{CLIENT_CONFIG_ENDPOINT, VERSION_ENDPOINT};
47use fedimint_core::envs::is_running_in_test_env;
48use fedimint_core::invite_code::InviteCode;
49use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
50use fedimint_core::module::{
51    AmountUnit, Amounts, ApiRequestErased, ApiVersion, MultiApiVersion,
52    SupportedApiVersionsSummary, SupportedCoreApiVersions, SupportedModuleApiVersions,
53};
54use fedimint_core::net::api_announcement::SignedApiAnnouncement;
55use fedimint_core::runtime::sleep;
56use fedimint_core::task::{Elapsed, MaybeSend, MaybeSync, TaskGroup};
57use fedimint_core::transaction::Transaction;
58use fedimint_core::util::backoff_util::custom_backoff;
59use fedimint_core::util::{
60    BoxStream, FmtCompact as _, FmtCompactAnyhow as _, SafeUrl, backoff_util, retry,
61};
62use fedimint_core::{
63    Amount, NumPeers, OutPoint, PeerId, apply, async_trait_maybe_send, maybe_add_send,
64    maybe_add_send_sync, runtime,
65};
66use fedimint_derive_secret::DerivableSecret;
67use fedimint_eventlog::{
68    DBTransactionEventLogExt as _, DynEventLogTrimableTracker, Event, EventKind, EventLogEntry,
69    EventLogId, EventLogTrimableId, EventLogTrimableTracker, EventPersistence, PersistedLogEntry,
70};
71use fedimint_logging::{LOG_CLIENT, LOG_CLIENT_NET_API, LOG_CLIENT_RECOVERY};
72use futures::stream::FuturesUnordered;
73use futures::{Stream, StreamExt as _};
74use global_ctx::ModuleGlobalClientContext;
75use serde::{Deserialize, Serialize};
76use tokio::sync::{broadcast, watch};
77use tokio_stream::wrappers::WatchStream;
78use tracing::{debug, info, warn};
79
80use crate::ClientBuilder;
81use crate::api_announcements::{ApiAnnouncementPrefix, get_api_urls};
82use crate::backup::Metadata;
83use crate::client::event_log::DefaultApplicationEventLogKey;
84use crate::db::{
85    ApiSecretKey, CachedApiVersionSet, CachedApiVersionSetKey, ChronologicalOperationLogKey,
86    ClientConfigKey, ClientMetadataKey, ClientModuleRecovery, ClientModuleRecoveryState,
87    EncodedClientSecretKey, OperationLogKey, PeerLastApiVersionsSummary,
88    PeerLastApiVersionsSummaryKey, PendingClientConfigKey, apply_migrations_core_client_dbtx,
89    get_decoded_client_secret, verify_client_db_integrity_dbtx,
90};
91use crate::meta::MetaService;
92use crate::module_init::{ClientModuleInitRegistry, DynClientModuleInit, IClientModuleInit};
93use crate::oplog::OperationLog;
94use crate::sm::executor::{
95    ActiveModuleOperationStateKeyPrefix, ActiveOperationStateKeyPrefix, Executor,
96    InactiveModuleOperationStateKeyPrefix, InactiveOperationStateKeyPrefix,
97};
98
99pub(crate) mod builder;
100pub(crate) mod event_log;
101pub(crate) mod global_ctx;
102pub(crate) mod handle;
103
104/// List of core api versions supported by the implementation.
105/// Notably `major` version is the one being supported, and corresponding
106/// `minor` version is the one required (for given `major` version).
107const SUPPORTED_CORE_API_VERSIONS: &[fedimint_core::module::ApiVersion] =
108    &[ApiVersion { major: 0, minor: 0 }];
109
110/// Primary module candidates at specific priority level
111#[derive(Default)]
112pub(crate) struct PrimaryModuleCandidates {
113    /// Modules that listed specific units they handle
114    specific: BTreeMap<AmountUnit, Vec<ModuleInstanceId>>,
115    /// Modules handling any unit
116    wildcard: Vec<ModuleInstanceId>,
117}
118
119/// Main client type
120///
121/// A handle and API to interacting with a single federation. End user
122/// applications that want to support interacting with multiple federations at
123/// the same time, will need to instantiate and manage multiple instances of
124/// this struct.
125///
126/// Under the hood it is starting and managing service tasks, state machines,
127/// database and other resources required.
128///
129/// This type is shared externally and internally, and
130/// [`crate::ClientHandle`] is responsible for external lifecycle management
131/// and resource freeing of the [`Client`].
132pub struct Client {
133    final_client: FinalClientIface,
134    config: tokio::sync::RwLock<ClientConfig>,
135    api_secret: Option<String>,
136    decoders: ModuleDecoderRegistry,
137    connectors: ConnectorRegistry,
138    db: Database,
139    federation_id: FederationId,
140    federation_config_meta: BTreeMap<String, String>,
141    primary_modules: BTreeMap<PrimaryModulePriority, PrimaryModuleCandidates>,
142    pub(crate) modules: ClientModuleRegistry,
143    module_inits: ClientModuleInitRegistry,
144    executor: Executor,
145    pub(crate) api: DynGlobalApi,
146    root_secret: DerivableSecret,
147    operation_log: OperationLog,
148    secp_ctx: Secp256k1<secp256k1::All>,
149    meta_service: Arc<MetaService>,
150    connector: ConnectorType,
151
152    task_group: TaskGroup,
153
154    /// Updates about client recovery progress
155    client_recovery_progress_receiver:
156        watch::Receiver<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
157
158    /// Internal client sender to wake up log ordering task every time a
159    /// (unuordered) log event is added.
160    log_ordering_wakeup_tx: watch::Sender<()>,
161    /// Receiver for events fired every time (ordered) log event is added.
162    log_event_added_rx: watch::Receiver<()>,
163    log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
164    request_hook: ApiRequestHook,
165    iroh_enable_dht: bool,
166    iroh_enable_next: bool,
167}
168
169#[derive(Debug, Serialize, Deserialize)]
170struct ListOperationsParams {
171    limit: Option<usize>,
172    last_seen: Option<ChronologicalOperationLogKey>,
173}
174
175#[derive(Debug, Clone, Serialize, Deserialize)]
176pub struct GetOperationIdRequest {
177    operation_id: OperationId,
178}
179
180#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct GetBalanceChangesRequest {
182    #[serde(default = "AmountUnit::bitcoin")]
183    unit: AmountUnit,
184}
185
186impl Client {
187    /// Initialize a client builder that can be configured to create a new
188    /// client.
189    pub async fn builder() -> anyhow::Result<ClientBuilder> {
190        Ok(ClientBuilder::new())
191    }
192
193    pub fn api(&self) -> &(dyn IGlobalFederationApi + 'static) {
194        self.api.as_ref()
195    }
196
197    pub fn api_clone(&self) -> DynGlobalApi {
198        self.api.clone()
199    }
200
201    /// Returns a stream that emits the current connection status of all peers
202    /// whenever any peer's status changes. Emits initial state immediately.
203    pub fn connection_status_stream(&self) -> impl Stream<Item = BTreeMap<PeerId, bool>> {
204        self.api.connection_status_stream()
205    }
206
207    /// Get the [`TaskGroup`] that is tied to Client's lifetime.
208    pub fn task_group(&self) -> &TaskGroup {
209        &self.task_group
210    }
211
212    /// Useful for our CLI tooling, not meant for external use
213    #[doc(hidden)]
214    pub fn executor(&self) -> &Executor {
215        &self.executor
216    }
217
218    pub async fn get_config_from_db(db: &Database) -> Option<ClientConfig> {
219        let mut dbtx = db.begin_transaction_nc().await;
220        dbtx.get_value(&ClientConfigKey).await
221    }
222
223    pub async fn get_pending_config_from_db(db: &Database) -> Option<ClientConfig> {
224        let mut dbtx = db.begin_transaction_nc().await;
225        dbtx.get_value(&PendingClientConfigKey).await
226    }
227
228    pub async fn get_api_secret_from_db(db: &Database) -> Option<String> {
229        let mut dbtx = db.begin_transaction_nc().await;
230        dbtx.get_value(&ApiSecretKey).await
231    }
232
233    pub async fn store_encodable_client_secret<T: Encodable>(
234        db: &Database,
235        secret: T,
236    ) -> anyhow::Result<()> {
237        let mut dbtx = db.begin_transaction().await;
238
239        // Don't overwrite an existing secret
240        if dbtx.get_value(&EncodedClientSecretKey).await.is_some() {
241            bail!("Encoded client secret already exists, cannot overwrite")
242        }
243
244        let encoded_secret = T::consensus_encode_to_vec(&secret);
245        dbtx.insert_entry(&EncodedClientSecretKey, &encoded_secret)
246            .await;
247        dbtx.commit_tx().await;
248        Ok(())
249    }
250
251    pub async fn load_decodable_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
252        let Some(secret) = Self::load_decodable_client_secret_opt(db).await? else {
253            bail!("Encoded client secret not present in DB")
254        };
255
256        Ok(secret)
257    }
258    pub async fn load_decodable_client_secret_opt<T: Decodable>(
259        db: &Database,
260    ) -> anyhow::Result<Option<T>> {
261        let mut dbtx = db.begin_transaction_nc().await;
262
263        let client_secret = dbtx.get_value(&EncodedClientSecretKey).await;
264
265        Ok(match client_secret {
266            Some(client_secret) => Some(
267                T::consensus_decode_whole(&client_secret, &ModuleRegistry::default())
268                    .map_err(|e| anyhow!("Decoding failed: {e}"))?,
269            ),
270            None => None,
271        })
272    }
273
274    pub async fn load_or_generate_client_secret(db: &Database) -> anyhow::Result<[u8; 64]> {
275        let client_secret = match Self::load_decodable_client_secret::<[u8; 64]>(db).await {
276            Ok(secret) => secret,
277            _ => {
278                let secret = PlainRootSecretStrategy::random(&mut thread_rng());
279                Self::store_encodable_client_secret(db, secret)
280                    .await
281                    .expect("Storing client secret must work");
282                secret
283            }
284        };
285        Ok(client_secret)
286    }
287
288    pub async fn is_initialized(db: &Database) -> bool {
289        let mut dbtx = db.begin_transaction_nc().await;
290        dbtx.raw_get_bytes(&[ClientConfigKey::DB_PREFIX])
291            .await
292            .expect("Unrecoverable error occurred while reading and entry from the database")
293            .is_some()
294    }
295
296    pub fn start_executor(self: &Arc<Self>) {
297        debug!(
298            target: LOG_CLIENT,
299            "Starting fedimint client executor",
300        );
301        self.executor.start_executor(self.context_gen());
302    }
303
304    pub fn federation_id(&self) -> FederationId {
305        self.federation_id
306    }
307
308    fn context_gen(self: &Arc<Self>) -> ModuleGlobalContextGen {
309        let client_inner = Arc::downgrade(self);
310        Arc::new(move |module_instance, operation| {
311            ModuleGlobalClientContext {
312                client: client_inner
313                    .clone()
314                    .upgrade()
315                    .expect("ModuleGlobalContextGen called after client was dropped"),
316                module_instance_id: module_instance,
317                operation,
318            }
319            .into()
320        })
321    }
322
323    pub async fn config(&self) -> ClientConfig {
324        self.config.read().await.clone()
325    }
326
327    // TODO: change to `-> Option<&str>`
328    pub fn api_secret(&self) -> &Option<String> {
329        &self.api_secret
330    }
331
332    /// Returns the core API version that the federation supports
333    ///
334    /// This reads from the cached version stored during client initialization.
335    /// If no cache is available (e.g., during initial setup), returns a default
336    /// version (0, 0).
337    pub async fn core_api_version(&self) -> ApiVersion {
338        // Try to get from cache. If not available, return a conservative
339        // default. The cache should always be populated after successful client init.
340        self.db
341            .begin_transaction_nc()
342            .await
343            .get_value(&CachedApiVersionSetKey)
344            .await
345            .map(|cached: CachedApiVersionSet| cached.0.core)
346            .unwrap_or(ApiVersion { major: 0, minor: 0 })
347    }
348
349    pub fn decoders(&self) -> &ModuleDecoderRegistry {
350        &self.decoders
351    }
352
353    /// Returns a reference to the module, panics if not found
354    fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
355        self.try_get_module(instance)
356            .expect("Module instance not found")
357    }
358
359    fn try_get_module(
360        &self,
361        instance: ModuleInstanceId,
362    ) -> Option<&maybe_add_send_sync!(dyn IClientModule)> {
363        Some(self.modules.get(instance)?.as_ref())
364    }
365
366    pub fn has_module(&self, instance: ModuleInstanceId) -> bool {
367        self.modules.get(instance).is_some()
368    }
369
370    /// Returns the input amount and output amount of a transaction
371    ///
372    /// # Panics
373    /// If any of the input or output versions in the transaction builder are
374    /// unknown by the respective module.
375    fn transaction_builder_get_balance(&self, builder: &TransactionBuilder) -> (Amounts, Amounts) {
376        // FIXME: prevent overflows, currently not suitable for untrusted input
377        let mut in_amounts = Amounts::ZERO;
378        let mut out_amounts = Amounts::ZERO;
379        let mut fee_amounts = Amounts::ZERO;
380
381        for input in builder.inputs() {
382            let module = self.get_module(input.input.module_instance_id());
383
384            let item_fees = module.input_fee(&input.amounts, &input.input).expect(
385                "We only build transactions with input versions that are supported by the module",
386            );
387
388            in_amounts.checked_add_mut(&input.amounts);
389            fee_amounts.checked_add_mut(&item_fees);
390        }
391
392        for output in builder.outputs() {
393            let module = self.get_module(output.output.module_instance_id());
394
395            let item_fees = module.output_fee(&output.amounts, &output.output).expect(
396                "We only build transactions with output versions that are supported by the module",
397            );
398
399            out_amounts.checked_add_mut(&output.amounts);
400            fee_amounts.checked_add_mut(&item_fees);
401        }
402
403        out_amounts.checked_add_mut(&fee_amounts);
404        (in_amounts, out_amounts)
405    }
406
407    pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
408        Ok((self.federation_id().to_fake_ln_pub_key(&self.secp_ctx)?, 0))
409    }
410
411    /// Get metadata value from the federation config itself
412    pub fn get_config_meta(&self, key: &str) -> Option<String> {
413        self.federation_config_meta.get(key).cloned()
414    }
415
416    pub(crate) fn root_secret(&self) -> DerivableSecret {
417        self.root_secret.clone()
418    }
419
420    pub async fn add_state_machines(
421        &self,
422        dbtx: &mut DatabaseTransaction<'_>,
423        states: Vec<DynState>,
424    ) -> AddStateMachinesResult {
425        self.executor.add_state_machines_dbtx(dbtx, states).await
426    }
427
428    // TODO: implement as part of [`OperationLog`]
429    pub async fn get_active_operations(&self) -> HashSet<OperationId> {
430        let active_states = self.executor.get_active_states().await;
431        let mut active_operations = HashSet::with_capacity(active_states.len());
432        let mut dbtx = self.db().begin_transaction_nc().await;
433        for (state, _) in active_states {
434            let operation_id = state.operation_id();
435            if dbtx
436                .get_value(&OperationLogKey { operation_id })
437                .await
438                .is_some()
439            {
440                active_operations.insert(operation_id);
441            }
442        }
443        active_operations
444    }
445
446    pub fn operation_log(&self) -> &OperationLog {
447        &self.operation_log
448    }
449
450    /// Get the meta manager to read meta fields.
451    pub fn meta_service(&self) -> &Arc<MetaService> {
452        &self.meta_service
453    }
454
455    /// Get the meta manager to read meta fields.
456    pub async fn get_meta_expiration_timestamp(&self) -> Option<SystemTime> {
457        let meta_service = self.meta_service();
458        let ts = meta_service
459            .get_field::<u64>(self.db(), "federation_expiry_timestamp")
460            .await
461            .and_then(|v| v.value)?;
462        Some(UNIX_EPOCH + Duration::from_secs(ts))
463    }
464
465    /// Adds funding to a transaction or removes over-funding via change.
466    async fn finalize_transaction(
467        &self,
468        dbtx: &mut DatabaseTransaction<'_>,
469        operation_id: OperationId,
470        mut partial_transaction: TransactionBuilder,
471    ) -> anyhow::Result<(Transaction, Vec<DynState>, Range<u64>)> {
472        let (in_amounts, out_amounts) = self.transaction_builder_get_balance(&partial_transaction);
473
474        let mut added_inputs_bundles = vec![];
475        let mut added_outputs_bundles = vec![];
476
477        // The way currently things are implemented is OK for modules which can
478        // collect a fee relative to one being used, but will break down in any
479        // fancy scenarios. Future TODOs:
480        //
481        // * create_final_inputs_and_outputs needs to get broken down, so we can use
482        //   primary modules using priorities (possibly separate prios for inputs and
483        //   outputs to be able to drain modules, etc.); we need the split "check if
484        //   possible" and "take" steps,
485        // * extra inputs and outputs adding fees needs to be taken into account,
486        //   possibly with some looping
487        for unit in in_amounts.units().union(&out_amounts.units()) {
488            let input_amount = in_amounts.get(unit).copied().unwrap_or_default();
489            let output_amount = out_amounts.get(unit).copied().unwrap_or_default();
490            if input_amount == output_amount {
491                continue;
492            }
493
494            let Some((module_id, module)) = self.primary_module_for_unit(*unit) else {
495                bail!("No module to balance a partial transaction (affected unit: {unit:?}");
496            };
497
498            let (added_input_bundle, added_output_bundle) = module
499                .create_final_inputs_and_outputs(
500                    module_id,
501                    dbtx,
502                    operation_id,
503                    *unit,
504                    input_amount,
505                    output_amount,
506                )
507                .await?;
508
509            added_inputs_bundles.push(added_input_bundle);
510            added_outputs_bundles.push(added_output_bundle);
511        }
512
513        // This is the range of  outputs that will be added to the transaction
514        // in order to balance it. Notice that it may stay empty in case the transaction
515        // is already balanced.
516        let change_range = Range {
517            start: partial_transaction.outputs().count() as u64,
518            end: (partial_transaction.outputs().count() as u64
519                + added_outputs_bundles
520                    .iter()
521                    .map(|output| output.outputs().len() as u64)
522                    .sum::<u64>()),
523        };
524
525        for added_inputs in added_inputs_bundles {
526            partial_transaction = partial_transaction.with_inputs(added_inputs);
527        }
528
529        for added_outputs in added_outputs_bundles {
530            partial_transaction = partial_transaction.with_outputs(added_outputs);
531        }
532
533        let (input_amounts, output_amounts) =
534            self.transaction_builder_get_balance(&partial_transaction);
535
536        for (unit, output_amount) in output_amounts {
537            let input_amount = input_amounts.get(&unit).copied().unwrap_or_default();
538
539            assert!(input_amount >= output_amount, "Transaction is underfunded");
540        }
541
542        let (tx, states) = partial_transaction.build(&self.secp_ctx, thread_rng());
543
544        Ok((tx, states, change_range))
545    }
546
547    /// Add funding and/or change to the transaction builder as needed, finalize
548    /// the transaction and submit it to the federation.
549    ///
550    /// ## Errors
551    /// The function will return an error if the operation with given ID already
552    /// exists.
553    ///
554    /// ## Panics
555    /// The function will panic if the database transaction collides with
556    /// other and fails with others too often, this should not happen except for
557    /// excessively concurrent scenarios.
558    pub async fn finalize_and_submit_transaction<F, M>(
559        &self,
560        operation_id: OperationId,
561        operation_type: &str,
562        operation_meta_gen: F,
563        tx_builder: TransactionBuilder,
564    ) -> anyhow::Result<OutPointRange>
565    where
566        F: Fn(OutPointRange) -> M + Clone + MaybeSend + MaybeSync,
567        M: serde::Serialize + MaybeSend,
568    {
569        let operation_type = operation_type.to_owned();
570
571        let autocommit_res = self
572            .db
573            .autocommit(
574                |dbtx, _| {
575                    let operation_type = operation_type.clone();
576                    let tx_builder = tx_builder.clone();
577                    let operation_meta_gen = operation_meta_gen.clone();
578                    Box::pin(async move {
579                        if Client::operation_exists_dbtx(dbtx, operation_id).await {
580                            bail!("There already exists an operation with id {operation_id:?}")
581                        }
582
583                        let out_point_range = self
584                            .finalize_and_submit_transaction_inner(dbtx, operation_id, tx_builder)
585                            .await?;
586
587                        self.operation_log()
588                            .add_operation_log_entry_dbtx(
589                                dbtx,
590                                operation_id,
591                                &operation_type,
592                                operation_meta_gen(out_point_range),
593                            )
594                            .await;
595
596                        Ok(out_point_range)
597                    })
598                },
599                Some(100), // TODO: handle what happens after 100 retries
600            )
601            .await;
602
603        match autocommit_res {
604            Ok(txid) => Ok(txid),
605            Err(AutocommitError::ClosureError { error, .. }) => Err(error),
606            Err(AutocommitError::CommitFailed {
607                attempts,
608                last_error,
609            }) => panic!(
610                "Failed to commit tx submission dbtx after {attempts} attempts: {last_error}"
611            ),
612        }
613    }
614
615    async fn finalize_and_submit_transaction_inner(
616        &self,
617        dbtx: &mut DatabaseTransaction<'_>,
618        operation_id: OperationId,
619        tx_builder: TransactionBuilder,
620    ) -> anyhow::Result<OutPointRange> {
621        let (transaction, mut states, change_range) = self
622            .finalize_transaction(&mut dbtx.to_ref_nc(), operation_id, tx_builder)
623            .await?;
624
625        if transaction.consensus_encode_to_vec().len() > Transaction::MAX_TX_SIZE {
626            let inputs = transaction
627                .inputs
628                .iter()
629                .map(DynInput::module_instance_id)
630                .collect::<Vec<_>>();
631            let outputs = transaction
632                .outputs
633                .iter()
634                .map(DynOutput::module_instance_id)
635                .collect::<Vec<_>>();
636            warn!(
637                target: LOG_CLIENT_NET_API,
638                size=%transaction.consensus_encode_to_vec().len(),
639                ?inputs,
640                ?outputs,
641                "Transaction too large",
642            );
643            debug!(target: LOG_CLIENT_NET_API, ?transaction, "transaction details");
644            bail!(
645                "The generated transaction would be rejected by the federation for being too large."
646            );
647        }
648
649        let txid = transaction.tx_hash();
650
651        debug!(target: LOG_CLIENT_NET_API, %txid, ?transaction,  "Finalized and submitting transaction");
652
653        let tx_submission_sm = DynState::from_typed(
654            TRANSACTION_SUBMISSION_MODULE_INSTANCE,
655            TxSubmissionStatesSM {
656                operation_id,
657                state: TxSubmissionStates::Created(transaction),
658            },
659        );
660        states.push(tx_submission_sm);
661
662        self.executor.add_state_machines_dbtx(dbtx, states).await?;
663
664        self.log_event_dbtx(dbtx, None, TxCreatedEvent { txid, operation_id })
665            .await;
666
667        Ok(OutPointRange::new(txid, IdxRange::from(change_range)))
668    }
669
670    async fn transaction_update_stream(
671        &self,
672        operation_id: OperationId,
673    ) -> BoxStream<'static, TxSubmissionStatesSM> {
674        self.executor
675            .notifier()
676            .module_notifier::<TxSubmissionStatesSM>(
677                TRANSACTION_SUBMISSION_MODULE_INSTANCE,
678                self.final_client.clone(),
679            )
680            .subscribe(operation_id)
681            .await
682    }
683
684    pub async fn operation_exists(&self, operation_id: OperationId) -> bool {
685        let mut dbtx = self.db().begin_transaction_nc().await;
686
687        Client::operation_exists_dbtx(&mut dbtx, operation_id).await
688    }
689
690    pub async fn operation_exists_dbtx(
691        dbtx: &mut DatabaseTransaction<'_>,
692        operation_id: OperationId,
693    ) -> bool {
694        let active_state_exists = dbtx
695            .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
696            .await
697            .next()
698            .await
699            .is_some();
700
701        let inactive_state_exists = dbtx
702            .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
703            .await
704            .next()
705            .await
706            .is_some();
707
708        active_state_exists || inactive_state_exists
709    }
710
711    pub async fn has_active_states(&self, operation_id: OperationId) -> bool {
712        self.db
713            .begin_transaction_nc()
714            .await
715            .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
716            .await
717            .next()
718            .await
719            .is_some()
720    }
721
722    /// Waits for an output from the primary module to reach its final
723    /// state.
724    pub async fn await_primary_bitcoin_module_output(
725        &self,
726        operation_id: OperationId,
727        out_point: OutPoint,
728    ) -> anyhow::Result<()> {
729        self.primary_module_for_unit(AmountUnit::BITCOIN)
730            .ok_or_else(|| anyhow!("No primary module available"))?
731            .1
732            .await_primary_module_output(operation_id, out_point)
733            .await
734    }
735
736    /// Returns a reference to a typed module client instance by kind
737    pub fn get_first_module<M: ClientModule>(
738        &'_ self,
739    ) -> anyhow::Result<ClientModuleInstance<'_, M>> {
740        let module_kind = M::kind();
741        let id = self
742            .get_first_instance(&module_kind)
743            .ok_or_else(|| format_err!("No modules found of kind {module_kind}"))?;
744        let module: &M = self
745            .try_get_module(id)
746            .ok_or_else(|| format_err!("Unknown module instance {id}"))?
747            .as_any()
748            .downcast_ref::<M>()
749            .ok_or_else(|| format_err!("Module is not of type {}", std::any::type_name::<M>()))?;
750        let (db, _) = self.db().with_prefix_module_id(id);
751        Ok(ClientModuleInstance {
752            id,
753            db,
754            api: self.api().with_module(id),
755            module,
756        })
757    }
758
759    pub fn get_module_client_dyn(
760        &self,
761        instance_id: ModuleInstanceId,
762    ) -> anyhow::Result<&maybe_add_send_sync!(dyn IClientModule)> {
763        self.try_get_module(instance_id)
764            .ok_or(anyhow!("Unknown module instance {}", instance_id))
765    }
766
767    pub fn db(&self) -> &Database {
768        &self.db
769    }
770
771    pub fn endpoints(&self) -> &ConnectorRegistry {
772        &self.connectors
773    }
774
775    /// Returns a stream of transaction updates for the given operation id that
776    /// can later be used to watch for a specific transaction being accepted.
777    pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
778        TransactionUpdates {
779            update_stream: self.transaction_update_stream(operation_id).await,
780        }
781    }
782
783    /// Returns the instance id of the first module of the given kind.
784    pub fn get_first_instance(&self, module_kind: &ModuleKind) -> Option<ModuleInstanceId> {
785        self.modules
786            .iter_modules()
787            .find(|(_, kind, _module)| *kind == module_kind)
788            .map(|(instance_id, _, _)| instance_id)
789    }
790
791    /// Returns the data from which the client's root secret is derived (e.g.
792    /// BIP39 seed phrase struct).
793    pub async fn root_secret_encoding<T: Decodable>(&self) -> anyhow::Result<T> {
794        get_decoded_client_secret::<T>(self.db()).await
795    }
796
797    /// Waits for outputs from the primary module to reach its final
798    /// state.
799    pub async fn await_primary_bitcoin_module_outputs(
800        &self,
801        operation_id: OperationId,
802        outputs: Vec<OutPoint>,
803    ) -> anyhow::Result<()> {
804        for out_point in outputs {
805            self.await_primary_bitcoin_module_output(operation_id, out_point)
806                .await?;
807        }
808
809        Ok(())
810    }
811
812    /// Returns the config of the client in JSON format.
813    ///
814    /// Compared to the consensus module format where module configs are binary
815    /// encoded this format cannot be cryptographically verified but is easier
816    /// to consume and to some degree human-readable.
817    pub async fn get_config_json(&self) -> JsonClientConfig {
818        self.config().await.to_json()
819    }
820
821    // Ideally this would not be in the API, but there's a lot of places where this
822    // makes it easier.
823    #[doc(hidden)]
824    /// Like [`Self::get_balance`] but returns an error if primary module is not
825    /// available
826    pub async fn get_balance_for_btc(&self) -> anyhow::Result<Amount> {
827        self.get_balance_for_unit(AmountUnit::BITCOIN).await
828    }
829
830    pub async fn get_balance_for_unit(&self, unit: AmountUnit) -> anyhow::Result<Amount> {
831        let (id, module) = self
832            .primary_module_for_unit(unit)
833            .ok_or_else(|| anyhow!("Primary module not available"))?;
834        Ok(module
835            .get_balance(id, &mut self.db().begin_transaction_nc().await, unit)
836            .await)
837    }
838
839    /// Returns a stream that yields the current client balance every time it
840    /// changes.
841    pub async fn subscribe_balance_changes(&self, unit: AmountUnit) -> BoxStream<'static, Amount> {
842        let primary_module_things =
843            if let Some((primary_module_id, primary_module)) = self.primary_module_for_unit(unit) {
844                let balance_changes = primary_module.subscribe_balance_changes().await;
845                let initial_balance = self
846                    .get_balance_for_unit(unit)
847                    .await
848                    .expect("Primary is present");
849
850                Some((
851                    primary_module_id,
852                    primary_module.clone(),
853                    balance_changes,
854                    initial_balance,
855                ))
856            } else {
857                None
858            };
859        let db = self.db().clone();
860
861        Box::pin(async_stream::stream! {
862            let Some((primary_module_id, primary_module, mut balance_changes, initial_balance)) = primary_module_things else {
863                // If there is no primary module, there will not be one until client is
864                // restarted
865                pending().await
866            };
867
868
869            yield initial_balance;
870            let mut prev_balance = initial_balance;
871            while let Some(()) = balance_changes.next().await {
872                let mut dbtx = db.begin_transaction_nc().await;
873                let balance = primary_module
874                     .get_balance(primary_module_id, &mut dbtx, unit)
875                    .await;
876
877                // Deduplicate in case modules cannot always tell if the balance actually changed
878                if balance != prev_balance {
879                    prev_balance = balance;
880                    yield balance;
881                }
882            }
883        })
884    }
885
886    /// Make a single API version request to a peer after a delay.
887    ///
888    /// The delay is here to unify the type of a future both for initial request
889    /// and possible retries.
890    async fn make_api_version_request(
891        delay: Duration,
892        peer_id: PeerId,
893        api: &DynGlobalApi,
894    ) -> (
895        PeerId,
896        Result<SupportedApiVersionsSummary, fedimint_connectors::error::ServerError>,
897    ) {
898        runtime::sleep(delay).await;
899        (
900            peer_id,
901            api.request_single_peer::<SupportedApiVersionsSummary>(
902                VERSION_ENDPOINT.to_owned(),
903                ApiRequestErased::default(),
904                peer_id,
905            )
906            .await,
907        )
908    }
909
910    /// Create a backoff strategy for API version requests.
911    ///
912    /// Keep trying, initially somewhat aggressively, but after a while retry
913    /// very slowly, because chances for response are getting lower and
914    /// lower.
915    fn create_api_version_backoff() -> impl Iterator<Item = Duration> {
916        custom_backoff(Duration::from_millis(200), Duration::from_secs(600), None)
917    }
918
919    /// Query the federation for API version support and then calculate
920    /// the best API version to use (supported by most guardians).
921    pub async fn fetch_common_api_versions_from_all_peers(
922        num_peers: NumPeers,
923        api: DynGlobalApi,
924        db: Database,
925        num_responses_sender: watch::Sender<usize>,
926    ) {
927        let mut backoff = Self::create_api_version_backoff();
928
929        // NOTE: `FuturesUnordered` is a footgun, but since we only poll it for result
930        // and make a single async db write operation, it should be OK.
931        let mut requests = FuturesUnordered::new();
932
933        for peer_id in num_peers.peer_ids() {
934            requests.push(Self::make_api_version_request(
935                Duration::ZERO,
936                peer_id,
937                &api,
938            ));
939        }
940
941        let mut num_responses = 0;
942
943        while let Some((peer_id, response)) = requests.next().await {
944            let retry = match response {
945                Err(err) => {
946                    let has_previous_response = db
947                        .begin_transaction_nc()
948                        .await
949                        .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
950                        .await
951                        .is_some();
952                    debug!(
953                        target: LOG_CLIENT,
954                        %peer_id,
955                        err = %err.fmt_compact(),
956                        %has_previous_response,
957                        "Failed to refresh API versions of a peer"
958                    );
959
960                    !has_previous_response
961                }
962                Ok(o) => {
963                    // Save the response to the database right away, just to
964                    // not lose it
965                    let mut dbtx = db.begin_transaction().await;
966                    dbtx.insert_entry(
967                        &PeerLastApiVersionsSummaryKey(peer_id),
968                        &PeerLastApiVersionsSummary(o),
969                    )
970                    .await;
971                    dbtx.commit_tx().await;
972                    false
973                }
974            };
975
976            if retry {
977                requests.push(Self::make_api_version_request(
978                    backoff.next().expect("Keeps retrying"),
979                    peer_id,
980                    &api,
981                ));
982            } else {
983                num_responses += 1;
984                num_responses_sender.send_replace(num_responses);
985            }
986        }
987    }
988
989    /// Fetch API versions from peers, retrying until we get threshold number of
990    /// successful responses. Returns the successful responses collected
991    /// from at least `num_peers.threshold()` peers.
992    pub async fn fetch_peers_api_versions_from_threshold_of_peers(
993        num_peers: NumPeers,
994        api: DynGlobalApi,
995    ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
996        let mut backoff = Self::create_api_version_backoff();
997
998        // NOTE: `FuturesUnordered` is a footgun, but since we only poll it for result
999        // and collect responses, it should be OK.
1000        let mut requests = FuturesUnordered::new();
1001
1002        for peer_id in num_peers.peer_ids() {
1003            requests.push(Self::make_api_version_request(
1004                Duration::ZERO,
1005                peer_id,
1006                &api,
1007            ));
1008        }
1009
1010        let mut successful_responses = BTreeMap::new();
1011
1012        while successful_responses.len() < num_peers.threshold()
1013            && let Some((peer_id, response)) = requests.next().await
1014        {
1015            let retry = match response {
1016                Err(err) => {
1017                    debug!(
1018                        target: LOG_CLIENT,
1019                        %peer_id,
1020                        err = %err.fmt_compact(),
1021                        "Failed to fetch API versions from peer"
1022                    );
1023                    true
1024                }
1025                Ok(response) => {
1026                    successful_responses.insert(peer_id, response);
1027                    false
1028                }
1029            };
1030
1031            if retry {
1032                requests.push(Self::make_api_version_request(
1033                    backoff.next().expect("Keeps retrying"),
1034                    peer_id,
1035                    &api,
1036                ));
1037            }
1038        }
1039
1040        successful_responses
1041    }
1042
1043    /// Fetch API versions from peers and discover common API versions to use.
1044    pub async fn fetch_common_api_versions(
1045        config: &ClientConfig,
1046        api: &DynGlobalApi,
1047    ) -> anyhow::Result<BTreeMap<PeerId, SupportedApiVersionsSummary>> {
1048        debug!(
1049            target: LOG_CLIENT,
1050            "Fetching common api versions"
1051        );
1052
1053        let num_peers = NumPeers::from(config.global.api_endpoints.len());
1054
1055        let peer_api_version_sets =
1056            Self::fetch_peers_api_versions_from_threshold_of_peers(num_peers, api.clone()).await;
1057
1058        Ok(peer_api_version_sets)
1059    }
1060
1061    /// Write API version set to database cache.
1062    /// Used when we have a pre-calculated API version set that should be stored
1063    /// for later use.
1064    pub async fn write_api_version_cache(
1065        dbtx: &mut DatabaseTransaction<'_>,
1066        api_version_set: ApiVersionSet,
1067    ) {
1068        debug!(
1069            target: LOG_CLIENT,
1070            value = ?api_version_set,
1071            "Writing API version set to cache"
1072        );
1073
1074        dbtx.insert_entry(
1075            &CachedApiVersionSetKey,
1076            &CachedApiVersionSet(api_version_set),
1077        )
1078        .await;
1079    }
1080
1081    /// Store prefetched peer API version responses and calculate/store common
1082    /// API version set. This processes the individual peer responses by
1083    /// storing them in the database and calculating the common API version
1084    /// set for caching.
1085    pub async fn store_prefetched_api_versions(
1086        db: &Database,
1087        config: &ClientConfig,
1088        client_module_init: &ClientModuleInitRegistry,
1089        peer_api_versions: &BTreeMap<PeerId, SupportedApiVersionsSummary>,
1090    ) {
1091        debug!(
1092            target: LOG_CLIENT,
1093            "Storing {} prefetched peer API version responses and calculating common version set",
1094            peer_api_versions.len()
1095        );
1096
1097        let mut dbtx = db.begin_transaction().await;
1098        // Calculate common API version set from individual responses
1099        let client_supported_versions =
1100            Self::supported_api_versions_summary_static(config, client_module_init);
1101        match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1102            &client_supported_versions,
1103            peer_api_versions,
1104        ) {
1105            Ok(common_api_versions) => {
1106                // Write the calculated common API version set to database cache
1107                Self::write_api_version_cache(&mut dbtx.to_ref_nc(), common_api_versions).await;
1108                debug!(target: LOG_CLIENT, "Calculated and stored common API version set");
1109            }
1110            Err(err) => {
1111                debug!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Failed to calculate common API versions from prefetched data");
1112            }
1113        }
1114
1115        // Store individual peer responses to database
1116        for (peer_id, peer_api_versions) in peer_api_versions {
1117            dbtx.insert_entry(
1118                &PeerLastApiVersionsSummaryKey(*peer_id),
1119                &PeerLastApiVersionsSummary(peer_api_versions.clone()),
1120            )
1121            .await;
1122        }
1123        dbtx.commit_tx().await;
1124        debug!(target: LOG_CLIENT, "Stored individual peer API version responses");
1125    }
1126
1127    /// [`SupportedApiVersionsSummary`] that the client and its modules support
1128    pub fn supported_api_versions_summary_static(
1129        config: &ClientConfig,
1130        client_module_init: &ClientModuleInitRegistry,
1131    ) -> SupportedApiVersionsSummary {
1132        SupportedApiVersionsSummary {
1133            core: SupportedCoreApiVersions {
1134                core_consensus: config.global.consensus_version,
1135                api: MultiApiVersion::try_from_iter(SUPPORTED_CORE_API_VERSIONS.to_owned())
1136                    .expect("must not have conflicting versions"),
1137            },
1138            modules: config
1139                .modules
1140                .iter()
1141                .filter_map(|(&module_instance_id, module_config)| {
1142                    client_module_init
1143                        .get(module_config.kind())
1144                        .map(|module_init| {
1145                            (
1146                                module_instance_id,
1147                                SupportedModuleApiVersions {
1148                                    core_consensus: config.global.consensus_version,
1149                                    module_consensus: module_config.version,
1150                                    api: module_init.supported_api_versions(),
1151                                },
1152                            )
1153                        })
1154                })
1155                .collect(),
1156        }
1157    }
1158
1159    pub async fn load_and_refresh_common_api_version(&self) -> anyhow::Result<ApiVersionSet> {
1160        Self::load_and_refresh_common_api_version_static(
1161            &self.config().await,
1162            &self.module_inits,
1163            self.connectors.clone(),
1164            &self.api,
1165            &self.db,
1166            &self.task_group,
1167        )
1168        .await
1169    }
1170
1171    /// Load the common api versions to use from cache and start a background
1172    /// process to refresh them.
1173    ///
1174    /// This is a compromise, so we not have to wait for version discovery to
1175    /// complete every time a [`Client`] is being built.
1176    async fn load_and_refresh_common_api_version_static(
1177        config: &ClientConfig,
1178        module_init: &ClientModuleInitRegistry,
1179        connectors: ConnectorRegistry,
1180        api: &DynGlobalApi,
1181        db: &Database,
1182        task_group: &TaskGroup,
1183    ) -> anyhow::Result<ApiVersionSet> {
1184        if let Some(v) = db
1185            .begin_transaction_nc()
1186            .await
1187            .get_value(&CachedApiVersionSetKey)
1188            .await
1189        {
1190            debug!(
1191                target: LOG_CLIENT,
1192                "Found existing cached common api versions"
1193            );
1194            let config = config.clone();
1195            let client_module_init = module_init.clone();
1196            let api = api.clone();
1197            let db = db.clone();
1198            let task_group = task_group.clone();
1199            // Separate task group, because we actually don't want to be waiting for this to
1200            // finish, and it's just best effort.
1201            task_group
1202                .clone()
1203                .spawn_cancellable("refresh_common_api_version_static", async move {
1204                    connectors.wait_for_initialized_connections().await;
1205
1206                    if let Err(error) = Self::refresh_common_api_version_static(
1207                        &config,
1208                        &client_module_init,
1209                        &api,
1210                        &db,
1211                        task_group,
1212                        false,
1213                    )
1214                    .await
1215                    {
1216                        warn!(
1217                            target: LOG_CLIENT,
1218                            err = %error.fmt_compact_anyhow(), "Failed to discover common api versions"
1219                        );
1220                    }
1221                });
1222
1223            return Ok(v.0);
1224        }
1225
1226        info!(
1227            target: LOG_CLIENT,
1228            "Fetching initial API versions "
1229        );
1230        Self::refresh_common_api_version_static(
1231            config,
1232            module_init,
1233            api,
1234            db,
1235            task_group.clone(),
1236            true,
1237        )
1238        .await
1239    }
1240
1241    async fn refresh_common_api_version_static(
1242        config: &ClientConfig,
1243        client_module_init: &ClientModuleInitRegistry,
1244        api: &DynGlobalApi,
1245        db: &Database,
1246        task_group: TaskGroup,
1247        block_until_ok: bool,
1248    ) -> anyhow::Result<ApiVersionSet> {
1249        debug!(
1250            target: LOG_CLIENT,
1251            "Refreshing common api versions"
1252        );
1253
1254        let (num_responses_sender, mut num_responses_receiver) = tokio::sync::watch::channel(0);
1255        let num_peers = NumPeers::from(config.global.api_endpoints.len());
1256
1257        task_group.spawn_cancellable("refresh peers api versions", {
1258            Client::fetch_common_api_versions_from_all_peers(
1259                num_peers,
1260                api.clone(),
1261                db.clone(),
1262                num_responses_sender,
1263            )
1264        });
1265
1266        let common_api_versions = loop {
1267            // Wait to collect enough answers before calculating a set of common api
1268            // versions to use. Note that all peers individual responses from
1269            // previous attempts are still being used, and requests, or even
1270            // retries for response of peers are not actually cancelled, as they
1271            // are happening on a separate task. This is all just to bound the
1272            // time user can be waiting for the join operation to finish, at the
1273            // risk of picking wrong version in very rare circumstances.
1274            let _: Result<_, Elapsed> = runtime::timeout(
1275                Duration::from_secs(30),
1276                num_responses_receiver.wait_for(|num| num_peers.threshold() <= *num),
1277            )
1278            .await;
1279
1280            let peer_api_version_sets = Self::load_peers_last_api_versions(db, num_peers).await;
1281
1282            match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1283                &Self::supported_api_versions_summary_static(config, client_module_init),
1284                &peer_api_version_sets,
1285            ) {
1286                Ok(o) => break o,
1287                Err(err) if block_until_ok => {
1288                    warn!(
1289                        target: LOG_CLIENT,
1290                        err = %err.fmt_compact_anyhow(),
1291                        "Failed to discover API version to use. Retrying..."
1292                    );
1293                    continue;
1294                }
1295                Err(e) => return Err(e),
1296            }
1297        };
1298
1299        debug!(
1300            target: LOG_CLIENT,
1301            value = ?common_api_versions,
1302            "Updating the cached common api versions"
1303        );
1304        let mut dbtx = db.begin_transaction().await;
1305        let _ = dbtx
1306            .insert_entry(
1307                &CachedApiVersionSetKey,
1308                &CachedApiVersionSet(common_api_versions.clone()),
1309            )
1310            .await;
1311
1312        dbtx.commit_tx().await;
1313
1314        Ok(common_api_versions)
1315    }
1316
1317    /// Get the client [`Metadata`]
1318    pub async fn get_metadata(&self) -> Metadata {
1319        self.db
1320            .begin_transaction_nc()
1321            .await
1322            .get_value(&ClientMetadataKey)
1323            .await
1324            .unwrap_or_else(|| {
1325                warn!(
1326                    target: LOG_CLIENT,
1327                    "Missing existing metadata. This key should have been set on Client init"
1328                );
1329                Metadata::empty()
1330            })
1331    }
1332
1333    /// Set the client [`Metadata`]
1334    pub async fn set_metadata(&self, metadata: &Metadata) {
1335        self.db
1336            .autocommit::<_, _, anyhow::Error>(
1337                |dbtx, _| {
1338                    Box::pin(async {
1339                        Self::set_metadata_dbtx(dbtx, metadata).await;
1340                        Ok(())
1341                    })
1342                },
1343                None,
1344            )
1345            .await
1346            .expect("Failed to autocommit metadata");
1347    }
1348
1349    pub fn has_pending_recoveries(&self) -> bool {
1350        !self
1351            .client_recovery_progress_receiver
1352            .borrow()
1353            .iter()
1354            .all(|(_id, progress)| progress.is_done())
1355    }
1356
1357    /// Wait for all module recoveries to finish
1358    ///
1359    /// This will block until the recovery task is done with recoveries.
1360    /// Returns success if all recovery tasks are complete (success case),
1361    /// or an error if some modules could not complete the recovery at the time.
1362    ///
1363    /// A bit of a heavy approach.
1364    pub async fn wait_for_all_recoveries(&self) -> anyhow::Result<()> {
1365        let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1366        recovery_receiver
1367            .wait_for(|in_progress| {
1368                in_progress
1369                    .iter()
1370                    .all(|(_id, progress)| progress.is_done())
1371            })
1372            .await
1373            .context("Recovery task completed and update receiver disconnected, but some modules failed to recover")?;
1374
1375        Ok(())
1376    }
1377
1378    /// Subscribe to recover progress for all the modules.
1379    ///
1380    /// This stream can contain duplicate progress for a module.
1381    /// Don't use this stream for detecting completion of recovery.
1382    pub fn subscribe_to_recovery_progress(
1383        &self,
1384    ) -> impl Stream<Item = (ModuleInstanceId, RecoveryProgress)> + use<> {
1385        WatchStream::new(self.client_recovery_progress_receiver.clone())
1386            .flat_map(futures::stream::iter)
1387    }
1388
1389    pub async fn wait_for_module_kind_recovery(
1390        &self,
1391        module_kind: ModuleKind,
1392    ) -> anyhow::Result<()> {
1393        let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1394        let config = self.config().await;
1395        recovery_receiver
1396            .wait_for(|in_progress| {
1397                !in_progress
1398                    .iter()
1399                    .filter(|(module_instance_id, _progress)| {
1400                        config.modules[module_instance_id].kind == module_kind
1401                    })
1402                    .any(|(_id, progress)| !progress.is_done())
1403            })
1404            .await
1405            .context("Recovery task completed and update receiver disconnected, but the desired modules are still unavailable or failed to recover")?;
1406
1407        Ok(())
1408    }
1409
1410    pub async fn wait_for_all_active_state_machines(&self) -> anyhow::Result<()> {
1411        loop {
1412            if self.executor.get_active_states().await.is_empty() {
1413                break;
1414            }
1415            sleep(Duration::from_millis(100)).await;
1416        }
1417        Ok(())
1418    }
1419
1420    /// Set the client [`Metadata`]
1421    pub async fn set_metadata_dbtx(dbtx: &mut DatabaseTransaction<'_>, metadata: &Metadata) {
1422        dbtx.insert_new_entry(&ClientMetadataKey, metadata).await;
1423    }
1424
1425    fn spawn_module_recoveries_task(
1426        &self,
1427        recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1428        module_recoveries: BTreeMap<
1429            ModuleInstanceId,
1430            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1431        >,
1432        module_recovery_progress_receivers: BTreeMap<
1433            ModuleInstanceId,
1434            watch::Receiver<RecoveryProgress>,
1435        >,
1436    ) {
1437        let db = self.db.clone();
1438        let log_ordering_wakeup_tx = self.log_ordering_wakeup_tx.clone();
1439        self.task_group
1440            .spawn("module recoveries", |_task_handle| async {
1441                Self::run_module_recoveries_task(
1442                    db,
1443                    log_ordering_wakeup_tx,
1444                    recovery_sender,
1445                    module_recoveries,
1446                    module_recovery_progress_receivers,
1447                )
1448                .await;
1449            });
1450    }
1451
1452    async fn run_module_recoveries_task(
1453        db: Database,
1454        log_ordering_wakeup_tx: watch::Sender<()>,
1455        recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1456        module_recoveries: BTreeMap<
1457            ModuleInstanceId,
1458            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1459        >,
1460        module_recovery_progress_receivers: BTreeMap<
1461            ModuleInstanceId,
1462            watch::Receiver<RecoveryProgress>,
1463        >,
1464    ) {
1465        debug!(target: LOG_CLIENT_RECOVERY, num_modules=%module_recovery_progress_receivers.len(), "Staring module recoveries");
1466        let mut completed_stream = Vec::new();
1467        let progress_stream = futures::stream::FuturesUnordered::new();
1468
1469        for (module_instance_id, f) in module_recoveries {
1470            completed_stream.push(futures::stream::once(Box::pin(async move {
1471                match f.await {
1472                    Ok(()) => (module_instance_id, None),
1473                    Err(err) => {
1474                        warn!(
1475                            target: LOG_CLIENT,
1476                            err = %err.fmt_compact_anyhow(), module_instance_id, "Module recovery failed"
1477                        );
1478                        // a module recovery that failed reports and error and
1479                        // just never finishes, so we don't need a separate state
1480                        // for it
1481                        futures::future::pending::<()>().await;
1482                        unreachable!()
1483                    }
1484                }
1485            })));
1486        }
1487
1488        for (module_instance_id, rx) in module_recovery_progress_receivers {
1489            progress_stream.push(
1490                tokio_stream::wrappers::WatchStream::new(rx)
1491                    .fuse()
1492                    .map(move |progress| (module_instance_id, Some(progress))),
1493            );
1494        }
1495
1496        let mut futures = futures::stream::select(
1497            futures::stream::select_all(progress_stream),
1498            futures::stream::select_all(completed_stream),
1499        );
1500
1501        while let Some((module_instance_id, progress)) = futures.next().await {
1502            let mut dbtx = db.begin_transaction().await;
1503
1504            let prev_progress = *recovery_sender
1505                .borrow()
1506                .get(&module_instance_id)
1507                .expect("existing progress must be present");
1508
1509            let progress = if prev_progress.is_done() {
1510                // since updates might be out of order, once done, stick with it
1511                prev_progress
1512            } else if let Some(progress) = progress {
1513                progress
1514            } else {
1515                prev_progress.to_complete()
1516            };
1517
1518            if !prev_progress.is_done() && progress.is_done() {
1519                info!(
1520                    target: LOG_CLIENT,
1521                    module_instance_id,
1522                    progress = format!("{}/{}", progress.complete, progress.total),
1523                    "Recovery complete"
1524                );
1525                dbtx.log_event(
1526                    log_ordering_wakeup_tx.clone(),
1527                    None,
1528                    ModuleRecoveryCompleted {
1529                        module_id: module_instance_id,
1530                    },
1531                )
1532                .await;
1533            } else {
1534                info!(
1535                    target: LOG_CLIENT,
1536                    module_instance_id,
1537                    progress = format!("{}/{}", progress.complete, progress.total),
1538                    "Recovery progress"
1539                );
1540            }
1541
1542            dbtx.insert_entry(
1543                &ClientModuleRecovery { module_instance_id },
1544                &ClientModuleRecoveryState { progress },
1545            )
1546            .await;
1547            dbtx.commit_tx().await;
1548
1549            recovery_sender.send_modify(|v| {
1550                v.insert(module_instance_id, progress);
1551            });
1552        }
1553        debug!(target: LOG_CLIENT_RECOVERY, "Recovery executor stopped");
1554    }
1555
1556    async fn load_peers_last_api_versions(
1557        db: &Database,
1558        num_peers: NumPeers,
1559    ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1560        let mut peer_api_version_sets = BTreeMap::new();
1561
1562        let mut dbtx = db.begin_transaction_nc().await;
1563        for peer_id in num_peers.peer_ids() {
1564            if let Some(v) = dbtx
1565                .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1566                .await
1567            {
1568                peer_api_version_sets.insert(peer_id, v.0);
1569            }
1570        }
1571        drop(dbtx);
1572        peer_api_version_sets
1573    }
1574
1575    /// You likely want to use [`Client::get_peer_urls`]. This function returns
1576    /// only the announcements and doesn't use the config as fallback.
1577    pub async fn get_peer_url_announcements(&self) -> BTreeMap<PeerId, SignedApiAnnouncement> {
1578        self.db()
1579            .begin_transaction_nc()
1580            .await
1581            .find_by_prefix(&ApiAnnouncementPrefix)
1582            .await
1583            .map(|(announcement_key, announcement)| (announcement_key.0, announcement))
1584            .collect()
1585            .await
1586    }
1587
1588    /// Returns a list of guardian API URLs
1589    pub async fn get_peer_urls(&self) -> BTreeMap<PeerId, SafeUrl> {
1590        get_api_urls(&self.db, &self.config().await).await
1591    }
1592
1593    /// Create an invite code with the api endpoint of the given peer which can
1594    /// be used to download this client config
1595    pub async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
1596        self.get_peer_urls()
1597            .await
1598            .into_iter()
1599            .find_map(|(peer_id, url)| (peer == peer_id).then_some(url))
1600            .map(|peer_url| {
1601                InviteCode::new(
1602                    peer_url.clone(),
1603                    peer,
1604                    self.federation_id(),
1605                    self.api_secret.clone(),
1606                )
1607            })
1608    }
1609
1610    /// Blocks till the client has synced the guardian public key set
1611    /// (introduced in version 0.4) and returns it. Once it has been fetched
1612    /// once this function is guaranteed to return immediately.
1613    pub async fn get_guardian_public_keys_blocking(
1614        &self,
1615    ) -> BTreeMap<PeerId, fedimint_core::secp256k1::PublicKey> {
1616        self.db
1617            .autocommit(
1618                |dbtx, _| {
1619                    Box::pin(async move {
1620                        let config = self.config().await;
1621
1622                        let guardian_pub_keys = self
1623                            .get_or_backfill_broadcast_public_keys(dbtx, config)
1624                            .await;
1625
1626                        Result::<_, ()>::Ok(guardian_pub_keys)
1627                    })
1628                },
1629                None,
1630            )
1631            .await
1632            .expect("Will retry forever")
1633    }
1634
1635    async fn get_or_backfill_broadcast_public_keys(
1636        &self,
1637        dbtx: &mut DatabaseTransaction<'_>,
1638        config: ClientConfig,
1639    ) -> BTreeMap<PeerId, PublicKey> {
1640        match config.global.broadcast_public_keys {
1641            Some(guardian_pub_keys) => guardian_pub_keys,
1642            _ => {
1643                let (guardian_pub_keys, new_config) = self.fetch_and_update_config(config).await;
1644
1645                dbtx.insert_entry(&ClientConfigKey, &new_config).await;
1646                *(self.config.write().await) = new_config;
1647                guardian_pub_keys
1648            }
1649        }
1650    }
1651
1652    async fn fetch_session_count(&self) -> FederationResult<u64> {
1653        self.api.session_count().await
1654    }
1655
1656    async fn fetch_and_update_config(
1657        &self,
1658        config: ClientConfig,
1659    ) -> (BTreeMap<PeerId, PublicKey>, ClientConfig) {
1660        let fetched_config = retry(
1661            "Fetching guardian public keys",
1662            backoff_util::background_backoff(),
1663            || async {
1664                Ok(self
1665                    .api
1666                    .request_current_consensus::<ClientConfig>(
1667                        CLIENT_CONFIG_ENDPOINT.to_owned(),
1668                        ApiRequestErased::default(),
1669                    )
1670                    .await?)
1671            },
1672        )
1673        .await
1674        .expect("Will never return on error");
1675
1676        let Some(guardian_pub_keys) = fetched_config.global.broadcast_public_keys else {
1677            warn!(
1678                target: LOG_CLIENT,
1679                "Guardian public keys not found in fetched config, server not updated to 0.4 yet"
1680            );
1681            pending::<()>().await;
1682            unreachable!("Pending will never return");
1683        };
1684
1685        let new_config = ClientConfig {
1686            global: GlobalClientConfig {
1687                broadcast_public_keys: Some(guardian_pub_keys.clone()),
1688                ..config.global
1689            },
1690            modules: config.modules,
1691        };
1692        (guardian_pub_keys, new_config)
1693    }
1694
1695    pub fn handle_global_rpc(
1696        &self,
1697        method: String,
1698        params: serde_json::Value,
1699    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1700        Box::pin(try_stream! {
1701            match method.as_str() {
1702                "get_balance" => {
1703                    let balance = self.get_balance_for_btc().await.unwrap_or_default();
1704                    yield serde_json::to_value(balance)?;
1705                }
1706                "subscribe_balance_changes" => {
1707                    let req: GetBalanceChangesRequest= serde_json::from_value(params)?;
1708                    let mut stream = self.subscribe_balance_changes(req.unit).await;
1709                    while let Some(balance) = stream.next().await {
1710                        yield serde_json::to_value(balance)?;
1711                    }
1712                }
1713                "get_config" => {
1714                    let config = self.config().await;
1715                    yield serde_json::to_value(config)?;
1716                }
1717                "get_federation_id" => {
1718                    let federation_id = self.federation_id();
1719                    yield serde_json::to_value(federation_id)?;
1720                }
1721                "get_invite_code" => {
1722                    let req: GetInviteCodeRequest = serde_json::from_value(params)?;
1723                    let invite_code = self.invite_code(req.peer).await;
1724                    yield serde_json::to_value(invite_code)?;
1725                }
1726                "get_operation" => {
1727                    let req: GetOperationIdRequest = serde_json::from_value(params)?;
1728                    let operation = self.operation_log().get_operation(req.operation_id).await;
1729                    yield serde_json::to_value(operation)?;
1730                }
1731                "list_operations" => {
1732                    let req: ListOperationsParams = serde_json::from_value(params)?;
1733                    let limit = if req.limit.is_none() && req.last_seen.is_none() {
1734                        usize::MAX
1735                    } else {
1736                        req.limit.unwrap_or(usize::MAX)
1737                    };
1738                    let operations = self.operation_log()
1739                        .paginate_operations_rev(limit, req.last_seen)
1740                        .await;
1741                    yield serde_json::to_value(operations)?;
1742                }
1743                "session_count" => {
1744                    let count = self.fetch_session_count().await?;
1745                    yield serde_json::to_value(count)?;
1746                }
1747                "has_pending_recoveries" => {
1748                    let has_pending = self.has_pending_recoveries();
1749                    yield serde_json::to_value(has_pending)?;
1750                }
1751                "wait_for_all_recoveries" => {
1752                    self.wait_for_all_recoveries().await?;
1753                    yield serde_json::Value::Null;
1754                }
1755                "subscribe_to_recovery_progress" => {
1756                    let mut stream = self.subscribe_to_recovery_progress();
1757                    while let Some((module_id, progress)) = stream.next().await {
1758                        yield serde_json::json!({
1759                            "module_id": module_id,
1760                            "progress": progress
1761                        });
1762                    }
1763                }
1764                "backup_to_federation" => {
1765                    let metadata = if params.is_null() {
1766                        Metadata::from_json_serialized(serde_json::json!({}))
1767                    } else {
1768                        Metadata::from_json_serialized(params)
1769                    };
1770                    self.backup_to_federation(metadata).await?;
1771                    yield serde_json::Value::Null;
1772                }
1773                _ => {
1774                    Err(anyhow::format_err!("Unknown method: {}", method))?;
1775                    unreachable!()
1776                },
1777            }
1778        })
1779    }
1780
1781    pub async fn log_event<E>(&self, module_id: Option<ModuleInstanceId>, event: E)
1782    where
1783        E: Event + Send,
1784    {
1785        let mut dbtx = self.db.begin_transaction().await;
1786        self.log_event_dbtx(&mut dbtx, module_id, event).await;
1787        dbtx.commit_tx().await;
1788    }
1789
1790    pub async fn log_event_dbtx<E, Cap>(
1791        &self,
1792        dbtx: &mut DatabaseTransaction<'_, Cap>,
1793        module_id: Option<ModuleInstanceId>,
1794        event: E,
1795    ) where
1796        E: Event + Send,
1797        Cap: Send,
1798    {
1799        dbtx.log_event(self.log_ordering_wakeup_tx.clone(), module_id, event)
1800            .await;
1801    }
1802
1803    pub async fn log_event_raw_dbtx<Cap>(
1804        &self,
1805        dbtx: &mut DatabaseTransaction<'_, Cap>,
1806        kind: EventKind,
1807        module: Option<(ModuleKind, ModuleInstanceId)>,
1808        payload: Vec<u8>,
1809        persist: EventPersistence,
1810    ) where
1811        Cap: Send,
1812    {
1813        let module_id = module.as_ref().map(|m| m.1);
1814        let module_kind = module.map(|m| m.0);
1815        dbtx.log_event_raw(
1816            self.log_ordering_wakeup_tx.clone(),
1817            kind,
1818            module_kind,
1819            module_id,
1820            payload,
1821            persist,
1822        )
1823        .await;
1824    }
1825
1826    /// Built in event log (trimmable) tracker
1827    ///
1828    /// For the convenience of downstream applications, [`Client`] can store
1829    /// internally event log position for the main application using/driving it.
1830    ///
1831    /// Note that this position is a singleton, so this tracker should not be
1832    /// used for multiple purposes or applications, etc. at the same time.
1833    ///
1834    /// If the application has a need to follow log using multiple trackers, it
1835    /// should implement own [`DynEventLogTrimableTracker`] and store its
1836    /// persient data by itself.
1837    pub fn built_in_application_event_log_tracker(&self) -> DynEventLogTrimableTracker {
1838        struct BuiltInApplicationEventLogTracker;
1839
1840        #[apply(async_trait_maybe_send!)]
1841        impl EventLogTrimableTracker for BuiltInApplicationEventLogTracker {
1842            // Store position in the event log
1843            async fn store(
1844                &mut self,
1845                dbtx: &mut DatabaseTransaction<NonCommittable>,
1846                pos: EventLogTrimableId,
1847            ) -> anyhow::Result<()> {
1848                dbtx.insert_entry(&DefaultApplicationEventLogKey, &pos)
1849                    .await;
1850                Ok(())
1851            }
1852
1853            /// Load the last previous stored position (or None if never stored)
1854            async fn load(
1855                &mut self,
1856                dbtx: &mut DatabaseTransaction<NonCommittable>,
1857            ) -> anyhow::Result<Option<EventLogTrimableId>> {
1858                Ok(dbtx.get_value(&DefaultApplicationEventLogKey).await)
1859            }
1860        }
1861        Box::new(BuiltInApplicationEventLogTracker)
1862    }
1863
1864    /// Like [`Self::handle_events`] but for historical data.
1865    ///
1866    ///
1867    /// This function can be used to process subset of events
1868    /// that is infrequent and important enough to be persisted
1869    /// forever. Most applications should prefer to use [`Self::handle_events`]
1870    /// which emits *all* events.
1871    pub async fn handle_historical_events<F, R>(
1872        &self,
1873        tracker: fedimint_eventlog::DynEventLogTracker,
1874        handler_fn: F,
1875    ) -> anyhow::Result<()>
1876    where
1877        F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
1878        R: Future<Output = anyhow::Result<()>>,
1879    {
1880        fedimint_eventlog::handle_events(
1881            self.db.clone(),
1882            tracker,
1883            self.log_event_added_rx.clone(),
1884            handler_fn,
1885        )
1886        .await
1887    }
1888
1889    /// Handle events emitted by the client
1890    ///
1891    /// This is a preferred method for reactive & asynchronous
1892    /// processing of events emitted by the client.
1893    ///
1894    /// It needs a `tracker` that will persist the position in the log
1895    /// as it is being handled. You can use the
1896    /// [`Client::built_in_application_event_log_tracker`] if this call is
1897    /// used for the single main application handling this instance of the
1898    /// [`Client`]. Otherwise you should implement your own tracker.
1899    ///
1900    /// This handler will call `handle_fn` with ever event emitted by
1901    /// [`Client`], including transient ones. The caller should atomically
1902    /// handle each event it is interested in and ignore other ones.
1903    ///
1904    /// This method returns only when client is shutting down or on internal
1905    /// error, so typically should be called in a background task dedicated
1906    /// to handling events.
1907    pub async fn handle_events<F, R>(
1908        &self,
1909        tracker: fedimint_eventlog::DynEventLogTrimableTracker,
1910        handler_fn: F,
1911    ) -> anyhow::Result<()>
1912    where
1913        F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
1914        R: Future<Output = anyhow::Result<()>>,
1915    {
1916        fedimint_eventlog::handle_trimable_events(
1917            self.db.clone(),
1918            tracker,
1919            self.log_event_added_rx.clone(),
1920            handler_fn,
1921        )
1922        .await
1923    }
1924
1925    pub async fn get_event_log(
1926        &self,
1927        pos: Option<EventLogId>,
1928        limit: u64,
1929    ) -> Vec<PersistedLogEntry> {
1930        self.get_event_log_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
1931            .await
1932    }
1933
1934    pub async fn get_event_log_trimable(
1935        &self,
1936        pos: Option<EventLogTrimableId>,
1937        limit: u64,
1938    ) -> Vec<PersistedLogEntry> {
1939        self.get_event_log_trimable_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
1940            .await
1941    }
1942
1943    pub async fn get_event_log_dbtx<Cap>(
1944        &self,
1945        dbtx: &mut DatabaseTransaction<'_, Cap>,
1946        pos: Option<EventLogId>,
1947        limit: u64,
1948    ) -> Vec<PersistedLogEntry>
1949    where
1950        Cap: Send,
1951    {
1952        dbtx.get_event_log(pos, limit).await
1953    }
1954
1955    pub async fn get_event_log_trimable_dbtx<Cap>(
1956        &self,
1957        dbtx: &mut DatabaseTransaction<'_, Cap>,
1958        pos: Option<EventLogTrimableId>,
1959        limit: u64,
1960    ) -> Vec<PersistedLogEntry>
1961    where
1962        Cap: Send,
1963    {
1964        dbtx.get_event_log_trimable(pos, limit).await
1965    }
1966
1967    /// Register to receiver all new transient (unpersisted) events
1968    pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
1969        self.log_event_added_transient_tx.subscribe()
1970    }
1971
1972    /// Get a receiver that signals when new events are added to the event log
1973    pub fn log_event_added_rx(&self) -> watch::Receiver<()> {
1974        self.log_event_added_rx.clone()
1975    }
1976
1977    pub fn iroh_enable_dht(&self) -> bool {
1978        self.iroh_enable_dht
1979    }
1980
1981    pub(crate) async fn run_core_migrations(
1982        db_no_decoders: &Database,
1983    ) -> Result<(), anyhow::Error> {
1984        let mut dbtx = db_no_decoders.begin_transaction().await;
1985        apply_migrations_core_client_dbtx(&mut dbtx.to_ref_nc(), "fedimint-client".to_string())
1986            .await?;
1987        if is_running_in_test_env() {
1988            verify_client_db_integrity_dbtx(&mut dbtx.to_ref_nc()).await;
1989        }
1990        dbtx.commit_tx_result().await?;
1991        Ok(())
1992    }
1993
1994    /// Iterator over primary modules for a given `unit`
1995    fn primary_modules_for_unit(
1996        &self,
1997        unit: AmountUnit,
1998    ) -> impl Iterator<Item = (ModuleInstanceId, &DynClientModule)> {
1999        self.primary_modules
2000            .iter()
2001            .flat_map(move |(_prio, candidates)| {
2002                candidates
2003                    .specific
2004                    .get(&unit)
2005                    .into_iter()
2006                    .flatten()
2007                    .copied()
2008                    // within same priority, wildcard matches come last
2009                    .chain(candidates.wildcard.iter().copied())
2010            })
2011            .map(|id| (id, self.modules.get_expect(id)))
2012    }
2013
2014    /// Primary module to use for `unit`
2015    ///
2016    /// Currently, just pick the first (highest priority) match
2017    pub fn primary_module_for_unit(
2018        &self,
2019        unit: AmountUnit,
2020    ) -> Option<(ModuleInstanceId, &DynClientModule)> {
2021        self.primary_modules_for_unit(unit).next()
2022    }
2023
2024    /// [`Self::primary_module_for_unit`] for Bitcoin
2025    pub fn primary_module_for_btc(&self) -> (ModuleInstanceId, &DynClientModule) {
2026        self.primary_module_for_unit(AmountUnit::BITCOIN)
2027            .expect("No primary module for Bitcoin")
2028    }
2029}
2030
2031#[apply(async_trait_maybe_send!)]
2032impl ClientContextIface for Client {
2033    fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
2034        Client::get_module(self, instance)
2035    }
2036
2037    fn api_clone(&self) -> DynGlobalApi {
2038        Client::api_clone(self)
2039    }
2040    fn decoders(&self) -> &ModuleDecoderRegistry {
2041        Client::decoders(self)
2042    }
2043
2044    async fn finalize_and_submit_transaction(
2045        &self,
2046        operation_id: OperationId,
2047        operation_type: &str,
2048        operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
2049        tx_builder: TransactionBuilder,
2050    ) -> anyhow::Result<OutPointRange> {
2051        Client::finalize_and_submit_transaction(
2052            self,
2053            operation_id,
2054            operation_type,
2055            // |out_point_range| operation_meta_gen(out_point_range),
2056            &operation_meta_gen,
2057            tx_builder,
2058        )
2059        .await
2060    }
2061
2062    async fn finalize_and_submit_transaction_inner(
2063        &self,
2064        dbtx: &mut DatabaseTransaction<'_>,
2065        operation_id: OperationId,
2066        tx_builder: TransactionBuilder,
2067    ) -> anyhow::Result<OutPointRange> {
2068        Client::finalize_and_submit_transaction_inner(self, dbtx, operation_id, tx_builder).await
2069    }
2070
2071    async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
2072        Client::transaction_updates(self, operation_id).await
2073    }
2074
2075    async fn await_primary_module_outputs(
2076        &self,
2077        operation_id: OperationId,
2078        // TODO: make `impl Iterator<Item = ...>`
2079        outputs: Vec<OutPoint>,
2080    ) -> anyhow::Result<()> {
2081        Client::await_primary_bitcoin_module_outputs(self, operation_id, outputs).await
2082    }
2083
2084    fn operation_log(&self) -> &dyn IOperationLog {
2085        Client::operation_log(self)
2086    }
2087
2088    async fn has_active_states(&self, operation_id: OperationId) -> bool {
2089        Client::has_active_states(self, operation_id).await
2090    }
2091
2092    async fn operation_exists(&self, operation_id: OperationId) -> bool {
2093        Client::operation_exists(self, operation_id).await
2094    }
2095
2096    async fn config(&self) -> ClientConfig {
2097        Client::config(self).await
2098    }
2099
2100    fn db(&self) -> &Database {
2101        Client::db(self)
2102    }
2103
2104    fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static)) {
2105        Client::executor(self)
2106    }
2107
2108    async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
2109        Client::invite_code(self, peer).await
2110    }
2111
2112    fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
2113        Client::get_internal_payment_markers(self)
2114    }
2115
2116    async fn log_event_json(
2117        &self,
2118        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
2119        module_kind: Option<ModuleKind>,
2120        module_id: ModuleInstanceId,
2121        kind: EventKind,
2122        payload: serde_json::Value,
2123        persist: EventPersistence,
2124    ) {
2125        dbtx.ensure_global()
2126            .expect("Must be called with global dbtx");
2127        self.log_event_raw_dbtx(
2128            dbtx,
2129            kind,
2130            module_kind.map(|kind| (kind, module_id)),
2131            serde_json::to_vec(&payload).expect("Serialization can't fail"),
2132            persist,
2133        )
2134        .await;
2135    }
2136
2137    async fn read_operation_active_states<'dbtx>(
2138        &self,
2139        operation_id: OperationId,
2140        module_id: ModuleInstanceId,
2141        dbtx: &'dbtx mut DatabaseTransaction<'_>,
2142    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>
2143    {
2144        Box::pin(
2145            dbtx.find_by_prefix(&ActiveModuleOperationStateKeyPrefix {
2146                operation_id,
2147                module_instance: module_id,
2148            })
2149            .await
2150            .map(move |(k, v)| (k.0, v)),
2151        )
2152    }
2153    async fn read_operation_inactive_states<'dbtx>(
2154        &self,
2155        operation_id: OperationId,
2156        module_id: ModuleInstanceId,
2157        dbtx: &'dbtx mut DatabaseTransaction<'_>,
2158    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>
2159    {
2160        Box::pin(
2161            dbtx.find_by_prefix(&InactiveModuleOperationStateKeyPrefix {
2162                operation_id,
2163                module_instance: module_id,
2164            })
2165            .await
2166            .map(move |(k, v)| (k.0, v)),
2167        )
2168    }
2169}
2170
2171// TODO: impl `Debug` for `Client` and derive here
2172impl fmt::Debug for Client {
2173    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2174        write!(f, "Client")
2175    }
2176}
2177
2178pub fn client_decoders<'a>(
2179    registry: &ModuleInitRegistry<DynClientModuleInit>,
2180    module_kinds: impl Iterator<Item = (ModuleInstanceId, &'a ModuleKind)>,
2181) -> ModuleDecoderRegistry {
2182    let mut modules = BTreeMap::new();
2183    for (id, kind) in module_kinds {
2184        let Some(init) = registry.get(kind) else {
2185            debug!("Detected configuration for unsupported module id: {id}, kind: {kind}");
2186            continue;
2187        };
2188
2189        modules.insert(
2190            id,
2191            (
2192                kind.clone(),
2193                IClientModuleInit::decoder(AsRef::<dyn IClientModuleInit + 'static>::as_ref(init)),
2194            ),
2195        );
2196    }
2197    ModuleDecoderRegistry::from(modules)
2198}