fedimint_client/
client.rs

1use std::collections::{BTreeMap, HashSet};
2use std::fmt::{self, Formatter};
3use std::future::{Future, pending};
4use std::ops::Range;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9use anyhow::{Context as _, anyhow, bail, format_err};
10use async_stream::try_stream;
11use bitcoin::key::Secp256k1;
12use bitcoin::key::rand::thread_rng;
13use bitcoin::secp256k1::{self, PublicKey};
14use fedimint_api_client::api::global_api::with_request_hook::ApiRequestHook;
15use fedimint_api_client::api::{
16    ApiVersionSet, DynGlobalApi, FederationApiExt as _, FederationResult, IGlobalFederationApi,
17};
18use fedimint_bitcoind::DynBitcoindRpc;
19use fedimint_client_module::module::recovery::RecoveryProgress;
20use fedimint_client_module::module::{
21    ClientContextIface, ClientModule, ClientModuleRegistry, DynClientModule, FinalClientIface,
22    IClientModule, IdxRange, OutPointRange, PrimaryModulePriority,
23};
24use fedimint_client_module::oplog::IOperationLog;
25use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy as _};
26use fedimint_client_module::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
27use fedimint_client_module::sm::{ActiveStateMeta, DynState, InactiveStateMeta};
28use fedimint_client_module::transaction::{
29    TRANSACTION_SUBMISSION_MODULE_INSTANCE, TransactionBuilder, TxSubmissionStates,
30    TxSubmissionStatesSM,
31};
32use fedimint_client_module::{
33    AddStateMachinesResult, ClientModuleInstance, GetInviteCodeRequest, ModuleGlobalContextGen,
34    ModuleRecoveryCompleted, TransactionUpdates, TxCreatedEvent,
35};
36use fedimint_connectors::ConnectorRegistry;
37use fedimint_core::config::{
38    ClientConfig, FederationId, GlobalClientConfig, JsonClientConfig, ModuleInitRegistry,
39};
40use fedimint_core::core::{DynInput, DynOutput, ModuleInstanceId, ModuleKind, OperationId};
41use fedimint_core::db::{
42    AutocommitError, Database, DatabaseRecord, DatabaseTransaction,
43    IDatabaseTransactionOpsCore as _, IDatabaseTransactionOpsCoreTyped as _, NonCommittable,
44};
45use fedimint_core::encoding::{Decodable, Encodable};
46use fedimint_core::endpoint_constants::{CLIENT_CONFIG_ENDPOINT, VERSION_ENDPOINT};
47use fedimint_core::envs::is_running_in_test_env;
48use fedimint_core::invite_code::InviteCode;
49use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
50use fedimint_core::module::{
51    AmountUnit, Amounts, ApiRequestErased, ApiVersion, MultiApiVersion,
52    SupportedApiVersionsSummary, SupportedCoreApiVersions, SupportedModuleApiVersions,
53};
54use fedimint_core::net::api_announcement::SignedApiAnnouncement;
55use fedimint_core::runtime::sleep;
56use fedimint_core::task::{Elapsed, MaybeSend, MaybeSync, TaskGroup};
57use fedimint_core::transaction::Transaction;
58use fedimint_core::util::backoff_util::custom_backoff;
59use fedimint_core::util::{
60    BoxStream, FmtCompact as _, FmtCompactAnyhow as _, SafeUrl, backoff_util, retry,
61};
62use fedimint_core::{
63    Amount, ChainId, NumPeers, OutPoint, PeerId, apply, async_trait_maybe_send, maybe_add_send,
64    maybe_add_send_sync, runtime,
65};
66use fedimint_derive_secret::DerivableSecret;
67use fedimint_eventlog::{
68    DBTransactionEventLogExt as _, DynEventLogTrimableTracker, Event, EventKind, EventLogEntry,
69    EventLogId, EventLogTrimableId, EventLogTrimableTracker, EventPersistence, PersistedLogEntry,
70};
71use fedimint_logging::{LOG_CLIENT, LOG_CLIENT_NET_API, LOG_CLIENT_RECOVERY};
72use futures::stream::FuturesUnordered;
73use futures::{Stream, StreamExt as _};
74use global_ctx::ModuleGlobalClientContext;
75use serde::{Deserialize, Serialize};
76use tokio::sync::{broadcast, watch};
77use tokio_stream::wrappers::WatchStream;
78use tracing::{debug, info, warn};
79
80use crate::ClientBuilder;
81use crate::api_announcements::{ApiAnnouncementPrefix, get_api_urls};
82use crate::backup::Metadata;
83use crate::client::event_log::DefaultApplicationEventLogKey;
84use crate::db::{
85    ApiSecretKey, CachedApiVersionSet, CachedApiVersionSetKey, ChainIdKey,
86    ChronologicalOperationLogKey, ClientConfigKey, ClientMetadataKey, ClientModuleRecovery,
87    ClientModuleRecoveryState, EncodedClientSecretKey, OperationLogKey, PeerLastApiVersionsSummary,
88    PeerLastApiVersionsSummaryKey, PendingClientConfigKey, apply_migrations_core_client_dbtx,
89    get_decoded_client_secret, verify_client_db_integrity_dbtx,
90};
91use crate::meta::MetaService;
92use crate::module_init::{ClientModuleInitRegistry, DynClientModuleInit, IClientModuleInit};
93use crate::oplog::OperationLog;
94use crate::sm::executor::{
95    ActiveModuleOperationStateKeyPrefix, ActiveOperationStateKeyPrefix, Executor,
96    InactiveModuleOperationStateKeyPrefix, InactiveOperationStateKeyPrefix,
97};
98
99pub(crate) mod builder;
100pub(crate) mod event_log;
101pub(crate) mod global_ctx;
102pub(crate) mod handle;
103
104/// List of core api versions supported by the implementation.
105/// Notably `major` version is the one being supported, and corresponding
106/// `minor` version is the one required (for given `major` version).
107const SUPPORTED_CORE_API_VERSIONS: &[fedimint_core::module::ApiVersion] =
108    &[ApiVersion { major: 0, minor: 0 }];
109
110/// Primary module candidates at specific priority level
111#[derive(Default)]
112pub(crate) struct PrimaryModuleCandidates {
113    /// Modules that listed specific units they handle
114    specific: BTreeMap<AmountUnit, Vec<ModuleInstanceId>>,
115    /// Modules handling any unit
116    wildcard: Vec<ModuleInstanceId>,
117}
118
119/// Main client type
120///
121/// A handle and API to interacting with a single federation. End user
122/// applications that want to support interacting with multiple federations at
123/// the same time, will need to instantiate and manage multiple instances of
124/// this struct.
125///
126/// Under the hood it is starting and managing service tasks, state machines,
127/// database and other resources required.
128///
129/// This type is shared externally and internally, and
130/// [`crate::ClientHandle`] is responsible for external lifecycle management
131/// and resource freeing of the [`Client`].
132pub struct Client {
133    final_client: FinalClientIface,
134    config: tokio::sync::RwLock<ClientConfig>,
135    api_secret: Option<String>,
136    decoders: ModuleDecoderRegistry,
137    connectors: ConnectorRegistry,
138    db: Database,
139    federation_id: FederationId,
140    federation_config_meta: BTreeMap<String, String>,
141    primary_modules: BTreeMap<PrimaryModulePriority, PrimaryModuleCandidates>,
142    pub(crate) modules: ClientModuleRegistry,
143    module_inits: ClientModuleInitRegistry,
144    executor: Executor,
145    pub(crate) api: DynGlobalApi,
146    root_secret: DerivableSecret,
147    operation_log: OperationLog,
148    secp_ctx: Secp256k1<secp256k1::All>,
149    meta_service: Arc<MetaService>,
150
151    task_group: TaskGroup,
152
153    /// Updates about client recovery progress
154    client_recovery_progress_receiver:
155        watch::Receiver<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
156
157    /// Internal client sender to wake up log ordering task every time a
158    /// (unuordered) log event is added.
159    log_ordering_wakeup_tx: watch::Sender<()>,
160    /// Receiver for events fired every time (ordered) log event is added.
161    log_event_added_rx: watch::Receiver<()>,
162    log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
163    request_hook: ApiRequestHook,
164    iroh_enable_dht: bool,
165    iroh_enable_next: bool,
166    /// User-provided Bitcoin RPC client for modules to use
167    ///
168    /// Stored here for potential future access; currently passed to modules
169    /// during initialization.
170    #[allow(dead_code)]
171    user_bitcoind_rpc: Option<DynBitcoindRpc>,
172    /// User-provided Bitcoin RPC factory for when ChainId is not available
173    ///
174    /// This is used as a fallback when the federation doesn't support ChainId.
175    /// Modules can call this with a URL from their config to get an RPC client.
176    pub(crate) user_bitcoind_rpc_no_chain_id:
177        Option<fedimint_client_module::module::init::BitcoindRpcNoChainIdFactory>,
178}
179
180#[derive(Debug, Serialize, Deserialize)]
181struct ListOperationsParams {
182    limit: Option<usize>,
183    last_seen: Option<ChronologicalOperationLogKey>,
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize)]
187pub struct GetOperationIdRequest {
188    operation_id: OperationId,
189}
190
191#[derive(Debug, Clone, Serialize, Deserialize)]
192pub struct GetBalanceChangesRequest {
193    #[serde(default = "AmountUnit::bitcoin")]
194    unit: AmountUnit,
195}
196
197impl Client {
198    /// Initialize a client builder that can be configured to create a new
199    /// client.
200    pub async fn builder() -> anyhow::Result<ClientBuilder> {
201        Ok(ClientBuilder::new())
202    }
203
204    pub fn api(&self) -> &(dyn IGlobalFederationApi + 'static) {
205        self.api.as_ref()
206    }
207
208    pub fn api_clone(&self) -> DynGlobalApi {
209        self.api.clone()
210    }
211
212    /// Returns a stream that emits the current connection status of all peers
213    /// whenever any peer's status changes. Emits initial state immediately.
214    pub fn connection_status_stream(&self) -> impl Stream<Item = BTreeMap<PeerId, bool>> {
215        self.api.connection_status_stream()
216    }
217
218    /// Get the [`TaskGroup`] that is tied to Client's lifetime.
219    pub fn task_group(&self) -> &TaskGroup {
220        &self.task_group
221    }
222
223    /// Returns all registered Prometheus metrics encoded in text format.
224    ///
225    /// This can be used by downstream clients to expose metrics via their own
226    /// HTTP server or print them for debugging purposes.
227    pub fn get_metrics() -> anyhow::Result<String> {
228        fedimint_metrics::get_metrics()
229    }
230
231    /// Useful for our CLI tooling, not meant for external use
232    #[doc(hidden)]
233    pub fn executor(&self) -> &Executor {
234        &self.executor
235    }
236
237    pub async fn get_config_from_db(db: &Database) -> Option<ClientConfig> {
238        let mut dbtx = db.begin_transaction_nc().await;
239        dbtx.get_value(&ClientConfigKey).await
240    }
241
242    pub async fn get_pending_config_from_db(db: &Database) -> Option<ClientConfig> {
243        let mut dbtx = db.begin_transaction_nc().await;
244        dbtx.get_value(&PendingClientConfigKey).await
245    }
246
247    pub async fn get_api_secret_from_db(db: &Database) -> Option<String> {
248        let mut dbtx = db.begin_transaction_nc().await;
249        dbtx.get_value(&ApiSecretKey).await
250    }
251
252    pub async fn store_encodable_client_secret<T: Encodable>(
253        db: &Database,
254        secret: T,
255    ) -> anyhow::Result<()> {
256        let mut dbtx = db.begin_transaction().await;
257
258        // Don't overwrite an existing secret
259        if dbtx.get_value(&EncodedClientSecretKey).await.is_some() {
260            bail!("Encoded client secret already exists, cannot overwrite")
261        }
262
263        let encoded_secret = T::consensus_encode_to_vec(&secret);
264        dbtx.insert_entry(&EncodedClientSecretKey, &encoded_secret)
265            .await;
266        dbtx.commit_tx().await;
267        Ok(())
268    }
269
270    pub async fn load_decodable_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
271        let Some(secret) = Self::load_decodable_client_secret_opt(db).await? else {
272            bail!("Encoded client secret not present in DB")
273        };
274
275        Ok(secret)
276    }
277    pub async fn load_decodable_client_secret_opt<T: Decodable>(
278        db: &Database,
279    ) -> anyhow::Result<Option<T>> {
280        let mut dbtx = db.begin_transaction_nc().await;
281
282        let client_secret = dbtx.get_value(&EncodedClientSecretKey).await;
283
284        Ok(match client_secret {
285            Some(client_secret) => Some(
286                T::consensus_decode_whole(&client_secret, &ModuleRegistry::default())
287                    .map_err(|e| anyhow!("Decoding failed: {e}"))?,
288            ),
289            None => None,
290        })
291    }
292
293    pub async fn load_or_generate_client_secret(db: &Database) -> anyhow::Result<[u8; 64]> {
294        let client_secret = match Self::load_decodable_client_secret::<[u8; 64]>(db).await {
295            Ok(secret) => secret,
296            _ => {
297                let secret = PlainRootSecretStrategy::random(&mut thread_rng());
298                Self::store_encodable_client_secret(db, secret)
299                    .await
300                    .expect("Storing client secret must work");
301                secret
302            }
303        };
304        Ok(client_secret)
305    }
306
307    pub async fn is_initialized(db: &Database) -> bool {
308        let mut dbtx = db.begin_transaction_nc().await;
309        dbtx.raw_get_bytes(&[ClientConfigKey::DB_PREFIX])
310            .await
311            .expect("Unrecoverable error occurred while reading and entry from the database")
312            .is_some()
313    }
314
315    pub fn start_executor(self: &Arc<Self>) {
316        debug!(
317            target: LOG_CLIENT,
318            "Starting fedimint client executor",
319        );
320        self.executor.start_executor(self.context_gen());
321    }
322
323    pub fn federation_id(&self) -> FederationId {
324        self.federation_id
325    }
326
327    fn context_gen(self: &Arc<Self>) -> ModuleGlobalContextGen {
328        let client_inner = Arc::downgrade(self);
329        Arc::new(move |module_instance, operation| {
330            ModuleGlobalClientContext {
331                client: client_inner
332                    .clone()
333                    .upgrade()
334                    .expect("ModuleGlobalContextGen called after client was dropped"),
335                module_instance_id: module_instance,
336                operation,
337            }
338            .into()
339        })
340    }
341
342    pub async fn config(&self) -> ClientConfig {
343        self.config.read().await.clone()
344    }
345
346    // TODO: change to `-> Option<&str>`
347    pub fn api_secret(&self) -> &Option<String> {
348        &self.api_secret
349    }
350
351    /// Returns the core API version that the federation supports
352    ///
353    /// This reads from the cached version stored during client initialization.
354    /// If no cache is available (e.g., during initial setup), returns a default
355    /// version (0, 0).
356    pub async fn core_api_version(&self) -> ApiVersion {
357        // Try to get from cache. If not available, return a conservative
358        // default. The cache should always be populated after successful client init.
359        self.db
360            .begin_transaction_nc()
361            .await
362            .get_value(&CachedApiVersionSetKey)
363            .await
364            .map(|cached: CachedApiVersionSet| cached.0.core)
365            .unwrap_or(ApiVersion { major: 0, minor: 0 })
366    }
367
368    /// Returns the chain ID (bitcoin block hash at height 1) from the
369    /// federation
370    ///
371    /// This is cached in the database after the first successful fetch.
372    /// The chain ID uniquely identifies which bitcoin network the federation
373    /// operates on (mainnet, testnet, signet, regtest).
374    pub async fn chain_id(&self) -> anyhow::Result<ChainId> {
375        // Check cache first
376        if let Some(chain_id) = self
377            .db
378            .begin_transaction_nc()
379            .await
380            .get_value(&ChainIdKey)
381            .await
382        {
383            return Ok(chain_id);
384        }
385
386        // Fetch from federation with consensus
387        let chain_id = self.api.chain_id().await?;
388
389        // Cache the result
390        let mut dbtx = self.db.begin_transaction().await;
391        dbtx.insert_entry(&ChainIdKey, &chain_id).await;
392        dbtx.commit_tx().await;
393
394        Ok(chain_id)
395    }
396
397    pub fn decoders(&self) -> &ModuleDecoderRegistry {
398        &self.decoders
399    }
400
401    /// Returns a reference to the module, panics if not found
402    fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
403        self.try_get_module(instance)
404            .expect("Module instance not found")
405    }
406
407    fn try_get_module(
408        &self,
409        instance: ModuleInstanceId,
410    ) -> Option<&maybe_add_send_sync!(dyn IClientModule)> {
411        Some(self.modules.get(instance)?.as_ref())
412    }
413
414    pub fn has_module(&self, instance: ModuleInstanceId) -> bool {
415        self.modules.get(instance).is_some()
416    }
417
418    /// Returns the input amount and output amount of a transaction
419    ///
420    /// # Panics
421    /// If any of the input or output versions in the transaction builder are
422    /// unknown by the respective module.
423    fn transaction_builder_get_balance(&self, builder: &TransactionBuilder) -> (Amounts, Amounts) {
424        // FIXME: prevent overflows, currently not suitable for untrusted input
425        let mut in_amounts = Amounts::ZERO;
426        let mut out_amounts = Amounts::ZERO;
427        let mut fee_amounts = Amounts::ZERO;
428
429        for input in builder.inputs() {
430            let module = self.get_module(input.input.module_instance_id());
431
432            let item_fees = module.input_fee(&input.amounts, &input.input).expect(
433                "We only build transactions with input versions that are supported by the module",
434            );
435
436            in_amounts.checked_add_mut(&input.amounts);
437            fee_amounts.checked_add_mut(&item_fees);
438        }
439
440        for output in builder.outputs() {
441            let module = self.get_module(output.output.module_instance_id());
442
443            let item_fees = module.output_fee(&output.amounts, &output.output).expect(
444                "We only build transactions with output versions that are supported by the module",
445            );
446
447            out_amounts.checked_add_mut(&output.amounts);
448            fee_amounts.checked_add_mut(&item_fees);
449        }
450
451        out_amounts.checked_add_mut(&fee_amounts);
452        (in_amounts, out_amounts)
453    }
454
455    pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
456        Ok((self.federation_id().to_fake_ln_pub_key(&self.secp_ctx)?, 0))
457    }
458
459    /// Get metadata value from the federation config itself
460    pub fn get_config_meta(&self, key: &str) -> Option<String> {
461        self.federation_config_meta.get(key).cloned()
462    }
463
464    pub(crate) fn root_secret(&self) -> DerivableSecret {
465        self.root_secret.clone()
466    }
467
468    pub async fn add_state_machines(
469        &self,
470        dbtx: &mut DatabaseTransaction<'_>,
471        states: Vec<DynState>,
472    ) -> AddStateMachinesResult {
473        self.executor.add_state_machines_dbtx(dbtx, states).await
474    }
475
476    // TODO: implement as part of [`OperationLog`]
477    pub async fn get_active_operations(&self) -> HashSet<OperationId> {
478        let active_states = self.executor.get_active_states().await;
479        let mut active_operations = HashSet::with_capacity(active_states.len());
480        let mut dbtx = self.db().begin_transaction_nc().await;
481        for (state, _) in active_states {
482            let operation_id = state.operation_id();
483            if dbtx
484                .get_value(&OperationLogKey { operation_id })
485                .await
486                .is_some()
487            {
488                active_operations.insert(operation_id);
489            }
490        }
491        active_operations
492    }
493
494    pub fn operation_log(&self) -> &OperationLog {
495        &self.operation_log
496    }
497
498    /// Get the meta manager to read meta fields.
499    pub fn meta_service(&self) -> &Arc<MetaService> {
500        &self.meta_service
501    }
502
503    /// Get the meta manager to read meta fields.
504    pub async fn get_meta_expiration_timestamp(&self) -> Option<SystemTime> {
505        let meta_service = self.meta_service();
506        let ts = meta_service
507            .get_field::<u64>(self.db(), "federation_expiry_timestamp")
508            .await
509            .and_then(|v| v.value)?;
510        Some(UNIX_EPOCH + Duration::from_secs(ts))
511    }
512
513    /// Adds funding to a transaction or removes over-funding via change.
514    async fn finalize_transaction(
515        &self,
516        dbtx: &mut DatabaseTransaction<'_>,
517        operation_id: OperationId,
518        mut partial_transaction: TransactionBuilder,
519    ) -> anyhow::Result<(Transaction, Vec<DynState>, Range<u64>)> {
520        let (in_amounts, out_amounts) = self.transaction_builder_get_balance(&partial_transaction);
521
522        let mut added_inputs_bundles = vec![];
523        let mut added_outputs_bundles = vec![];
524
525        // The way currently things are implemented is OK for modules which can
526        // collect a fee relative to one being used, but will break down in any
527        // fancy scenarios. Future TODOs:
528        //
529        // * create_final_inputs_and_outputs needs to get broken down, so we can use
530        //   primary modules using priorities (possibly separate prios for inputs and
531        //   outputs to be able to drain modules, etc.); we need the split "check if
532        //   possible" and "take" steps,
533        // * extra inputs and outputs adding fees needs to be taken into account,
534        //   possibly with some looping
535        for unit in in_amounts.units().union(&out_amounts.units()) {
536            let input_amount = in_amounts.get(unit).copied().unwrap_or_default();
537            let output_amount = out_amounts.get(unit).copied().unwrap_or_default();
538            if input_amount == output_amount {
539                continue;
540            }
541
542            let Some((module_id, module)) = self.primary_module_for_unit(*unit) else {
543                bail!("No module to balance a partial transaction (affected unit: {unit:?}");
544            };
545
546            let (added_input_bundle, added_output_bundle) = module
547                .create_final_inputs_and_outputs(
548                    module_id,
549                    dbtx,
550                    operation_id,
551                    *unit,
552                    input_amount,
553                    output_amount,
554                )
555                .await?;
556
557            added_inputs_bundles.push(added_input_bundle);
558            added_outputs_bundles.push(added_output_bundle);
559        }
560
561        // This is the range of  outputs that will be added to the transaction
562        // in order to balance it. Notice that it may stay empty in case the transaction
563        // is already balanced.
564        let change_range = Range {
565            start: partial_transaction.outputs().count() as u64,
566            end: (partial_transaction.outputs().count() as u64
567                + added_outputs_bundles
568                    .iter()
569                    .map(|output| output.outputs().len() as u64)
570                    .sum::<u64>()),
571        };
572
573        for added_inputs in added_inputs_bundles {
574            partial_transaction = partial_transaction.with_inputs(added_inputs);
575        }
576
577        for added_outputs in added_outputs_bundles {
578            partial_transaction = partial_transaction.with_outputs(added_outputs);
579        }
580
581        let (input_amounts, output_amounts) =
582            self.transaction_builder_get_balance(&partial_transaction);
583
584        for (unit, output_amount) in output_amounts {
585            let input_amount = input_amounts.get(&unit).copied().unwrap_or_default();
586
587            assert!(input_amount >= output_amount, "Transaction is underfunded");
588        }
589
590        let (tx, states) = partial_transaction.build(&self.secp_ctx, thread_rng());
591
592        Ok((tx, states, change_range))
593    }
594
595    /// Add funding and/or change to the transaction builder as needed, finalize
596    /// the transaction and submit it to the federation.
597    ///
598    /// ## Errors
599    /// The function will return an error if the operation with given ID already
600    /// exists.
601    ///
602    /// ## Panics
603    /// The function will panic if the database transaction collides with
604    /// other and fails with others too often, this should not happen except for
605    /// excessively concurrent scenarios.
606    pub async fn finalize_and_submit_transaction<F, M>(
607        &self,
608        operation_id: OperationId,
609        operation_type: &str,
610        operation_meta_gen: F,
611        tx_builder: TransactionBuilder,
612    ) -> anyhow::Result<OutPointRange>
613    where
614        F: Fn(OutPointRange) -> M + Clone + MaybeSend + MaybeSync,
615        M: serde::Serialize + MaybeSend,
616    {
617        let operation_type = operation_type.to_owned();
618
619        let autocommit_res = self
620            .db
621            .autocommit(
622                |dbtx, _| {
623                    let operation_type = operation_type.clone();
624                    let tx_builder = tx_builder.clone();
625                    let operation_meta_gen = operation_meta_gen.clone();
626                    Box::pin(async move {
627                        if Client::operation_exists_dbtx(dbtx, operation_id).await {
628                            bail!("There already exists an operation with id {operation_id:?}")
629                        }
630
631                        let out_point_range = self
632                            .finalize_and_submit_transaction_inner(dbtx, operation_id, tx_builder)
633                            .await?;
634
635                        self.operation_log()
636                            .add_operation_log_entry_dbtx(
637                                dbtx,
638                                operation_id,
639                                &operation_type,
640                                operation_meta_gen(out_point_range),
641                            )
642                            .await;
643
644                        Ok(out_point_range)
645                    })
646                },
647                Some(100), // TODO: handle what happens after 100 retries
648            )
649            .await;
650
651        match autocommit_res {
652            Ok(txid) => Ok(txid),
653            Err(AutocommitError::ClosureError { error, .. }) => Err(error),
654            Err(AutocommitError::CommitFailed {
655                attempts,
656                last_error,
657            }) => panic!(
658                "Failed to commit tx submission dbtx after {attempts} attempts: {last_error}"
659            ),
660        }
661    }
662
663    async fn finalize_and_submit_transaction_inner(
664        &self,
665        dbtx: &mut DatabaseTransaction<'_>,
666        operation_id: OperationId,
667        tx_builder: TransactionBuilder,
668    ) -> anyhow::Result<OutPointRange> {
669        let (transaction, mut states, change_range) = self
670            .finalize_transaction(&mut dbtx.to_ref_nc(), operation_id, tx_builder)
671            .await?;
672
673        if transaction.consensus_encode_to_vec().len() > Transaction::MAX_TX_SIZE {
674            let inputs = transaction
675                .inputs
676                .iter()
677                .map(DynInput::module_instance_id)
678                .collect::<Vec<_>>();
679            let outputs = transaction
680                .outputs
681                .iter()
682                .map(DynOutput::module_instance_id)
683                .collect::<Vec<_>>();
684            warn!(
685                target: LOG_CLIENT_NET_API,
686                size=%transaction.consensus_encode_to_vec().len(),
687                ?inputs,
688                ?outputs,
689                "Transaction too large",
690            );
691            debug!(target: LOG_CLIENT_NET_API, ?transaction, "transaction details");
692            bail!(
693                "The generated transaction would be rejected by the federation for being too large."
694            );
695        }
696
697        let txid = transaction.tx_hash();
698
699        debug!(target: LOG_CLIENT_NET_API, %txid, ?transaction,  "Finalized and submitting transaction");
700
701        let tx_submission_sm = DynState::from_typed(
702            TRANSACTION_SUBMISSION_MODULE_INSTANCE,
703            TxSubmissionStatesSM {
704                operation_id,
705                state: TxSubmissionStates::Created(transaction),
706            },
707        );
708        states.push(tx_submission_sm);
709
710        self.executor.add_state_machines_dbtx(dbtx, states).await?;
711
712        self.log_event_dbtx(dbtx, None, TxCreatedEvent { txid, operation_id })
713            .await;
714
715        Ok(OutPointRange::new(txid, IdxRange::from(change_range)))
716    }
717
718    async fn transaction_update_stream(
719        &self,
720        operation_id: OperationId,
721    ) -> BoxStream<'static, TxSubmissionStatesSM> {
722        self.executor
723            .notifier()
724            .module_notifier::<TxSubmissionStatesSM>(
725                TRANSACTION_SUBMISSION_MODULE_INSTANCE,
726                self.final_client.clone(),
727            )
728            .subscribe(operation_id)
729            .await
730    }
731
732    pub async fn operation_exists(&self, operation_id: OperationId) -> bool {
733        let mut dbtx = self.db().begin_transaction_nc().await;
734
735        Client::operation_exists_dbtx(&mut dbtx, operation_id).await
736    }
737
738    pub async fn operation_exists_dbtx(
739        dbtx: &mut DatabaseTransaction<'_>,
740        operation_id: OperationId,
741    ) -> bool {
742        let active_state_exists = dbtx
743            .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
744            .await
745            .next()
746            .await
747            .is_some();
748
749        let inactive_state_exists = dbtx
750            .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
751            .await
752            .next()
753            .await
754            .is_some();
755
756        active_state_exists || inactive_state_exists
757    }
758
759    pub async fn has_active_states(&self, operation_id: OperationId) -> bool {
760        self.db
761            .begin_transaction_nc()
762            .await
763            .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
764            .await
765            .next()
766            .await
767            .is_some()
768    }
769
770    /// Waits for an output from the primary module to reach its final
771    /// state.
772    pub async fn await_primary_bitcoin_module_output(
773        &self,
774        operation_id: OperationId,
775        out_point: OutPoint,
776    ) -> anyhow::Result<()> {
777        self.primary_module_for_unit(AmountUnit::BITCOIN)
778            .ok_or_else(|| anyhow!("No primary module available"))?
779            .1
780            .await_primary_module_output(operation_id, out_point)
781            .await
782    }
783
784    /// Returns a reference to a typed module client instance by kind
785    pub fn get_first_module<M: ClientModule>(
786        &'_ self,
787    ) -> anyhow::Result<ClientModuleInstance<'_, M>> {
788        let module_kind = M::kind();
789        let id = self
790            .get_first_instance(&module_kind)
791            .ok_or_else(|| format_err!("No modules found of kind {module_kind}"))?;
792        let module: &M = self
793            .try_get_module(id)
794            .ok_or_else(|| format_err!("Unknown module instance {id}"))?
795            .as_any()
796            .downcast_ref::<M>()
797            .ok_or_else(|| format_err!("Module is not of type {}", std::any::type_name::<M>()))?;
798        let (db, _) = self.db().with_prefix_module_id(id);
799        Ok(ClientModuleInstance {
800            id,
801            db,
802            api: self.api().with_module(id),
803            module,
804        })
805    }
806
807    pub fn get_module_client_dyn(
808        &self,
809        instance_id: ModuleInstanceId,
810    ) -> anyhow::Result<&maybe_add_send_sync!(dyn IClientModule)> {
811        self.try_get_module(instance_id)
812            .ok_or(anyhow!("Unknown module instance {}", instance_id))
813    }
814
815    pub fn db(&self) -> &Database {
816        &self.db
817    }
818
819    pub fn endpoints(&self) -> &ConnectorRegistry {
820        &self.connectors
821    }
822
823    /// Returns a stream of transaction updates for the given operation id that
824    /// can later be used to watch for a specific transaction being accepted.
825    pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
826        TransactionUpdates {
827            update_stream: self.transaction_update_stream(operation_id).await,
828        }
829    }
830
831    /// Returns the instance id of the first module of the given kind.
832    pub fn get_first_instance(&self, module_kind: &ModuleKind) -> Option<ModuleInstanceId> {
833        self.modules
834            .iter_modules()
835            .find(|(_, kind, _module)| *kind == module_kind)
836            .map(|(instance_id, _, _)| instance_id)
837    }
838
839    /// Returns the data from which the client's root secret is derived (e.g.
840    /// BIP39 seed phrase struct).
841    pub async fn root_secret_encoding<T: Decodable>(&self) -> anyhow::Result<T> {
842        get_decoded_client_secret::<T>(self.db()).await
843    }
844
845    /// Waits for outputs from the primary module to reach its final
846    /// state.
847    pub async fn await_primary_bitcoin_module_outputs(
848        &self,
849        operation_id: OperationId,
850        outputs: Vec<OutPoint>,
851    ) -> anyhow::Result<()> {
852        for out_point in outputs {
853            self.await_primary_bitcoin_module_output(operation_id, out_point)
854                .await?;
855        }
856
857        Ok(())
858    }
859
860    /// Returns the config of the client in JSON format.
861    ///
862    /// Compared to the consensus module format where module configs are binary
863    /// encoded this format cannot be cryptographically verified but is easier
864    /// to consume and to some degree human-readable.
865    pub async fn get_config_json(&self) -> JsonClientConfig {
866        self.config().await.to_json()
867    }
868
869    // Ideally this would not be in the API, but there's a lot of places where this
870    // makes it easier.
871    #[doc(hidden)]
872    /// Like [`Self::get_balance`] but returns an error if primary module is not
873    /// available
874    pub async fn get_balance_for_btc(&self) -> anyhow::Result<Amount> {
875        self.get_balance_for_unit(AmountUnit::BITCOIN).await
876    }
877
878    pub async fn get_balance_for_unit(&self, unit: AmountUnit) -> anyhow::Result<Amount> {
879        let (id, module) = self
880            .primary_module_for_unit(unit)
881            .ok_or_else(|| anyhow!("Primary module not available"))?;
882        Ok(module
883            .get_balance(id, &mut self.db().begin_transaction_nc().await, unit)
884            .await)
885    }
886
887    /// Returns a stream that yields the current client balance every time it
888    /// changes.
889    pub async fn subscribe_balance_changes(&self, unit: AmountUnit) -> BoxStream<'static, Amount> {
890        let primary_module_things =
891            if let Some((primary_module_id, primary_module)) = self.primary_module_for_unit(unit) {
892                let balance_changes = primary_module.subscribe_balance_changes().await;
893                let initial_balance = self
894                    .get_balance_for_unit(unit)
895                    .await
896                    .expect("Primary is present");
897
898                Some((
899                    primary_module_id,
900                    primary_module.clone(),
901                    balance_changes,
902                    initial_balance,
903                ))
904            } else {
905                None
906            };
907        let db = self.db().clone();
908
909        Box::pin(async_stream::stream! {
910            let Some((primary_module_id, primary_module, mut balance_changes, initial_balance)) = primary_module_things else {
911                // If there is no primary module, there will not be one until client is
912                // restarted
913                pending().await
914            };
915
916
917            yield initial_balance;
918            let mut prev_balance = initial_balance;
919            while let Some(()) = balance_changes.next().await {
920                let mut dbtx = db.begin_transaction_nc().await;
921                let balance = primary_module
922                     .get_balance(primary_module_id, &mut dbtx, unit)
923                    .await;
924
925                // Deduplicate in case modules cannot always tell if the balance actually changed
926                if balance != prev_balance {
927                    prev_balance = balance;
928                    yield balance;
929                }
930            }
931        })
932    }
933
934    /// Make a single API version request to a peer after a delay.
935    ///
936    /// The delay is here to unify the type of a future both for initial request
937    /// and possible retries.
938    async fn make_api_version_request(
939        delay: Duration,
940        peer_id: PeerId,
941        api: &DynGlobalApi,
942    ) -> (
943        PeerId,
944        Result<SupportedApiVersionsSummary, fedimint_connectors::error::ServerError>,
945    ) {
946        runtime::sleep(delay).await;
947        (
948            peer_id,
949            api.request_single_peer::<SupportedApiVersionsSummary>(
950                VERSION_ENDPOINT.to_owned(),
951                ApiRequestErased::default(),
952                peer_id,
953            )
954            .await,
955        )
956    }
957
958    /// Create a backoff strategy for API version requests.
959    ///
960    /// Keep trying, initially somewhat aggressively, but after a while retry
961    /// very slowly, because chances for response are getting lower and
962    /// lower.
963    fn create_api_version_backoff() -> impl Iterator<Item = Duration> {
964        custom_backoff(Duration::from_millis(200), Duration::from_secs(600), None)
965    }
966
967    /// Query the federation for API version support and then calculate
968    /// the best API version to use (supported by most guardians).
969    pub async fn fetch_common_api_versions_from_all_peers(
970        num_peers: NumPeers,
971        api: DynGlobalApi,
972        db: Database,
973        num_responses_sender: watch::Sender<usize>,
974    ) {
975        let mut backoff = Self::create_api_version_backoff();
976
977        // NOTE: `FuturesUnordered` is a footgun, but since we only poll it for result
978        // and make a single async db write operation, it should be OK.
979        let mut requests = FuturesUnordered::new();
980
981        for peer_id in num_peers.peer_ids() {
982            requests.push(Self::make_api_version_request(
983                Duration::ZERO,
984                peer_id,
985                &api,
986            ));
987        }
988
989        let mut num_responses = 0;
990
991        while let Some((peer_id, response)) = requests.next().await {
992            let retry = match response {
993                Err(err) => {
994                    let has_previous_response = db
995                        .begin_transaction_nc()
996                        .await
997                        .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
998                        .await
999                        .is_some();
1000                    debug!(
1001                        target: LOG_CLIENT,
1002                        %peer_id,
1003                        err = %err.fmt_compact(),
1004                        %has_previous_response,
1005                        "Failed to refresh API versions of a peer"
1006                    );
1007
1008                    !has_previous_response
1009                }
1010                Ok(o) => {
1011                    // Save the response to the database right away, just to
1012                    // not lose it
1013                    let mut dbtx = db.begin_transaction().await;
1014                    dbtx.insert_entry(
1015                        &PeerLastApiVersionsSummaryKey(peer_id),
1016                        &PeerLastApiVersionsSummary(o),
1017                    )
1018                    .await;
1019                    dbtx.commit_tx().await;
1020                    false
1021                }
1022            };
1023
1024            if retry {
1025                requests.push(Self::make_api_version_request(
1026                    backoff.next().expect("Keeps retrying"),
1027                    peer_id,
1028                    &api,
1029                ));
1030            } else {
1031                num_responses += 1;
1032                num_responses_sender.send_replace(num_responses);
1033            }
1034        }
1035    }
1036
1037    /// Fetch API versions from peers, retrying until we get threshold number of
1038    /// successful responses. Returns the successful responses collected
1039    /// from at least `num_peers.threshold()` peers.
1040    pub async fn fetch_peers_api_versions_from_threshold_of_peers(
1041        num_peers: NumPeers,
1042        api: DynGlobalApi,
1043    ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1044        let mut backoff = Self::create_api_version_backoff();
1045
1046        // NOTE: `FuturesUnordered` is a footgun, but since we only poll it for result
1047        // and collect responses, it should be OK.
1048        let mut requests = FuturesUnordered::new();
1049
1050        for peer_id in num_peers.peer_ids() {
1051            requests.push(Self::make_api_version_request(
1052                Duration::ZERO,
1053                peer_id,
1054                &api,
1055            ));
1056        }
1057
1058        let mut successful_responses = BTreeMap::new();
1059
1060        while successful_responses.len() < num_peers.threshold()
1061            && let Some((peer_id, response)) = requests.next().await
1062        {
1063            let retry = match response {
1064                Err(err) => {
1065                    debug!(
1066                        target: LOG_CLIENT,
1067                        %peer_id,
1068                        err = %err.fmt_compact(),
1069                        "Failed to fetch API versions from peer"
1070                    );
1071                    true
1072                }
1073                Ok(response) => {
1074                    successful_responses.insert(peer_id, response);
1075                    false
1076                }
1077            };
1078
1079            if retry {
1080                requests.push(Self::make_api_version_request(
1081                    backoff.next().expect("Keeps retrying"),
1082                    peer_id,
1083                    &api,
1084                ));
1085            }
1086        }
1087
1088        successful_responses
1089    }
1090
1091    /// Fetch API versions from peers and discover common API versions to use.
1092    pub async fn fetch_common_api_versions(
1093        config: &ClientConfig,
1094        api: &DynGlobalApi,
1095    ) -> anyhow::Result<BTreeMap<PeerId, SupportedApiVersionsSummary>> {
1096        debug!(
1097            target: LOG_CLIENT,
1098            "Fetching common api versions"
1099        );
1100
1101        let num_peers = NumPeers::from(config.global.api_endpoints.len());
1102
1103        let peer_api_version_sets =
1104            Self::fetch_peers_api_versions_from_threshold_of_peers(num_peers, api.clone()).await;
1105
1106        Ok(peer_api_version_sets)
1107    }
1108
1109    /// Write API version set to database cache.
1110    /// Used when we have a pre-calculated API version set that should be stored
1111    /// for later use.
1112    pub async fn write_api_version_cache(
1113        dbtx: &mut DatabaseTransaction<'_>,
1114        api_version_set: ApiVersionSet,
1115    ) {
1116        debug!(
1117            target: LOG_CLIENT,
1118            value = ?api_version_set,
1119            "Writing API version set to cache"
1120        );
1121
1122        dbtx.insert_entry(
1123            &CachedApiVersionSetKey,
1124            &CachedApiVersionSet(api_version_set),
1125        )
1126        .await;
1127    }
1128
1129    /// Store prefetched peer API version responses and calculate/store common
1130    /// API version set. This processes the individual peer responses by
1131    /// storing them in the database and calculating the common API version
1132    /// set for caching.
1133    pub async fn store_prefetched_api_versions(
1134        db: &Database,
1135        config: &ClientConfig,
1136        client_module_init: &ClientModuleInitRegistry,
1137        peer_api_versions: &BTreeMap<PeerId, SupportedApiVersionsSummary>,
1138    ) {
1139        debug!(
1140            target: LOG_CLIENT,
1141            "Storing {} prefetched peer API version responses and calculating common version set",
1142            peer_api_versions.len()
1143        );
1144
1145        let mut dbtx = db.begin_transaction().await;
1146        // Calculate common API version set from individual responses
1147        let client_supported_versions =
1148            Self::supported_api_versions_summary_static(config, client_module_init);
1149        match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1150            &client_supported_versions,
1151            peer_api_versions,
1152        ) {
1153            Ok(common_api_versions) => {
1154                // Write the calculated common API version set to database cache
1155                Self::write_api_version_cache(&mut dbtx.to_ref_nc(), common_api_versions).await;
1156                debug!(target: LOG_CLIENT, "Calculated and stored common API version set");
1157            }
1158            Err(err) => {
1159                debug!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Failed to calculate common API versions from prefetched data");
1160            }
1161        }
1162
1163        // Store individual peer responses to database
1164        for (peer_id, peer_api_versions) in peer_api_versions {
1165            dbtx.insert_entry(
1166                &PeerLastApiVersionsSummaryKey(*peer_id),
1167                &PeerLastApiVersionsSummary(peer_api_versions.clone()),
1168            )
1169            .await;
1170        }
1171        dbtx.commit_tx().await;
1172        debug!(target: LOG_CLIENT, "Stored individual peer API version responses");
1173    }
1174
1175    /// [`SupportedApiVersionsSummary`] that the client and its modules support
1176    pub fn supported_api_versions_summary_static(
1177        config: &ClientConfig,
1178        client_module_init: &ClientModuleInitRegistry,
1179    ) -> SupportedApiVersionsSummary {
1180        SupportedApiVersionsSummary {
1181            core: SupportedCoreApiVersions {
1182                core_consensus: config.global.consensus_version,
1183                api: MultiApiVersion::try_from_iter(SUPPORTED_CORE_API_VERSIONS.to_owned())
1184                    .expect("must not have conflicting versions"),
1185            },
1186            modules: config
1187                .modules
1188                .iter()
1189                .filter_map(|(&module_instance_id, module_config)| {
1190                    client_module_init
1191                        .get(module_config.kind())
1192                        .map(|module_init| {
1193                            (
1194                                module_instance_id,
1195                                SupportedModuleApiVersions {
1196                                    core_consensus: config.global.consensus_version,
1197                                    module_consensus: module_config.version,
1198                                    api: module_init.supported_api_versions(),
1199                                },
1200                            )
1201                        })
1202                })
1203                .collect(),
1204        }
1205    }
1206
1207    pub async fn load_and_refresh_common_api_version(&self) -> anyhow::Result<ApiVersionSet> {
1208        Self::load_and_refresh_common_api_version_static(
1209            &self.config().await,
1210            &self.module_inits,
1211            self.connectors.clone(),
1212            &self.api,
1213            &self.db,
1214            &self.task_group,
1215        )
1216        .await
1217    }
1218
1219    /// Load the common api versions to use from cache and start a background
1220    /// process to refresh them.
1221    ///
1222    /// This is a compromise, so we not have to wait for version discovery to
1223    /// complete every time a [`Client`] is being built.
1224    async fn load_and_refresh_common_api_version_static(
1225        config: &ClientConfig,
1226        module_init: &ClientModuleInitRegistry,
1227        connectors: ConnectorRegistry,
1228        api: &DynGlobalApi,
1229        db: &Database,
1230        task_group: &TaskGroup,
1231    ) -> anyhow::Result<ApiVersionSet> {
1232        if let Some(v) = db
1233            .begin_transaction_nc()
1234            .await
1235            .get_value(&CachedApiVersionSetKey)
1236            .await
1237        {
1238            debug!(
1239                target: LOG_CLIENT,
1240                "Found existing cached common api versions"
1241            );
1242            let config = config.clone();
1243            let client_module_init = module_init.clone();
1244            let api = api.clone();
1245            let db = db.clone();
1246            let task_group = task_group.clone();
1247            // Separate task group, because we actually don't want to be waiting for this to
1248            // finish, and it's just best effort.
1249            task_group
1250                .clone()
1251                .spawn_cancellable("refresh_common_api_version_static", async move {
1252                    connectors.wait_for_initialized_connections().await;
1253
1254                    if let Err(error) = Self::refresh_common_api_version_static(
1255                        &config,
1256                        &client_module_init,
1257                        &api,
1258                        &db,
1259                        task_group,
1260                        false,
1261                    )
1262                    .await
1263                    {
1264                        warn!(
1265                            target: LOG_CLIENT,
1266                            err = %error.fmt_compact_anyhow(), "Failed to discover common api versions"
1267                        );
1268                    }
1269                });
1270
1271            return Ok(v.0);
1272        }
1273
1274        info!(
1275            target: LOG_CLIENT,
1276            "Fetching initial API versions "
1277        );
1278        Self::refresh_common_api_version_static(
1279            config,
1280            module_init,
1281            api,
1282            db,
1283            task_group.clone(),
1284            true,
1285        )
1286        .await
1287    }
1288
1289    async fn refresh_common_api_version_static(
1290        config: &ClientConfig,
1291        client_module_init: &ClientModuleInitRegistry,
1292        api: &DynGlobalApi,
1293        db: &Database,
1294        task_group: TaskGroup,
1295        block_until_ok: bool,
1296    ) -> anyhow::Result<ApiVersionSet> {
1297        debug!(
1298            target: LOG_CLIENT,
1299            "Refreshing common api versions"
1300        );
1301
1302        let (num_responses_sender, mut num_responses_receiver) = tokio::sync::watch::channel(0);
1303        let num_peers = NumPeers::from(config.global.api_endpoints.len());
1304
1305        task_group.spawn_cancellable("refresh peers api versions", {
1306            Client::fetch_common_api_versions_from_all_peers(
1307                num_peers,
1308                api.clone(),
1309                db.clone(),
1310                num_responses_sender,
1311            )
1312        });
1313
1314        let common_api_versions = loop {
1315            // Wait to collect enough answers before calculating a set of common api
1316            // versions to use. Note that all peers individual responses from
1317            // previous attempts are still being used, and requests, or even
1318            // retries for response of peers are not actually cancelled, as they
1319            // are happening on a separate task. This is all just to bound the
1320            // time user can be waiting for the join operation to finish, at the
1321            // risk of picking wrong version in very rare circumstances.
1322            let _: Result<_, Elapsed> = runtime::timeout(
1323                Duration::from_secs(30),
1324                num_responses_receiver.wait_for(|num| num_peers.threshold() <= *num),
1325            )
1326            .await;
1327
1328            let peer_api_version_sets = Self::load_peers_last_api_versions(db, num_peers).await;
1329
1330            match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1331                &Self::supported_api_versions_summary_static(config, client_module_init),
1332                &peer_api_version_sets,
1333            ) {
1334                Ok(o) => break o,
1335                Err(err) if block_until_ok => {
1336                    warn!(
1337                        target: LOG_CLIENT,
1338                        err = %err.fmt_compact_anyhow(),
1339                        "Failed to discover API version to use. Retrying..."
1340                    );
1341                    continue;
1342                }
1343                Err(e) => return Err(e),
1344            }
1345        };
1346
1347        debug!(
1348            target: LOG_CLIENT,
1349            value = ?common_api_versions,
1350            "Updating the cached common api versions"
1351        );
1352        let mut dbtx = db.begin_transaction().await;
1353        let _ = dbtx
1354            .insert_entry(
1355                &CachedApiVersionSetKey,
1356                &CachedApiVersionSet(common_api_versions.clone()),
1357            )
1358            .await;
1359
1360        dbtx.commit_tx().await;
1361
1362        Ok(common_api_versions)
1363    }
1364
1365    /// Get the client [`Metadata`]
1366    pub async fn get_metadata(&self) -> Metadata {
1367        self.db
1368            .begin_transaction_nc()
1369            .await
1370            .get_value(&ClientMetadataKey)
1371            .await
1372            .unwrap_or_else(|| {
1373                warn!(
1374                    target: LOG_CLIENT,
1375                    "Missing existing metadata. This key should have been set on Client init"
1376                );
1377                Metadata::empty()
1378            })
1379    }
1380
1381    /// Set the client [`Metadata`]
1382    pub async fn set_metadata(&self, metadata: &Metadata) {
1383        self.db
1384            .autocommit::<_, _, anyhow::Error>(
1385                |dbtx, _| {
1386                    Box::pin(async {
1387                        Self::set_metadata_dbtx(dbtx, metadata).await;
1388                        Ok(())
1389                    })
1390                },
1391                None,
1392            )
1393            .await
1394            .expect("Failed to autocommit metadata");
1395    }
1396
1397    pub fn has_pending_recoveries(&self) -> bool {
1398        !self
1399            .client_recovery_progress_receiver
1400            .borrow()
1401            .iter()
1402            .all(|(_id, progress)| progress.is_done())
1403    }
1404
1405    /// Wait for all module recoveries to finish
1406    ///
1407    /// This will block until the recovery task is done with recoveries.
1408    /// Returns success if all recovery tasks are complete (success case),
1409    /// or an error if some modules could not complete the recovery at the time.
1410    ///
1411    /// A bit of a heavy approach.
1412    pub async fn wait_for_all_recoveries(&self) -> anyhow::Result<()> {
1413        let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1414        recovery_receiver
1415            .wait_for(|in_progress| {
1416                in_progress
1417                    .iter()
1418                    .all(|(_id, progress)| progress.is_done())
1419            })
1420            .await
1421            .context("Recovery task completed and update receiver disconnected, but some modules failed to recover")?;
1422
1423        Ok(())
1424    }
1425
1426    /// Subscribe to recover progress for all the modules.
1427    ///
1428    /// This stream can contain duplicate progress for a module.
1429    /// Don't use this stream for detecting completion of recovery.
1430    pub fn subscribe_to_recovery_progress(
1431        &self,
1432    ) -> impl Stream<Item = (ModuleInstanceId, RecoveryProgress)> + use<> {
1433        WatchStream::new(self.client_recovery_progress_receiver.clone())
1434            .flat_map(futures::stream::iter)
1435    }
1436
1437    pub async fn wait_for_module_kind_recovery(
1438        &self,
1439        module_kind: ModuleKind,
1440    ) -> anyhow::Result<()> {
1441        let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1442        let config = self.config().await;
1443        recovery_receiver
1444            .wait_for(|in_progress| {
1445                !in_progress
1446                    .iter()
1447                    .filter(|(module_instance_id, _progress)| {
1448                        config.modules[module_instance_id].kind == module_kind
1449                    })
1450                    .any(|(_id, progress)| !progress.is_done())
1451            })
1452            .await
1453            .context("Recovery task completed and update receiver disconnected, but the desired modules are still unavailable or failed to recover")?;
1454
1455        Ok(())
1456    }
1457
1458    pub async fn wait_for_all_active_state_machines(&self) -> anyhow::Result<()> {
1459        loop {
1460            if self.executor.get_active_states().await.is_empty() {
1461                break;
1462            }
1463            sleep(Duration::from_millis(100)).await;
1464        }
1465        Ok(())
1466    }
1467
1468    /// Set the client [`Metadata`]
1469    pub async fn set_metadata_dbtx(dbtx: &mut DatabaseTransaction<'_>, metadata: &Metadata) {
1470        dbtx.insert_new_entry(&ClientMetadataKey, metadata).await;
1471    }
1472
1473    fn spawn_module_recoveries_task(
1474        &self,
1475        recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1476        module_recoveries: BTreeMap<
1477            ModuleInstanceId,
1478            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1479        >,
1480        module_recovery_progress_receivers: BTreeMap<
1481            ModuleInstanceId,
1482            watch::Receiver<RecoveryProgress>,
1483        >,
1484    ) {
1485        let db = self.db.clone();
1486        let log_ordering_wakeup_tx = self.log_ordering_wakeup_tx.clone();
1487        self.task_group
1488            .spawn("module recoveries", |_task_handle| async {
1489                Self::run_module_recoveries_task(
1490                    db,
1491                    log_ordering_wakeup_tx,
1492                    recovery_sender,
1493                    module_recoveries,
1494                    module_recovery_progress_receivers,
1495                )
1496                .await;
1497            });
1498    }
1499
1500    async fn run_module_recoveries_task(
1501        db: Database,
1502        log_ordering_wakeup_tx: watch::Sender<()>,
1503        recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1504        module_recoveries: BTreeMap<
1505            ModuleInstanceId,
1506            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1507        >,
1508        module_recovery_progress_receivers: BTreeMap<
1509            ModuleInstanceId,
1510            watch::Receiver<RecoveryProgress>,
1511        >,
1512    ) {
1513        debug!(target: LOG_CLIENT_RECOVERY, num_modules=%module_recovery_progress_receivers.len(), "Staring module recoveries");
1514        let mut completed_stream = Vec::new();
1515        let progress_stream = futures::stream::FuturesUnordered::new();
1516
1517        for (module_instance_id, f) in module_recoveries {
1518            completed_stream.push(futures::stream::once(Box::pin(async move {
1519                match f.await {
1520                    Ok(()) => (module_instance_id, None),
1521                    Err(err) => {
1522                        warn!(
1523                            target: LOG_CLIENT,
1524                            err = %err.fmt_compact_anyhow(), module_instance_id, "Module recovery failed"
1525                        );
1526                        // a module recovery that failed reports and error and
1527                        // just never finishes, so we don't need a separate state
1528                        // for it
1529                        futures::future::pending::<()>().await;
1530                        unreachable!()
1531                    }
1532                }
1533            })));
1534        }
1535
1536        for (module_instance_id, rx) in module_recovery_progress_receivers {
1537            progress_stream.push(
1538                tokio_stream::wrappers::WatchStream::new(rx)
1539                    .fuse()
1540                    .map(move |progress| (module_instance_id, Some(progress))),
1541            );
1542        }
1543
1544        let mut futures = futures::stream::select(
1545            futures::stream::select_all(progress_stream),
1546            futures::stream::select_all(completed_stream),
1547        );
1548
1549        while let Some((module_instance_id, progress)) = futures.next().await {
1550            let mut dbtx = db.begin_transaction().await;
1551
1552            let prev_progress = *recovery_sender
1553                .borrow()
1554                .get(&module_instance_id)
1555                .expect("existing progress must be present");
1556
1557            let progress = if prev_progress.is_done() {
1558                // since updates might be out of order, once done, stick with it
1559                prev_progress
1560            } else if let Some(progress) = progress {
1561                progress
1562            } else {
1563                prev_progress.to_complete()
1564            };
1565
1566            if !prev_progress.is_done() && progress.is_done() {
1567                info!(
1568                    target: LOG_CLIENT,
1569                    module_instance_id,
1570                    progress = format!("{}/{}", progress.complete, progress.total),
1571                    "Recovery complete"
1572                );
1573                dbtx.log_event(
1574                    log_ordering_wakeup_tx.clone(),
1575                    None,
1576                    ModuleRecoveryCompleted {
1577                        module_id: module_instance_id,
1578                    },
1579                )
1580                .await;
1581            } else {
1582                info!(
1583                    target: LOG_CLIENT,
1584                    module_instance_id,
1585                    progress = format!("{}/{}", progress.complete, progress.total),
1586                    "Recovery progress"
1587                );
1588            }
1589
1590            dbtx.insert_entry(
1591                &ClientModuleRecovery { module_instance_id },
1592                &ClientModuleRecoveryState { progress },
1593            )
1594            .await;
1595            dbtx.commit_tx().await;
1596
1597            recovery_sender.send_modify(|v| {
1598                v.insert(module_instance_id, progress);
1599            });
1600        }
1601        debug!(target: LOG_CLIENT_RECOVERY, "Recovery executor stopped");
1602    }
1603
1604    async fn load_peers_last_api_versions(
1605        db: &Database,
1606        num_peers: NumPeers,
1607    ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1608        let mut peer_api_version_sets = BTreeMap::new();
1609
1610        let mut dbtx = db.begin_transaction_nc().await;
1611        for peer_id in num_peers.peer_ids() {
1612            if let Some(v) = dbtx
1613                .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1614                .await
1615            {
1616                peer_api_version_sets.insert(peer_id, v.0);
1617            }
1618        }
1619        drop(dbtx);
1620        peer_api_version_sets
1621    }
1622
1623    /// You likely want to use [`Client::get_peer_urls`]. This function returns
1624    /// only the announcements and doesn't use the config as fallback.
1625    pub async fn get_peer_url_announcements(&self) -> BTreeMap<PeerId, SignedApiAnnouncement> {
1626        self.db()
1627            .begin_transaction_nc()
1628            .await
1629            .find_by_prefix(&ApiAnnouncementPrefix)
1630            .await
1631            .map(|(announcement_key, announcement)| (announcement_key.0, announcement))
1632            .collect()
1633            .await
1634    }
1635
1636    /// Returns guardian metadata stored in the client database
1637    pub async fn get_guardian_metadata(
1638        &self,
1639    ) -> BTreeMap<PeerId, fedimint_core::net::guardian_metadata::SignedGuardianMetadata> {
1640        self.db()
1641            .begin_transaction_nc()
1642            .await
1643            .find_by_prefix(&crate::guardian_metadata::GuardianMetadataPrefix)
1644            .await
1645            .map(|(key, metadata)| (key.0, metadata))
1646            .collect()
1647            .await
1648    }
1649
1650    /// Returns a list of guardian API URLs
1651    pub async fn get_peer_urls(&self) -> BTreeMap<PeerId, SafeUrl> {
1652        get_api_urls(&self.db, &self.config().await).await
1653    }
1654
1655    /// Create an invite code with the api endpoint of the given peer which can
1656    /// be used to download this client config
1657    pub async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
1658        self.get_peer_urls()
1659            .await
1660            .into_iter()
1661            .find_map(|(peer_id, url)| (peer == peer_id).then_some(url))
1662            .map(|peer_url| {
1663                InviteCode::new(
1664                    peer_url.clone(),
1665                    peer,
1666                    self.federation_id(),
1667                    self.api_secret.clone(),
1668                )
1669            })
1670    }
1671
1672    /// Blocks till the client has synced the guardian public key set
1673    /// (introduced in version 0.4) and returns it. Once it has been fetched
1674    /// once this function is guaranteed to return immediately.
1675    pub async fn get_guardian_public_keys_blocking(
1676        &self,
1677    ) -> BTreeMap<PeerId, fedimint_core::secp256k1::PublicKey> {
1678        self.db
1679            .autocommit(
1680                |dbtx, _| {
1681                    Box::pin(async move {
1682                        let config = self.config().await;
1683
1684                        let guardian_pub_keys = self
1685                            .get_or_backfill_broadcast_public_keys(dbtx, config)
1686                            .await;
1687
1688                        Result::<_, ()>::Ok(guardian_pub_keys)
1689                    })
1690                },
1691                None,
1692            )
1693            .await
1694            .expect("Will retry forever")
1695    }
1696
1697    async fn get_or_backfill_broadcast_public_keys(
1698        &self,
1699        dbtx: &mut DatabaseTransaction<'_>,
1700        config: ClientConfig,
1701    ) -> BTreeMap<PeerId, PublicKey> {
1702        match config.global.broadcast_public_keys {
1703            Some(guardian_pub_keys) => guardian_pub_keys,
1704            _ => {
1705                let (guardian_pub_keys, new_config) = self.fetch_and_update_config(config).await;
1706
1707                dbtx.insert_entry(&ClientConfigKey, &new_config).await;
1708                *(self.config.write().await) = new_config;
1709                guardian_pub_keys
1710            }
1711        }
1712    }
1713
1714    async fn fetch_session_count(&self) -> FederationResult<u64> {
1715        self.api.session_count().await
1716    }
1717
1718    async fn fetch_and_update_config(
1719        &self,
1720        config: ClientConfig,
1721    ) -> (BTreeMap<PeerId, PublicKey>, ClientConfig) {
1722        let fetched_config = retry(
1723            "Fetching guardian public keys",
1724            backoff_util::background_backoff(),
1725            || async {
1726                Ok(self
1727                    .api
1728                    .request_current_consensus::<ClientConfig>(
1729                        CLIENT_CONFIG_ENDPOINT.to_owned(),
1730                        ApiRequestErased::default(),
1731                    )
1732                    .await?)
1733            },
1734        )
1735        .await
1736        .expect("Will never return on error");
1737
1738        let Some(guardian_pub_keys) = fetched_config.global.broadcast_public_keys else {
1739            warn!(
1740                target: LOG_CLIENT,
1741                "Guardian public keys not found in fetched config, server not updated to 0.4 yet"
1742            );
1743            pending::<()>().await;
1744            unreachable!("Pending will never return");
1745        };
1746
1747        let new_config = ClientConfig {
1748            global: GlobalClientConfig {
1749                broadcast_public_keys: Some(guardian_pub_keys.clone()),
1750                ..config.global
1751            },
1752            modules: config.modules,
1753        };
1754        (guardian_pub_keys, new_config)
1755    }
1756
1757    pub fn handle_global_rpc(
1758        &self,
1759        method: String,
1760        params: serde_json::Value,
1761    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1762        Box::pin(try_stream! {
1763            match method.as_str() {
1764                "get_balance" => {
1765                    let balance = self.get_balance_for_btc().await.unwrap_or_default();
1766                    yield serde_json::to_value(balance)?;
1767                }
1768                "subscribe_balance_changes" => {
1769                    let req: GetBalanceChangesRequest= serde_json::from_value(params)?;
1770                    let mut stream = self.subscribe_balance_changes(req.unit).await;
1771                    while let Some(balance) = stream.next().await {
1772                        yield serde_json::to_value(balance)?;
1773                    }
1774                }
1775                "get_config" => {
1776                    let config = self.config().await;
1777                    yield serde_json::to_value(config)?;
1778                }
1779                "get_federation_id" => {
1780                    let federation_id = self.federation_id();
1781                    yield serde_json::to_value(federation_id)?;
1782                }
1783                "get_invite_code" => {
1784                    let req: GetInviteCodeRequest = serde_json::from_value(params)?;
1785                    let invite_code = self.invite_code(req.peer).await;
1786                    yield serde_json::to_value(invite_code)?;
1787                }
1788                "get_operation" => {
1789                    let req: GetOperationIdRequest = serde_json::from_value(params)?;
1790                    let operation = self.operation_log().get_operation(req.operation_id).await;
1791                    yield serde_json::to_value(operation)?;
1792                }
1793                "list_operations" => {
1794                    let req: ListOperationsParams = serde_json::from_value(params)?;
1795                    let limit = if req.limit.is_none() && req.last_seen.is_none() {
1796                        usize::MAX
1797                    } else {
1798                        req.limit.unwrap_or(usize::MAX)
1799                    };
1800                    let operations = self.operation_log()
1801                        .paginate_operations_rev(limit, req.last_seen)
1802                        .await;
1803                    yield serde_json::to_value(operations)?;
1804                }
1805                "session_count" => {
1806                    let count = self.fetch_session_count().await?;
1807                    yield serde_json::to_value(count)?;
1808                }
1809                "has_pending_recoveries" => {
1810                    let has_pending = self.has_pending_recoveries();
1811                    yield serde_json::to_value(has_pending)?;
1812                }
1813                "wait_for_all_recoveries" => {
1814                    self.wait_for_all_recoveries().await?;
1815                    yield serde_json::Value::Null;
1816                }
1817                "subscribe_to_recovery_progress" => {
1818                    let mut stream = self.subscribe_to_recovery_progress();
1819                    while let Some((module_id, progress)) = stream.next().await {
1820                        yield serde_json::json!({
1821                            "module_id": module_id,
1822                            "progress": progress
1823                        });
1824                    }
1825                }
1826                #[allow(deprecated)]
1827                "backup_to_federation" => {
1828                    let metadata = if params.is_null() {
1829                        Metadata::from_json_serialized(serde_json::json!({}))
1830                    } else {
1831                        Metadata::from_json_serialized(params)
1832                    };
1833                    self.backup_to_federation(metadata).await?;
1834                    yield serde_json::Value::Null;
1835                }
1836                _ => {
1837                    Err(anyhow::format_err!("Unknown method: {}", method))?;
1838                    unreachable!()
1839                },
1840            }
1841        })
1842    }
1843
1844    pub async fn log_event<E>(&self, module_id: Option<ModuleInstanceId>, event: E)
1845    where
1846        E: Event + Send,
1847    {
1848        let mut dbtx = self.db.begin_transaction().await;
1849        self.log_event_dbtx(&mut dbtx, module_id, event).await;
1850        dbtx.commit_tx().await;
1851    }
1852
1853    pub async fn log_event_dbtx<E, Cap>(
1854        &self,
1855        dbtx: &mut DatabaseTransaction<'_, Cap>,
1856        module_id: Option<ModuleInstanceId>,
1857        event: E,
1858    ) where
1859        E: Event + Send,
1860        Cap: Send,
1861    {
1862        dbtx.log_event(self.log_ordering_wakeup_tx.clone(), module_id, event)
1863            .await;
1864    }
1865
1866    pub async fn log_event_raw_dbtx<Cap>(
1867        &self,
1868        dbtx: &mut DatabaseTransaction<'_, Cap>,
1869        kind: EventKind,
1870        module: Option<(ModuleKind, ModuleInstanceId)>,
1871        payload: Vec<u8>,
1872        persist: EventPersistence,
1873    ) where
1874        Cap: Send,
1875    {
1876        let module_id = module.as_ref().map(|m| m.1);
1877        let module_kind = module.map(|m| m.0);
1878        dbtx.log_event_raw(
1879            self.log_ordering_wakeup_tx.clone(),
1880            kind,
1881            module_kind,
1882            module_id,
1883            payload,
1884            persist,
1885        )
1886        .await;
1887    }
1888
1889    /// Built in event log (trimmable) tracker
1890    ///
1891    /// For the convenience of downstream applications, [`Client`] can store
1892    /// internally event log position for the main application using/driving it.
1893    ///
1894    /// Note that this position is a singleton, so this tracker should not be
1895    /// used for multiple purposes or applications, etc. at the same time.
1896    ///
1897    /// If the application has a need to follow log using multiple trackers, it
1898    /// should implement own [`DynEventLogTrimableTracker`] and store its
1899    /// persient data by itself.
1900    pub fn built_in_application_event_log_tracker(&self) -> DynEventLogTrimableTracker {
1901        struct BuiltInApplicationEventLogTracker;
1902
1903        #[apply(async_trait_maybe_send!)]
1904        impl EventLogTrimableTracker for BuiltInApplicationEventLogTracker {
1905            // Store position in the event log
1906            async fn store(
1907                &mut self,
1908                dbtx: &mut DatabaseTransaction<NonCommittable>,
1909                pos: EventLogTrimableId,
1910            ) -> anyhow::Result<()> {
1911                dbtx.insert_entry(&DefaultApplicationEventLogKey, &pos)
1912                    .await;
1913                Ok(())
1914            }
1915
1916            /// Load the last previous stored position (or None if never stored)
1917            async fn load(
1918                &mut self,
1919                dbtx: &mut DatabaseTransaction<NonCommittable>,
1920            ) -> anyhow::Result<Option<EventLogTrimableId>> {
1921                Ok(dbtx.get_value(&DefaultApplicationEventLogKey).await)
1922            }
1923        }
1924        Box::new(BuiltInApplicationEventLogTracker)
1925    }
1926
1927    /// Like [`Self::handle_events`] but for historical data.
1928    ///
1929    ///
1930    /// This function can be used to process subset of events
1931    /// that is infrequent and important enough to be persisted
1932    /// forever. Most applications should prefer to use [`Self::handle_events`]
1933    /// which emits *all* events.
1934    pub async fn handle_historical_events<F, R>(
1935        &self,
1936        tracker: fedimint_eventlog::DynEventLogTracker,
1937        handler_fn: F,
1938    ) -> anyhow::Result<()>
1939    where
1940        F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
1941        R: Future<Output = anyhow::Result<()>>,
1942    {
1943        fedimint_eventlog::handle_events(
1944            self.db.clone(),
1945            tracker,
1946            self.log_event_added_rx.clone(),
1947            handler_fn,
1948        )
1949        .await
1950    }
1951
1952    /// Handle events emitted by the client
1953    ///
1954    /// This is a preferred method for reactive & asynchronous
1955    /// processing of events emitted by the client.
1956    ///
1957    /// It needs a `tracker` that will persist the position in the log
1958    /// as it is being handled. You can use the
1959    /// [`Client::built_in_application_event_log_tracker`] if this call is
1960    /// used for the single main application handling this instance of the
1961    /// [`Client`]. Otherwise you should implement your own tracker.
1962    ///
1963    /// This handler will call `handle_fn` with ever event emitted by
1964    /// [`Client`], including transient ones. The caller should atomically
1965    /// handle each event it is interested in and ignore other ones.
1966    ///
1967    /// This method returns only when client is shutting down or on internal
1968    /// error, so typically should be called in a background task dedicated
1969    /// to handling events.
1970    pub async fn handle_events<F, R>(
1971        &self,
1972        tracker: fedimint_eventlog::DynEventLogTrimableTracker,
1973        handler_fn: F,
1974    ) -> anyhow::Result<()>
1975    where
1976        F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
1977        R: Future<Output = anyhow::Result<()>>,
1978    {
1979        fedimint_eventlog::handle_trimable_events(
1980            self.db.clone(),
1981            tracker,
1982            self.log_event_added_rx.clone(),
1983            handler_fn,
1984        )
1985        .await
1986    }
1987
1988    pub async fn get_event_log(
1989        &self,
1990        pos: Option<EventLogId>,
1991        limit: u64,
1992    ) -> Vec<PersistedLogEntry> {
1993        self.get_event_log_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
1994            .await
1995    }
1996
1997    pub async fn get_event_log_trimable(
1998        &self,
1999        pos: Option<EventLogTrimableId>,
2000        limit: u64,
2001    ) -> Vec<PersistedLogEntry> {
2002        self.get_event_log_trimable_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
2003            .await
2004    }
2005
2006    pub async fn get_event_log_dbtx<Cap>(
2007        &self,
2008        dbtx: &mut DatabaseTransaction<'_, Cap>,
2009        pos: Option<EventLogId>,
2010        limit: u64,
2011    ) -> Vec<PersistedLogEntry>
2012    where
2013        Cap: Send,
2014    {
2015        dbtx.get_event_log(pos, limit).await
2016    }
2017
2018    pub async fn get_event_log_trimable_dbtx<Cap>(
2019        &self,
2020        dbtx: &mut DatabaseTransaction<'_, Cap>,
2021        pos: Option<EventLogTrimableId>,
2022        limit: u64,
2023    ) -> Vec<PersistedLogEntry>
2024    where
2025        Cap: Send,
2026    {
2027        dbtx.get_event_log_trimable(pos, limit).await
2028    }
2029
2030    /// Register to receiver all new transient (unpersisted) events
2031    pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
2032        self.log_event_added_transient_tx.subscribe()
2033    }
2034
2035    /// Get a receiver that signals when new events are added to the event log
2036    pub fn log_event_added_rx(&self) -> watch::Receiver<()> {
2037        self.log_event_added_rx.clone()
2038    }
2039
2040    pub fn iroh_enable_dht(&self) -> bool {
2041        self.iroh_enable_dht
2042    }
2043
2044    pub(crate) async fn run_core_migrations(
2045        db_no_decoders: &Database,
2046    ) -> Result<(), anyhow::Error> {
2047        let mut dbtx = db_no_decoders.begin_transaction().await;
2048        apply_migrations_core_client_dbtx(&mut dbtx.to_ref_nc(), "fedimint-client".to_string())
2049            .await?;
2050        if is_running_in_test_env() {
2051            verify_client_db_integrity_dbtx(&mut dbtx.to_ref_nc()).await;
2052        }
2053        dbtx.commit_tx_result().await?;
2054        Ok(())
2055    }
2056
2057    /// Iterator over primary modules for a given `unit`
2058    fn primary_modules_for_unit(
2059        &self,
2060        unit: AmountUnit,
2061    ) -> impl Iterator<Item = (ModuleInstanceId, &DynClientModule)> {
2062        self.primary_modules
2063            .iter()
2064            .flat_map(move |(_prio, candidates)| {
2065                candidates
2066                    .specific
2067                    .get(&unit)
2068                    .into_iter()
2069                    .flatten()
2070                    .copied()
2071                    // within same priority, wildcard matches come last
2072                    .chain(candidates.wildcard.iter().copied())
2073            })
2074            .map(|id| (id, self.modules.get_expect(id)))
2075    }
2076
2077    /// Primary module to use for `unit`
2078    ///
2079    /// Currently, just pick the first (highest priority) match
2080    pub fn primary_module_for_unit(
2081        &self,
2082        unit: AmountUnit,
2083    ) -> Option<(ModuleInstanceId, &DynClientModule)> {
2084        self.primary_modules_for_unit(unit).next()
2085    }
2086
2087    /// [`Self::primary_module_for_unit`] for Bitcoin
2088    pub fn primary_module_for_btc(&self) -> (ModuleInstanceId, &DynClientModule) {
2089        self.primary_module_for_unit(AmountUnit::BITCOIN)
2090            .expect("No primary module for Bitcoin")
2091    }
2092}
2093
2094#[apply(async_trait_maybe_send!)]
2095impl ClientContextIface for Client {
2096    fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
2097        Client::get_module(self, instance)
2098    }
2099
2100    fn api_clone(&self) -> DynGlobalApi {
2101        Client::api_clone(self)
2102    }
2103    fn decoders(&self) -> &ModuleDecoderRegistry {
2104        Client::decoders(self)
2105    }
2106
2107    async fn finalize_and_submit_transaction(
2108        &self,
2109        operation_id: OperationId,
2110        operation_type: &str,
2111        operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
2112        tx_builder: TransactionBuilder,
2113    ) -> anyhow::Result<OutPointRange> {
2114        Client::finalize_and_submit_transaction(
2115            self,
2116            operation_id,
2117            operation_type,
2118            // |out_point_range| operation_meta_gen(out_point_range),
2119            &operation_meta_gen,
2120            tx_builder,
2121        )
2122        .await
2123    }
2124
2125    async fn finalize_and_submit_transaction_inner(
2126        &self,
2127        dbtx: &mut DatabaseTransaction<'_>,
2128        operation_id: OperationId,
2129        tx_builder: TransactionBuilder,
2130    ) -> anyhow::Result<OutPointRange> {
2131        Client::finalize_and_submit_transaction_inner(self, dbtx, operation_id, tx_builder).await
2132    }
2133
2134    async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
2135        Client::transaction_updates(self, operation_id).await
2136    }
2137
2138    async fn await_primary_module_outputs(
2139        &self,
2140        operation_id: OperationId,
2141        // TODO: make `impl Iterator<Item = ...>`
2142        outputs: Vec<OutPoint>,
2143    ) -> anyhow::Result<()> {
2144        Client::await_primary_bitcoin_module_outputs(self, operation_id, outputs).await
2145    }
2146
2147    fn operation_log(&self) -> &dyn IOperationLog {
2148        Client::operation_log(self)
2149    }
2150
2151    async fn has_active_states(&self, operation_id: OperationId) -> bool {
2152        Client::has_active_states(self, operation_id).await
2153    }
2154
2155    async fn operation_exists(&self, operation_id: OperationId) -> bool {
2156        Client::operation_exists(self, operation_id).await
2157    }
2158
2159    async fn config(&self) -> ClientConfig {
2160        Client::config(self).await
2161    }
2162
2163    fn db(&self) -> &Database {
2164        Client::db(self)
2165    }
2166
2167    fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static)) {
2168        Client::executor(self)
2169    }
2170
2171    async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
2172        Client::invite_code(self, peer).await
2173    }
2174
2175    fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
2176        Client::get_internal_payment_markers(self)
2177    }
2178
2179    async fn log_event_json(
2180        &self,
2181        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
2182        module_kind: Option<ModuleKind>,
2183        module_id: ModuleInstanceId,
2184        kind: EventKind,
2185        payload: serde_json::Value,
2186        persist: EventPersistence,
2187    ) {
2188        dbtx.ensure_global()
2189            .expect("Must be called with global dbtx");
2190        self.log_event_raw_dbtx(
2191            dbtx,
2192            kind,
2193            module_kind.map(|kind| (kind, module_id)),
2194            serde_json::to_vec(&payload).expect("Serialization can't fail"),
2195            persist,
2196        )
2197        .await;
2198    }
2199
2200    async fn read_operation_active_states<'dbtx>(
2201        &self,
2202        operation_id: OperationId,
2203        module_id: ModuleInstanceId,
2204        dbtx: &'dbtx mut DatabaseTransaction<'_>,
2205    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>
2206    {
2207        Box::pin(
2208            dbtx.find_by_prefix(&ActiveModuleOperationStateKeyPrefix {
2209                operation_id,
2210                module_instance: module_id,
2211            })
2212            .await
2213            .map(move |(k, v)| (k.0, v)),
2214        )
2215    }
2216    async fn read_operation_inactive_states<'dbtx>(
2217        &self,
2218        operation_id: OperationId,
2219        module_id: ModuleInstanceId,
2220        dbtx: &'dbtx mut DatabaseTransaction<'_>,
2221    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>
2222    {
2223        Box::pin(
2224            dbtx.find_by_prefix(&InactiveModuleOperationStateKeyPrefix {
2225                operation_id,
2226                module_instance: module_id,
2227            })
2228            .await
2229            .map(move |(k, v)| (k.0, v)),
2230        )
2231    }
2232}
2233
2234// TODO: impl `Debug` for `Client` and derive here
2235impl fmt::Debug for Client {
2236    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2237        write!(f, "Client")
2238    }
2239}
2240
2241pub fn client_decoders<'a>(
2242    registry: &ModuleInitRegistry<DynClientModuleInit>,
2243    module_kinds: impl Iterator<Item = (ModuleInstanceId, &'a ModuleKind)>,
2244) -> ModuleDecoderRegistry {
2245    let mut modules = BTreeMap::new();
2246    for (id, kind) in module_kinds {
2247        let Some(init) = registry.get(kind) else {
2248            debug!("Detected configuration for unsupported module id: {id}, kind: {kind}");
2249            continue;
2250        };
2251
2252        modules.insert(
2253            id,
2254            (
2255                kind.clone(),
2256                IClientModuleInit::decoder(AsRef::<dyn IClientModuleInit + 'static>::as_ref(init)),
2257            ),
2258        );
2259    }
2260    ModuleDecoderRegistry::from(modules)
2261}