fedimint_client/
client.rs

1use std::collections::{BTreeMap, HashSet};
2use std::fmt::{self, Formatter};
3use std::future::{Future, pending};
4use std::ops::Range;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9use anyhow::{Context as _, anyhow, bail, format_err};
10use async_stream::try_stream;
11use bitcoin::key::Secp256k1;
12use bitcoin::key::rand::thread_rng;
13use bitcoin::secp256k1::{self, PublicKey};
14use fedimint_api_client::api::global_api::with_request_hook::ApiRequestHook;
15use fedimint_api_client::api::{
16    ApiVersionSet, DynGlobalApi, FederationApiExt as _, FederationResult, IGlobalFederationApi,
17};
18use fedimint_client_module::module::recovery::RecoveryProgress;
19use fedimint_client_module::module::{
20    ClientContextIface, ClientModule, ClientModuleRegistry, DynClientModule, FinalClientIface,
21    IClientModule, IdxRange, OutPointRange, PrimaryModulePriority,
22};
23use fedimint_client_module::oplog::IOperationLog;
24use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy as _};
25use fedimint_client_module::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
26use fedimint_client_module::sm::{ActiveStateMeta, DynState, InactiveStateMeta};
27use fedimint_client_module::transaction::{
28    TRANSACTION_SUBMISSION_MODULE_INSTANCE, TransactionBuilder, TxSubmissionStates,
29    TxSubmissionStatesSM,
30};
31use fedimint_client_module::{
32    AddStateMachinesResult, ClientModuleInstance, GetInviteCodeRequest, ModuleGlobalContextGen,
33    ModuleRecoveryCompleted, TransactionUpdates, TxCreatedEvent,
34};
35use fedimint_connectors::ConnectorRegistry;
36use fedimint_core::config::{
37    ClientConfig, FederationId, GlobalClientConfig, JsonClientConfig, ModuleInitRegistry,
38};
39use fedimint_core::core::{DynInput, DynOutput, ModuleInstanceId, ModuleKind, OperationId};
40use fedimint_core::db::{
41    AutocommitError, Database, DatabaseRecord, DatabaseTransaction,
42    IDatabaseTransactionOpsCore as _, IDatabaseTransactionOpsCoreTyped as _, NonCommittable,
43};
44use fedimint_core::encoding::{Decodable, Encodable};
45use fedimint_core::endpoint_constants::{CLIENT_CONFIG_ENDPOINT, VERSION_ENDPOINT};
46use fedimint_core::envs::is_running_in_test_env;
47use fedimint_core::invite_code::InviteCode;
48use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
49use fedimint_core::module::{
50    AmountUnit, Amounts, ApiRequestErased, ApiVersion, MultiApiVersion,
51    SupportedApiVersionsSummary, SupportedCoreApiVersions, SupportedModuleApiVersions,
52};
53use fedimint_core::net::api_announcement::SignedApiAnnouncement;
54use fedimint_core::runtime::sleep;
55use fedimint_core::task::{Elapsed, MaybeSend, MaybeSync, TaskGroup};
56use fedimint_core::transaction::Transaction;
57use fedimint_core::util::backoff_util::custom_backoff;
58use fedimint_core::util::{
59    BoxStream, FmtCompact as _, FmtCompactAnyhow as _, SafeUrl, backoff_util, retry,
60};
61use fedimint_core::{
62    Amount, NumPeers, OutPoint, PeerId, apply, async_trait_maybe_send, maybe_add_send,
63    maybe_add_send_sync, runtime,
64};
65use fedimint_derive_secret::DerivableSecret;
66use fedimint_eventlog::{
67    DBTransactionEventLogExt as _, DynEventLogTrimableTracker, Event, EventKind, EventLogEntry,
68    EventLogId, EventLogTrimableId, EventLogTrimableTracker, EventPersistence, PersistedLogEntry,
69};
70use fedimint_logging::{LOG_CLIENT, LOG_CLIENT_NET_API, LOG_CLIENT_RECOVERY};
71use futures::stream::FuturesUnordered;
72use futures::{Stream, StreamExt as _};
73use global_ctx::ModuleGlobalClientContext;
74use serde::{Deserialize, Serialize};
75use tokio::sync::{broadcast, watch};
76use tokio_stream::wrappers::WatchStream;
77use tracing::{debug, info, warn};
78
79use crate::ClientBuilder;
80use crate::api_announcements::{ApiAnnouncementPrefix, get_api_urls};
81use crate::backup::Metadata;
82use crate::client::event_log::DefaultApplicationEventLogKey;
83use crate::db::{
84    ApiSecretKey, CachedApiVersionSet, CachedApiVersionSetKey, ChronologicalOperationLogKey,
85    ClientConfigKey, ClientMetadataKey, ClientModuleRecovery, ClientModuleRecoveryState,
86    EncodedClientSecretKey, OperationLogKey, PeerLastApiVersionsSummary,
87    PeerLastApiVersionsSummaryKey, PendingClientConfigKey, apply_migrations_core_client_dbtx,
88    get_decoded_client_secret, verify_client_db_integrity_dbtx,
89};
90use crate::meta::MetaService;
91use crate::module_init::{ClientModuleInitRegistry, DynClientModuleInit, IClientModuleInit};
92use crate::oplog::OperationLog;
93use crate::sm::executor::{
94    ActiveModuleOperationStateKeyPrefix, ActiveOperationStateKeyPrefix, Executor,
95    InactiveModuleOperationStateKeyPrefix, InactiveOperationStateKeyPrefix,
96};
97
98pub(crate) mod builder;
99pub(crate) mod event_log;
100pub(crate) mod global_ctx;
101pub(crate) mod handle;
102
103/// List of core api versions supported by the implementation.
104/// Notably `major` version is the one being supported, and corresponding
105/// `minor` version is the one required (for given `major` version).
106const SUPPORTED_CORE_API_VERSIONS: &[fedimint_core::module::ApiVersion] =
107    &[ApiVersion { major: 0, minor: 0 }];
108
109/// Primary module candidates at specific priority level
110#[derive(Default)]
111pub(crate) struct PrimaryModuleCandidates {
112    /// Modules that listed specific units they handle
113    specific: BTreeMap<AmountUnit, Vec<ModuleInstanceId>>,
114    /// Modules handling any unit
115    wildcard: Vec<ModuleInstanceId>,
116}
117
118/// Main client type
119///
120/// A handle and API to interacting with a single federation. End user
121/// applications that want to support interacting with multiple federations at
122/// the same time, will need to instantiate and manage multiple instances of
123/// this struct.
124///
125/// Under the hood it is starting and managing service tasks, state machines,
126/// database and other resources required.
127///
128/// This type is shared externally and internally, and
129/// [`crate::ClientHandle`] is responsible for external lifecycle management
130/// and resource freeing of the [`Client`].
131pub struct Client {
132    final_client: FinalClientIface,
133    config: tokio::sync::RwLock<ClientConfig>,
134    api_secret: Option<String>,
135    decoders: ModuleDecoderRegistry,
136    connectors: ConnectorRegistry,
137    db: Database,
138    federation_id: FederationId,
139    federation_config_meta: BTreeMap<String, String>,
140    primary_modules: BTreeMap<PrimaryModulePriority, PrimaryModuleCandidates>,
141    pub(crate) modules: ClientModuleRegistry,
142    module_inits: ClientModuleInitRegistry,
143    executor: Executor,
144    pub(crate) api: DynGlobalApi,
145    root_secret: DerivableSecret,
146    operation_log: OperationLog,
147    secp_ctx: Secp256k1<secp256k1::All>,
148    meta_service: Arc<MetaService>,
149
150    task_group: TaskGroup,
151
152    /// Updates about client recovery progress
153    client_recovery_progress_receiver:
154        watch::Receiver<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
155
156    /// Internal client sender to wake up log ordering task every time a
157    /// (unuordered) log event is added.
158    log_ordering_wakeup_tx: watch::Sender<()>,
159    /// Receiver for events fired every time (ordered) log event is added.
160    log_event_added_rx: watch::Receiver<()>,
161    log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
162    request_hook: ApiRequestHook,
163    iroh_enable_dht: bool,
164    iroh_enable_next: bool,
165}
166
167#[derive(Debug, Serialize, Deserialize)]
168struct ListOperationsParams {
169    limit: Option<usize>,
170    last_seen: Option<ChronologicalOperationLogKey>,
171}
172
173#[derive(Debug, Clone, Serialize, Deserialize)]
174pub struct GetOperationIdRequest {
175    operation_id: OperationId,
176}
177
178#[derive(Debug, Clone, Serialize, Deserialize)]
179pub struct GetBalanceChangesRequest {
180    #[serde(default = "AmountUnit::bitcoin")]
181    unit: AmountUnit,
182}
183
184impl Client {
185    /// Initialize a client builder that can be configured to create a new
186    /// client.
187    pub async fn builder() -> anyhow::Result<ClientBuilder> {
188        Ok(ClientBuilder::new())
189    }
190
191    pub fn api(&self) -> &(dyn IGlobalFederationApi + 'static) {
192        self.api.as_ref()
193    }
194
195    pub fn api_clone(&self) -> DynGlobalApi {
196        self.api.clone()
197    }
198
199    /// Returns a stream that emits the current connection status of all peers
200    /// whenever any peer's status changes. Emits initial state immediately.
201    pub fn connection_status_stream(&self) -> impl Stream<Item = BTreeMap<PeerId, bool>> {
202        self.api.connection_status_stream()
203    }
204
205    /// Get the [`TaskGroup`] that is tied to Client's lifetime.
206    pub fn task_group(&self) -> &TaskGroup {
207        &self.task_group
208    }
209
210    /// Useful for our CLI tooling, not meant for external use
211    #[doc(hidden)]
212    pub fn executor(&self) -> &Executor {
213        &self.executor
214    }
215
216    pub async fn get_config_from_db(db: &Database) -> Option<ClientConfig> {
217        let mut dbtx = db.begin_transaction_nc().await;
218        dbtx.get_value(&ClientConfigKey).await
219    }
220
221    pub async fn get_pending_config_from_db(db: &Database) -> Option<ClientConfig> {
222        let mut dbtx = db.begin_transaction_nc().await;
223        dbtx.get_value(&PendingClientConfigKey).await
224    }
225
226    pub async fn get_api_secret_from_db(db: &Database) -> Option<String> {
227        let mut dbtx = db.begin_transaction_nc().await;
228        dbtx.get_value(&ApiSecretKey).await
229    }
230
231    pub async fn store_encodable_client_secret<T: Encodable>(
232        db: &Database,
233        secret: T,
234    ) -> anyhow::Result<()> {
235        let mut dbtx = db.begin_transaction().await;
236
237        // Don't overwrite an existing secret
238        if dbtx.get_value(&EncodedClientSecretKey).await.is_some() {
239            bail!("Encoded client secret already exists, cannot overwrite")
240        }
241
242        let encoded_secret = T::consensus_encode_to_vec(&secret);
243        dbtx.insert_entry(&EncodedClientSecretKey, &encoded_secret)
244            .await;
245        dbtx.commit_tx().await;
246        Ok(())
247    }
248
249    pub async fn load_decodable_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
250        let Some(secret) = Self::load_decodable_client_secret_opt(db).await? else {
251            bail!("Encoded client secret not present in DB")
252        };
253
254        Ok(secret)
255    }
256    pub async fn load_decodable_client_secret_opt<T: Decodable>(
257        db: &Database,
258    ) -> anyhow::Result<Option<T>> {
259        let mut dbtx = db.begin_transaction_nc().await;
260
261        let client_secret = dbtx.get_value(&EncodedClientSecretKey).await;
262
263        Ok(match client_secret {
264            Some(client_secret) => Some(
265                T::consensus_decode_whole(&client_secret, &ModuleRegistry::default())
266                    .map_err(|e| anyhow!("Decoding failed: {e}"))?,
267            ),
268            None => None,
269        })
270    }
271
272    pub async fn load_or_generate_client_secret(db: &Database) -> anyhow::Result<[u8; 64]> {
273        let client_secret = match Self::load_decodable_client_secret::<[u8; 64]>(db).await {
274            Ok(secret) => secret,
275            _ => {
276                let secret = PlainRootSecretStrategy::random(&mut thread_rng());
277                Self::store_encodable_client_secret(db, secret)
278                    .await
279                    .expect("Storing client secret must work");
280                secret
281            }
282        };
283        Ok(client_secret)
284    }
285
286    pub async fn is_initialized(db: &Database) -> bool {
287        let mut dbtx = db.begin_transaction_nc().await;
288        dbtx.raw_get_bytes(&[ClientConfigKey::DB_PREFIX])
289            .await
290            .expect("Unrecoverable error occurred while reading and entry from the database")
291            .is_some()
292    }
293
294    pub fn start_executor(self: &Arc<Self>) {
295        debug!(
296            target: LOG_CLIENT,
297            "Starting fedimint client executor",
298        );
299        self.executor.start_executor(self.context_gen());
300    }
301
302    pub fn federation_id(&self) -> FederationId {
303        self.federation_id
304    }
305
306    fn context_gen(self: &Arc<Self>) -> ModuleGlobalContextGen {
307        let client_inner = Arc::downgrade(self);
308        Arc::new(move |module_instance, operation| {
309            ModuleGlobalClientContext {
310                client: client_inner
311                    .clone()
312                    .upgrade()
313                    .expect("ModuleGlobalContextGen called after client was dropped"),
314                module_instance_id: module_instance,
315                operation,
316            }
317            .into()
318        })
319    }
320
321    pub async fn config(&self) -> ClientConfig {
322        self.config.read().await.clone()
323    }
324
325    // TODO: change to `-> Option<&str>`
326    pub fn api_secret(&self) -> &Option<String> {
327        &self.api_secret
328    }
329
330    /// Returns the core API version that the federation supports
331    ///
332    /// This reads from the cached version stored during client initialization.
333    /// If no cache is available (e.g., during initial setup), returns a default
334    /// version (0, 0).
335    pub async fn core_api_version(&self) -> ApiVersion {
336        // Try to get from cache. If not available, return a conservative
337        // default. The cache should always be populated after successful client init.
338        self.db
339            .begin_transaction_nc()
340            .await
341            .get_value(&CachedApiVersionSetKey)
342            .await
343            .map(|cached: CachedApiVersionSet| cached.0.core)
344            .unwrap_or(ApiVersion { major: 0, minor: 0 })
345    }
346
347    pub fn decoders(&self) -> &ModuleDecoderRegistry {
348        &self.decoders
349    }
350
351    /// Returns a reference to the module, panics if not found
352    fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
353        self.try_get_module(instance)
354            .expect("Module instance not found")
355    }
356
357    fn try_get_module(
358        &self,
359        instance: ModuleInstanceId,
360    ) -> Option<&maybe_add_send_sync!(dyn IClientModule)> {
361        Some(self.modules.get(instance)?.as_ref())
362    }
363
364    pub fn has_module(&self, instance: ModuleInstanceId) -> bool {
365        self.modules.get(instance).is_some()
366    }
367
368    /// Returns the input amount and output amount of a transaction
369    ///
370    /// # Panics
371    /// If any of the input or output versions in the transaction builder are
372    /// unknown by the respective module.
373    fn transaction_builder_get_balance(&self, builder: &TransactionBuilder) -> (Amounts, Amounts) {
374        // FIXME: prevent overflows, currently not suitable for untrusted input
375        let mut in_amounts = Amounts::ZERO;
376        let mut out_amounts = Amounts::ZERO;
377        let mut fee_amounts = Amounts::ZERO;
378
379        for input in builder.inputs() {
380            let module = self.get_module(input.input.module_instance_id());
381
382            let item_fees = module.input_fee(&input.amounts, &input.input).expect(
383                "We only build transactions with input versions that are supported by the module",
384            );
385
386            in_amounts.checked_add_mut(&input.amounts);
387            fee_amounts.checked_add_mut(&item_fees);
388        }
389
390        for output in builder.outputs() {
391            let module = self.get_module(output.output.module_instance_id());
392
393            let item_fees = module.output_fee(&output.amounts, &output.output).expect(
394                "We only build transactions with output versions that are supported by the module",
395            );
396
397            out_amounts.checked_add_mut(&output.amounts);
398            fee_amounts.checked_add_mut(&item_fees);
399        }
400
401        out_amounts.checked_add_mut(&fee_amounts);
402        (in_amounts, out_amounts)
403    }
404
405    pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
406        Ok((self.federation_id().to_fake_ln_pub_key(&self.secp_ctx)?, 0))
407    }
408
409    /// Get metadata value from the federation config itself
410    pub fn get_config_meta(&self, key: &str) -> Option<String> {
411        self.federation_config_meta.get(key).cloned()
412    }
413
414    pub(crate) fn root_secret(&self) -> DerivableSecret {
415        self.root_secret.clone()
416    }
417
418    pub async fn add_state_machines(
419        &self,
420        dbtx: &mut DatabaseTransaction<'_>,
421        states: Vec<DynState>,
422    ) -> AddStateMachinesResult {
423        self.executor.add_state_machines_dbtx(dbtx, states).await
424    }
425
426    // TODO: implement as part of [`OperationLog`]
427    pub async fn get_active_operations(&self) -> HashSet<OperationId> {
428        let active_states = self.executor.get_active_states().await;
429        let mut active_operations = HashSet::with_capacity(active_states.len());
430        let mut dbtx = self.db().begin_transaction_nc().await;
431        for (state, _) in active_states {
432            let operation_id = state.operation_id();
433            if dbtx
434                .get_value(&OperationLogKey { operation_id })
435                .await
436                .is_some()
437            {
438                active_operations.insert(operation_id);
439            }
440        }
441        active_operations
442    }
443
444    pub fn operation_log(&self) -> &OperationLog {
445        &self.operation_log
446    }
447
448    /// Get the meta manager to read meta fields.
449    pub fn meta_service(&self) -> &Arc<MetaService> {
450        &self.meta_service
451    }
452
453    /// Get the meta manager to read meta fields.
454    pub async fn get_meta_expiration_timestamp(&self) -> Option<SystemTime> {
455        let meta_service = self.meta_service();
456        let ts = meta_service
457            .get_field::<u64>(self.db(), "federation_expiry_timestamp")
458            .await
459            .and_then(|v| v.value)?;
460        Some(UNIX_EPOCH + Duration::from_secs(ts))
461    }
462
463    /// Adds funding to a transaction or removes over-funding via change.
464    async fn finalize_transaction(
465        &self,
466        dbtx: &mut DatabaseTransaction<'_>,
467        operation_id: OperationId,
468        mut partial_transaction: TransactionBuilder,
469    ) -> anyhow::Result<(Transaction, Vec<DynState>, Range<u64>)> {
470        let (in_amounts, out_amounts) = self.transaction_builder_get_balance(&partial_transaction);
471
472        let mut added_inputs_bundles = vec![];
473        let mut added_outputs_bundles = vec![];
474
475        // The way currently things are implemented is OK for modules which can
476        // collect a fee relative to one being used, but will break down in any
477        // fancy scenarios. Future TODOs:
478        //
479        // * create_final_inputs_and_outputs needs to get broken down, so we can use
480        //   primary modules using priorities (possibly separate prios for inputs and
481        //   outputs to be able to drain modules, etc.); we need the split "check if
482        //   possible" and "take" steps,
483        // * extra inputs and outputs adding fees needs to be taken into account,
484        //   possibly with some looping
485        for unit in in_amounts.units().union(&out_amounts.units()) {
486            let input_amount = in_amounts.get(unit).copied().unwrap_or_default();
487            let output_amount = out_amounts.get(unit).copied().unwrap_or_default();
488            if input_amount == output_amount {
489                continue;
490            }
491
492            let Some((module_id, module)) = self.primary_module_for_unit(*unit) else {
493                bail!("No module to balance a partial transaction (affected unit: {unit:?}");
494            };
495
496            let (added_input_bundle, added_output_bundle) = module
497                .create_final_inputs_and_outputs(
498                    module_id,
499                    dbtx,
500                    operation_id,
501                    *unit,
502                    input_amount,
503                    output_amount,
504                )
505                .await?;
506
507            added_inputs_bundles.push(added_input_bundle);
508            added_outputs_bundles.push(added_output_bundle);
509        }
510
511        // This is the range of  outputs that will be added to the transaction
512        // in order to balance it. Notice that it may stay empty in case the transaction
513        // is already balanced.
514        let change_range = Range {
515            start: partial_transaction.outputs().count() as u64,
516            end: (partial_transaction.outputs().count() as u64
517                + added_outputs_bundles
518                    .iter()
519                    .map(|output| output.outputs().len() as u64)
520                    .sum::<u64>()),
521        };
522
523        for added_inputs in added_inputs_bundles {
524            partial_transaction = partial_transaction.with_inputs(added_inputs);
525        }
526
527        for added_outputs in added_outputs_bundles {
528            partial_transaction = partial_transaction.with_outputs(added_outputs);
529        }
530
531        let (input_amounts, output_amounts) =
532            self.transaction_builder_get_balance(&partial_transaction);
533
534        for (unit, output_amount) in output_amounts {
535            let input_amount = input_amounts.get(&unit).copied().unwrap_or_default();
536
537            assert!(input_amount >= output_amount, "Transaction is underfunded");
538        }
539
540        let (tx, states) = partial_transaction.build(&self.secp_ctx, thread_rng());
541
542        Ok((tx, states, change_range))
543    }
544
545    /// Add funding and/or change to the transaction builder as needed, finalize
546    /// the transaction and submit it to the federation.
547    ///
548    /// ## Errors
549    /// The function will return an error if the operation with given ID already
550    /// exists.
551    ///
552    /// ## Panics
553    /// The function will panic if the database transaction collides with
554    /// other and fails with others too often, this should not happen except for
555    /// excessively concurrent scenarios.
556    pub async fn finalize_and_submit_transaction<F, M>(
557        &self,
558        operation_id: OperationId,
559        operation_type: &str,
560        operation_meta_gen: F,
561        tx_builder: TransactionBuilder,
562    ) -> anyhow::Result<OutPointRange>
563    where
564        F: Fn(OutPointRange) -> M + Clone + MaybeSend + MaybeSync,
565        M: serde::Serialize + MaybeSend,
566    {
567        let operation_type = operation_type.to_owned();
568
569        let autocommit_res = self
570            .db
571            .autocommit(
572                |dbtx, _| {
573                    let operation_type = operation_type.clone();
574                    let tx_builder = tx_builder.clone();
575                    let operation_meta_gen = operation_meta_gen.clone();
576                    Box::pin(async move {
577                        if Client::operation_exists_dbtx(dbtx, operation_id).await {
578                            bail!("There already exists an operation with id {operation_id:?}")
579                        }
580
581                        let out_point_range = self
582                            .finalize_and_submit_transaction_inner(dbtx, operation_id, tx_builder)
583                            .await?;
584
585                        self.operation_log()
586                            .add_operation_log_entry_dbtx(
587                                dbtx,
588                                operation_id,
589                                &operation_type,
590                                operation_meta_gen(out_point_range),
591                            )
592                            .await;
593
594                        Ok(out_point_range)
595                    })
596                },
597                Some(100), // TODO: handle what happens after 100 retries
598            )
599            .await;
600
601        match autocommit_res {
602            Ok(txid) => Ok(txid),
603            Err(AutocommitError::ClosureError { error, .. }) => Err(error),
604            Err(AutocommitError::CommitFailed {
605                attempts,
606                last_error,
607            }) => panic!(
608                "Failed to commit tx submission dbtx after {attempts} attempts: {last_error}"
609            ),
610        }
611    }
612
613    async fn finalize_and_submit_transaction_inner(
614        &self,
615        dbtx: &mut DatabaseTransaction<'_>,
616        operation_id: OperationId,
617        tx_builder: TransactionBuilder,
618    ) -> anyhow::Result<OutPointRange> {
619        let (transaction, mut states, change_range) = self
620            .finalize_transaction(&mut dbtx.to_ref_nc(), operation_id, tx_builder)
621            .await?;
622
623        if transaction.consensus_encode_to_vec().len() > Transaction::MAX_TX_SIZE {
624            let inputs = transaction
625                .inputs
626                .iter()
627                .map(DynInput::module_instance_id)
628                .collect::<Vec<_>>();
629            let outputs = transaction
630                .outputs
631                .iter()
632                .map(DynOutput::module_instance_id)
633                .collect::<Vec<_>>();
634            warn!(
635                target: LOG_CLIENT_NET_API,
636                size=%transaction.consensus_encode_to_vec().len(),
637                ?inputs,
638                ?outputs,
639                "Transaction too large",
640            );
641            debug!(target: LOG_CLIENT_NET_API, ?transaction, "transaction details");
642            bail!(
643                "The generated transaction would be rejected by the federation for being too large."
644            );
645        }
646
647        let txid = transaction.tx_hash();
648
649        debug!(target: LOG_CLIENT_NET_API, %txid, ?transaction,  "Finalized and submitting transaction");
650
651        let tx_submission_sm = DynState::from_typed(
652            TRANSACTION_SUBMISSION_MODULE_INSTANCE,
653            TxSubmissionStatesSM {
654                operation_id,
655                state: TxSubmissionStates::Created(transaction),
656            },
657        );
658        states.push(tx_submission_sm);
659
660        self.executor.add_state_machines_dbtx(dbtx, states).await?;
661
662        self.log_event_dbtx(dbtx, None, TxCreatedEvent { txid, operation_id })
663            .await;
664
665        Ok(OutPointRange::new(txid, IdxRange::from(change_range)))
666    }
667
668    async fn transaction_update_stream(
669        &self,
670        operation_id: OperationId,
671    ) -> BoxStream<'static, TxSubmissionStatesSM> {
672        self.executor
673            .notifier()
674            .module_notifier::<TxSubmissionStatesSM>(
675                TRANSACTION_SUBMISSION_MODULE_INSTANCE,
676                self.final_client.clone(),
677            )
678            .subscribe(operation_id)
679            .await
680    }
681
682    pub async fn operation_exists(&self, operation_id: OperationId) -> bool {
683        let mut dbtx = self.db().begin_transaction_nc().await;
684
685        Client::operation_exists_dbtx(&mut dbtx, operation_id).await
686    }
687
688    pub async fn operation_exists_dbtx(
689        dbtx: &mut DatabaseTransaction<'_>,
690        operation_id: OperationId,
691    ) -> bool {
692        let active_state_exists = dbtx
693            .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
694            .await
695            .next()
696            .await
697            .is_some();
698
699        let inactive_state_exists = dbtx
700            .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
701            .await
702            .next()
703            .await
704            .is_some();
705
706        active_state_exists || inactive_state_exists
707    }
708
709    pub async fn has_active_states(&self, operation_id: OperationId) -> bool {
710        self.db
711            .begin_transaction_nc()
712            .await
713            .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
714            .await
715            .next()
716            .await
717            .is_some()
718    }
719
720    /// Waits for an output from the primary module to reach its final
721    /// state.
722    pub async fn await_primary_bitcoin_module_output(
723        &self,
724        operation_id: OperationId,
725        out_point: OutPoint,
726    ) -> anyhow::Result<()> {
727        self.primary_module_for_unit(AmountUnit::BITCOIN)
728            .ok_or_else(|| anyhow!("No primary module available"))?
729            .1
730            .await_primary_module_output(operation_id, out_point)
731            .await
732    }
733
734    /// Returns a reference to a typed module client instance by kind
735    pub fn get_first_module<M: ClientModule>(
736        &'_ self,
737    ) -> anyhow::Result<ClientModuleInstance<'_, M>> {
738        let module_kind = M::kind();
739        let id = self
740            .get_first_instance(&module_kind)
741            .ok_or_else(|| format_err!("No modules found of kind {module_kind}"))?;
742        let module: &M = self
743            .try_get_module(id)
744            .ok_or_else(|| format_err!("Unknown module instance {id}"))?
745            .as_any()
746            .downcast_ref::<M>()
747            .ok_or_else(|| format_err!("Module is not of type {}", std::any::type_name::<M>()))?;
748        let (db, _) = self.db().with_prefix_module_id(id);
749        Ok(ClientModuleInstance {
750            id,
751            db,
752            api: self.api().with_module(id),
753            module,
754        })
755    }
756
757    pub fn get_module_client_dyn(
758        &self,
759        instance_id: ModuleInstanceId,
760    ) -> anyhow::Result<&maybe_add_send_sync!(dyn IClientModule)> {
761        self.try_get_module(instance_id)
762            .ok_or(anyhow!("Unknown module instance {}", instance_id))
763    }
764
765    pub fn db(&self) -> &Database {
766        &self.db
767    }
768
769    pub fn endpoints(&self) -> &ConnectorRegistry {
770        &self.connectors
771    }
772
773    /// Returns a stream of transaction updates for the given operation id that
774    /// can later be used to watch for a specific transaction being accepted.
775    pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
776        TransactionUpdates {
777            update_stream: self.transaction_update_stream(operation_id).await,
778        }
779    }
780
781    /// Returns the instance id of the first module of the given kind.
782    pub fn get_first_instance(&self, module_kind: &ModuleKind) -> Option<ModuleInstanceId> {
783        self.modules
784            .iter_modules()
785            .find(|(_, kind, _module)| *kind == module_kind)
786            .map(|(instance_id, _, _)| instance_id)
787    }
788
789    /// Returns the data from which the client's root secret is derived (e.g.
790    /// BIP39 seed phrase struct).
791    pub async fn root_secret_encoding<T: Decodable>(&self) -> anyhow::Result<T> {
792        get_decoded_client_secret::<T>(self.db()).await
793    }
794
795    /// Waits for outputs from the primary module to reach its final
796    /// state.
797    pub async fn await_primary_bitcoin_module_outputs(
798        &self,
799        operation_id: OperationId,
800        outputs: Vec<OutPoint>,
801    ) -> anyhow::Result<()> {
802        for out_point in outputs {
803            self.await_primary_bitcoin_module_output(operation_id, out_point)
804                .await?;
805        }
806
807        Ok(())
808    }
809
810    /// Returns the config of the client in JSON format.
811    ///
812    /// Compared to the consensus module format where module configs are binary
813    /// encoded this format cannot be cryptographically verified but is easier
814    /// to consume and to some degree human-readable.
815    pub async fn get_config_json(&self) -> JsonClientConfig {
816        self.config().await.to_json()
817    }
818
819    // Ideally this would not be in the API, but there's a lot of places where this
820    // makes it easier.
821    #[doc(hidden)]
822    /// Like [`Self::get_balance`] but returns an error if primary module is not
823    /// available
824    pub async fn get_balance_for_btc(&self) -> anyhow::Result<Amount> {
825        self.get_balance_for_unit(AmountUnit::BITCOIN).await
826    }
827
828    pub async fn get_balance_for_unit(&self, unit: AmountUnit) -> anyhow::Result<Amount> {
829        let (id, module) = self
830            .primary_module_for_unit(unit)
831            .ok_or_else(|| anyhow!("Primary module not available"))?;
832        Ok(module
833            .get_balance(id, &mut self.db().begin_transaction_nc().await, unit)
834            .await)
835    }
836
837    /// Returns a stream that yields the current client balance every time it
838    /// changes.
839    pub async fn subscribe_balance_changes(&self, unit: AmountUnit) -> BoxStream<'static, Amount> {
840        let primary_module_things =
841            if let Some((primary_module_id, primary_module)) = self.primary_module_for_unit(unit) {
842                let balance_changes = primary_module.subscribe_balance_changes().await;
843                let initial_balance = self
844                    .get_balance_for_unit(unit)
845                    .await
846                    .expect("Primary is present");
847
848                Some((
849                    primary_module_id,
850                    primary_module.clone(),
851                    balance_changes,
852                    initial_balance,
853                ))
854            } else {
855                None
856            };
857        let db = self.db().clone();
858
859        Box::pin(async_stream::stream! {
860            let Some((primary_module_id, primary_module, mut balance_changes, initial_balance)) = primary_module_things else {
861                // If there is no primary module, there will not be one until client is
862                // restarted
863                pending().await
864            };
865
866
867            yield initial_balance;
868            let mut prev_balance = initial_balance;
869            while let Some(()) = balance_changes.next().await {
870                let mut dbtx = db.begin_transaction_nc().await;
871                let balance = primary_module
872                     .get_balance(primary_module_id, &mut dbtx, unit)
873                    .await;
874
875                // Deduplicate in case modules cannot always tell if the balance actually changed
876                if balance != prev_balance {
877                    prev_balance = balance;
878                    yield balance;
879                }
880            }
881        })
882    }
883
884    /// Make a single API version request to a peer after a delay.
885    ///
886    /// The delay is here to unify the type of a future both for initial request
887    /// and possible retries.
888    async fn make_api_version_request(
889        delay: Duration,
890        peer_id: PeerId,
891        api: &DynGlobalApi,
892    ) -> (
893        PeerId,
894        Result<SupportedApiVersionsSummary, fedimint_connectors::error::ServerError>,
895    ) {
896        runtime::sleep(delay).await;
897        (
898            peer_id,
899            api.request_single_peer::<SupportedApiVersionsSummary>(
900                VERSION_ENDPOINT.to_owned(),
901                ApiRequestErased::default(),
902                peer_id,
903            )
904            .await,
905        )
906    }
907
908    /// Create a backoff strategy for API version requests.
909    ///
910    /// Keep trying, initially somewhat aggressively, but after a while retry
911    /// very slowly, because chances for response are getting lower and
912    /// lower.
913    fn create_api_version_backoff() -> impl Iterator<Item = Duration> {
914        custom_backoff(Duration::from_millis(200), Duration::from_secs(600), None)
915    }
916
917    /// Query the federation for API version support and then calculate
918    /// the best API version to use (supported by most guardians).
919    pub async fn fetch_common_api_versions_from_all_peers(
920        num_peers: NumPeers,
921        api: DynGlobalApi,
922        db: Database,
923        num_responses_sender: watch::Sender<usize>,
924    ) {
925        let mut backoff = Self::create_api_version_backoff();
926
927        // NOTE: `FuturesUnordered` is a footgun, but since we only poll it for result
928        // and make a single async db write operation, it should be OK.
929        let mut requests = FuturesUnordered::new();
930
931        for peer_id in num_peers.peer_ids() {
932            requests.push(Self::make_api_version_request(
933                Duration::ZERO,
934                peer_id,
935                &api,
936            ));
937        }
938
939        let mut num_responses = 0;
940
941        while let Some((peer_id, response)) = requests.next().await {
942            let retry = match response {
943                Err(err) => {
944                    let has_previous_response = db
945                        .begin_transaction_nc()
946                        .await
947                        .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
948                        .await
949                        .is_some();
950                    debug!(
951                        target: LOG_CLIENT,
952                        %peer_id,
953                        err = %err.fmt_compact(),
954                        %has_previous_response,
955                        "Failed to refresh API versions of a peer"
956                    );
957
958                    !has_previous_response
959                }
960                Ok(o) => {
961                    // Save the response to the database right away, just to
962                    // not lose it
963                    let mut dbtx = db.begin_transaction().await;
964                    dbtx.insert_entry(
965                        &PeerLastApiVersionsSummaryKey(peer_id),
966                        &PeerLastApiVersionsSummary(o),
967                    )
968                    .await;
969                    dbtx.commit_tx().await;
970                    false
971                }
972            };
973
974            if retry {
975                requests.push(Self::make_api_version_request(
976                    backoff.next().expect("Keeps retrying"),
977                    peer_id,
978                    &api,
979                ));
980            } else {
981                num_responses += 1;
982                num_responses_sender.send_replace(num_responses);
983            }
984        }
985    }
986
987    /// Fetch API versions from peers, retrying until we get threshold number of
988    /// successful responses. Returns the successful responses collected
989    /// from at least `num_peers.threshold()` peers.
990    pub async fn fetch_peers_api_versions_from_threshold_of_peers(
991        num_peers: NumPeers,
992        api: DynGlobalApi,
993    ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
994        let mut backoff = Self::create_api_version_backoff();
995
996        // NOTE: `FuturesUnordered` is a footgun, but since we only poll it for result
997        // and collect responses, it should be OK.
998        let mut requests = FuturesUnordered::new();
999
1000        for peer_id in num_peers.peer_ids() {
1001            requests.push(Self::make_api_version_request(
1002                Duration::ZERO,
1003                peer_id,
1004                &api,
1005            ));
1006        }
1007
1008        let mut successful_responses = BTreeMap::new();
1009
1010        while successful_responses.len() < num_peers.threshold()
1011            && let Some((peer_id, response)) = requests.next().await
1012        {
1013            let retry = match response {
1014                Err(err) => {
1015                    debug!(
1016                        target: LOG_CLIENT,
1017                        %peer_id,
1018                        err = %err.fmt_compact(),
1019                        "Failed to fetch API versions from peer"
1020                    );
1021                    true
1022                }
1023                Ok(response) => {
1024                    successful_responses.insert(peer_id, response);
1025                    false
1026                }
1027            };
1028
1029            if retry {
1030                requests.push(Self::make_api_version_request(
1031                    backoff.next().expect("Keeps retrying"),
1032                    peer_id,
1033                    &api,
1034                ));
1035            }
1036        }
1037
1038        successful_responses
1039    }
1040
1041    /// Fetch API versions from peers and discover common API versions to use.
1042    pub async fn fetch_common_api_versions(
1043        config: &ClientConfig,
1044        api: &DynGlobalApi,
1045    ) -> anyhow::Result<BTreeMap<PeerId, SupportedApiVersionsSummary>> {
1046        debug!(
1047            target: LOG_CLIENT,
1048            "Fetching common api versions"
1049        );
1050
1051        let num_peers = NumPeers::from(config.global.api_endpoints.len());
1052
1053        let peer_api_version_sets =
1054            Self::fetch_peers_api_versions_from_threshold_of_peers(num_peers, api.clone()).await;
1055
1056        Ok(peer_api_version_sets)
1057    }
1058
1059    /// Write API version set to database cache.
1060    /// Used when we have a pre-calculated API version set that should be stored
1061    /// for later use.
1062    pub async fn write_api_version_cache(
1063        dbtx: &mut DatabaseTransaction<'_>,
1064        api_version_set: ApiVersionSet,
1065    ) {
1066        debug!(
1067            target: LOG_CLIENT,
1068            value = ?api_version_set,
1069            "Writing API version set to cache"
1070        );
1071
1072        dbtx.insert_entry(
1073            &CachedApiVersionSetKey,
1074            &CachedApiVersionSet(api_version_set),
1075        )
1076        .await;
1077    }
1078
1079    /// Store prefetched peer API version responses and calculate/store common
1080    /// API version set. This processes the individual peer responses by
1081    /// storing them in the database and calculating the common API version
1082    /// set for caching.
1083    pub async fn store_prefetched_api_versions(
1084        db: &Database,
1085        config: &ClientConfig,
1086        client_module_init: &ClientModuleInitRegistry,
1087        peer_api_versions: &BTreeMap<PeerId, SupportedApiVersionsSummary>,
1088    ) {
1089        debug!(
1090            target: LOG_CLIENT,
1091            "Storing {} prefetched peer API version responses and calculating common version set",
1092            peer_api_versions.len()
1093        );
1094
1095        let mut dbtx = db.begin_transaction().await;
1096        // Calculate common API version set from individual responses
1097        let client_supported_versions =
1098            Self::supported_api_versions_summary_static(config, client_module_init);
1099        match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1100            &client_supported_versions,
1101            peer_api_versions,
1102        ) {
1103            Ok(common_api_versions) => {
1104                // Write the calculated common API version set to database cache
1105                Self::write_api_version_cache(&mut dbtx.to_ref_nc(), common_api_versions).await;
1106                debug!(target: LOG_CLIENT, "Calculated and stored common API version set");
1107            }
1108            Err(err) => {
1109                debug!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Failed to calculate common API versions from prefetched data");
1110            }
1111        }
1112
1113        // Store individual peer responses to database
1114        for (peer_id, peer_api_versions) in peer_api_versions {
1115            dbtx.insert_entry(
1116                &PeerLastApiVersionsSummaryKey(*peer_id),
1117                &PeerLastApiVersionsSummary(peer_api_versions.clone()),
1118            )
1119            .await;
1120        }
1121        dbtx.commit_tx().await;
1122        debug!(target: LOG_CLIENT, "Stored individual peer API version responses");
1123    }
1124
1125    /// [`SupportedApiVersionsSummary`] that the client and its modules support
1126    pub fn supported_api_versions_summary_static(
1127        config: &ClientConfig,
1128        client_module_init: &ClientModuleInitRegistry,
1129    ) -> SupportedApiVersionsSummary {
1130        SupportedApiVersionsSummary {
1131            core: SupportedCoreApiVersions {
1132                core_consensus: config.global.consensus_version,
1133                api: MultiApiVersion::try_from_iter(SUPPORTED_CORE_API_VERSIONS.to_owned())
1134                    .expect("must not have conflicting versions"),
1135            },
1136            modules: config
1137                .modules
1138                .iter()
1139                .filter_map(|(&module_instance_id, module_config)| {
1140                    client_module_init
1141                        .get(module_config.kind())
1142                        .map(|module_init| {
1143                            (
1144                                module_instance_id,
1145                                SupportedModuleApiVersions {
1146                                    core_consensus: config.global.consensus_version,
1147                                    module_consensus: module_config.version,
1148                                    api: module_init.supported_api_versions(),
1149                                },
1150                            )
1151                        })
1152                })
1153                .collect(),
1154        }
1155    }
1156
1157    pub async fn load_and_refresh_common_api_version(&self) -> anyhow::Result<ApiVersionSet> {
1158        Self::load_and_refresh_common_api_version_static(
1159            &self.config().await,
1160            &self.module_inits,
1161            self.connectors.clone(),
1162            &self.api,
1163            &self.db,
1164            &self.task_group,
1165        )
1166        .await
1167    }
1168
1169    /// Load the common api versions to use from cache and start a background
1170    /// process to refresh them.
1171    ///
1172    /// This is a compromise, so we not have to wait for version discovery to
1173    /// complete every time a [`Client`] is being built.
1174    async fn load_and_refresh_common_api_version_static(
1175        config: &ClientConfig,
1176        module_init: &ClientModuleInitRegistry,
1177        connectors: ConnectorRegistry,
1178        api: &DynGlobalApi,
1179        db: &Database,
1180        task_group: &TaskGroup,
1181    ) -> anyhow::Result<ApiVersionSet> {
1182        if let Some(v) = db
1183            .begin_transaction_nc()
1184            .await
1185            .get_value(&CachedApiVersionSetKey)
1186            .await
1187        {
1188            debug!(
1189                target: LOG_CLIENT,
1190                "Found existing cached common api versions"
1191            );
1192            let config = config.clone();
1193            let client_module_init = module_init.clone();
1194            let api = api.clone();
1195            let db = db.clone();
1196            let task_group = task_group.clone();
1197            // Separate task group, because we actually don't want to be waiting for this to
1198            // finish, and it's just best effort.
1199            task_group
1200                .clone()
1201                .spawn_cancellable("refresh_common_api_version_static", async move {
1202                    connectors.wait_for_initialized_connections().await;
1203
1204                    if let Err(error) = Self::refresh_common_api_version_static(
1205                        &config,
1206                        &client_module_init,
1207                        &api,
1208                        &db,
1209                        task_group,
1210                        false,
1211                    )
1212                    .await
1213                    {
1214                        warn!(
1215                            target: LOG_CLIENT,
1216                            err = %error.fmt_compact_anyhow(), "Failed to discover common api versions"
1217                        );
1218                    }
1219                });
1220
1221            return Ok(v.0);
1222        }
1223
1224        info!(
1225            target: LOG_CLIENT,
1226            "Fetching initial API versions "
1227        );
1228        Self::refresh_common_api_version_static(
1229            config,
1230            module_init,
1231            api,
1232            db,
1233            task_group.clone(),
1234            true,
1235        )
1236        .await
1237    }
1238
1239    async fn refresh_common_api_version_static(
1240        config: &ClientConfig,
1241        client_module_init: &ClientModuleInitRegistry,
1242        api: &DynGlobalApi,
1243        db: &Database,
1244        task_group: TaskGroup,
1245        block_until_ok: bool,
1246    ) -> anyhow::Result<ApiVersionSet> {
1247        debug!(
1248            target: LOG_CLIENT,
1249            "Refreshing common api versions"
1250        );
1251
1252        let (num_responses_sender, mut num_responses_receiver) = tokio::sync::watch::channel(0);
1253        let num_peers = NumPeers::from(config.global.api_endpoints.len());
1254
1255        task_group.spawn_cancellable("refresh peers api versions", {
1256            Client::fetch_common_api_versions_from_all_peers(
1257                num_peers,
1258                api.clone(),
1259                db.clone(),
1260                num_responses_sender,
1261            )
1262        });
1263
1264        let common_api_versions = loop {
1265            // Wait to collect enough answers before calculating a set of common api
1266            // versions to use. Note that all peers individual responses from
1267            // previous attempts are still being used, and requests, or even
1268            // retries for response of peers are not actually cancelled, as they
1269            // are happening on a separate task. This is all just to bound the
1270            // time user can be waiting for the join operation to finish, at the
1271            // risk of picking wrong version in very rare circumstances.
1272            let _: Result<_, Elapsed> = runtime::timeout(
1273                Duration::from_secs(30),
1274                num_responses_receiver.wait_for(|num| num_peers.threshold() <= *num),
1275            )
1276            .await;
1277
1278            let peer_api_version_sets = Self::load_peers_last_api_versions(db, num_peers).await;
1279
1280            match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1281                &Self::supported_api_versions_summary_static(config, client_module_init),
1282                &peer_api_version_sets,
1283            ) {
1284                Ok(o) => break o,
1285                Err(err) if block_until_ok => {
1286                    warn!(
1287                        target: LOG_CLIENT,
1288                        err = %err.fmt_compact_anyhow(),
1289                        "Failed to discover API version to use. Retrying..."
1290                    );
1291                    continue;
1292                }
1293                Err(e) => return Err(e),
1294            }
1295        };
1296
1297        debug!(
1298            target: LOG_CLIENT,
1299            value = ?common_api_versions,
1300            "Updating the cached common api versions"
1301        );
1302        let mut dbtx = db.begin_transaction().await;
1303        let _ = dbtx
1304            .insert_entry(
1305                &CachedApiVersionSetKey,
1306                &CachedApiVersionSet(common_api_versions.clone()),
1307            )
1308            .await;
1309
1310        dbtx.commit_tx().await;
1311
1312        Ok(common_api_versions)
1313    }
1314
1315    /// Get the client [`Metadata`]
1316    pub async fn get_metadata(&self) -> Metadata {
1317        self.db
1318            .begin_transaction_nc()
1319            .await
1320            .get_value(&ClientMetadataKey)
1321            .await
1322            .unwrap_or_else(|| {
1323                warn!(
1324                    target: LOG_CLIENT,
1325                    "Missing existing metadata. This key should have been set on Client init"
1326                );
1327                Metadata::empty()
1328            })
1329    }
1330
1331    /// Set the client [`Metadata`]
1332    pub async fn set_metadata(&self, metadata: &Metadata) {
1333        self.db
1334            .autocommit::<_, _, anyhow::Error>(
1335                |dbtx, _| {
1336                    Box::pin(async {
1337                        Self::set_metadata_dbtx(dbtx, metadata).await;
1338                        Ok(())
1339                    })
1340                },
1341                None,
1342            )
1343            .await
1344            .expect("Failed to autocommit metadata");
1345    }
1346
1347    pub fn has_pending_recoveries(&self) -> bool {
1348        !self
1349            .client_recovery_progress_receiver
1350            .borrow()
1351            .iter()
1352            .all(|(_id, progress)| progress.is_done())
1353    }
1354
1355    /// Wait for all module recoveries to finish
1356    ///
1357    /// This will block until the recovery task is done with recoveries.
1358    /// Returns success if all recovery tasks are complete (success case),
1359    /// or an error if some modules could not complete the recovery at the time.
1360    ///
1361    /// A bit of a heavy approach.
1362    pub async fn wait_for_all_recoveries(&self) -> anyhow::Result<()> {
1363        let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1364        recovery_receiver
1365            .wait_for(|in_progress| {
1366                in_progress
1367                    .iter()
1368                    .all(|(_id, progress)| progress.is_done())
1369            })
1370            .await
1371            .context("Recovery task completed and update receiver disconnected, but some modules failed to recover")?;
1372
1373        Ok(())
1374    }
1375
1376    /// Subscribe to recover progress for all the modules.
1377    ///
1378    /// This stream can contain duplicate progress for a module.
1379    /// Don't use this stream for detecting completion of recovery.
1380    pub fn subscribe_to_recovery_progress(
1381        &self,
1382    ) -> impl Stream<Item = (ModuleInstanceId, RecoveryProgress)> + use<> {
1383        WatchStream::new(self.client_recovery_progress_receiver.clone())
1384            .flat_map(futures::stream::iter)
1385    }
1386
1387    pub async fn wait_for_module_kind_recovery(
1388        &self,
1389        module_kind: ModuleKind,
1390    ) -> anyhow::Result<()> {
1391        let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1392        let config = self.config().await;
1393        recovery_receiver
1394            .wait_for(|in_progress| {
1395                !in_progress
1396                    .iter()
1397                    .filter(|(module_instance_id, _progress)| {
1398                        config.modules[module_instance_id].kind == module_kind
1399                    })
1400                    .any(|(_id, progress)| !progress.is_done())
1401            })
1402            .await
1403            .context("Recovery task completed and update receiver disconnected, but the desired modules are still unavailable or failed to recover")?;
1404
1405        Ok(())
1406    }
1407
1408    pub async fn wait_for_all_active_state_machines(&self) -> anyhow::Result<()> {
1409        loop {
1410            if self.executor.get_active_states().await.is_empty() {
1411                break;
1412            }
1413            sleep(Duration::from_millis(100)).await;
1414        }
1415        Ok(())
1416    }
1417
1418    /// Set the client [`Metadata`]
1419    pub async fn set_metadata_dbtx(dbtx: &mut DatabaseTransaction<'_>, metadata: &Metadata) {
1420        dbtx.insert_new_entry(&ClientMetadataKey, metadata).await;
1421    }
1422
1423    fn spawn_module_recoveries_task(
1424        &self,
1425        recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1426        module_recoveries: BTreeMap<
1427            ModuleInstanceId,
1428            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1429        >,
1430        module_recovery_progress_receivers: BTreeMap<
1431            ModuleInstanceId,
1432            watch::Receiver<RecoveryProgress>,
1433        >,
1434    ) {
1435        let db = self.db.clone();
1436        let log_ordering_wakeup_tx = self.log_ordering_wakeup_tx.clone();
1437        self.task_group
1438            .spawn("module recoveries", |_task_handle| async {
1439                Self::run_module_recoveries_task(
1440                    db,
1441                    log_ordering_wakeup_tx,
1442                    recovery_sender,
1443                    module_recoveries,
1444                    module_recovery_progress_receivers,
1445                )
1446                .await;
1447            });
1448    }
1449
1450    async fn run_module_recoveries_task(
1451        db: Database,
1452        log_ordering_wakeup_tx: watch::Sender<()>,
1453        recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1454        module_recoveries: BTreeMap<
1455            ModuleInstanceId,
1456            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1457        >,
1458        module_recovery_progress_receivers: BTreeMap<
1459            ModuleInstanceId,
1460            watch::Receiver<RecoveryProgress>,
1461        >,
1462    ) {
1463        debug!(target: LOG_CLIENT_RECOVERY, num_modules=%module_recovery_progress_receivers.len(), "Staring module recoveries");
1464        let mut completed_stream = Vec::new();
1465        let progress_stream = futures::stream::FuturesUnordered::new();
1466
1467        for (module_instance_id, f) in module_recoveries {
1468            completed_stream.push(futures::stream::once(Box::pin(async move {
1469                match f.await {
1470                    Ok(()) => (module_instance_id, None),
1471                    Err(err) => {
1472                        warn!(
1473                            target: LOG_CLIENT,
1474                            err = %err.fmt_compact_anyhow(), module_instance_id, "Module recovery failed"
1475                        );
1476                        // a module recovery that failed reports and error and
1477                        // just never finishes, so we don't need a separate state
1478                        // for it
1479                        futures::future::pending::<()>().await;
1480                        unreachable!()
1481                    }
1482                }
1483            })));
1484        }
1485
1486        for (module_instance_id, rx) in module_recovery_progress_receivers {
1487            progress_stream.push(
1488                tokio_stream::wrappers::WatchStream::new(rx)
1489                    .fuse()
1490                    .map(move |progress| (module_instance_id, Some(progress))),
1491            );
1492        }
1493
1494        let mut futures = futures::stream::select(
1495            futures::stream::select_all(progress_stream),
1496            futures::stream::select_all(completed_stream),
1497        );
1498
1499        while let Some((module_instance_id, progress)) = futures.next().await {
1500            let mut dbtx = db.begin_transaction().await;
1501
1502            let prev_progress = *recovery_sender
1503                .borrow()
1504                .get(&module_instance_id)
1505                .expect("existing progress must be present");
1506
1507            let progress = if prev_progress.is_done() {
1508                // since updates might be out of order, once done, stick with it
1509                prev_progress
1510            } else if let Some(progress) = progress {
1511                progress
1512            } else {
1513                prev_progress.to_complete()
1514            };
1515
1516            if !prev_progress.is_done() && progress.is_done() {
1517                info!(
1518                    target: LOG_CLIENT,
1519                    module_instance_id,
1520                    progress = format!("{}/{}", progress.complete, progress.total),
1521                    "Recovery complete"
1522                );
1523                dbtx.log_event(
1524                    log_ordering_wakeup_tx.clone(),
1525                    None,
1526                    ModuleRecoveryCompleted {
1527                        module_id: module_instance_id,
1528                    },
1529                )
1530                .await;
1531            } else {
1532                info!(
1533                    target: LOG_CLIENT,
1534                    module_instance_id,
1535                    progress = format!("{}/{}", progress.complete, progress.total),
1536                    "Recovery progress"
1537                );
1538            }
1539
1540            dbtx.insert_entry(
1541                &ClientModuleRecovery { module_instance_id },
1542                &ClientModuleRecoveryState { progress },
1543            )
1544            .await;
1545            dbtx.commit_tx().await;
1546
1547            recovery_sender.send_modify(|v| {
1548                v.insert(module_instance_id, progress);
1549            });
1550        }
1551        debug!(target: LOG_CLIENT_RECOVERY, "Recovery executor stopped");
1552    }
1553
1554    async fn load_peers_last_api_versions(
1555        db: &Database,
1556        num_peers: NumPeers,
1557    ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1558        let mut peer_api_version_sets = BTreeMap::new();
1559
1560        let mut dbtx = db.begin_transaction_nc().await;
1561        for peer_id in num_peers.peer_ids() {
1562            if let Some(v) = dbtx
1563                .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1564                .await
1565            {
1566                peer_api_version_sets.insert(peer_id, v.0);
1567            }
1568        }
1569        drop(dbtx);
1570        peer_api_version_sets
1571    }
1572
1573    /// You likely want to use [`Client::get_peer_urls`]. This function returns
1574    /// only the announcements and doesn't use the config as fallback.
1575    pub async fn get_peer_url_announcements(&self) -> BTreeMap<PeerId, SignedApiAnnouncement> {
1576        self.db()
1577            .begin_transaction_nc()
1578            .await
1579            .find_by_prefix(&ApiAnnouncementPrefix)
1580            .await
1581            .map(|(announcement_key, announcement)| (announcement_key.0, announcement))
1582            .collect()
1583            .await
1584    }
1585
1586    /// Returns a list of guardian API URLs
1587    pub async fn get_peer_urls(&self) -> BTreeMap<PeerId, SafeUrl> {
1588        get_api_urls(&self.db, &self.config().await).await
1589    }
1590
1591    /// Create an invite code with the api endpoint of the given peer which can
1592    /// be used to download this client config
1593    pub async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
1594        self.get_peer_urls()
1595            .await
1596            .into_iter()
1597            .find_map(|(peer_id, url)| (peer == peer_id).then_some(url))
1598            .map(|peer_url| {
1599                InviteCode::new(
1600                    peer_url.clone(),
1601                    peer,
1602                    self.federation_id(),
1603                    self.api_secret.clone(),
1604                )
1605            })
1606    }
1607
1608    /// Blocks till the client has synced the guardian public key set
1609    /// (introduced in version 0.4) and returns it. Once it has been fetched
1610    /// once this function is guaranteed to return immediately.
1611    pub async fn get_guardian_public_keys_blocking(
1612        &self,
1613    ) -> BTreeMap<PeerId, fedimint_core::secp256k1::PublicKey> {
1614        self.db
1615            .autocommit(
1616                |dbtx, _| {
1617                    Box::pin(async move {
1618                        let config = self.config().await;
1619
1620                        let guardian_pub_keys = self
1621                            .get_or_backfill_broadcast_public_keys(dbtx, config)
1622                            .await;
1623
1624                        Result::<_, ()>::Ok(guardian_pub_keys)
1625                    })
1626                },
1627                None,
1628            )
1629            .await
1630            .expect("Will retry forever")
1631    }
1632
1633    async fn get_or_backfill_broadcast_public_keys(
1634        &self,
1635        dbtx: &mut DatabaseTransaction<'_>,
1636        config: ClientConfig,
1637    ) -> BTreeMap<PeerId, PublicKey> {
1638        match config.global.broadcast_public_keys {
1639            Some(guardian_pub_keys) => guardian_pub_keys,
1640            _ => {
1641                let (guardian_pub_keys, new_config) = self.fetch_and_update_config(config).await;
1642
1643                dbtx.insert_entry(&ClientConfigKey, &new_config).await;
1644                *(self.config.write().await) = new_config;
1645                guardian_pub_keys
1646            }
1647        }
1648    }
1649
1650    async fn fetch_session_count(&self) -> FederationResult<u64> {
1651        self.api.session_count().await
1652    }
1653
1654    async fn fetch_and_update_config(
1655        &self,
1656        config: ClientConfig,
1657    ) -> (BTreeMap<PeerId, PublicKey>, ClientConfig) {
1658        let fetched_config = retry(
1659            "Fetching guardian public keys",
1660            backoff_util::background_backoff(),
1661            || async {
1662                Ok(self
1663                    .api
1664                    .request_current_consensus::<ClientConfig>(
1665                        CLIENT_CONFIG_ENDPOINT.to_owned(),
1666                        ApiRequestErased::default(),
1667                    )
1668                    .await?)
1669            },
1670        )
1671        .await
1672        .expect("Will never return on error");
1673
1674        let Some(guardian_pub_keys) = fetched_config.global.broadcast_public_keys else {
1675            warn!(
1676                target: LOG_CLIENT,
1677                "Guardian public keys not found in fetched config, server not updated to 0.4 yet"
1678            );
1679            pending::<()>().await;
1680            unreachable!("Pending will never return");
1681        };
1682
1683        let new_config = ClientConfig {
1684            global: GlobalClientConfig {
1685                broadcast_public_keys: Some(guardian_pub_keys.clone()),
1686                ..config.global
1687            },
1688            modules: config.modules,
1689        };
1690        (guardian_pub_keys, new_config)
1691    }
1692
1693    pub fn handle_global_rpc(
1694        &self,
1695        method: String,
1696        params: serde_json::Value,
1697    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1698        Box::pin(try_stream! {
1699            match method.as_str() {
1700                "get_balance" => {
1701                    let balance = self.get_balance_for_btc().await.unwrap_or_default();
1702                    yield serde_json::to_value(balance)?;
1703                }
1704                "subscribe_balance_changes" => {
1705                    let req: GetBalanceChangesRequest= serde_json::from_value(params)?;
1706                    let mut stream = self.subscribe_balance_changes(req.unit).await;
1707                    while let Some(balance) = stream.next().await {
1708                        yield serde_json::to_value(balance)?;
1709                    }
1710                }
1711                "get_config" => {
1712                    let config = self.config().await;
1713                    yield serde_json::to_value(config)?;
1714                }
1715                "get_federation_id" => {
1716                    let federation_id = self.federation_id();
1717                    yield serde_json::to_value(federation_id)?;
1718                }
1719                "get_invite_code" => {
1720                    let req: GetInviteCodeRequest = serde_json::from_value(params)?;
1721                    let invite_code = self.invite_code(req.peer).await;
1722                    yield serde_json::to_value(invite_code)?;
1723                }
1724                "get_operation" => {
1725                    let req: GetOperationIdRequest = serde_json::from_value(params)?;
1726                    let operation = self.operation_log().get_operation(req.operation_id).await;
1727                    yield serde_json::to_value(operation)?;
1728                }
1729                "list_operations" => {
1730                    let req: ListOperationsParams = serde_json::from_value(params)?;
1731                    let limit = if req.limit.is_none() && req.last_seen.is_none() {
1732                        usize::MAX
1733                    } else {
1734                        req.limit.unwrap_or(usize::MAX)
1735                    };
1736                    let operations = self.operation_log()
1737                        .paginate_operations_rev(limit, req.last_seen)
1738                        .await;
1739                    yield serde_json::to_value(operations)?;
1740                }
1741                "session_count" => {
1742                    let count = self.fetch_session_count().await?;
1743                    yield serde_json::to_value(count)?;
1744                }
1745                "has_pending_recoveries" => {
1746                    let has_pending = self.has_pending_recoveries();
1747                    yield serde_json::to_value(has_pending)?;
1748                }
1749                "wait_for_all_recoveries" => {
1750                    self.wait_for_all_recoveries().await?;
1751                    yield serde_json::Value::Null;
1752                }
1753                "subscribe_to_recovery_progress" => {
1754                    let mut stream = self.subscribe_to_recovery_progress();
1755                    while let Some((module_id, progress)) = stream.next().await {
1756                        yield serde_json::json!({
1757                            "module_id": module_id,
1758                            "progress": progress
1759                        });
1760                    }
1761                }
1762                "backup_to_federation" => {
1763                    let metadata = if params.is_null() {
1764                        Metadata::from_json_serialized(serde_json::json!({}))
1765                    } else {
1766                        Metadata::from_json_serialized(params)
1767                    };
1768                    self.backup_to_federation(metadata).await?;
1769                    yield serde_json::Value::Null;
1770                }
1771                _ => {
1772                    Err(anyhow::format_err!("Unknown method: {}", method))?;
1773                    unreachable!()
1774                },
1775            }
1776        })
1777    }
1778
1779    pub async fn log_event<E>(&self, module_id: Option<ModuleInstanceId>, event: E)
1780    where
1781        E: Event + Send,
1782    {
1783        let mut dbtx = self.db.begin_transaction().await;
1784        self.log_event_dbtx(&mut dbtx, module_id, event).await;
1785        dbtx.commit_tx().await;
1786    }
1787
1788    pub async fn log_event_dbtx<E, Cap>(
1789        &self,
1790        dbtx: &mut DatabaseTransaction<'_, Cap>,
1791        module_id: Option<ModuleInstanceId>,
1792        event: E,
1793    ) where
1794        E: Event + Send,
1795        Cap: Send,
1796    {
1797        dbtx.log_event(self.log_ordering_wakeup_tx.clone(), module_id, event)
1798            .await;
1799    }
1800
1801    pub async fn log_event_raw_dbtx<Cap>(
1802        &self,
1803        dbtx: &mut DatabaseTransaction<'_, Cap>,
1804        kind: EventKind,
1805        module: Option<(ModuleKind, ModuleInstanceId)>,
1806        payload: Vec<u8>,
1807        persist: EventPersistence,
1808    ) where
1809        Cap: Send,
1810    {
1811        let module_id = module.as_ref().map(|m| m.1);
1812        let module_kind = module.map(|m| m.0);
1813        dbtx.log_event_raw(
1814            self.log_ordering_wakeup_tx.clone(),
1815            kind,
1816            module_kind,
1817            module_id,
1818            payload,
1819            persist,
1820        )
1821        .await;
1822    }
1823
1824    /// Built in event log (trimmable) tracker
1825    ///
1826    /// For the convenience of downstream applications, [`Client`] can store
1827    /// internally event log position for the main application using/driving it.
1828    ///
1829    /// Note that this position is a singleton, so this tracker should not be
1830    /// used for multiple purposes or applications, etc. at the same time.
1831    ///
1832    /// If the application has a need to follow log using multiple trackers, it
1833    /// should implement own [`DynEventLogTrimableTracker`] and store its
1834    /// persient data by itself.
1835    pub fn built_in_application_event_log_tracker(&self) -> DynEventLogTrimableTracker {
1836        struct BuiltInApplicationEventLogTracker;
1837
1838        #[apply(async_trait_maybe_send!)]
1839        impl EventLogTrimableTracker for BuiltInApplicationEventLogTracker {
1840            // Store position in the event log
1841            async fn store(
1842                &mut self,
1843                dbtx: &mut DatabaseTransaction<NonCommittable>,
1844                pos: EventLogTrimableId,
1845            ) -> anyhow::Result<()> {
1846                dbtx.insert_entry(&DefaultApplicationEventLogKey, &pos)
1847                    .await;
1848                Ok(())
1849            }
1850
1851            /// Load the last previous stored position (or None if never stored)
1852            async fn load(
1853                &mut self,
1854                dbtx: &mut DatabaseTransaction<NonCommittable>,
1855            ) -> anyhow::Result<Option<EventLogTrimableId>> {
1856                Ok(dbtx.get_value(&DefaultApplicationEventLogKey).await)
1857            }
1858        }
1859        Box::new(BuiltInApplicationEventLogTracker)
1860    }
1861
1862    /// Like [`Self::handle_events`] but for historical data.
1863    ///
1864    ///
1865    /// This function can be used to process subset of events
1866    /// that is infrequent and important enough to be persisted
1867    /// forever. Most applications should prefer to use [`Self::handle_events`]
1868    /// which emits *all* events.
1869    pub async fn handle_historical_events<F, R>(
1870        &self,
1871        tracker: fedimint_eventlog::DynEventLogTracker,
1872        handler_fn: F,
1873    ) -> anyhow::Result<()>
1874    where
1875        F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
1876        R: Future<Output = anyhow::Result<()>>,
1877    {
1878        fedimint_eventlog::handle_events(
1879            self.db.clone(),
1880            tracker,
1881            self.log_event_added_rx.clone(),
1882            handler_fn,
1883        )
1884        .await
1885    }
1886
1887    /// Handle events emitted by the client
1888    ///
1889    /// This is a preferred method for reactive & asynchronous
1890    /// processing of events emitted by the client.
1891    ///
1892    /// It needs a `tracker` that will persist the position in the log
1893    /// as it is being handled. You can use the
1894    /// [`Client::built_in_application_event_log_tracker`] if this call is
1895    /// used for the single main application handling this instance of the
1896    /// [`Client`]. Otherwise you should implement your own tracker.
1897    ///
1898    /// This handler will call `handle_fn` with ever event emitted by
1899    /// [`Client`], including transient ones. The caller should atomically
1900    /// handle each event it is interested in and ignore other ones.
1901    ///
1902    /// This method returns only when client is shutting down or on internal
1903    /// error, so typically should be called in a background task dedicated
1904    /// to handling events.
1905    pub async fn handle_events<F, R>(
1906        &self,
1907        tracker: fedimint_eventlog::DynEventLogTrimableTracker,
1908        handler_fn: F,
1909    ) -> anyhow::Result<()>
1910    where
1911        F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
1912        R: Future<Output = anyhow::Result<()>>,
1913    {
1914        fedimint_eventlog::handle_trimable_events(
1915            self.db.clone(),
1916            tracker,
1917            self.log_event_added_rx.clone(),
1918            handler_fn,
1919        )
1920        .await
1921    }
1922
1923    pub async fn get_event_log(
1924        &self,
1925        pos: Option<EventLogId>,
1926        limit: u64,
1927    ) -> Vec<PersistedLogEntry> {
1928        self.get_event_log_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
1929            .await
1930    }
1931
1932    pub async fn get_event_log_trimable(
1933        &self,
1934        pos: Option<EventLogTrimableId>,
1935        limit: u64,
1936    ) -> Vec<PersistedLogEntry> {
1937        self.get_event_log_trimable_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
1938            .await
1939    }
1940
1941    pub async fn get_event_log_dbtx<Cap>(
1942        &self,
1943        dbtx: &mut DatabaseTransaction<'_, Cap>,
1944        pos: Option<EventLogId>,
1945        limit: u64,
1946    ) -> Vec<PersistedLogEntry>
1947    where
1948        Cap: Send,
1949    {
1950        dbtx.get_event_log(pos, limit).await
1951    }
1952
1953    pub async fn get_event_log_trimable_dbtx<Cap>(
1954        &self,
1955        dbtx: &mut DatabaseTransaction<'_, Cap>,
1956        pos: Option<EventLogTrimableId>,
1957        limit: u64,
1958    ) -> Vec<PersistedLogEntry>
1959    where
1960        Cap: Send,
1961    {
1962        dbtx.get_event_log_trimable(pos, limit).await
1963    }
1964
1965    /// Register to receiver all new transient (unpersisted) events
1966    pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
1967        self.log_event_added_transient_tx.subscribe()
1968    }
1969
1970    /// Get a receiver that signals when new events are added to the event log
1971    pub fn log_event_added_rx(&self) -> watch::Receiver<()> {
1972        self.log_event_added_rx.clone()
1973    }
1974
1975    pub fn iroh_enable_dht(&self) -> bool {
1976        self.iroh_enable_dht
1977    }
1978
1979    pub(crate) async fn run_core_migrations(
1980        db_no_decoders: &Database,
1981    ) -> Result<(), anyhow::Error> {
1982        let mut dbtx = db_no_decoders.begin_transaction().await;
1983        apply_migrations_core_client_dbtx(&mut dbtx.to_ref_nc(), "fedimint-client".to_string())
1984            .await?;
1985        if is_running_in_test_env() {
1986            verify_client_db_integrity_dbtx(&mut dbtx.to_ref_nc()).await;
1987        }
1988        dbtx.commit_tx_result().await?;
1989        Ok(())
1990    }
1991
1992    /// Iterator over primary modules for a given `unit`
1993    fn primary_modules_for_unit(
1994        &self,
1995        unit: AmountUnit,
1996    ) -> impl Iterator<Item = (ModuleInstanceId, &DynClientModule)> {
1997        self.primary_modules
1998            .iter()
1999            .flat_map(move |(_prio, candidates)| {
2000                candidates
2001                    .specific
2002                    .get(&unit)
2003                    .into_iter()
2004                    .flatten()
2005                    .copied()
2006                    // within same priority, wildcard matches come last
2007                    .chain(candidates.wildcard.iter().copied())
2008            })
2009            .map(|id| (id, self.modules.get_expect(id)))
2010    }
2011
2012    /// Primary module to use for `unit`
2013    ///
2014    /// Currently, just pick the first (highest priority) match
2015    pub fn primary_module_for_unit(
2016        &self,
2017        unit: AmountUnit,
2018    ) -> Option<(ModuleInstanceId, &DynClientModule)> {
2019        self.primary_modules_for_unit(unit).next()
2020    }
2021
2022    /// [`Self::primary_module_for_unit`] for Bitcoin
2023    pub fn primary_module_for_btc(&self) -> (ModuleInstanceId, &DynClientModule) {
2024        self.primary_module_for_unit(AmountUnit::BITCOIN)
2025            .expect("No primary module for Bitcoin")
2026    }
2027}
2028
2029#[apply(async_trait_maybe_send!)]
2030impl ClientContextIface for Client {
2031    fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
2032        Client::get_module(self, instance)
2033    }
2034
2035    fn api_clone(&self) -> DynGlobalApi {
2036        Client::api_clone(self)
2037    }
2038    fn decoders(&self) -> &ModuleDecoderRegistry {
2039        Client::decoders(self)
2040    }
2041
2042    async fn finalize_and_submit_transaction(
2043        &self,
2044        operation_id: OperationId,
2045        operation_type: &str,
2046        operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
2047        tx_builder: TransactionBuilder,
2048    ) -> anyhow::Result<OutPointRange> {
2049        Client::finalize_and_submit_transaction(
2050            self,
2051            operation_id,
2052            operation_type,
2053            // |out_point_range| operation_meta_gen(out_point_range),
2054            &operation_meta_gen,
2055            tx_builder,
2056        )
2057        .await
2058    }
2059
2060    async fn finalize_and_submit_transaction_inner(
2061        &self,
2062        dbtx: &mut DatabaseTransaction<'_>,
2063        operation_id: OperationId,
2064        tx_builder: TransactionBuilder,
2065    ) -> anyhow::Result<OutPointRange> {
2066        Client::finalize_and_submit_transaction_inner(self, dbtx, operation_id, tx_builder).await
2067    }
2068
2069    async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
2070        Client::transaction_updates(self, operation_id).await
2071    }
2072
2073    async fn await_primary_module_outputs(
2074        &self,
2075        operation_id: OperationId,
2076        // TODO: make `impl Iterator<Item = ...>`
2077        outputs: Vec<OutPoint>,
2078    ) -> anyhow::Result<()> {
2079        Client::await_primary_bitcoin_module_outputs(self, operation_id, outputs).await
2080    }
2081
2082    fn operation_log(&self) -> &dyn IOperationLog {
2083        Client::operation_log(self)
2084    }
2085
2086    async fn has_active_states(&self, operation_id: OperationId) -> bool {
2087        Client::has_active_states(self, operation_id).await
2088    }
2089
2090    async fn operation_exists(&self, operation_id: OperationId) -> bool {
2091        Client::operation_exists(self, operation_id).await
2092    }
2093
2094    async fn config(&self) -> ClientConfig {
2095        Client::config(self).await
2096    }
2097
2098    fn db(&self) -> &Database {
2099        Client::db(self)
2100    }
2101
2102    fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static)) {
2103        Client::executor(self)
2104    }
2105
2106    async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
2107        Client::invite_code(self, peer).await
2108    }
2109
2110    fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
2111        Client::get_internal_payment_markers(self)
2112    }
2113
2114    async fn log_event_json(
2115        &self,
2116        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
2117        module_kind: Option<ModuleKind>,
2118        module_id: ModuleInstanceId,
2119        kind: EventKind,
2120        payload: serde_json::Value,
2121        persist: EventPersistence,
2122    ) {
2123        dbtx.ensure_global()
2124            .expect("Must be called with global dbtx");
2125        self.log_event_raw_dbtx(
2126            dbtx,
2127            kind,
2128            module_kind.map(|kind| (kind, module_id)),
2129            serde_json::to_vec(&payload).expect("Serialization can't fail"),
2130            persist,
2131        )
2132        .await;
2133    }
2134
2135    async fn read_operation_active_states<'dbtx>(
2136        &self,
2137        operation_id: OperationId,
2138        module_id: ModuleInstanceId,
2139        dbtx: &'dbtx mut DatabaseTransaction<'_>,
2140    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>
2141    {
2142        Box::pin(
2143            dbtx.find_by_prefix(&ActiveModuleOperationStateKeyPrefix {
2144                operation_id,
2145                module_instance: module_id,
2146            })
2147            .await
2148            .map(move |(k, v)| (k.0, v)),
2149        )
2150    }
2151    async fn read_operation_inactive_states<'dbtx>(
2152        &self,
2153        operation_id: OperationId,
2154        module_id: ModuleInstanceId,
2155        dbtx: &'dbtx mut DatabaseTransaction<'_>,
2156    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>
2157    {
2158        Box::pin(
2159            dbtx.find_by_prefix(&InactiveModuleOperationStateKeyPrefix {
2160                operation_id,
2161                module_instance: module_id,
2162            })
2163            .await
2164            .map(move |(k, v)| (k.0, v)),
2165        )
2166    }
2167}
2168
2169// TODO: impl `Debug` for `Client` and derive here
2170impl fmt::Debug for Client {
2171    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2172        write!(f, "Client")
2173    }
2174}
2175
2176pub fn client_decoders<'a>(
2177    registry: &ModuleInitRegistry<DynClientModuleInit>,
2178    module_kinds: impl Iterator<Item = (ModuleInstanceId, &'a ModuleKind)>,
2179) -> ModuleDecoderRegistry {
2180    let mut modules = BTreeMap::new();
2181    for (id, kind) in module_kinds {
2182        let Some(init) = registry.get(kind) else {
2183            debug!("Detected configuration for unsupported module id: {id}, kind: {kind}");
2184            continue;
2185        };
2186
2187        modules.insert(
2188            id,
2189            (
2190                kind.clone(),
2191                IClientModuleInit::decoder(AsRef::<dyn IClientModuleInit + 'static>::as_ref(init)),
2192            ),
2193        );
2194    }
2195    ModuleDecoderRegistry::from(modules)
2196}