fedimint_client/
client.rs

1use std::collections::{BTreeMap, HashSet};
2use std::fmt::{self, Formatter};
3use std::future::{Future, pending};
4use std::ops::Range;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9use anyhow::{Context as _, anyhow, bail, format_err};
10use async_stream::try_stream;
11use bitcoin::key::Secp256k1;
12use bitcoin::key::rand::thread_rng;
13use bitcoin::secp256k1::{self, PublicKey};
14use fedimint_api_client::api::global_api::with_request_hook::ApiRequestHook;
15use fedimint_api_client::api::net::ConnectorType;
16use fedimint_api_client::api::{
17    ApiVersionSet, DynGlobalApi, FederationApiExt as _, FederationResult, IGlobalFederationApi,
18};
19use fedimint_client_module::module::recovery::RecoveryProgress;
20use fedimint_client_module::module::{
21    ClientContextIface, ClientModule, ClientModuleRegistry, DynClientModule, FinalClientIface,
22    IClientModule, IdxRange, OutPointRange, PrimaryModulePriority,
23};
24use fedimint_client_module::oplog::IOperationLog;
25use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy as _};
26use fedimint_client_module::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
27use fedimint_client_module::sm::{ActiveStateMeta, DynState, InactiveStateMeta};
28use fedimint_client_module::transaction::{
29    TRANSACTION_SUBMISSION_MODULE_INSTANCE, TransactionBuilder, TxSubmissionStates,
30    TxSubmissionStatesSM,
31};
32use fedimint_client_module::{
33    AddStateMachinesResult, ClientModuleInstance, GetInviteCodeRequest, ModuleGlobalContextGen,
34    ModuleRecoveryCompleted, TransactionUpdates, TxCreatedEvent,
35};
36use fedimint_connectors::ConnectorRegistry;
37use fedimint_core::config::{
38    ClientConfig, FederationId, GlobalClientConfig, JsonClientConfig, ModuleInitRegistry,
39};
40use fedimint_core::core::{DynInput, DynOutput, ModuleInstanceId, ModuleKind, OperationId};
41use fedimint_core::db::{
42    AutocommitError, Database, DatabaseRecord, DatabaseTransaction,
43    IDatabaseTransactionOpsCore as _, IDatabaseTransactionOpsCoreTyped as _, NonCommittable,
44};
45use fedimint_core::encoding::{Decodable, Encodable};
46use fedimint_core::endpoint_constants::{CLIENT_CONFIG_ENDPOINT, VERSION_ENDPOINT};
47use fedimint_core::envs::is_running_in_test_env;
48use fedimint_core::invite_code::InviteCode;
49use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
50use fedimint_core::module::{
51    AmountUnit, Amounts, ApiRequestErased, ApiVersion, MultiApiVersion,
52    SupportedApiVersionsSummary, SupportedCoreApiVersions, SupportedModuleApiVersions,
53};
54use fedimint_core::net::api_announcement::SignedApiAnnouncement;
55use fedimint_core::runtime::sleep;
56use fedimint_core::task::{Elapsed, MaybeSend, MaybeSync, TaskGroup};
57use fedimint_core::transaction::Transaction;
58use fedimint_core::util::backoff_util::custom_backoff;
59use fedimint_core::util::{
60    BoxStream, FmtCompact as _, FmtCompactAnyhow as _, SafeUrl, backoff_util, retry,
61};
62use fedimint_core::{
63    Amount, NumPeers, OutPoint, PeerId, apply, async_trait_maybe_send, maybe_add_send,
64    maybe_add_send_sync, runtime,
65};
66use fedimint_derive_secret::DerivableSecret;
67use fedimint_eventlog::{
68    DBTransactionEventLogExt as _, DynEventLogTrimableTracker, Event, EventKind, EventLogEntry,
69    EventLogId, EventLogTrimableId, EventLogTrimableTracker, EventPersistence, PersistedLogEntry,
70};
71use fedimint_logging::{LOG_CLIENT, LOG_CLIENT_NET_API, LOG_CLIENT_RECOVERY};
72use futures::stream::FuturesUnordered;
73use futures::{Stream, StreamExt as _};
74use global_ctx::ModuleGlobalClientContext;
75use serde::{Deserialize, Serialize};
76use tokio::sync::{broadcast, watch};
77use tokio_stream::wrappers::WatchStream;
78use tracing::{debug, info, warn};
79
80use crate::ClientBuilder;
81use crate::api_announcements::{ApiAnnouncementPrefix, get_api_urls};
82use crate::backup::Metadata;
83use crate::client::event_log::DefaultApplicationEventLogKey;
84use crate::db::{
85    ApiSecretKey, CachedApiVersionSet, CachedApiVersionSetKey, ChronologicalOperationLogKey,
86    ClientConfigKey, ClientMetadataKey, ClientModuleRecovery, ClientModuleRecoveryState,
87    EncodedClientSecretKey, OperationLogKey, PeerLastApiVersionsSummary,
88    PeerLastApiVersionsSummaryKey, PendingClientConfigKey, apply_migrations_core_client_dbtx,
89    get_decoded_client_secret, verify_client_db_integrity_dbtx,
90};
91use crate::meta::MetaService;
92use crate::module_init::{ClientModuleInitRegistry, DynClientModuleInit, IClientModuleInit};
93use crate::oplog::OperationLog;
94use crate::sm::executor::{
95    ActiveModuleOperationStateKeyPrefix, ActiveOperationStateKeyPrefix, Executor,
96    InactiveModuleOperationStateKeyPrefix, InactiveOperationStateKeyPrefix,
97};
98
99pub(crate) mod builder;
100pub(crate) mod event_log;
101pub(crate) mod global_ctx;
102pub(crate) mod handle;
103
104/// List of core api versions supported by the implementation.
105/// Notably `major` version is the one being supported, and corresponding
106/// `minor` version is the one required (for given `major` version).
107const SUPPORTED_CORE_API_VERSIONS: &[fedimint_core::module::ApiVersion] =
108    &[ApiVersion { major: 0, minor: 0 }];
109
110/// Primary module candidates at specific priority level
111#[derive(Default)]
112pub(crate) struct PrimaryModuleCandidates {
113    /// Modules that listed specific units they handle
114    specific: BTreeMap<AmountUnit, Vec<ModuleInstanceId>>,
115    /// Modules handling any unit
116    wildcard: Vec<ModuleInstanceId>,
117}
118
119/// Main client type
120///
121/// A handle and API to interacting with a single federation. End user
122/// applications that want to support interacting with multiple federations at
123/// the same time, will need to instantiate and manage multiple instances of
124/// this struct.
125///
126/// Under the hood it is starting and managing service tasks, state machines,
127/// database and other resources required.
128///
129/// This type is shared externally and internally, and
130/// [`crate::ClientHandle`] is responsible for external lifecycle management
131/// and resource freeing of the [`Client`].
132pub struct Client {
133    final_client: FinalClientIface,
134    config: tokio::sync::RwLock<ClientConfig>,
135    api_secret: Option<String>,
136    decoders: ModuleDecoderRegistry,
137    connectors: ConnectorRegistry,
138    db: Database,
139    federation_id: FederationId,
140    federation_config_meta: BTreeMap<String, String>,
141    primary_modules: BTreeMap<PrimaryModulePriority, PrimaryModuleCandidates>,
142    pub(crate) modules: ClientModuleRegistry,
143    module_inits: ClientModuleInitRegistry,
144    executor: Executor,
145    pub(crate) api: DynGlobalApi,
146    root_secret: DerivableSecret,
147    operation_log: OperationLog,
148    secp_ctx: Secp256k1<secp256k1::All>,
149    meta_service: Arc<MetaService>,
150    connector: ConnectorType,
151
152    task_group: TaskGroup,
153
154    /// Updates about client recovery progress
155    client_recovery_progress_receiver:
156        watch::Receiver<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
157
158    /// Internal client sender to wake up log ordering task every time a
159    /// (unuordered) log event is added.
160    log_ordering_wakeup_tx: watch::Sender<()>,
161    /// Receiver for events fired every time (ordered) log event is added.
162    log_event_added_rx: watch::Receiver<()>,
163    log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
164    request_hook: ApiRequestHook,
165    iroh_enable_dht: bool,
166    iroh_enable_next: bool,
167}
168
169#[derive(Debug, Serialize, Deserialize)]
170struct ListOperationsParams {
171    limit: Option<usize>,
172    last_seen: Option<ChronologicalOperationLogKey>,
173}
174
175#[derive(Debug, Clone, Serialize, Deserialize)]
176pub struct GetOperationIdRequest {
177    operation_id: OperationId,
178}
179
180#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct GetBalanceChangesRequest {
182    #[serde(default = "AmountUnit::bitcoin")]
183    unit: AmountUnit,
184}
185
186impl Client {
187    /// Initialize a client builder that can be configured to create a new
188    /// client.
189    pub async fn builder() -> anyhow::Result<ClientBuilder> {
190        Ok(ClientBuilder::new())
191    }
192
193    pub fn api(&self) -> &(dyn IGlobalFederationApi + 'static) {
194        self.api.as_ref()
195    }
196
197    pub fn api_clone(&self) -> DynGlobalApi {
198        self.api.clone()
199    }
200
201    /// Get the [`TaskGroup`] that is tied to Client's lifetime.
202    pub fn task_group(&self) -> &TaskGroup {
203        &self.task_group
204    }
205
206    /// Useful for our CLI tooling, not meant for external use
207    #[doc(hidden)]
208    pub fn executor(&self) -> &Executor {
209        &self.executor
210    }
211
212    pub async fn get_config_from_db(db: &Database) -> Option<ClientConfig> {
213        let mut dbtx = db.begin_transaction_nc().await;
214        dbtx.get_value(&ClientConfigKey).await
215    }
216
217    pub async fn get_pending_config_from_db(db: &Database) -> Option<ClientConfig> {
218        let mut dbtx = db.begin_transaction_nc().await;
219        dbtx.get_value(&PendingClientConfigKey).await
220    }
221
222    pub async fn get_api_secret_from_db(db: &Database) -> Option<String> {
223        let mut dbtx = db.begin_transaction_nc().await;
224        dbtx.get_value(&ApiSecretKey).await
225    }
226
227    pub async fn store_encodable_client_secret<T: Encodable>(
228        db: &Database,
229        secret: T,
230    ) -> anyhow::Result<()> {
231        let mut dbtx = db.begin_transaction().await;
232
233        // Don't overwrite an existing secret
234        if dbtx.get_value(&EncodedClientSecretKey).await.is_some() {
235            bail!("Encoded client secret already exists, cannot overwrite")
236        }
237
238        let encoded_secret = T::consensus_encode_to_vec(&secret);
239        dbtx.insert_entry(&EncodedClientSecretKey, &encoded_secret)
240            .await;
241        dbtx.commit_tx().await;
242        Ok(())
243    }
244
245    pub async fn load_decodable_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
246        let Some(secret) = Self::load_decodable_client_secret_opt(db).await? else {
247            bail!("Encoded client secret not present in DB")
248        };
249
250        Ok(secret)
251    }
252    pub async fn load_decodable_client_secret_opt<T: Decodable>(
253        db: &Database,
254    ) -> anyhow::Result<Option<T>> {
255        let mut dbtx = db.begin_transaction_nc().await;
256
257        let client_secret = dbtx.get_value(&EncodedClientSecretKey).await;
258
259        Ok(match client_secret {
260            Some(client_secret) => Some(
261                T::consensus_decode_whole(&client_secret, &ModuleRegistry::default())
262                    .map_err(|e| anyhow!("Decoding failed: {e}"))?,
263            ),
264            None => None,
265        })
266    }
267
268    pub async fn load_or_generate_client_secret(db: &Database) -> anyhow::Result<[u8; 64]> {
269        let client_secret = match Self::load_decodable_client_secret::<[u8; 64]>(db).await {
270            Ok(secret) => secret,
271            _ => {
272                let secret = PlainRootSecretStrategy::random(&mut thread_rng());
273                Self::store_encodable_client_secret(db, secret)
274                    .await
275                    .expect("Storing client secret must work");
276                secret
277            }
278        };
279        Ok(client_secret)
280    }
281
282    pub async fn is_initialized(db: &Database) -> bool {
283        let mut dbtx = db.begin_transaction_nc().await;
284        dbtx.raw_get_bytes(&[ClientConfigKey::DB_PREFIX])
285            .await
286            .expect("Unrecoverable error occurred while reading and entry from the database")
287            .is_some()
288    }
289
290    pub fn start_executor(self: &Arc<Self>) {
291        debug!(
292            target: LOG_CLIENT,
293            "Starting fedimint client executor",
294        );
295        self.executor.start_executor(self.context_gen());
296    }
297
298    pub fn federation_id(&self) -> FederationId {
299        self.federation_id
300    }
301
302    fn context_gen(self: &Arc<Self>) -> ModuleGlobalContextGen {
303        let client_inner = Arc::downgrade(self);
304        Arc::new(move |module_instance, operation| {
305            ModuleGlobalClientContext {
306                client: client_inner
307                    .clone()
308                    .upgrade()
309                    .expect("ModuleGlobalContextGen called after client was dropped"),
310                module_instance_id: module_instance,
311                operation,
312            }
313            .into()
314        })
315    }
316
317    pub async fn config(&self) -> ClientConfig {
318        self.config.read().await.clone()
319    }
320
321    // TODO: change to `-> Option<&str>`
322    pub fn api_secret(&self) -> &Option<String> {
323        &self.api_secret
324    }
325
326    /// Returns the core API version that the federation supports
327    ///
328    /// This reads from the cached version stored during client initialization.
329    /// If no cache is available (e.g., during initial setup), returns a default
330    /// version (0, 0).
331    pub async fn core_api_version(&self) -> ApiVersion {
332        // Try to get from cache. If not available, return a conservative
333        // default. The cache should always be populated after successful client init.
334        self.db
335            .begin_transaction_nc()
336            .await
337            .get_value(&CachedApiVersionSetKey)
338            .await
339            .map(|cached: CachedApiVersionSet| cached.0.core)
340            .unwrap_or(ApiVersion { major: 0, minor: 0 })
341    }
342
343    pub fn decoders(&self) -> &ModuleDecoderRegistry {
344        &self.decoders
345    }
346
347    /// Returns a reference to the module, panics if not found
348    fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
349        self.try_get_module(instance)
350            .expect("Module instance not found")
351    }
352
353    fn try_get_module(
354        &self,
355        instance: ModuleInstanceId,
356    ) -> Option<&maybe_add_send_sync!(dyn IClientModule)> {
357        Some(self.modules.get(instance)?.as_ref())
358    }
359
360    pub fn has_module(&self, instance: ModuleInstanceId) -> bool {
361        self.modules.get(instance).is_some()
362    }
363
364    /// Returns the input amount and output amount of a transaction
365    ///
366    /// # Panics
367    /// If any of the input or output versions in the transaction builder are
368    /// unknown by the respective module.
369    fn transaction_builder_get_balance(&self, builder: &TransactionBuilder) -> (Amounts, Amounts) {
370        // FIXME: prevent overflows, currently not suitable for untrusted input
371        let mut in_amounts = Amounts::ZERO;
372        let mut out_amounts = Amounts::ZERO;
373        let mut fee_amounts = Amounts::ZERO;
374
375        for input in builder.inputs() {
376            let module = self.get_module(input.input.module_instance_id());
377
378            let item_fees = module.input_fee(&input.amounts, &input.input).expect(
379                "We only build transactions with input versions that are supported by the module",
380            );
381
382            in_amounts.checked_add_mut(&input.amounts);
383            fee_amounts.checked_add_mut(&item_fees);
384        }
385
386        for output in builder.outputs() {
387            let module = self.get_module(output.output.module_instance_id());
388
389            let item_fees = module.output_fee(&output.amounts, &output.output).expect(
390                "We only build transactions with output versions that are supported by the module",
391            );
392
393            out_amounts.checked_add_mut(&output.amounts);
394            fee_amounts.checked_add_mut(&item_fees);
395        }
396
397        out_amounts.checked_add_mut(&fee_amounts);
398        (in_amounts, out_amounts)
399    }
400
401    pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
402        Ok((self.federation_id().to_fake_ln_pub_key(&self.secp_ctx)?, 0))
403    }
404
405    /// Get metadata value from the federation config itself
406    pub fn get_config_meta(&self, key: &str) -> Option<String> {
407        self.federation_config_meta.get(key).cloned()
408    }
409
410    pub(crate) fn root_secret(&self) -> DerivableSecret {
411        self.root_secret.clone()
412    }
413
414    pub async fn add_state_machines(
415        &self,
416        dbtx: &mut DatabaseTransaction<'_>,
417        states: Vec<DynState>,
418    ) -> AddStateMachinesResult {
419        self.executor.add_state_machines_dbtx(dbtx, states).await
420    }
421
422    // TODO: implement as part of [`OperationLog`]
423    pub async fn get_active_operations(&self) -> HashSet<OperationId> {
424        let active_states = self.executor.get_active_states().await;
425        let mut active_operations = HashSet::with_capacity(active_states.len());
426        let mut dbtx = self.db().begin_transaction_nc().await;
427        for (state, _) in active_states {
428            let operation_id = state.operation_id();
429            if dbtx
430                .get_value(&OperationLogKey { operation_id })
431                .await
432                .is_some()
433            {
434                active_operations.insert(operation_id);
435            }
436        }
437        active_operations
438    }
439
440    pub fn operation_log(&self) -> &OperationLog {
441        &self.operation_log
442    }
443
444    /// Get the meta manager to read meta fields.
445    pub fn meta_service(&self) -> &Arc<MetaService> {
446        &self.meta_service
447    }
448
449    /// Get the meta manager to read meta fields.
450    pub async fn get_meta_expiration_timestamp(&self) -> Option<SystemTime> {
451        let meta_service = self.meta_service();
452        let ts = meta_service
453            .get_field::<u64>(self.db(), "federation_expiry_timestamp")
454            .await
455            .and_then(|v| v.value)?;
456        Some(UNIX_EPOCH + Duration::from_secs(ts))
457    }
458
459    /// Adds funding to a transaction or removes over-funding via change.
460    async fn finalize_transaction(
461        &self,
462        dbtx: &mut DatabaseTransaction<'_>,
463        operation_id: OperationId,
464        mut partial_transaction: TransactionBuilder,
465    ) -> anyhow::Result<(Transaction, Vec<DynState>, Range<u64>)> {
466        let (in_amounts, out_amounts) = self.transaction_builder_get_balance(&partial_transaction);
467
468        let mut added_inputs_bundles = vec![];
469        let mut added_outputs_bundles = vec![];
470
471        // The way currently things are implemented is OK for modules which can
472        // collect a fee relative to one being used, but will break down in any
473        // fancy scenarios. Future TODOs:
474        //
475        // * create_final_inputs_and_outputs needs to get broken down, so we can use
476        //   primary modules using priorities (possibly separate prios for inputs and
477        //   outputs to be able to drain modules, etc.); we need the split "check if
478        //   possible" and "take" steps,
479        // * extra inputs and outputs adding fees needs to be taken into account,
480        //   possibly with some looping
481        for unit in in_amounts.units().union(&out_amounts.units()) {
482            let input_amount = in_amounts.get(unit).copied().unwrap_or_default();
483            let output_amount = out_amounts.get(unit).copied().unwrap_or_default();
484            if input_amount == output_amount {
485                continue;
486            }
487
488            let Some((module_id, module)) = self.primary_module_for_unit(*unit) else {
489                bail!("No module to balance a partial transaction (affected unit: {unit:?}");
490            };
491
492            let (added_input_bundle, added_output_bundle) = module
493                .create_final_inputs_and_outputs(
494                    module_id,
495                    dbtx,
496                    operation_id,
497                    *unit,
498                    input_amount,
499                    output_amount,
500                )
501                .await?;
502
503            added_inputs_bundles.push(added_input_bundle);
504            added_outputs_bundles.push(added_output_bundle);
505        }
506
507        // This is the range of  outputs that will be added to the transaction
508        // in order to balance it. Notice that it may stay empty in case the transaction
509        // is already balanced.
510        let change_range = Range {
511            start: partial_transaction.outputs().count() as u64,
512            end: (partial_transaction.outputs().count() as u64
513                + added_outputs_bundles
514                    .iter()
515                    .map(|output| output.outputs().len() as u64)
516                    .sum::<u64>()),
517        };
518
519        for added_inputs in added_inputs_bundles {
520            partial_transaction = partial_transaction.with_inputs(added_inputs);
521        }
522
523        for added_outputs in added_outputs_bundles {
524            partial_transaction = partial_transaction.with_outputs(added_outputs);
525        }
526
527        let (input_amounts, output_amounts) =
528            self.transaction_builder_get_balance(&partial_transaction);
529
530        for (unit, output_amount) in output_amounts {
531            let input_amount = input_amounts.get(&unit).copied().unwrap_or_default();
532
533            assert!(input_amount >= output_amount, "Transaction is underfunded");
534        }
535
536        let (tx, states) = partial_transaction.build(&self.secp_ctx, thread_rng());
537
538        Ok((tx, states, change_range))
539    }
540
541    /// Add funding and/or change to the transaction builder as needed, finalize
542    /// the transaction and submit it to the federation.
543    ///
544    /// ## Errors
545    /// The function will return an error if the operation with given ID already
546    /// exists.
547    ///
548    /// ## Panics
549    /// The function will panic if the database transaction collides with
550    /// other and fails with others too often, this should not happen except for
551    /// excessively concurrent scenarios.
552    pub async fn finalize_and_submit_transaction<F, M>(
553        &self,
554        operation_id: OperationId,
555        operation_type: &str,
556        operation_meta_gen: F,
557        tx_builder: TransactionBuilder,
558    ) -> anyhow::Result<OutPointRange>
559    where
560        F: Fn(OutPointRange) -> M + Clone + MaybeSend + MaybeSync,
561        M: serde::Serialize + MaybeSend,
562    {
563        let operation_type = operation_type.to_owned();
564
565        let autocommit_res = self
566            .db
567            .autocommit(
568                |dbtx, _| {
569                    let operation_type = operation_type.clone();
570                    let tx_builder = tx_builder.clone();
571                    let operation_meta_gen = operation_meta_gen.clone();
572                    Box::pin(async move {
573                        if Client::operation_exists_dbtx(dbtx, operation_id).await {
574                            bail!("There already exists an operation with id {operation_id:?}")
575                        }
576
577                        let out_point_range = self
578                            .finalize_and_submit_transaction_inner(dbtx, operation_id, tx_builder)
579                            .await?;
580
581                        self.operation_log()
582                            .add_operation_log_entry_dbtx(
583                                dbtx,
584                                operation_id,
585                                &operation_type,
586                                operation_meta_gen(out_point_range),
587                            )
588                            .await;
589
590                        Ok(out_point_range)
591                    })
592                },
593                Some(100), // TODO: handle what happens after 100 retries
594            )
595            .await;
596
597        match autocommit_res {
598            Ok(txid) => Ok(txid),
599            Err(AutocommitError::ClosureError { error, .. }) => Err(error),
600            Err(AutocommitError::CommitFailed {
601                attempts,
602                last_error,
603            }) => panic!(
604                "Failed to commit tx submission dbtx after {attempts} attempts: {last_error}"
605            ),
606        }
607    }
608
609    async fn finalize_and_submit_transaction_inner(
610        &self,
611        dbtx: &mut DatabaseTransaction<'_>,
612        operation_id: OperationId,
613        tx_builder: TransactionBuilder,
614    ) -> anyhow::Result<OutPointRange> {
615        let (transaction, mut states, change_range) = self
616            .finalize_transaction(&mut dbtx.to_ref_nc(), operation_id, tx_builder)
617            .await?;
618
619        if transaction.consensus_encode_to_vec().len() > Transaction::MAX_TX_SIZE {
620            let inputs = transaction
621                .inputs
622                .iter()
623                .map(DynInput::module_instance_id)
624                .collect::<Vec<_>>();
625            let outputs = transaction
626                .outputs
627                .iter()
628                .map(DynOutput::module_instance_id)
629                .collect::<Vec<_>>();
630            warn!(
631                target: LOG_CLIENT_NET_API,
632                size=%transaction.consensus_encode_to_vec().len(),
633                ?inputs,
634                ?outputs,
635                "Transaction too large",
636            );
637            debug!(target: LOG_CLIENT_NET_API, ?transaction, "transaction details");
638            bail!(
639                "The generated transaction would be rejected by the federation for being too large."
640            );
641        }
642
643        let txid = transaction.tx_hash();
644
645        debug!(target: LOG_CLIENT_NET_API, %txid, ?transaction,  "Finalized and submitting transaction");
646
647        let tx_submission_sm = DynState::from_typed(
648            TRANSACTION_SUBMISSION_MODULE_INSTANCE,
649            TxSubmissionStatesSM {
650                operation_id,
651                state: TxSubmissionStates::Created(transaction),
652            },
653        );
654        states.push(tx_submission_sm);
655
656        self.executor.add_state_machines_dbtx(dbtx, states).await?;
657
658        self.log_event_dbtx(dbtx, None, TxCreatedEvent { txid, operation_id })
659            .await;
660
661        Ok(OutPointRange::new(txid, IdxRange::from(change_range)))
662    }
663
664    async fn transaction_update_stream(
665        &self,
666        operation_id: OperationId,
667    ) -> BoxStream<'static, TxSubmissionStatesSM> {
668        self.executor
669            .notifier()
670            .module_notifier::<TxSubmissionStatesSM>(
671                TRANSACTION_SUBMISSION_MODULE_INSTANCE,
672                self.final_client.clone(),
673            )
674            .subscribe(operation_id)
675            .await
676    }
677
678    pub async fn operation_exists(&self, operation_id: OperationId) -> bool {
679        let mut dbtx = self.db().begin_transaction_nc().await;
680
681        Client::operation_exists_dbtx(&mut dbtx, operation_id).await
682    }
683
684    pub async fn operation_exists_dbtx(
685        dbtx: &mut DatabaseTransaction<'_>,
686        operation_id: OperationId,
687    ) -> bool {
688        let active_state_exists = dbtx
689            .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
690            .await
691            .next()
692            .await
693            .is_some();
694
695        let inactive_state_exists = dbtx
696            .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
697            .await
698            .next()
699            .await
700            .is_some();
701
702        active_state_exists || inactive_state_exists
703    }
704
705    pub async fn has_active_states(&self, operation_id: OperationId) -> bool {
706        self.db
707            .begin_transaction_nc()
708            .await
709            .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
710            .await
711            .next()
712            .await
713            .is_some()
714    }
715
716    /// Waits for an output from the primary module to reach its final
717    /// state.
718    pub async fn await_primary_bitcoin_module_output(
719        &self,
720        operation_id: OperationId,
721        out_point: OutPoint,
722    ) -> anyhow::Result<()> {
723        self.primary_module_for_unit(AmountUnit::BITCOIN)
724            .ok_or_else(|| anyhow!("No primary module available"))?
725            .1
726            .await_primary_module_output(operation_id, out_point)
727            .await
728    }
729
730    /// Returns a reference to a typed module client instance by kind
731    pub fn get_first_module<M: ClientModule>(
732        &'_ self,
733    ) -> anyhow::Result<ClientModuleInstance<'_, M>> {
734        let module_kind = M::kind();
735        let id = self
736            .get_first_instance(&module_kind)
737            .ok_or_else(|| format_err!("No modules found of kind {module_kind}"))?;
738        let module: &M = self
739            .try_get_module(id)
740            .ok_or_else(|| format_err!("Unknown module instance {id}"))?
741            .as_any()
742            .downcast_ref::<M>()
743            .ok_or_else(|| format_err!("Module is not of type {}", std::any::type_name::<M>()))?;
744        let (db, _) = self.db().with_prefix_module_id(id);
745        Ok(ClientModuleInstance {
746            id,
747            db,
748            api: self.api().with_module(id),
749            module,
750        })
751    }
752
753    pub fn get_module_client_dyn(
754        &self,
755        instance_id: ModuleInstanceId,
756    ) -> anyhow::Result<&maybe_add_send_sync!(dyn IClientModule)> {
757        self.try_get_module(instance_id)
758            .ok_or(anyhow!("Unknown module instance {}", instance_id))
759    }
760
761    pub fn db(&self) -> &Database {
762        &self.db
763    }
764
765    pub fn endpoints(&self) -> &ConnectorRegistry {
766        &self.connectors
767    }
768
769    /// Returns a stream of transaction updates for the given operation id that
770    /// can later be used to watch for a specific transaction being accepted.
771    pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
772        TransactionUpdates {
773            update_stream: self.transaction_update_stream(operation_id).await,
774        }
775    }
776
777    /// Returns the instance id of the first module of the given kind.
778    pub fn get_first_instance(&self, module_kind: &ModuleKind) -> Option<ModuleInstanceId> {
779        self.modules
780            .iter_modules()
781            .find(|(_, kind, _module)| *kind == module_kind)
782            .map(|(instance_id, _, _)| instance_id)
783    }
784
785    /// Returns the data from which the client's root secret is derived (e.g.
786    /// BIP39 seed phrase struct).
787    pub async fn root_secret_encoding<T: Decodable>(&self) -> anyhow::Result<T> {
788        get_decoded_client_secret::<T>(self.db()).await
789    }
790
791    /// Waits for outputs from the primary module to reach its final
792    /// state.
793    pub async fn await_primary_bitcoin_module_outputs(
794        &self,
795        operation_id: OperationId,
796        outputs: Vec<OutPoint>,
797    ) -> anyhow::Result<()> {
798        for out_point in outputs {
799            self.await_primary_bitcoin_module_output(operation_id, out_point)
800                .await?;
801        }
802
803        Ok(())
804    }
805
806    /// Returns the config of the client in JSON format.
807    ///
808    /// Compared to the consensus module format where module configs are binary
809    /// encoded this format cannot be cryptographically verified but is easier
810    /// to consume and to some degree human-readable.
811    pub async fn get_config_json(&self) -> JsonClientConfig {
812        self.config().await.to_json()
813    }
814
815    // Ideally this would not be in the API, but there's a lot of places where this
816    // makes it easier.
817    #[doc(hidden)]
818    /// Like [`Self::get_balance`] but returns an error if primary module is not
819    /// available
820    pub async fn get_balance_for_btc(&self) -> anyhow::Result<Amount> {
821        self.get_balance_for_unit(AmountUnit::BITCOIN).await
822    }
823
824    pub async fn get_balance_for_unit(&self, unit: AmountUnit) -> anyhow::Result<Amount> {
825        let (id, module) = self
826            .primary_module_for_unit(unit)
827            .ok_or_else(|| anyhow!("Primary module not available"))?;
828        Ok(module
829            .get_balance(id, &mut self.db().begin_transaction_nc().await, unit)
830            .await)
831    }
832
833    /// Returns a stream that yields the current client balance every time it
834    /// changes.
835    pub async fn subscribe_balance_changes(&self, unit: AmountUnit) -> BoxStream<'static, Amount> {
836        let primary_module_things =
837            if let Some((primary_module_id, primary_module)) = self.primary_module_for_unit(unit) {
838                let balance_changes = primary_module.subscribe_balance_changes().await;
839                let initial_balance = self
840                    .get_balance_for_unit(unit)
841                    .await
842                    .expect("Primary is present");
843
844                Some((
845                    primary_module_id,
846                    primary_module.clone(),
847                    balance_changes,
848                    initial_balance,
849                ))
850            } else {
851                None
852            };
853        let db = self.db().clone();
854
855        Box::pin(async_stream::stream! {
856            let Some((primary_module_id, primary_module, mut balance_changes, initial_balance)) = primary_module_things else {
857                // If there is no primary module, there will not be one until client is
858                // restarted
859                pending().await
860            };
861
862
863            yield initial_balance;
864            let mut prev_balance = initial_balance;
865            while let Some(()) = balance_changes.next().await {
866                let mut dbtx = db.begin_transaction_nc().await;
867                let balance = primary_module
868                     .get_balance(primary_module_id, &mut dbtx, unit)
869                    .await;
870
871                // Deduplicate in case modules cannot always tell if the balance actually changed
872                if balance != prev_balance {
873                    prev_balance = balance;
874                    yield balance;
875                }
876            }
877        })
878    }
879
880    /// Make a single API version request to a peer after a delay.
881    ///
882    /// The delay is here to unify the type of a future both for initial request
883    /// and possible retries.
884    async fn make_api_version_request(
885        delay: Duration,
886        peer_id: PeerId,
887        api: &DynGlobalApi,
888    ) -> (
889        PeerId,
890        Result<SupportedApiVersionsSummary, fedimint_connectors::error::ServerError>,
891    ) {
892        runtime::sleep(delay).await;
893        (
894            peer_id,
895            api.request_single_peer::<SupportedApiVersionsSummary>(
896                VERSION_ENDPOINT.to_owned(),
897                ApiRequestErased::default(),
898                peer_id,
899            )
900            .await,
901        )
902    }
903
904    /// Create a backoff strategy for API version requests.
905    ///
906    /// Keep trying, initially somewhat aggressively, but after a while retry
907    /// very slowly, because chances for response are getting lower and
908    /// lower.
909    fn create_api_version_backoff() -> impl Iterator<Item = Duration> {
910        custom_backoff(Duration::from_millis(200), Duration::from_secs(600), None)
911    }
912
913    /// Query the federation for API version support and then calculate
914    /// the best API version to use (supported by most guardians).
915    pub async fn fetch_common_api_versions_from_all_peers(
916        num_peers: NumPeers,
917        api: DynGlobalApi,
918        db: Database,
919        num_responses_sender: watch::Sender<usize>,
920    ) {
921        let mut backoff = Self::create_api_version_backoff();
922
923        // NOTE: `FuturesUnordered` is a footgun, but since we only poll it for result
924        // and make a single async db write operation, it should be OK.
925        let mut requests = FuturesUnordered::new();
926
927        for peer_id in num_peers.peer_ids() {
928            requests.push(Self::make_api_version_request(
929                Duration::ZERO,
930                peer_id,
931                &api,
932            ));
933        }
934
935        let mut num_responses = 0;
936
937        while let Some((peer_id, response)) = requests.next().await {
938            let retry = match response {
939                Err(err) => {
940                    let has_previous_response = db
941                        .begin_transaction_nc()
942                        .await
943                        .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
944                        .await
945                        .is_some();
946                    debug!(
947                        target: LOG_CLIENT,
948                        %peer_id,
949                        err = %err.fmt_compact(),
950                        %has_previous_response,
951                        "Failed to refresh API versions of a peer"
952                    );
953
954                    !has_previous_response
955                }
956                Ok(o) => {
957                    // Save the response to the database right away, just to
958                    // not lose it
959                    let mut dbtx = db.begin_transaction().await;
960                    dbtx.insert_entry(
961                        &PeerLastApiVersionsSummaryKey(peer_id),
962                        &PeerLastApiVersionsSummary(o),
963                    )
964                    .await;
965                    dbtx.commit_tx().await;
966                    false
967                }
968            };
969
970            if retry {
971                requests.push(Self::make_api_version_request(
972                    backoff.next().expect("Keeps retrying"),
973                    peer_id,
974                    &api,
975                ));
976            } else {
977                num_responses += 1;
978                num_responses_sender.send_replace(num_responses);
979            }
980        }
981    }
982
983    /// Fetch API versions from peers, retrying until we get threshold number of
984    /// successful responses. Returns the successful responses collected
985    /// from at least `num_peers.threshold()` peers.
986    pub async fn fetch_peers_api_versions_from_threshold_of_peers(
987        num_peers: NumPeers,
988        api: DynGlobalApi,
989    ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
990        let mut backoff = Self::create_api_version_backoff();
991
992        // NOTE: `FuturesUnordered` is a footgun, but since we only poll it for result
993        // and collect responses, it should be OK.
994        let mut requests = FuturesUnordered::new();
995
996        for peer_id in num_peers.peer_ids() {
997            requests.push(Self::make_api_version_request(
998                Duration::ZERO,
999                peer_id,
1000                &api,
1001            ));
1002        }
1003
1004        let mut successful_responses = BTreeMap::new();
1005
1006        while successful_responses.len() < num_peers.threshold()
1007            && let Some((peer_id, response)) = requests.next().await
1008        {
1009            let retry = match response {
1010                Err(err) => {
1011                    debug!(
1012                        target: LOG_CLIENT,
1013                        %peer_id,
1014                        err = %err.fmt_compact(),
1015                        "Failed to fetch API versions from peer"
1016                    );
1017                    true
1018                }
1019                Ok(response) => {
1020                    successful_responses.insert(peer_id, response);
1021                    false
1022                }
1023            };
1024
1025            if retry {
1026                requests.push(Self::make_api_version_request(
1027                    backoff.next().expect("Keeps retrying"),
1028                    peer_id,
1029                    &api,
1030                ));
1031            }
1032        }
1033
1034        successful_responses
1035    }
1036
1037    /// Fetch API versions from peers and discover common API versions to use.
1038    pub async fn fetch_common_api_versions(
1039        config: &ClientConfig,
1040        api: &DynGlobalApi,
1041    ) -> anyhow::Result<BTreeMap<PeerId, SupportedApiVersionsSummary>> {
1042        debug!(
1043            target: LOG_CLIENT,
1044            "Fetching common api versions"
1045        );
1046
1047        let num_peers = NumPeers::from(config.global.api_endpoints.len());
1048
1049        let peer_api_version_sets =
1050            Self::fetch_peers_api_versions_from_threshold_of_peers(num_peers, api.clone()).await;
1051
1052        Ok(peer_api_version_sets)
1053    }
1054
1055    /// Write API version set to database cache.
1056    /// Used when we have a pre-calculated API version set that should be stored
1057    /// for later use.
1058    pub async fn write_api_version_cache(
1059        dbtx: &mut DatabaseTransaction<'_>,
1060        api_version_set: ApiVersionSet,
1061    ) {
1062        debug!(
1063            target: LOG_CLIENT,
1064            value = ?api_version_set,
1065            "Writing API version set to cache"
1066        );
1067
1068        dbtx.insert_entry(
1069            &CachedApiVersionSetKey,
1070            &CachedApiVersionSet(api_version_set),
1071        )
1072        .await;
1073    }
1074
1075    /// Store prefetched peer API version responses and calculate/store common
1076    /// API version set. This processes the individual peer responses by
1077    /// storing them in the database and calculating the common API version
1078    /// set for caching.
1079    pub async fn store_prefetched_api_versions(
1080        db: &Database,
1081        config: &ClientConfig,
1082        client_module_init: &ClientModuleInitRegistry,
1083        peer_api_versions: &BTreeMap<PeerId, SupportedApiVersionsSummary>,
1084    ) {
1085        debug!(
1086            target: LOG_CLIENT,
1087            "Storing {} prefetched peer API version responses and calculating common version set",
1088            peer_api_versions.len()
1089        );
1090
1091        let mut dbtx = db.begin_transaction().await;
1092        // Calculate common API version set from individual responses
1093        let client_supported_versions =
1094            Self::supported_api_versions_summary_static(config, client_module_init);
1095        match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1096            &client_supported_versions,
1097            peer_api_versions,
1098        ) {
1099            Ok(common_api_versions) => {
1100                // Write the calculated common API version set to database cache
1101                Self::write_api_version_cache(&mut dbtx.to_ref_nc(), common_api_versions).await;
1102                debug!(target: LOG_CLIENT, "Calculated and stored common API version set");
1103            }
1104            Err(err) => {
1105                debug!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Failed to calculate common API versions from prefetched data");
1106            }
1107        }
1108
1109        // Store individual peer responses to database
1110        for (peer_id, peer_api_versions) in peer_api_versions {
1111            dbtx.insert_entry(
1112                &PeerLastApiVersionsSummaryKey(*peer_id),
1113                &PeerLastApiVersionsSummary(peer_api_versions.clone()),
1114            )
1115            .await;
1116        }
1117        dbtx.commit_tx().await;
1118        debug!(target: LOG_CLIENT, "Stored individual peer API version responses");
1119    }
1120
1121    /// [`SupportedApiVersionsSummary`] that the client and its modules support
1122    pub fn supported_api_versions_summary_static(
1123        config: &ClientConfig,
1124        client_module_init: &ClientModuleInitRegistry,
1125    ) -> SupportedApiVersionsSummary {
1126        SupportedApiVersionsSummary {
1127            core: SupportedCoreApiVersions {
1128                core_consensus: config.global.consensus_version,
1129                api: MultiApiVersion::try_from_iter(SUPPORTED_CORE_API_VERSIONS.to_owned())
1130                    .expect("must not have conflicting versions"),
1131            },
1132            modules: config
1133                .modules
1134                .iter()
1135                .filter_map(|(&module_instance_id, module_config)| {
1136                    client_module_init
1137                        .get(module_config.kind())
1138                        .map(|module_init| {
1139                            (
1140                                module_instance_id,
1141                                SupportedModuleApiVersions {
1142                                    core_consensus: config.global.consensus_version,
1143                                    module_consensus: module_config.version,
1144                                    api: module_init.supported_api_versions(),
1145                                },
1146                            )
1147                        })
1148                })
1149                .collect(),
1150        }
1151    }
1152
1153    pub async fn load_and_refresh_common_api_version(&self) -> anyhow::Result<ApiVersionSet> {
1154        Self::load_and_refresh_common_api_version_static(
1155            &self.config().await,
1156            &self.module_inits,
1157            &self.api,
1158            &self.db,
1159            &self.task_group,
1160        )
1161        .await
1162    }
1163
1164    /// Load the common api versions to use from cache and start a background
1165    /// process to refresh them.
1166    ///
1167    /// This is a compromise, so we not have to wait for version discovery to
1168    /// complete every time a [`Client`] is being built.
1169    async fn load_and_refresh_common_api_version_static(
1170        config: &ClientConfig,
1171        module_init: &ClientModuleInitRegistry,
1172        api: &DynGlobalApi,
1173        db: &Database,
1174        task_group: &TaskGroup,
1175    ) -> anyhow::Result<ApiVersionSet> {
1176        if let Some(v) = db
1177            .begin_transaction_nc()
1178            .await
1179            .get_value(&CachedApiVersionSetKey)
1180            .await
1181        {
1182            debug!(
1183                target: LOG_CLIENT,
1184                "Found existing cached common api versions"
1185            );
1186            let config = config.clone();
1187            let client_module_init = module_init.clone();
1188            let api = api.clone();
1189            let db = db.clone();
1190            let task_group = task_group.clone();
1191            // Separate task group, because we actually don't want to be waiting for this to
1192            // finish, and it's just best effort.
1193            task_group
1194                .clone()
1195                .spawn_cancellable("refresh_common_api_version_static", async move {
1196                    // Delay making any network connections, just in case this is a short-running
1197                    // invocation of the client (e.g. `fedimint-cli info`), so the attempt probably
1198                    // won't succeed anyway.
1199                    sleep(Duration::from_secs(1)).await;
1200                    if let Err(error) = Self::refresh_common_api_version_static(
1201                        &config,
1202                        &client_module_init,
1203                        &api,
1204                        &db,
1205                        task_group,
1206                        false,
1207                    )
1208                    .await
1209                    {
1210                        warn!(
1211                            target: LOG_CLIENT,
1212                            err = %error.fmt_compact_anyhow(), "Failed to discover common api versions"
1213                        );
1214                    }
1215                });
1216
1217            return Ok(v.0);
1218        }
1219
1220        info!(
1221            target: LOG_CLIENT,
1222            "Fetching initial API versions "
1223        );
1224        Self::refresh_common_api_version_static(
1225            config,
1226            module_init,
1227            api,
1228            db,
1229            task_group.clone(),
1230            true,
1231        )
1232        .await
1233    }
1234
1235    async fn refresh_common_api_version_static(
1236        config: &ClientConfig,
1237        client_module_init: &ClientModuleInitRegistry,
1238        api: &DynGlobalApi,
1239        db: &Database,
1240        task_group: TaskGroup,
1241        block_until_ok: bool,
1242    ) -> anyhow::Result<ApiVersionSet> {
1243        debug!(
1244            target: LOG_CLIENT,
1245            "Refreshing common api versions"
1246        );
1247
1248        let (num_responses_sender, mut num_responses_receiver) = tokio::sync::watch::channel(0);
1249        let num_peers = NumPeers::from(config.global.api_endpoints.len());
1250
1251        task_group.spawn_cancellable("refresh peers api versions", {
1252            Client::fetch_common_api_versions_from_all_peers(
1253                num_peers,
1254                api.clone(),
1255                db.clone(),
1256                num_responses_sender,
1257            )
1258        });
1259
1260        let common_api_versions = loop {
1261            // Wait to collect enough answers before calculating a set of common api
1262            // versions to use. Note that all peers individual responses from
1263            // previous attempts are still being used, and requests, or even
1264            // retries for response of peers are not actually cancelled, as they
1265            // are happening on a separate task. This is all just to bound the
1266            // time user can be waiting for the join operation to finish, at the
1267            // risk of picking wrong version in very rare circumstances.
1268            let _: Result<_, Elapsed> = runtime::timeout(
1269                Duration::from_secs(30),
1270                num_responses_receiver.wait_for(|num| num_peers.threshold() <= *num),
1271            )
1272            .await;
1273
1274            let peer_api_version_sets = Self::load_peers_last_api_versions(db, num_peers).await;
1275
1276            match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1277                &Self::supported_api_versions_summary_static(config, client_module_init),
1278                &peer_api_version_sets,
1279            ) {
1280                Ok(o) => break o,
1281                Err(err) if block_until_ok => {
1282                    warn!(
1283                        target: LOG_CLIENT,
1284                        err = %err.fmt_compact_anyhow(),
1285                        "Failed to discover API version to use. Retrying..."
1286                    );
1287                    continue;
1288                }
1289                Err(e) => return Err(e),
1290            }
1291        };
1292
1293        debug!(
1294            target: LOG_CLIENT,
1295            value = ?common_api_versions,
1296            "Updating the cached common api versions"
1297        );
1298        let mut dbtx = db.begin_transaction().await;
1299        let _ = dbtx
1300            .insert_entry(
1301                &CachedApiVersionSetKey,
1302                &CachedApiVersionSet(common_api_versions.clone()),
1303            )
1304            .await;
1305
1306        dbtx.commit_tx().await;
1307
1308        Ok(common_api_versions)
1309    }
1310
1311    /// Get the client [`Metadata`]
1312    pub async fn get_metadata(&self) -> Metadata {
1313        self.db
1314            .begin_transaction_nc()
1315            .await
1316            .get_value(&ClientMetadataKey)
1317            .await
1318            .unwrap_or_else(|| {
1319                warn!(
1320                    target: LOG_CLIENT,
1321                    "Missing existing metadata. This key should have been set on Client init"
1322                );
1323                Metadata::empty()
1324            })
1325    }
1326
1327    /// Set the client [`Metadata`]
1328    pub async fn set_metadata(&self, metadata: &Metadata) {
1329        self.db
1330            .autocommit::<_, _, anyhow::Error>(
1331                |dbtx, _| {
1332                    Box::pin(async {
1333                        Self::set_metadata_dbtx(dbtx, metadata).await;
1334                        Ok(())
1335                    })
1336                },
1337                None,
1338            )
1339            .await
1340            .expect("Failed to autocommit metadata");
1341    }
1342
1343    pub fn has_pending_recoveries(&self) -> bool {
1344        !self
1345            .client_recovery_progress_receiver
1346            .borrow()
1347            .iter()
1348            .all(|(_id, progress)| progress.is_done())
1349    }
1350
1351    /// Wait for all module recoveries to finish
1352    ///
1353    /// This will block until the recovery task is done with recoveries.
1354    /// Returns success if all recovery tasks are complete (success case),
1355    /// or an error if some modules could not complete the recovery at the time.
1356    ///
1357    /// A bit of a heavy approach.
1358    pub async fn wait_for_all_recoveries(&self) -> anyhow::Result<()> {
1359        let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1360        recovery_receiver
1361            .wait_for(|in_progress| {
1362                in_progress
1363                    .iter()
1364                    .all(|(_id, progress)| progress.is_done())
1365            })
1366            .await
1367            .context("Recovery task completed and update receiver disconnected, but some modules failed to recover")?;
1368
1369        Ok(())
1370    }
1371
1372    /// Subscribe to recover progress for all the modules.
1373    ///
1374    /// This stream can contain duplicate progress for a module.
1375    /// Don't use this stream for detecting completion of recovery.
1376    pub fn subscribe_to_recovery_progress(
1377        &self,
1378    ) -> impl Stream<Item = (ModuleInstanceId, RecoveryProgress)> + use<> {
1379        WatchStream::new(self.client_recovery_progress_receiver.clone())
1380            .flat_map(futures::stream::iter)
1381    }
1382
1383    pub async fn wait_for_module_kind_recovery(
1384        &self,
1385        module_kind: ModuleKind,
1386    ) -> anyhow::Result<()> {
1387        let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1388        let config = self.config().await;
1389        recovery_receiver
1390            .wait_for(|in_progress| {
1391                !in_progress
1392                    .iter()
1393                    .filter(|(module_instance_id, _progress)| {
1394                        config.modules[module_instance_id].kind == module_kind
1395                    })
1396                    .any(|(_id, progress)| !progress.is_done())
1397            })
1398            .await
1399            .context("Recovery task completed and update receiver disconnected, but the desired modules are still unavailable or failed to recover")?;
1400
1401        Ok(())
1402    }
1403
1404    pub async fn wait_for_all_active_state_machines(&self) -> anyhow::Result<()> {
1405        loop {
1406            if self.executor.get_active_states().await.is_empty() {
1407                break;
1408            }
1409            sleep(Duration::from_millis(100)).await;
1410        }
1411        Ok(())
1412    }
1413
1414    /// Set the client [`Metadata`]
1415    pub async fn set_metadata_dbtx(dbtx: &mut DatabaseTransaction<'_>, metadata: &Metadata) {
1416        dbtx.insert_new_entry(&ClientMetadataKey, metadata).await;
1417    }
1418
1419    fn spawn_module_recoveries_task(
1420        &self,
1421        recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1422        module_recoveries: BTreeMap<
1423            ModuleInstanceId,
1424            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1425        >,
1426        module_recovery_progress_receivers: BTreeMap<
1427            ModuleInstanceId,
1428            watch::Receiver<RecoveryProgress>,
1429        >,
1430    ) {
1431        let db = self.db.clone();
1432        let log_ordering_wakeup_tx = self.log_ordering_wakeup_tx.clone();
1433        self.task_group
1434            .spawn("module recoveries", |_task_handle| async {
1435                Self::run_module_recoveries_task(
1436                    db,
1437                    log_ordering_wakeup_tx,
1438                    recovery_sender,
1439                    module_recoveries,
1440                    module_recovery_progress_receivers,
1441                )
1442                .await;
1443            });
1444    }
1445
1446    async fn run_module_recoveries_task(
1447        db: Database,
1448        log_ordering_wakeup_tx: watch::Sender<()>,
1449        recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1450        module_recoveries: BTreeMap<
1451            ModuleInstanceId,
1452            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1453        >,
1454        module_recovery_progress_receivers: BTreeMap<
1455            ModuleInstanceId,
1456            watch::Receiver<RecoveryProgress>,
1457        >,
1458    ) {
1459        debug!(target: LOG_CLIENT_RECOVERY, num_modules=%module_recovery_progress_receivers.len(), "Staring module recoveries");
1460        let mut completed_stream = Vec::new();
1461        let progress_stream = futures::stream::FuturesUnordered::new();
1462
1463        for (module_instance_id, f) in module_recoveries {
1464            completed_stream.push(futures::stream::once(Box::pin(async move {
1465                match f.await {
1466                    Ok(()) => (module_instance_id, None),
1467                    Err(err) => {
1468                        warn!(
1469                            target: LOG_CLIENT,
1470                            err = %err.fmt_compact_anyhow(), module_instance_id, "Module recovery failed"
1471                        );
1472                        // a module recovery that failed reports and error and
1473                        // just never finishes, so we don't need a separate state
1474                        // for it
1475                        futures::future::pending::<()>().await;
1476                        unreachable!()
1477                    }
1478                }
1479            })));
1480        }
1481
1482        for (module_instance_id, rx) in module_recovery_progress_receivers {
1483            progress_stream.push(
1484                tokio_stream::wrappers::WatchStream::new(rx)
1485                    .fuse()
1486                    .map(move |progress| (module_instance_id, Some(progress))),
1487            );
1488        }
1489
1490        let mut futures = futures::stream::select(
1491            futures::stream::select_all(progress_stream),
1492            futures::stream::select_all(completed_stream),
1493        );
1494
1495        while let Some((module_instance_id, progress)) = futures.next().await {
1496            let mut dbtx = db.begin_transaction().await;
1497
1498            let prev_progress = *recovery_sender
1499                .borrow()
1500                .get(&module_instance_id)
1501                .expect("existing progress must be present");
1502
1503            let progress = if prev_progress.is_done() {
1504                // since updates might be out of order, once done, stick with it
1505                prev_progress
1506            } else if let Some(progress) = progress {
1507                progress
1508            } else {
1509                prev_progress.to_complete()
1510            };
1511
1512            if !prev_progress.is_done() && progress.is_done() {
1513                info!(
1514                    target: LOG_CLIENT,
1515                    module_instance_id,
1516                    progress = format!("{}/{}", progress.complete, progress.total),
1517                    "Recovery complete"
1518                );
1519                dbtx.log_event(
1520                    log_ordering_wakeup_tx.clone(),
1521                    None,
1522                    ModuleRecoveryCompleted {
1523                        module_id: module_instance_id,
1524                    },
1525                )
1526                .await;
1527            } else {
1528                info!(
1529                    target: LOG_CLIENT,
1530                    module_instance_id,
1531                    progress = format!("{}/{}", progress.complete, progress.total),
1532                    "Recovery progress"
1533                );
1534            }
1535
1536            dbtx.insert_entry(
1537                &ClientModuleRecovery { module_instance_id },
1538                &ClientModuleRecoveryState { progress },
1539            )
1540            .await;
1541            dbtx.commit_tx().await;
1542
1543            recovery_sender.send_modify(|v| {
1544                v.insert(module_instance_id, progress);
1545            });
1546        }
1547        debug!(target: LOG_CLIENT_RECOVERY, "Recovery executor stopped");
1548    }
1549
1550    async fn load_peers_last_api_versions(
1551        db: &Database,
1552        num_peers: NumPeers,
1553    ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1554        let mut peer_api_version_sets = BTreeMap::new();
1555
1556        let mut dbtx = db.begin_transaction_nc().await;
1557        for peer_id in num_peers.peer_ids() {
1558            if let Some(v) = dbtx
1559                .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1560                .await
1561            {
1562                peer_api_version_sets.insert(peer_id, v.0);
1563            }
1564        }
1565        drop(dbtx);
1566        peer_api_version_sets
1567    }
1568
1569    /// You likely want to use [`Client::get_peer_urls`]. This function returns
1570    /// only the announcements and doesn't use the config as fallback.
1571    pub async fn get_peer_url_announcements(&self) -> BTreeMap<PeerId, SignedApiAnnouncement> {
1572        self.db()
1573            .begin_transaction_nc()
1574            .await
1575            .find_by_prefix(&ApiAnnouncementPrefix)
1576            .await
1577            .map(|(announcement_key, announcement)| (announcement_key.0, announcement))
1578            .collect()
1579            .await
1580    }
1581
1582    /// Returns a list of guardian API URLs
1583    pub async fn get_peer_urls(&self) -> BTreeMap<PeerId, SafeUrl> {
1584        get_api_urls(&self.db, &self.config().await).await
1585    }
1586
1587    /// Create an invite code with the api endpoint of the given peer which can
1588    /// be used to download this client config
1589    pub async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
1590        self.get_peer_urls()
1591            .await
1592            .into_iter()
1593            .find_map(|(peer_id, url)| (peer == peer_id).then_some(url))
1594            .map(|peer_url| {
1595                InviteCode::new(
1596                    peer_url.clone(),
1597                    peer,
1598                    self.federation_id(),
1599                    self.api_secret.clone(),
1600                )
1601            })
1602    }
1603
1604    /// Blocks till the client has synced the guardian public key set
1605    /// (introduced in version 0.4) and returns it. Once it has been fetched
1606    /// once this function is guaranteed to return immediately.
1607    pub async fn get_guardian_public_keys_blocking(
1608        &self,
1609    ) -> BTreeMap<PeerId, fedimint_core::secp256k1::PublicKey> {
1610        self.db
1611            .autocommit(
1612                |dbtx, _| {
1613                    Box::pin(async move {
1614                        let config = self.config().await;
1615
1616                        let guardian_pub_keys = self
1617                            .get_or_backfill_broadcast_public_keys(dbtx, config)
1618                            .await;
1619
1620                        Result::<_, ()>::Ok(guardian_pub_keys)
1621                    })
1622                },
1623                None,
1624            )
1625            .await
1626            .expect("Will retry forever")
1627    }
1628
1629    async fn get_or_backfill_broadcast_public_keys(
1630        &self,
1631        dbtx: &mut DatabaseTransaction<'_>,
1632        config: ClientConfig,
1633    ) -> BTreeMap<PeerId, PublicKey> {
1634        match config.global.broadcast_public_keys {
1635            Some(guardian_pub_keys) => guardian_pub_keys,
1636            _ => {
1637                let (guardian_pub_keys, new_config) = self.fetch_and_update_config(config).await;
1638
1639                dbtx.insert_entry(&ClientConfigKey, &new_config).await;
1640                *(self.config.write().await) = new_config;
1641                guardian_pub_keys
1642            }
1643        }
1644    }
1645
1646    async fn fetch_session_count(&self) -> FederationResult<u64> {
1647        self.api.session_count().await
1648    }
1649
1650    async fn fetch_and_update_config(
1651        &self,
1652        config: ClientConfig,
1653    ) -> (BTreeMap<PeerId, PublicKey>, ClientConfig) {
1654        let fetched_config = retry(
1655            "Fetching guardian public keys",
1656            backoff_util::background_backoff(),
1657            || async {
1658                Ok(self
1659                    .api
1660                    .request_current_consensus::<ClientConfig>(
1661                        CLIENT_CONFIG_ENDPOINT.to_owned(),
1662                        ApiRequestErased::default(),
1663                    )
1664                    .await?)
1665            },
1666        )
1667        .await
1668        .expect("Will never return on error");
1669
1670        let Some(guardian_pub_keys) = fetched_config.global.broadcast_public_keys else {
1671            warn!(
1672                target: LOG_CLIENT,
1673                "Guardian public keys not found in fetched config, server not updated to 0.4 yet"
1674            );
1675            pending::<()>().await;
1676            unreachable!("Pending will never return");
1677        };
1678
1679        let new_config = ClientConfig {
1680            global: GlobalClientConfig {
1681                broadcast_public_keys: Some(guardian_pub_keys.clone()),
1682                ..config.global
1683            },
1684            modules: config.modules,
1685        };
1686        (guardian_pub_keys, new_config)
1687    }
1688
1689    pub fn handle_global_rpc(
1690        &self,
1691        method: String,
1692        params: serde_json::Value,
1693    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1694        Box::pin(try_stream! {
1695            match method.as_str() {
1696                "get_balance" => {
1697                    let balance = self.get_balance_for_btc().await.unwrap_or_default();
1698                    yield serde_json::to_value(balance)?;
1699                }
1700                "subscribe_balance_changes" => {
1701                    let req: GetBalanceChangesRequest= serde_json::from_value(params)?;
1702                    let mut stream = self.subscribe_balance_changes(req.unit).await;
1703                    while let Some(balance) = stream.next().await {
1704                        yield serde_json::to_value(balance)?;
1705                    }
1706                }
1707                "get_config" => {
1708                    let config = self.config().await;
1709                    yield serde_json::to_value(config)?;
1710                }
1711                "get_federation_id" => {
1712                    let federation_id = self.federation_id();
1713                    yield serde_json::to_value(federation_id)?;
1714                }
1715                "get_invite_code" => {
1716                    let req: GetInviteCodeRequest = serde_json::from_value(params)?;
1717                    let invite_code = self.invite_code(req.peer).await;
1718                    yield serde_json::to_value(invite_code)?;
1719                }
1720                "get_operation" => {
1721                    let req: GetOperationIdRequest = serde_json::from_value(params)?;
1722                    let operation = self.operation_log().get_operation(req.operation_id).await;
1723                    yield serde_json::to_value(operation)?;
1724                }
1725                "list_operations" => {
1726                    let req: ListOperationsParams = serde_json::from_value(params)?;
1727                    let limit = if req.limit.is_none() && req.last_seen.is_none() {
1728                        usize::MAX
1729                    } else {
1730                        req.limit.unwrap_or(usize::MAX)
1731                    };
1732                    let operations = self.operation_log()
1733                        .paginate_operations_rev(limit, req.last_seen)
1734                        .await;
1735                    yield serde_json::to_value(operations)?;
1736                }
1737                "session_count" => {
1738                    let count = self.fetch_session_count().await?;
1739                    yield serde_json::to_value(count)?;
1740                }
1741                "has_pending_recoveries" => {
1742                    let has_pending = self.has_pending_recoveries();
1743                    yield serde_json::to_value(has_pending)?;
1744                }
1745                "wait_for_all_recoveries" => {
1746                    self.wait_for_all_recoveries().await?;
1747                    yield serde_json::Value::Null;
1748                }
1749                "subscribe_to_recovery_progress" => {
1750                    let mut stream = self.subscribe_to_recovery_progress();
1751                    while let Some((module_id, progress)) = stream.next().await {
1752                        yield serde_json::json!({
1753                            "module_id": module_id,
1754                            "progress": progress
1755                        });
1756                    }
1757                }
1758                "backup_to_federation" => {
1759                    let metadata = if params.is_null() {
1760                        Metadata::from_json_serialized(serde_json::json!({}))
1761                    } else {
1762                        Metadata::from_json_serialized(params)
1763                    };
1764                    self.backup_to_federation(metadata).await?;
1765                    yield serde_json::Value::Null;
1766                }
1767                _ => {
1768                    Err(anyhow::format_err!("Unknown method: {}", method))?;
1769                    unreachable!()
1770                },
1771            }
1772        })
1773    }
1774
1775    pub async fn log_event<E>(&self, module_id: Option<ModuleInstanceId>, event: E)
1776    where
1777        E: Event + Send,
1778    {
1779        let mut dbtx = self.db.begin_transaction().await;
1780        self.log_event_dbtx(&mut dbtx, module_id, event).await;
1781        dbtx.commit_tx().await;
1782    }
1783
1784    pub async fn log_event_dbtx<E, Cap>(
1785        &self,
1786        dbtx: &mut DatabaseTransaction<'_, Cap>,
1787        module_id: Option<ModuleInstanceId>,
1788        event: E,
1789    ) where
1790        E: Event + Send,
1791        Cap: Send,
1792    {
1793        dbtx.log_event(self.log_ordering_wakeup_tx.clone(), module_id, event)
1794            .await;
1795    }
1796
1797    pub async fn log_event_raw_dbtx<Cap>(
1798        &self,
1799        dbtx: &mut DatabaseTransaction<'_, Cap>,
1800        kind: EventKind,
1801        module: Option<(ModuleKind, ModuleInstanceId)>,
1802        payload: Vec<u8>,
1803        persist: EventPersistence,
1804    ) where
1805        Cap: Send,
1806    {
1807        let module_id = module.as_ref().map(|m| m.1);
1808        let module_kind = module.map(|m| m.0);
1809        dbtx.log_event_raw(
1810            self.log_ordering_wakeup_tx.clone(),
1811            kind,
1812            module_kind,
1813            module_id,
1814            payload,
1815            persist,
1816        )
1817        .await;
1818    }
1819
1820    /// Built in event log (trimmable) tracker
1821    ///
1822    /// For the convenience of downstream applications, [`Client`] can store
1823    /// internally event log position for the main application using/driving it.
1824    ///
1825    /// Note that this position is a singleton, so this tracker should not be
1826    /// used for multiple purposes or applications, etc. at the same time.
1827    ///
1828    /// If the application has a need to follow log using multiple trackers, it
1829    /// should implement own [`DynEventLogTrimableTracker`] and store its
1830    /// persient data by itself.
1831    pub fn built_in_application_event_log_tracker(&self) -> DynEventLogTrimableTracker {
1832        struct BuiltInApplicationEventLogTracker;
1833
1834        #[apply(async_trait_maybe_send!)]
1835        impl EventLogTrimableTracker for BuiltInApplicationEventLogTracker {
1836            // Store position in the event log
1837            async fn store(
1838                &mut self,
1839                dbtx: &mut DatabaseTransaction<NonCommittable>,
1840                pos: EventLogTrimableId,
1841            ) -> anyhow::Result<()> {
1842                dbtx.insert_entry(&DefaultApplicationEventLogKey, &pos)
1843                    .await;
1844                Ok(())
1845            }
1846
1847            /// Load the last previous stored position (or None if never stored)
1848            async fn load(
1849                &mut self,
1850                dbtx: &mut DatabaseTransaction<NonCommittable>,
1851            ) -> anyhow::Result<Option<EventLogTrimableId>> {
1852                Ok(dbtx.get_value(&DefaultApplicationEventLogKey).await)
1853            }
1854        }
1855        Box::new(BuiltInApplicationEventLogTracker)
1856    }
1857
1858    /// Like [`Self::handle_events`] but for historical data.
1859    ///
1860    ///
1861    /// This function can be used to process subset of events
1862    /// that is infrequent and important enough to be persisted
1863    /// forever. Most applications should prefer to use [`Self::handle_events`]
1864    /// which emits *all* events.
1865    pub async fn handle_historical_events<F, R>(
1866        &self,
1867        tracker: fedimint_eventlog::DynEventLogTracker,
1868        handler_fn: F,
1869    ) -> anyhow::Result<()>
1870    where
1871        F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
1872        R: Future<Output = anyhow::Result<()>>,
1873    {
1874        fedimint_eventlog::handle_events(
1875            self.db.clone(),
1876            tracker,
1877            self.log_event_added_rx.clone(),
1878            handler_fn,
1879        )
1880        .await
1881    }
1882
1883    /// Handle events emitted by the client
1884    ///
1885    /// This is a preferred method for reactive & asynchronous
1886    /// processing of events emitted by the client.
1887    ///
1888    /// It needs a `tracker` that will persist the position in the log
1889    /// as it is being handled. You can use the
1890    /// [`Client::built_in_application_event_log_tracker`] if this call is
1891    /// used for the single main application handling this instance of the
1892    /// [`Client`]. Otherwise you should implement your own tracker.
1893    ///
1894    /// This handler will call `handle_fn` with ever event emitted by
1895    /// [`Client`], including transient ones. The caller should atomically
1896    /// handle each event it is interested in and ignore other ones.
1897    ///
1898    /// This method returns only when client is shutting down or on internal
1899    /// error, so typically should be called in a background task dedicated
1900    /// to handling events.
1901    pub async fn handle_events<F, R>(
1902        &self,
1903        tracker: fedimint_eventlog::DynEventLogTrimableTracker,
1904        handler_fn: F,
1905    ) -> anyhow::Result<()>
1906    where
1907        F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
1908        R: Future<Output = anyhow::Result<()>>,
1909    {
1910        fedimint_eventlog::handle_trimable_events(
1911            self.db.clone(),
1912            tracker,
1913            self.log_event_added_rx.clone(),
1914            handler_fn,
1915        )
1916        .await
1917    }
1918
1919    pub async fn get_event_log(
1920        &self,
1921        pos: Option<EventLogId>,
1922        limit: u64,
1923    ) -> Vec<PersistedLogEntry> {
1924        self.get_event_log_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
1925            .await
1926    }
1927
1928    pub async fn get_event_log_trimable(
1929        &self,
1930        pos: Option<EventLogTrimableId>,
1931        limit: u64,
1932    ) -> Vec<PersistedLogEntry> {
1933        self.get_event_log_trimable_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
1934            .await
1935    }
1936
1937    pub async fn get_event_log_dbtx<Cap>(
1938        &self,
1939        dbtx: &mut DatabaseTransaction<'_, Cap>,
1940        pos: Option<EventLogId>,
1941        limit: u64,
1942    ) -> Vec<PersistedLogEntry>
1943    where
1944        Cap: Send,
1945    {
1946        dbtx.get_event_log(pos, limit).await
1947    }
1948
1949    pub async fn get_event_log_trimable_dbtx<Cap>(
1950        &self,
1951        dbtx: &mut DatabaseTransaction<'_, Cap>,
1952        pos: Option<EventLogTrimableId>,
1953        limit: u64,
1954    ) -> Vec<PersistedLogEntry>
1955    where
1956        Cap: Send,
1957    {
1958        dbtx.get_event_log_trimable(pos, limit).await
1959    }
1960
1961    /// Register to receiver all new transient (unpersisted) events
1962    pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
1963        self.log_event_added_transient_tx.subscribe()
1964    }
1965
1966    /// Get a receiver that signals when new events are added to the event log
1967    pub fn log_event_added_rx(&self) -> watch::Receiver<()> {
1968        self.log_event_added_rx.clone()
1969    }
1970
1971    pub fn iroh_enable_dht(&self) -> bool {
1972        self.iroh_enable_dht
1973    }
1974
1975    pub(crate) async fn run_core_migrations(
1976        db_no_decoders: &Database,
1977    ) -> Result<(), anyhow::Error> {
1978        let mut dbtx = db_no_decoders.begin_transaction().await;
1979        apply_migrations_core_client_dbtx(&mut dbtx.to_ref_nc(), "fedimint-client".to_string())
1980            .await?;
1981        if is_running_in_test_env() {
1982            verify_client_db_integrity_dbtx(&mut dbtx.to_ref_nc()).await;
1983        }
1984        dbtx.commit_tx_result().await?;
1985        Ok(())
1986    }
1987
1988    /// Iterator over primary modules for a given `unit`
1989    fn primary_modules_for_unit(
1990        &self,
1991        unit: AmountUnit,
1992    ) -> impl Iterator<Item = (ModuleInstanceId, &DynClientModule)> {
1993        self.primary_modules
1994            .iter()
1995            .flat_map(move |(_prio, candidates)| {
1996                candidates
1997                    .specific
1998                    .get(&unit)
1999                    .into_iter()
2000                    .flatten()
2001                    .copied()
2002                    // within same priority, wildcard matches come last
2003                    .chain(candidates.wildcard.iter().copied())
2004            })
2005            .map(|id| (id, self.modules.get_expect(id)))
2006    }
2007
2008    /// Primary module to use for `unit`
2009    ///
2010    /// Currently, just pick the first (highest priority) match
2011    pub fn primary_module_for_unit(
2012        &self,
2013        unit: AmountUnit,
2014    ) -> Option<(ModuleInstanceId, &DynClientModule)> {
2015        self.primary_modules_for_unit(unit).next()
2016    }
2017
2018    /// [`Self::primary_module_for_unit`] for Bitcoin
2019    pub fn primary_module_for_btc(&self) -> (ModuleInstanceId, &DynClientModule) {
2020        self.primary_module_for_unit(AmountUnit::BITCOIN)
2021            .expect("No primary module for Bitcoin")
2022    }
2023}
2024
2025#[apply(async_trait_maybe_send!)]
2026impl ClientContextIface for Client {
2027    fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
2028        Client::get_module(self, instance)
2029    }
2030
2031    fn api_clone(&self) -> DynGlobalApi {
2032        Client::api_clone(self)
2033    }
2034    fn decoders(&self) -> &ModuleDecoderRegistry {
2035        Client::decoders(self)
2036    }
2037
2038    async fn finalize_and_submit_transaction(
2039        &self,
2040        operation_id: OperationId,
2041        operation_type: &str,
2042        operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
2043        tx_builder: TransactionBuilder,
2044    ) -> anyhow::Result<OutPointRange> {
2045        Client::finalize_and_submit_transaction(
2046            self,
2047            operation_id,
2048            operation_type,
2049            // |out_point_range| operation_meta_gen(out_point_range),
2050            &operation_meta_gen,
2051            tx_builder,
2052        )
2053        .await
2054    }
2055
2056    async fn finalize_and_submit_transaction_inner(
2057        &self,
2058        dbtx: &mut DatabaseTransaction<'_>,
2059        operation_id: OperationId,
2060        tx_builder: TransactionBuilder,
2061    ) -> anyhow::Result<OutPointRange> {
2062        Client::finalize_and_submit_transaction_inner(self, dbtx, operation_id, tx_builder).await
2063    }
2064
2065    async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
2066        Client::transaction_updates(self, operation_id).await
2067    }
2068
2069    async fn await_primary_module_outputs(
2070        &self,
2071        operation_id: OperationId,
2072        // TODO: make `impl Iterator<Item = ...>`
2073        outputs: Vec<OutPoint>,
2074    ) -> anyhow::Result<()> {
2075        Client::await_primary_bitcoin_module_outputs(self, operation_id, outputs).await
2076    }
2077
2078    fn operation_log(&self) -> &dyn IOperationLog {
2079        Client::operation_log(self)
2080    }
2081
2082    async fn has_active_states(&self, operation_id: OperationId) -> bool {
2083        Client::has_active_states(self, operation_id).await
2084    }
2085
2086    async fn operation_exists(&self, operation_id: OperationId) -> bool {
2087        Client::operation_exists(self, operation_id).await
2088    }
2089
2090    async fn config(&self) -> ClientConfig {
2091        Client::config(self).await
2092    }
2093
2094    fn db(&self) -> &Database {
2095        Client::db(self)
2096    }
2097
2098    fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static)) {
2099        Client::executor(self)
2100    }
2101
2102    async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
2103        Client::invite_code(self, peer).await
2104    }
2105
2106    fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
2107        Client::get_internal_payment_markers(self)
2108    }
2109
2110    async fn log_event_json(
2111        &self,
2112        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
2113        module_kind: Option<ModuleKind>,
2114        module_id: ModuleInstanceId,
2115        kind: EventKind,
2116        payload: serde_json::Value,
2117        persist: EventPersistence,
2118    ) {
2119        dbtx.ensure_global()
2120            .expect("Must be called with global dbtx");
2121        self.log_event_raw_dbtx(
2122            dbtx,
2123            kind,
2124            module_kind.map(|kind| (kind, module_id)),
2125            serde_json::to_vec(&payload).expect("Serialization can't fail"),
2126            persist,
2127        )
2128        .await;
2129    }
2130
2131    async fn read_operation_active_states<'dbtx>(
2132        &self,
2133        operation_id: OperationId,
2134        module_id: ModuleInstanceId,
2135        dbtx: &'dbtx mut DatabaseTransaction<'_>,
2136    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>
2137    {
2138        Box::pin(
2139            dbtx.find_by_prefix(&ActiveModuleOperationStateKeyPrefix {
2140                operation_id,
2141                module_instance: module_id,
2142            })
2143            .await
2144            .map(move |(k, v)| (k.0, v)),
2145        )
2146    }
2147    async fn read_operation_inactive_states<'dbtx>(
2148        &self,
2149        operation_id: OperationId,
2150        module_id: ModuleInstanceId,
2151        dbtx: &'dbtx mut DatabaseTransaction<'_>,
2152    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>
2153    {
2154        Box::pin(
2155            dbtx.find_by_prefix(&InactiveModuleOperationStateKeyPrefix {
2156                operation_id,
2157                module_instance: module_id,
2158            })
2159            .await
2160            .map(move |(k, v)| (k.0, v)),
2161        )
2162    }
2163}
2164
2165// TODO: impl `Debug` for `Client` and derive here
2166impl fmt::Debug for Client {
2167    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2168        write!(f, "Client")
2169    }
2170}
2171
2172pub fn client_decoders<'a>(
2173    registry: &ModuleInitRegistry<DynClientModuleInit>,
2174    module_kinds: impl Iterator<Item = (ModuleInstanceId, &'a ModuleKind)>,
2175) -> ModuleDecoderRegistry {
2176    let mut modules = BTreeMap::new();
2177    for (id, kind) in module_kinds {
2178        let Some(init) = registry.get(kind) else {
2179            debug!("Detected configuration for unsupported module id: {id}, kind: {kind}");
2180            continue;
2181        };
2182
2183        modules.insert(
2184            id,
2185            (
2186                kind.clone(),
2187                IClientModuleInit::decoder(AsRef::<dyn IClientModuleInit + 'static>::as_ref(init)),
2188            ),
2189        );
2190    }
2191    ModuleDecoderRegistry::from(modules)
2192}