Skip to main content

fedimint_client/
client.rs

1use std::collections::{BTreeMap, HashSet};
2use std::fmt::{self, Formatter};
3use std::future::{Future, pending};
4use std::ops::Range;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9use anyhow::{Context as _, anyhow, bail, format_err};
10use async_stream::try_stream;
11use bitcoin::key::Secp256k1;
12use bitcoin::key::rand::thread_rng;
13use bitcoin::secp256k1::{self, PublicKey};
14use fedimint_api_client::api::global_api::with_request_hook::ApiRequestHook;
15use fedimint_api_client::api::{
16    ApiVersionSet, DynGlobalApi, FederationApiExt as _, FederationResult, IGlobalFederationApi,
17};
18use fedimint_bitcoind::DynBitcoindRpc;
19use fedimint_client_module::module::recovery::RecoveryProgress;
20use fedimint_client_module::module::{
21    ClientContextIface, ClientModule, ClientModuleRegistry, DynClientModule, FinalClientIface,
22    IClientModule, IdxRange, OutPointRange, PrimaryModulePriority,
23};
24use fedimint_client_module::oplog::IOperationLog;
25use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy as _};
26use fedimint_client_module::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
27use fedimint_client_module::sm::{ActiveStateMeta, DynState, InactiveStateMeta};
28use fedimint_client_module::transaction::{
29    TRANSACTION_SUBMISSION_MODULE_INSTANCE, TransactionBuilder, TxSubmissionStates,
30    TxSubmissionStatesSM,
31};
32use fedimint_client_module::{
33    AddStateMachinesResult, ClientModuleInstance, GetInviteCodeRequest, ModuleGlobalContextGen,
34    ModuleRecoveryCompleted, TransactionUpdates, TxCreatedEvent,
35};
36use fedimint_connectors::ConnectorRegistry;
37use fedimint_core::config::{
38    ClientConfig, FederationId, GlobalClientConfig, JsonClientConfig, ModuleInitRegistry,
39};
40use fedimint_core::core::{DynInput, DynOutput, ModuleInstanceId, ModuleKind, OperationId};
41use fedimint_core::db::{
42    AutocommitError, Database, DatabaseRecord, DatabaseTransaction,
43    IDatabaseTransactionOpsCore as _, IDatabaseTransactionOpsCoreTyped as _, NonCommittable,
44};
45use fedimint_core::encoding::{Decodable, Encodable};
46use fedimint_core::endpoint_constants::{CLIENT_CONFIG_ENDPOINT, VERSION_ENDPOINT};
47use fedimint_core::envs::is_running_in_test_env;
48use fedimint_core::invite_code::InviteCode;
49use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
50use fedimint_core::module::{
51    AmountUnit, Amounts, ApiRequestErased, ApiVersion, MultiApiVersion,
52    SupportedApiVersionsSummary, SupportedCoreApiVersions, SupportedModuleApiVersions,
53};
54use fedimint_core::net::api_announcement::SignedApiAnnouncement;
55use fedimint_core::runtime::sleep;
56use fedimint_core::task::{Elapsed, MaybeSend, MaybeSync, TaskGroup};
57use fedimint_core::transaction::Transaction;
58use fedimint_core::util::backoff_util::custom_backoff;
59use fedimint_core::util::{
60    BoxStream, FmtCompact as _, FmtCompactAnyhow as _, SafeUrl, backoff_util, retry,
61};
62use fedimint_core::{
63    Amount, ChainId, NumPeers, OutPoint, PeerId, apply, async_trait_maybe_send, maybe_add_send,
64    maybe_add_send_sync, runtime,
65};
66use fedimint_derive_secret::DerivableSecret;
67use fedimint_eventlog::{
68    DBTransactionEventLogExt as _, DynEventLogTrimableTracker, Event, EventKind, EventLogEntry,
69    EventLogId, EventLogTrimableId, EventLogTrimableTracker, EventPersistence, PersistedLogEntry,
70};
71use fedimint_logging::{LOG_CLIENT, LOG_CLIENT_NET_API, LOG_CLIENT_RECOVERY};
72use futures::stream::FuturesUnordered;
73use futures::{Stream, StreamExt as _};
74use global_ctx::ModuleGlobalClientContext;
75use serde::{Deserialize, Serialize};
76use tokio::sync::{broadcast, watch};
77use tokio_stream::wrappers::WatchStream;
78use tracing::{debug, info, warn};
79
80use crate::ClientBuilder;
81use crate::api_announcements::{ApiAnnouncementPrefix, get_api_urls};
82use crate::backup::Metadata;
83use crate::client::event_log::DefaultApplicationEventLogKey;
84use crate::db::{
85    ApiSecretKey, CachedApiVersionSet, CachedApiVersionSetKey, ChainIdKey,
86    ChronologicalOperationLogKey, ClientConfigKey, ClientMetadataKey, ClientModuleRecovery,
87    ClientModuleRecoveryState, EncodedClientSecretKey, OperationLogKey, PeerLastApiVersionsSummary,
88    PeerLastApiVersionsSummaryKey, PendingClientConfigKey, apply_migrations_core_client_dbtx,
89    get_decoded_client_secret, verify_client_db_integrity_dbtx,
90};
91use crate::meta::MetaService;
92use crate::module_init::{ClientModuleInitRegistry, DynClientModuleInit, IClientModuleInit};
93use crate::oplog::OperationLog;
94use crate::sm::executor::{
95    ActiveModuleOperationStateKeyPrefix, ActiveOperationStateKeyPrefix, Executor,
96    InactiveModuleOperationStateKeyPrefix, InactiveOperationStateKeyPrefix,
97};
98
99pub(crate) mod builder;
100pub(crate) mod event_log;
101pub(crate) mod global_ctx;
102pub(crate) mod handle;
103
104/// List of core api versions supported by the implementation.
105/// Notably `major` version is the one being supported, and corresponding
106/// `minor` version is the one required (for given `major` version).
107const SUPPORTED_CORE_API_VERSIONS: &[fedimint_core::module::ApiVersion] =
108    &[ApiVersion { major: 0, minor: 0 }];
109
110/// Primary module candidates at specific priority level
111#[derive(Default)]
112pub(crate) struct PrimaryModuleCandidates {
113    /// Modules that listed specific units they handle
114    specific: BTreeMap<AmountUnit, Vec<ModuleInstanceId>>,
115    /// Modules handling any unit
116    wildcard: Vec<ModuleInstanceId>,
117}
118
119/// Main client type
120///
121/// A handle and API to interacting with a single federation. End user
122/// applications that want to support interacting with multiple federations at
123/// the same time, will need to instantiate and manage multiple instances of
124/// this struct.
125///
126/// Under the hood it is starting and managing service tasks, state machines,
127/// database and other resources required.
128///
129/// This type is shared externally and internally, and
130/// [`crate::ClientHandle`] is responsible for external lifecycle management
131/// and resource freeing of the [`Client`].
132pub struct Client {
133    final_client: FinalClientIface,
134    config: tokio::sync::RwLock<ClientConfig>,
135    api_secret: Option<String>,
136    decoders: ModuleDecoderRegistry,
137    connectors: ConnectorRegistry,
138    db: Database,
139    federation_id: FederationId,
140    federation_config_meta: BTreeMap<String, String>,
141    primary_modules: BTreeMap<PrimaryModulePriority, PrimaryModuleCandidates>,
142    pub(crate) modules: ClientModuleRegistry,
143    module_inits: ClientModuleInitRegistry,
144    executor: Executor,
145    pub(crate) api: DynGlobalApi,
146    root_secret: DerivableSecret,
147    operation_log: OperationLog,
148    secp_ctx: Secp256k1<secp256k1::All>,
149    meta_service: Arc<MetaService>,
150
151    task_group: TaskGroup,
152
153    /// Updates about client recovery progress
154    client_recovery_progress_receiver:
155        watch::Receiver<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
156
157    /// Internal client sender to wake up log ordering task every time a
158    /// (unuordered) log event is added.
159    log_ordering_wakeup_tx: watch::Sender<()>,
160    /// Receiver for events fired every time (ordered) log event is added.
161    log_event_added_rx: watch::Receiver<()>,
162    log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
163    request_hook: ApiRequestHook,
164    iroh_enable_dht: bool,
165    iroh_enable_next: bool,
166    /// User-provided Bitcoin RPC client for modules to use
167    ///
168    /// Stored here for potential future access; currently passed to modules
169    /// during initialization.
170    #[allow(dead_code)]
171    user_bitcoind_rpc: Option<DynBitcoindRpc>,
172    /// User-provided Bitcoin RPC factory for when ChainId is not available
173    ///
174    /// This is used as a fallback when the federation doesn't support ChainId.
175    /// Modules can call this with a URL from their config to get an RPC client.
176    pub(crate) user_bitcoind_rpc_no_chain_id:
177        Option<fedimint_client_module::module::init::BitcoindRpcNoChainIdFactory>,
178}
179
180#[derive(Debug, Serialize, Deserialize)]
181struct ListOperationsParams {
182    limit: Option<usize>,
183    last_seen: Option<ChronologicalOperationLogKey>,
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize)]
187pub struct GetOperationIdRequest {
188    operation_id: OperationId,
189}
190
191#[derive(Debug, Clone, Serialize, Deserialize)]
192pub struct GetBalanceChangesRequest {
193    #[serde(default = "AmountUnit::bitcoin")]
194    unit: AmountUnit,
195}
196
197impl Client {
198    /// Initialize a client builder that can be configured to create a new
199    /// client.
200    pub async fn builder() -> anyhow::Result<ClientBuilder> {
201        Ok(ClientBuilder::new())
202    }
203
204    pub fn api(&self) -> &(dyn IGlobalFederationApi + 'static) {
205        self.api.as_ref()
206    }
207
208    pub fn api_clone(&self) -> DynGlobalApi {
209        self.api.clone()
210    }
211
212    /// Returns a stream that emits the current connection status of all peers
213    /// whenever any peer's status changes. Emits initial state immediately.
214    pub fn connection_status_stream(&self) -> impl Stream<Item = BTreeMap<PeerId, bool>> {
215        self.api.connection_status_stream()
216    }
217
218    /// Establishes connections to all federation guardians once.
219    ///
220    /// Spawns tasks to connect to each guardian in the federation. Unlike
221    /// [`Self::spawn_federation_reconnect`], this only attempts to establish
222    /// connections once and completes - it does not maintain or reconnect.
223    ///
224    /// Useful for warming up connections before making API calls.
225    pub fn federation_reconnect(&self) {
226        let peers: Vec<PeerId> = self.api.all_peers().iter().copied().collect();
227
228        for peer_id in peers {
229            let api = self.api.clone();
230            self.task_group.spawn_cancellable(
231                format!("federation-reconnect-once-{peer_id}"),
232                async move {
233                    if let Err(e) = api.get_peer_connection(peer_id).await {
234                        debug!(
235                            target: LOG_CLIENT_NET_API,
236                            %peer_id,
237                            err = %e.fmt_compact(),
238                            "Failed to connect to peer"
239                        );
240                    }
241                },
242            );
243        }
244    }
245
246    /// Spawns background tasks that proactively maintain connections to all
247    /// federation guardians unconditionally.
248    ///
249    /// For each guardian, a task loops: establishes a connection, waits for it
250    /// to disconnect, then reconnects.
251    ///
252    /// The tasks are cancellable and will be terminated when the client shuts
253    /// down.
254    ///
255    /// By default [`Client`] creates connections on demand only, and share
256    /// them as long as they are alive.
257    ///
258    /// Reconnecting continuously might increase data and battery usage,
259    /// but potentially improve UX, depending on the time it takes to establish
260    /// a new network connection in given network conditions.
261    ///
262    /// Downstream users are encouraged to implement their own version of
263    /// this function, e.g. by reconnecting only when it is anticipated
264    /// that connection might be needed, or alternatively pre-warm
265    /// connections by calling [`Self::federation_reconnect`] when it seems
266    /// worthwhile.
267    pub fn spawn_federation_reconnect(&self) {
268        let peers: Vec<PeerId> = self.api.all_peers().iter().copied().collect();
269
270        for peer_id in peers {
271            let api = self.api.clone();
272            self.task_group.spawn_cancellable(
273                format!("federation-reconnect-{peer_id}"),
274                async move {
275                    loop {
276                        match api.get_peer_connection(peer_id).await {
277                            Ok(conn) => {
278                                conn.await_disconnection().await;
279                            }
280                            Err(e) => {
281                                // Connection failed, backoff is handled inside
282                                // get_or_create_connection
283                                debug!(
284                                    target: LOG_CLIENT_NET_API,
285                                    %peer_id,
286                                    err = %e.fmt_compact(),
287                                    "Failed to connect to peer, will retry"
288                                );
289                            }
290                        }
291                    }
292                },
293            );
294        }
295    }
296
297    /// Get the [`TaskGroup`] that is tied to Client's lifetime.
298    pub fn task_group(&self) -> &TaskGroup {
299        &self.task_group
300    }
301
302    /// Returns all registered Prometheus metrics encoded in text format.
303    ///
304    /// This can be used by downstream clients to expose metrics via their own
305    /// HTTP server or print them for debugging purposes.
306    pub fn get_metrics() -> anyhow::Result<String> {
307        fedimint_metrics::get_metrics()
308    }
309
310    /// Useful for our CLI tooling, not meant for external use
311    #[doc(hidden)]
312    pub fn executor(&self) -> &Executor {
313        &self.executor
314    }
315
316    pub async fn get_config_from_db(db: &Database) -> Option<ClientConfig> {
317        let mut dbtx = db.begin_transaction_nc().await;
318        dbtx.get_value(&ClientConfigKey).await
319    }
320
321    pub async fn get_pending_config_from_db(db: &Database) -> Option<ClientConfig> {
322        let mut dbtx = db.begin_transaction_nc().await;
323        dbtx.get_value(&PendingClientConfigKey).await
324    }
325
326    pub async fn get_api_secret_from_db(db: &Database) -> Option<String> {
327        let mut dbtx = db.begin_transaction_nc().await;
328        dbtx.get_value(&ApiSecretKey).await
329    }
330
331    pub async fn store_encodable_client_secret<T: Encodable>(
332        db: &Database,
333        secret: T,
334    ) -> anyhow::Result<()> {
335        let mut dbtx = db.begin_transaction().await;
336
337        // Don't overwrite an existing secret
338        if dbtx.get_value(&EncodedClientSecretKey).await.is_some() {
339            bail!("Encoded client secret already exists, cannot overwrite")
340        }
341
342        let encoded_secret = T::consensus_encode_to_vec(&secret);
343        dbtx.insert_entry(&EncodedClientSecretKey, &encoded_secret)
344            .await;
345        dbtx.commit_tx().await;
346        Ok(())
347    }
348
349    pub async fn load_decodable_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
350        let Some(secret) = Self::load_decodable_client_secret_opt(db).await? else {
351            bail!("Encoded client secret not present in DB")
352        };
353
354        Ok(secret)
355    }
356    pub async fn load_decodable_client_secret_opt<T: Decodable>(
357        db: &Database,
358    ) -> anyhow::Result<Option<T>> {
359        let mut dbtx = db.begin_transaction_nc().await;
360
361        let client_secret = dbtx.get_value(&EncodedClientSecretKey).await;
362
363        Ok(match client_secret {
364            Some(client_secret) => Some(
365                T::consensus_decode_whole(&client_secret, &ModuleRegistry::default())
366                    .map_err(|e| anyhow!("Decoding failed: {e}"))?,
367            ),
368            None => None,
369        })
370    }
371
372    pub async fn load_or_generate_client_secret(db: &Database) -> anyhow::Result<[u8; 64]> {
373        let client_secret = match Self::load_decodable_client_secret::<[u8; 64]>(db).await {
374            Ok(secret) => secret,
375            _ => {
376                let secret = PlainRootSecretStrategy::random(&mut thread_rng());
377                Self::store_encodable_client_secret(db, secret)
378                    .await
379                    .expect("Storing client secret must work");
380                secret
381            }
382        };
383        Ok(client_secret)
384    }
385
386    pub async fn is_initialized(db: &Database) -> bool {
387        let mut dbtx = db.begin_transaction_nc().await;
388        dbtx.raw_get_bytes(&[ClientConfigKey::DB_PREFIX])
389            .await
390            .expect("Unrecoverable error occurred while reading and entry from the database")
391            .is_some()
392    }
393
394    pub fn start_executor(self: &Arc<Self>) {
395        debug!(
396            target: LOG_CLIENT,
397            "Starting fedimint client executor",
398        );
399        self.executor.start_executor(self.context_gen());
400    }
401
402    pub fn federation_id(&self) -> FederationId {
403        self.federation_id
404    }
405
406    fn context_gen(self: &Arc<Self>) -> ModuleGlobalContextGen {
407        let client_inner = Arc::downgrade(self);
408        Arc::new(move |module_instance, operation| {
409            ModuleGlobalClientContext {
410                client: client_inner
411                    .clone()
412                    .upgrade()
413                    .expect("ModuleGlobalContextGen called after client was dropped"),
414                module_instance_id: module_instance,
415                operation,
416            }
417            .into()
418        })
419    }
420
421    pub async fn config(&self) -> ClientConfig {
422        self.config.read().await.clone()
423    }
424
425    // TODO: change to `-> Option<&str>`
426    pub fn api_secret(&self) -> &Option<String> {
427        &self.api_secret
428    }
429
430    /// Returns the core API version that the federation supports
431    ///
432    /// This reads from the cached version stored during client initialization.
433    /// If no cache is available (e.g., during initial setup), returns a default
434    /// version (0, 0).
435    pub async fn core_api_version(&self) -> ApiVersion {
436        // Try to get from cache. If not available, return a conservative
437        // default. The cache should always be populated after successful client init.
438        self.db
439            .begin_transaction_nc()
440            .await
441            .get_value(&CachedApiVersionSetKey)
442            .await
443            .map(|cached: CachedApiVersionSet| cached.0.core)
444            .unwrap_or(ApiVersion { major: 0, minor: 0 })
445    }
446
447    /// Returns the chain ID (bitcoin block hash at height 1) from the
448    /// federation
449    ///
450    /// This is cached in the database after the first successful fetch.
451    /// The chain ID uniquely identifies which bitcoin network the federation
452    /// operates on (mainnet, testnet, signet, regtest).
453    pub async fn chain_id(&self) -> anyhow::Result<ChainId> {
454        // Check cache first
455        if let Some(chain_id) = self
456            .db
457            .begin_transaction_nc()
458            .await
459            .get_value(&ChainIdKey)
460            .await
461        {
462            return Ok(chain_id);
463        }
464
465        // Fetch from federation with consensus
466        let chain_id = self.api.chain_id().await?;
467
468        // Cache the result
469        let mut dbtx = self.db.begin_transaction().await;
470        dbtx.insert_entry(&ChainIdKey, &chain_id).await;
471        dbtx.commit_tx().await;
472
473        Ok(chain_id)
474    }
475
476    pub fn decoders(&self) -> &ModuleDecoderRegistry {
477        &self.decoders
478    }
479
480    /// Returns a reference to the module, panics if not found
481    fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
482        self.try_get_module(instance)
483            .expect("Module instance not found")
484    }
485
486    fn try_get_module(
487        &self,
488        instance: ModuleInstanceId,
489    ) -> Option<&maybe_add_send_sync!(dyn IClientModule)> {
490        Some(self.modules.get(instance)?.as_ref())
491    }
492
493    pub fn has_module(&self, instance: ModuleInstanceId) -> bool {
494        self.modules.get(instance).is_some()
495    }
496
497    /// Returns the input amount and output amount of a transaction
498    ///
499    /// # Panics
500    /// If any of the input or output versions in the transaction builder are
501    /// unknown by the respective module.
502    fn transaction_builder_get_balance(&self, builder: &TransactionBuilder) -> (Amounts, Amounts) {
503        // FIXME: prevent overflows, currently not suitable for untrusted input
504        let mut in_amounts = Amounts::ZERO;
505        let mut out_amounts = Amounts::ZERO;
506        let mut fee_amounts = Amounts::ZERO;
507
508        for input in builder.inputs() {
509            let module = self.get_module(input.input.module_instance_id());
510
511            let item_fees = module.input_fee(&input.amounts, &input.input).expect(
512                "We only build transactions with input versions that are supported by the module",
513            );
514
515            in_amounts.checked_add_mut(&input.amounts);
516            fee_amounts.checked_add_mut(&item_fees);
517        }
518
519        for output in builder.outputs() {
520            let module = self.get_module(output.output.module_instance_id());
521
522            let item_fees = module.output_fee(&output.amounts, &output.output).expect(
523                "We only build transactions with output versions that are supported by the module",
524            );
525
526            out_amounts.checked_add_mut(&output.amounts);
527            fee_amounts.checked_add_mut(&item_fees);
528        }
529
530        out_amounts.checked_add_mut(&fee_amounts);
531        (in_amounts, out_amounts)
532    }
533
534    pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
535        Ok((self.federation_id().to_fake_ln_pub_key(&self.secp_ctx)?, 0))
536    }
537
538    /// Get metadata value from the federation config itself
539    pub fn get_config_meta(&self, key: &str) -> Option<String> {
540        self.federation_config_meta.get(key).cloned()
541    }
542
543    pub(crate) fn root_secret(&self) -> DerivableSecret {
544        self.root_secret.clone()
545    }
546
547    pub async fn add_state_machines(
548        &self,
549        dbtx: &mut DatabaseTransaction<'_>,
550        states: Vec<DynState>,
551    ) -> AddStateMachinesResult {
552        self.executor.add_state_machines_dbtx(dbtx, states).await
553    }
554
555    // TODO: implement as part of [`OperationLog`]
556    pub async fn get_active_operations(&self) -> HashSet<OperationId> {
557        let active_states = self.executor.get_active_states().await;
558        let mut active_operations = HashSet::with_capacity(active_states.len());
559        let mut dbtx = self.db().begin_transaction_nc().await;
560        for (state, _) in active_states {
561            let operation_id = state.operation_id();
562            if dbtx
563                .get_value(&OperationLogKey { operation_id })
564                .await
565                .is_some()
566            {
567                active_operations.insert(operation_id);
568            }
569        }
570        active_operations
571    }
572
573    pub fn operation_log(&self) -> &OperationLog {
574        &self.operation_log
575    }
576
577    /// Get the meta manager to read meta fields.
578    pub fn meta_service(&self) -> &Arc<MetaService> {
579        &self.meta_service
580    }
581
582    /// Get the meta manager to read meta fields.
583    pub async fn get_meta_expiration_timestamp(&self) -> Option<SystemTime> {
584        let meta_service = self.meta_service();
585        let ts = meta_service
586            .get_field::<u64>(self.db(), "federation_expiry_timestamp")
587            .await
588            .and_then(|v| v.value)?;
589        Some(UNIX_EPOCH + Duration::from_secs(ts))
590    }
591
592    /// Adds funding to a transaction or removes over-funding via change.
593    async fn finalize_transaction(
594        &self,
595        dbtx: &mut DatabaseTransaction<'_>,
596        operation_id: OperationId,
597        mut partial_transaction: TransactionBuilder,
598    ) -> anyhow::Result<(Transaction, Vec<DynState>, Range<u64>)> {
599        let (in_amounts, out_amounts) = self.transaction_builder_get_balance(&partial_transaction);
600
601        let mut added_inputs_bundles = vec![];
602        let mut added_outputs_bundles = vec![];
603
604        // The way currently things are implemented is OK for modules which can
605        // collect a fee relative to one being used, but will break down in any
606        // fancy scenarios. Future TODOs:
607        //
608        // * create_final_inputs_and_outputs needs to get broken down, so we can use
609        //   primary modules using priorities (possibly separate prios for inputs and
610        //   outputs to be able to drain modules, etc.); we need the split "check if
611        //   possible" and "take" steps,
612        // * extra inputs and outputs adding fees needs to be taken into account,
613        //   possibly with some looping
614        for unit in in_amounts.units().union(&out_amounts.units()) {
615            let input_amount = in_amounts.get(unit).copied().unwrap_or_default();
616            let output_amount = out_amounts.get(unit).copied().unwrap_or_default();
617            if input_amount == output_amount {
618                continue;
619            }
620
621            let Some((module_id, module)) = self.primary_module_for_unit(*unit) else {
622                bail!("No module to balance a partial transaction (affected unit: {unit:?}");
623            };
624
625            let (added_input_bundle, added_output_bundle) = module
626                .create_final_inputs_and_outputs(
627                    module_id,
628                    dbtx,
629                    operation_id,
630                    *unit,
631                    input_amount,
632                    output_amount,
633                )
634                .await?;
635
636            added_inputs_bundles.push(added_input_bundle);
637            added_outputs_bundles.push(added_output_bundle);
638        }
639
640        // This is the range of  outputs that will be added to the transaction
641        // in order to balance it. Notice that it may stay empty in case the transaction
642        // is already balanced.
643        let change_range = Range {
644            start: partial_transaction.outputs().count() as u64,
645            end: (partial_transaction.outputs().count() as u64
646                + added_outputs_bundles
647                    .iter()
648                    .map(|output| output.outputs().len() as u64)
649                    .sum::<u64>()),
650        };
651
652        for added_inputs in added_inputs_bundles {
653            partial_transaction = partial_transaction.with_inputs(added_inputs);
654        }
655
656        for added_outputs in added_outputs_bundles {
657            partial_transaction = partial_transaction.with_outputs(added_outputs);
658        }
659
660        let (input_amounts, output_amounts) =
661            self.transaction_builder_get_balance(&partial_transaction);
662
663        for (unit, output_amount) in output_amounts {
664            let input_amount = input_amounts.get(&unit).copied().unwrap_or_default();
665
666            assert!(input_amount >= output_amount, "Transaction is underfunded");
667        }
668
669        let (tx, states) = partial_transaction.build(&self.secp_ctx, thread_rng());
670
671        Ok((tx, states, change_range))
672    }
673
674    /// Add funding and/or change to the transaction builder as needed, finalize
675    /// the transaction and submit it to the federation.
676    ///
677    /// ## Errors
678    /// The function will return an error if the operation with given ID already
679    /// exists.
680    ///
681    /// ## Panics
682    /// The function will panic if the database transaction collides with
683    /// other and fails with others too often, this should not happen except for
684    /// excessively concurrent scenarios.
685    pub async fn finalize_and_submit_transaction<F, M>(
686        &self,
687        operation_id: OperationId,
688        operation_type: &str,
689        operation_meta_gen: F,
690        tx_builder: TransactionBuilder,
691    ) -> anyhow::Result<OutPointRange>
692    where
693        F: Fn(OutPointRange) -> M + Clone + MaybeSend + MaybeSync,
694        M: serde::Serialize + MaybeSend,
695    {
696        let operation_type = operation_type.to_owned();
697
698        let autocommit_res = self
699            .db
700            .autocommit(
701                |dbtx, _| {
702                    let operation_type = operation_type.clone();
703                    let tx_builder = tx_builder.clone();
704                    let operation_meta_gen = operation_meta_gen.clone();
705                    Box::pin(async move {
706                        if Client::operation_exists_dbtx(dbtx, operation_id).await {
707                            bail!("There already exists an operation with id {operation_id:?}")
708                        }
709
710                        let out_point_range = self
711                            .finalize_and_submit_transaction_inner(dbtx, operation_id, tx_builder)
712                            .await?;
713
714                        self.operation_log()
715                            .add_operation_log_entry_dbtx(
716                                dbtx,
717                                operation_id,
718                                &operation_type,
719                                operation_meta_gen(out_point_range),
720                            )
721                            .await;
722
723                        Ok(out_point_range)
724                    })
725                },
726                Some(100), // TODO: handle what happens after 100 retries
727            )
728            .await;
729
730        match autocommit_res {
731            Ok(txid) => Ok(txid),
732            Err(AutocommitError::ClosureError { error, .. }) => Err(error),
733            Err(AutocommitError::CommitFailed {
734                attempts,
735                last_error,
736            }) => panic!(
737                "Failed to commit tx submission dbtx after {attempts} attempts: {last_error}"
738            ),
739        }
740    }
741
742    async fn finalize_and_submit_transaction_inner(
743        &self,
744        dbtx: &mut DatabaseTransaction<'_>,
745        operation_id: OperationId,
746        tx_builder: TransactionBuilder,
747    ) -> anyhow::Result<OutPointRange> {
748        let (transaction, mut states, change_range) = self
749            .finalize_transaction(&mut dbtx.to_ref_nc(), operation_id, tx_builder)
750            .await?;
751
752        if transaction.consensus_encode_to_vec().len() > Transaction::MAX_TX_SIZE {
753            let inputs = transaction
754                .inputs
755                .iter()
756                .map(DynInput::module_instance_id)
757                .collect::<Vec<_>>();
758            let outputs = transaction
759                .outputs
760                .iter()
761                .map(DynOutput::module_instance_id)
762                .collect::<Vec<_>>();
763            warn!(
764                target: LOG_CLIENT_NET_API,
765                size=%transaction.consensus_encode_to_vec().len(),
766                ?inputs,
767                ?outputs,
768                "Transaction too large",
769            );
770            debug!(target: LOG_CLIENT_NET_API, ?transaction, "transaction details");
771            bail!(
772                "The generated transaction would be rejected by the federation for being too large."
773            );
774        }
775
776        let txid = transaction.tx_hash();
777
778        debug!(
779            target: LOG_CLIENT_NET_API,
780            %txid,
781            operation_id = %operation_id.fmt_short(),
782            ?transaction,
783            "Finalized and submitting transaction",
784        );
785
786        let tx_submission_sm = DynState::from_typed(
787            TRANSACTION_SUBMISSION_MODULE_INSTANCE,
788            TxSubmissionStatesSM {
789                operation_id,
790                state: TxSubmissionStates::Created(transaction),
791            },
792        );
793        states.push(tx_submission_sm);
794
795        self.executor.add_state_machines_dbtx(dbtx, states).await?;
796
797        self.log_event_dbtx(dbtx, None, TxCreatedEvent { txid, operation_id })
798            .await;
799
800        Ok(OutPointRange::new(txid, IdxRange::from(change_range)))
801    }
802
803    async fn transaction_update_stream(
804        &self,
805        operation_id: OperationId,
806    ) -> BoxStream<'static, TxSubmissionStatesSM> {
807        self.executor
808            .notifier()
809            .module_notifier::<TxSubmissionStatesSM>(
810                TRANSACTION_SUBMISSION_MODULE_INSTANCE,
811                self.final_client.clone(),
812            )
813            .subscribe(operation_id)
814            .await
815    }
816
817    pub async fn operation_exists(&self, operation_id: OperationId) -> bool {
818        let mut dbtx = self.db().begin_transaction_nc().await;
819
820        Client::operation_exists_dbtx(&mut dbtx, operation_id).await
821    }
822
823    pub async fn operation_exists_dbtx(
824        dbtx: &mut DatabaseTransaction<'_>,
825        operation_id: OperationId,
826    ) -> bool {
827        let active_state_exists = dbtx
828            .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
829            .await
830            .next()
831            .await
832            .is_some();
833
834        let inactive_state_exists = dbtx
835            .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
836            .await
837            .next()
838            .await
839            .is_some();
840
841        active_state_exists || inactive_state_exists
842    }
843
844    pub async fn has_active_states(&self, operation_id: OperationId) -> bool {
845        self.db
846            .begin_transaction_nc()
847            .await
848            .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
849            .await
850            .next()
851            .await
852            .is_some()
853    }
854
855    /// Waits for an output from the primary module to reach its final
856    /// state.
857    pub async fn await_primary_bitcoin_module_output(
858        &self,
859        operation_id: OperationId,
860        out_point: OutPoint,
861    ) -> anyhow::Result<()> {
862        self.primary_module_for_unit(AmountUnit::BITCOIN)
863            .ok_or_else(|| anyhow!("No primary module available"))?
864            .1
865            .await_primary_module_output(operation_id, out_point)
866            .await
867    }
868
869    /// Returns a reference to a typed module client instance by kind
870    pub fn get_first_module<M: ClientModule>(
871        &'_ self,
872    ) -> anyhow::Result<ClientModuleInstance<'_, M>> {
873        let module_kind = M::kind();
874        let id = self
875            .get_first_instance(&module_kind)
876            .ok_or_else(|| format_err!("No modules found of kind {module_kind}"))?;
877        let module: &M = self
878            .try_get_module(id)
879            .ok_or_else(|| format_err!("Unknown module instance {id}"))?
880            .as_any()
881            .downcast_ref::<M>()
882            .ok_or_else(|| format_err!("Module is not of type {}", std::any::type_name::<M>()))?;
883        let (db, _) = self.db().with_prefix_module_id(id);
884        Ok(ClientModuleInstance {
885            id,
886            db,
887            api: self.api().with_module(id),
888            module,
889        })
890    }
891
892    pub fn get_module_client_dyn(
893        &self,
894        instance_id: ModuleInstanceId,
895    ) -> anyhow::Result<&maybe_add_send_sync!(dyn IClientModule)> {
896        self.try_get_module(instance_id)
897            .ok_or(anyhow!("Unknown module instance {}", instance_id))
898    }
899
900    pub fn db(&self) -> &Database {
901        &self.db
902    }
903
904    pub fn endpoints(&self) -> &ConnectorRegistry {
905        &self.connectors
906    }
907
908    /// Returns a stream of transaction updates for the given operation id that
909    /// can later be used to watch for a specific transaction being accepted.
910    pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
911        TransactionUpdates {
912            update_stream: self.transaction_update_stream(operation_id).await,
913        }
914    }
915
916    /// Returns the instance id of the first module of the given kind.
917    pub fn get_first_instance(&self, module_kind: &ModuleKind) -> Option<ModuleInstanceId> {
918        self.modules
919            .iter_modules()
920            .find(|(_, kind, _module)| *kind == module_kind)
921            .map(|(instance_id, _, _)| instance_id)
922    }
923
924    /// Returns the data from which the client's root secret is derived (e.g.
925    /// BIP39 seed phrase struct).
926    pub async fn root_secret_encoding<T: Decodable>(&self) -> anyhow::Result<T> {
927        get_decoded_client_secret::<T>(self.db()).await
928    }
929
930    /// Waits for outputs from the primary module to reach its final
931    /// state.
932    pub async fn await_primary_bitcoin_module_outputs(
933        &self,
934        operation_id: OperationId,
935        outputs: Vec<OutPoint>,
936    ) -> anyhow::Result<()> {
937        for out_point in outputs {
938            self.await_primary_bitcoin_module_output(operation_id, out_point)
939                .await?;
940        }
941
942        Ok(())
943    }
944
945    /// Returns the config of the client in JSON format.
946    ///
947    /// Compared to the consensus module format where module configs are binary
948    /// encoded this format cannot be cryptographically verified but is easier
949    /// to consume and to some degree human-readable.
950    pub async fn get_config_json(&self) -> JsonClientConfig {
951        self.config().await.to_json()
952    }
953
954    // Ideally this would not be in the API, but there's a lot of places where this
955    // makes it easier.
956    #[doc(hidden)]
957    /// Like [`Self::get_balance`] but returns an error if primary module is not
958    /// available
959    pub async fn get_balance_for_btc(&self) -> anyhow::Result<Amount> {
960        self.get_balance_for_unit(AmountUnit::BITCOIN).await
961    }
962
963    pub async fn get_balance_for_unit(&self, unit: AmountUnit) -> anyhow::Result<Amount> {
964        let (id, module) = self
965            .primary_module_for_unit(unit)
966            .ok_or_else(|| anyhow!("Primary module not available"))?;
967        Ok(module
968            .get_balance(id, &mut self.db().begin_transaction_nc().await, unit)
969            .await)
970    }
971
972    /// Returns a stream that yields the current client balance every time it
973    /// changes.
974    pub async fn subscribe_balance_changes(&self, unit: AmountUnit) -> BoxStream<'static, Amount> {
975        let primary_module_things =
976            if let Some((primary_module_id, primary_module)) = self.primary_module_for_unit(unit) {
977                let balance_changes = primary_module.subscribe_balance_changes().await;
978                let initial_balance = self
979                    .get_balance_for_unit(unit)
980                    .await
981                    .expect("Primary is present");
982
983                Some((
984                    primary_module_id,
985                    primary_module.clone(),
986                    balance_changes,
987                    initial_balance,
988                ))
989            } else {
990                None
991            };
992        let db = self.db().clone();
993
994        Box::pin(async_stream::stream! {
995            let Some((primary_module_id, primary_module, mut balance_changes, initial_balance)) = primary_module_things else {
996                // If there is no primary module, there will not be one until client is
997                // restarted
998                pending().await
999            };
1000
1001
1002            yield initial_balance;
1003            let mut prev_balance = initial_balance;
1004            while let Some(()) = balance_changes.next().await {
1005                let mut dbtx = db.begin_transaction_nc().await;
1006                let balance = primary_module
1007                     .get_balance(primary_module_id, &mut dbtx, unit)
1008                    .await;
1009
1010                // Deduplicate in case modules cannot always tell if the balance actually changed
1011                if balance != prev_balance {
1012                    prev_balance = balance;
1013                    yield balance;
1014                }
1015            }
1016        })
1017    }
1018
1019    /// Make a single API version request to a peer after a delay.
1020    ///
1021    /// The delay is here to unify the type of a future both for initial request
1022    /// and possible retries.
1023    async fn make_api_version_request(
1024        delay: Duration,
1025        peer_id: PeerId,
1026        api: &DynGlobalApi,
1027    ) -> (
1028        PeerId,
1029        Result<SupportedApiVersionsSummary, fedimint_connectors::error::ServerError>,
1030    ) {
1031        runtime::sleep(delay).await;
1032        (
1033            peer_id,
1034            api.request_single_peer::<SupportedApiVersionsSummary>(
1035                VERSION_ENDPOINT.to_owned(),
1036                ApiRequestErased::default(),
1037                peer_id,
1038            )
1039            .await,
1040        )
1041    }
1042
1043    /// Create a backoff strategy for API version requests.
1044    ///
1045    /// Keep trying, initially somewhat aggressively, but after a while retry
1046    /// very slowly, because chances for response are getting lower and
1047    /// lower.
1048    fn create_api_version_backoff() -> impl Iterator<Item = Duration> {
1049        custom_backoff(Duration::from_millis(200), Duration::from_secs(600), None)
1050    }
1051
1052    /// Query the federation for API version support and then calculate
1053    /// the best API version to use (supported by most guardians).
1054    pub async fn fetch_common_api_versions_from_all_peers(
1055        num_peers: NumPeers,
1056        api: DynGlobalApi,
1057        db: Database,
1058        num_responses_sender: watch::Sender<usize>,
1059    ) {
1060        let mut backoff = Self::create_api_version_backoff();
1061
1062        // NOTE: `FuturesUnordered` is a footgun, but since we only poll it for result
1063        // and make a single async db write operation, it should be OK.
1064        let mut requests = FuturesUnordered::new();
1065
1066        for peer_id in num_peers.peer_ids() {
1067            requests.push(Self::make_api_version_request(
1068                Duration::ZERO,
1069                peer_id,
1070                &api,
1071            ));
1072        }
1073
1074        let mut num_responses = 0;
1075
1076        while let Some((peer_id, response)) = requests.next().await {
1077            let retry = match response {
1078                Err(err) => {
1079                    let has_previous_response = db
1080                        .begin_transaction_nc()
1081                        .await
1082                        .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1083                        .await
1084                        .is_some();
1085                    debug!(
1086                        target: LOG_CLIENT,
1087                        %peer_id,
1088                        err = %err.fmt_compact(),
1089                        %has_previous_response,
1090                        "Failed to refresh API versions of a peer"
1091                    );
1092
1093                    !has_previous_response
1094                }
1095                Ok(o) => {
1096                    // Save the response to the database right away, just to
1097                    // not lose it
1098                    let mut dbtx = db.begin_transaction().await;
1099                    dbtx.insert_entry(
1100                        &PeerLastApiVersionsSummaryKey(peer_id),
1101                        &PeerLastApiVersionsSummary(o),
1102                    )
1103                    .await;
1104                    dbtx.commit_tx().await;
1105                    false
1106                }
1107            };
1108
1109            if retry {
1110                requests.push(Self::make_api_version_request(
1111                    backoff.next().expect("Keeps retrying"),
1112                    peer_id,
1113                    &api,
1114                ));
1115            } else {
1116                num_responses += 1;
1117                num_responses_sender.send_replace(num_responses);
1118            }
1119        }
1120    }
1121
1122    /// Fetch API versions from peers, retrying until we get threshold number of
1123    /// successful responses. Returns the successful responses collected
1124    /// from at least `num_peers.threshold()` peers.
1125    pub async fn fetch_peers_api_versions_from_threshold_of_peers(
1126        num_peers: NumPeers,
1127        api: DynGlobalApi,
1128    ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1129        let mut backoff = Self::create_api_version_backoff();
1130
1131        // NOTE: `FuturesUnordered` is a footgun, but since we only poll it for result
1132        // and collect responses, it should be OK.
1133        let mut requests = FuturesUnordered::new();
1134
1135        for peer_id in num_peers.peer_ids() {
1136            requests.push(Self::make_api_version_request(
1137                Duration::ZERO,
1138                peer_id,
1139                &api,
1140            ));
1141        }
1142
1143        let mut successful_responses = BTreeMap::new();
1144
1145        while successful_responses.len() < num_peers.threshold()
1146            && let Some((peer_id, response)) = requests.next().await
1147        {
1148            let retry = match response {
1149                Err(err) => {
1150                    debug!(
1151                        target: LOG_CLIENT,
1152                        %peer_id,
1153                        err = %err.fmt_compact(),
1154                        "Failed to fetch API versions from peer"
1155                    );
1156                    true
1157                }
1158                Ok(response) => {
1159                    successful_responses.insert(peer_id, response);
1160                    false
1161                }
1162            };
1163
1164            if retry {
1165                requests.push(Self::make_api_version_request(
1166                    backoff.next().expect("Keeps retrying"),
1167                    peer_id,
1168                    &api,
1169                ));
1170            }
1171        }
1172
1173        successful_responses
1174    }
1175
1176    /// Fetch API versions from peers and discover common API versions to use.
1177    pub async fn fetch_common_api_versions(
1178        config: &ClientConfig,
1179        api: &DynGlobalApi,
1180    ) -> anyhow::Result<BTreeMap<PeerId, SupportedApiVersionsSummary>> {
1181        debug!(
1182            target: LOG_CLIENT,
1183            "Fetching common api versions"
1184        );
1185
1186        let num_peers = NumPeers::from(config.global.api_endpoints.len());
1187
1188        let peer_api_version_sets =
1189            Self::fetch_peers_api_versions_from_threshold_of_peers(num_peers, api.clone()).await;
1190
1191        Ok(peer_api_version_sets)
1192    }
1193
1194    /// Write API version set to database cache.
1195    /// Used when we have a pre-calculated API version set that should be stored
1196    /// for later use.
1197    pub async fn write_api_version_cache(
1198        dbtx: &mut DatabaseTransaction<'_>,
1199        api_version_set: ApiVersionSet,
1200    ) {
1201        debug!(
1202            target: LOG_CLIENT,
1203            value = ?api_version_set,
1204            "Writing API version set to cache"
1205        );
1206
1207        dbtx.insert_entry(
1208            &CachedApiVersionSetKey,
1209            &CachedApiVersionSet(api_version_set),
1210        )
1211        .await;
1212    }
1213
1214    /// Store prefetched peer API version responses and calculate/store common
1215    /// API version set. This processes the individual peer responses by
1216    /// storing them in the database and calculating the common API version
1217    /// set for caching.
1218    pub async fn store_prefetched_api_versions(
1219        db: &Database,
1220        config: &ClientConfig,
1221        client_module_init: &ClientModuleInitRegistry,
1222        peer_api_versions: &BTreeMap<PeerId, SupportedApiVersionsSummary>,
1223    ) {
1224        debug!(
1225            target: LOG_CLIENT,
1226            "Storing {} prefetched peer API version responses and calculating common version set",
1227            peer_api_versions.len()
1228        );
1229
1230        let mut dbtx = db.begin_transaction().await;
1231        // Calculate common API version set from individual responses
1232        let client_supported_versions =
1233            Self::supported_api_versions_summary_static(config, client_module_init);
1234        match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1235            &client_supported_versions,
1236            peer_api_versions,
1237        ) {
1238            Ok(common_api_versions) => {
1239                // Write the calculated common API version set to database cache
1240                Self::write_api_version_cache(&mut dbtx.to_ref_nc(), common_api_versions).await;
1241                debug!(target: LOG_CLIENT, "Calculated and stored common API version set");
1242            }
1243            Err(err) => {
1244                debug!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Failed to calculate common API versions from prefetched data");
1245            }
1246        }
1247
1248        // Store individual peer responses to database
1249        for (peer_id, peer_api_versions) in peer_api_versions {
1250            dbtx.insert_entry(
1251                &PeerLastApiVersionsSummaryKey(*peer_id),
1252                &PeerLastApiVersionsSummary(peer_api_versions.clone()),
1253            )
1254            .await;
1255        }
1256        dbtx.commit_tx().await;
1257        debug!(target: LOG_CLIENT, "Stored individual peer API version responses");
1258    }
1259
1260    /// [`SupportedApiVersionsSummary`] that the client and its modules support
1261    pub fn supported_api_versions_summary_static(
1262        config: &ClientConfig,
1263        client_module_init: &ClientModuleInitRegistry,
1264    ) -> SupportedApiVersionsSummary {
1265        SupportedApiVersionsSummary {
1266            core: SupportedCoreApiVersions {
1267                core_consensus: config.global.consensus_version,
1268                api: MultiApiVersion::try_from_iter(SUPPORTED_CORE_API_VERSIONS.to_owned())
1269                    .expect("must not have conflicting versions"),
1270            },
1271            modules: config
1272                .modules
1273                .iter()
1274                .filter_map(|(&module_instance_id, module_config)| {
1275                    client_module_init
1276                        .get(module_config.kind())
1277                        .map(|module_init| {
1278                            (
1279                                module_instance_id,
1280                                SupportedModuleApiVersions {
1281                                    core_consensus: config.global.consensus_version,
1282                                    module_consensus: module_config.version,
1283                                    api: module_init.supported_api_versions(),
1284                                },
1285                            )
1286                        })
1287                })
1288                .collect(),
1289        }
1290    }
1291
1292    pub async fn load_and_refresh_common_api_version(&self) -> anyhow::Result<ApiVersionSet> {
1293        Self::load_and_refresh_common_api_version_static(
1294            &self.config().await,
1295            &self.module_inits,
1296            self.connectors.clone(),
1297            &self.api,
1298            &self.db,
1299            &self.task_group,
1300        )
1301        .await
1302    }
1303
1304    /// Load the common api versions to use from cache and start a background
1305    /// process to refresh them.
1306    ///
1307    /// This is a compromise, so we not have to wait for version discovery to
1308    /// complete every time a [`Client`] is being built.
1309    async fn load_and_refresh_common_api_version_static(
1310        config: &ClientConfig,
1311        module_init: &ClientModuleInitRegistry,
1312        connectors: ConnectorRegistry,
1313        api: &DynGlobalApi,
1314        db: &Database,
1315        task_group: &TaskGroup,
1316    ) -> anyhow::Result<ApiVersionSet> {
1317        if let Some(v) = db
1318            .begin_transaction_nc()
1319            .await
1320            .get_value(&CachedApiVersionSetKey)
1321            .await
1322        {
1323            debug!(
1324                target: LOG_CLIENT,
1325                "Found existing cached common api versions"
1326            );
1327            let config = config.clone();
1328            let client_module_init = module_init.clone();
1329            let api = api.clone();
1330            let db = db.clone();
1331            let task_group = task_group.clone();
1332            // Separate task group, because we actually don't want to be waiting for this to
1333            // finish, and it's just best effort.
1334            task_group
1335                .clone()
1336                .spawn_cancellable("refresh_common_api_version_static", async move {
1337                    connectors.wait_for_initialized_connections().await;
1338
1339                    if let Err(error) = Self::refresh_common_api_version_static(
1340                        &config,
1341                        &client_module_init,
1342                        &api,
1343                        &db,
1344                        task_group,
1345                        false,
1346                    )
1347                    .await
1348                    {
1349                        warn!(
1350                            target: LOG_CLIENT,
1351                            err = %error.fmt_compact_anyhow(), "Failed to discover common api versions"
1352                        );
1353                    }
1354                });
1355
1356            return Ok(v.0);
1357        }
1358
1359        info!(
1360            target: LOG_CLIENT,
1361            "Fetching initial API versions "
1362        );
1363        Self::refresh_common_api_version_static(
1364            config,
1365            module_init,
1366            api,
1367            db,
1368            task_group.clone(),
1369            true,
1370        )
1371        .await
1372    }
1373
1374    async fn refresh_common_api_version_static(
1375        config: &ClientConfig,
1376        client_module_init: &ClientModuleInitRegistry,
1377        api: &DynGlobalApi,
1378        db: &Database,
1379        task_group: TaskGroup,
1380        block_until_ok: bool,
1381    ) -> anyhow::Result<ApiVersionSet> {
1382        debug!(
1383            target: LOG_CLIENT,
1384            "Refreshing common api versions"
1385        );
1386
1387        let (num_responses_sender, mut num_responses_receiver) = tokio::sync::watch::channel(0);
1388        let num_peers = NumPeers::from(config.global.api_endpoints.len());
1389
1390        task_group.spawn_cancellable("refresh peers api versions", {
1391            Client::fetch_common_api_versions_from_all_peers(
1392                num_peers,
1393                api.clone(),
1394                db.clone(),
1395                num_responses_sender,
1396            )
1397        });
1398
1399        let common_api_versions = loop {
1400            // Wait to collect enough answers before calculating a set of common api
1401            // versions to use. Note that all peers individual responses from
1402            // previous attempts are still being used, and requests, or even
1403            // retries for response of peers are not actually cancelled, as they
1404            // are happening on a separate task. This is all just to bound the
1405            // time user can be waiting for the join operation to finish, at the
1406            // risk of picking wrong version in very rare circumstances.
1407            let _: Result<_, Elapsed> = runtime::timeout(
1408                Duration::from_secs(30),
1409                num_responses_receiver.wait_for(|num| num_peers.threshold() <= *num),
1410            )
1411            .await;
1412
1413            let peer_api_version_sets = Self::load_peers_last_api_versions(db, num_peers).await;
1414
1415            match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1416                &Self::supported_api_versions_summary_static(config, client_module_init),
1417                &peer_api_version_sets,
1418            ) {
1419                Ok(o) => break o,
1420                Err(err) if block_until_ok => {
1421                    warn!(
1422                        target: LOG_CLIENT,
1423                        err = %err.fmt_compact_anyhow(),
1424                        "Failed to discover API version to use. Retrying..."
1425                    );
1426                    continue;
1427                }
1428                Err(e) => return Err(e),
1429            }
1430        };
1431
1432        debug!(
1433            target: LOG_CLIENT,
1434            value = ?common_api_versions,
1435            "Updating the cached common api versions"
1436        );
1437        let mut dbtx = db.begin_transaction().await;
1438        let _ = dbtx
1439            .insert_entry(
1440                &CachedApiVersionSetKey,
1441                &CachedApiVersionSet(common_api_versions.clone()),
1442            )
1443            .await;
1444
1445        dbtx.commit_tx().await;
1446
1447        Ok(common_api_versions)
1448    }
1449
1450    /// Get the client [`Metadata`]
1451    pub async fn get_metadata(&self) -> Metadata {
1452        self.db
1453            .begin_transaction_nc()
1454            .await
1455            .get_value(&ClientMetadataKey)
1456            .await
1457            .unwrap_or_else(|| {
1458                warn!(
1459                    target: LOG_CLIENT,
1460                    "Missing existing metadata. This key should have been set on Client init"
1461                );
1462                Metadata::empty()
1463            })
1464    }
1465
1466    /// Set the client [`Metadata`]
1467    pub async fn set_metadata(&self, metadata: &Metadata) {
1468        self.db
1469            .autocommit::<_, _, anyhow::Error>(
1470                |dbtx, _| {
1471                    Box::pin(async {
1472                        Self::set_metadata_dbtx(dbtx, metadata).await;
1473                        Ok(())
1474                    })
1475                },
1476                None,
1477            )
1478            .await
1479            .expect("Failed to autocommit metadata");
1480    }
1481
1482    pub fn has_pending_recoveries(&self) -> bool {
1483        !self
1484            .client_recovery_progress_receiver
1485            .borrow()
1486            .iter()
1487            .all(|(_id, progress)| progress.is_done())
1488    }
1489
1490    /// Wait for all module recoveries to finish
1491    ///
1492    /// This will block until the recovery task is done with recoveries.
1493    /// Returns success if all recovery tasks are complete (success case),
1494    /// or an error if some modules could not complete the recovery at the time.
1495    ///
1496    /// A bit of a heavy approach.
1497    pub async fn wait_for_all_recoveries(&self) -> anyhow::Result<()> {
1498        let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1499        recovery_receiver
1500            .wait_for(|in_progress| {
1501                in_progress
1502                    .iter()
1503                    .all(|(_id, progress)| progress.is_done())
1504            })
1505            .await
1506            .context("Recovery task completed and update receiver disconnected, but some modules failed to recover")?;
1507
1508        Ok(())
1509    }
1510
1511    /// Subscribe to recover progress for all the modules.
1512    ///
1513    /// This stream can contain duplicate progress for a module.
1514    /// Don't use this stream for detecting completion of recovery.
1515    pub fn subscribe_to_recovery_progress(
1516        &self,
1517    ) -> impl Stream<Item = (ModuleInstanceId, RecoveryProgress)> + use<> {
1518        WatchStream::new(self.client_recovery_progress_receiver.clone())
1519            .flat_map(futures::stream::iter)
1520    }
1521
1522    pub async fn wait_for_module_kind_recovery(
1523        &self,
1524        module_kind: ModuleKind,
1525    ) -> anyhow::Result<()> {
1526        let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1527        let config = self.config().await;
1528        recovery_receiver
1529            .wait_for(|in_progress| {
1530                !in_progress
1531                    .iter()
1532                    .filter(|(module_instance_id, _progress)| {
1533                        config.modules[module_instance_id].kind == module_kind
1534                    })
1535                    .any(|(_id, progress)| !progress.is_done())
1536            })
1537            .await
1538            .context("Recovery task completed and update receiver disconnected, but the desired modules are still unavailable or failed to recover")?;
1539
1540        Ok(())
1541    }
1542
1543    pub async fn wait_for_all_active_state_machines(&self) -> anyhow::Result<()> {
1544        loop {
1545            if self.executor.get_active_states().await.is_empty() {
1546                break;
1547            }
1548            sleep(Duration::from_millis(100)).await;
1549        }
1550        Ok(())
1551    }
1552
1553    /// Set the client [`Metadata`]
1554    pub async fn set_metadata_dbtx(dbtx: &mut DatabaseTransaction<'_>, metadata: &Metadata) {
1555        dbtx.insert_new_entry(&ClientMetadataKey, metadata).await;
1556    }
1557
1558    fn spawn_module_recoveries_task(
1559        &self,
1560        recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1561        module_recoveries: BTreeMap<
1562            ModuleInstanceId,
1563            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1564        >,
1565        module_recovery_progress_receivers: BTreeMap<
1566            ModuleInstanceId,
1567            watch::Receiver<RecoveryProgress>,
1568        >,
1569    ) {
1570        let db = self.db.clone();
1571        let log_ordering_wakeup_tx = self.log_ordering_wakeup_tx.clone();
1572        self.task_group
1573            .spawn("module recoveries", |_task_handle| async {
1574                Self::run_module_recoveries_task(
1575                    db,
1576                    log_ordering_wakeup_tx,
1577                    recovery_sender,
1578                    module_recoveries,
1579                    module_recovery_progress_receivers,
1580                )
1581                .await;
1582            });
1583    }
1584
1585    async fn run_module_recoveries_task(
1586        db: Database,
1587        log_ordering_wakeup_tx: watch::Sender<()>,
1588        recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1589        module_recoveries: BTreeMap<
1590            ModuleInstanceId,
1591            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1592        >,
1593        module_recovery_progress_receivers: BTreeMap<
1594            ModuleInstanceId,
1595            watch::Receiver<RecoveryProgress>,
1596        >,
1597    ) {
1598        debug!(target: LOG_CLIENT_RECOVERY, num_modules=%module_recovery_progress_receivers.len(), "Staring module recoveries");
1599        let mut completed_stream = Vec::new();
1600        let progress_stream = futures::stream::FuturesUnordered::new();
1601
1602        for (module_instance_id, f) in module_recoveries {
1603            completed_stream.push(futures::stream::once(Box::pin(async move {
1604                match f.await {
1605                    Ok(()) => (module_instance_id, None),
1606                    Err(err) => {
1607                        warn!(
1608                            target: LOG_CLIENT,
1609                            err = %err.fmt_compact_anyhow(), module_instance_id, "Module recovery failed"
1610                        );
1611                        // a module recovery that failed reports and error and
1612                        // just never finishes, so we don't need a separate state
1613                        // for it
1614                        futures::future::pending::<()>().await;
1615                        unreachable!()
1616                    }
1617                }
1618            })));
1619        }
1620
1621        for (module_instance_id, rx) in module_recovery_progress_receivers {
1622            progress_stream.push(
1623                tokio_stream::wrappers::WatchStream::new(rx)
1624                    .fuse()
1625                    .map(move |progress| (module_instance_id, Some(progress))),
1626            );
1627        }
1628
1629        let mut futures = futures::stream::select(
1630            futures::stream::select_all(progress_stream),
1631            futures::stream::select_all(completed_stream),
1632        );
1633
1634        while let Some((module_instance_id, progress)) = futures.next().await {
1635            let mut dbtx = db.begin_transaction().await;
1636
1637            let prev_progress = *recovery_sender
1638                .borrow()
1639                .get(&module_instance_id)
1640                .expect("existing progress must be present");
1641
1642            let progress = if prev_progress.is_done() {
1643                // since updates might be out of order, once done, stick with it
1644                prev_progress
1645            } else if let Some(progress) = progress {
1646                progress
1647            } else {
1648                prev_progress.to_complete()
1649            };
1650
1651            if !prev_progress.is_done() && progress.is_done() {
1652                info!(
1653                    target: LOG_CLIENT,
1654                    module_instance_id,
1655                    progress = format!("{}/{}", progress.complete, progress.total),
1656                    "Recovery complete"
1657                );
1658                dbtx.log_event(
1659                    log_ordering_wakeup_tx.clone(),
1660                    None,
1661                    ModuleRecoveryCompleted {
1662                        module_id: module_instance_id,
1663                    },
1664                )
1665                .await;
1666            } else {
1667                info!(
1668                    target: LOG_CLIENT,
1669                    module_instance_id,
1670                    progress = format!("{}/{}", progress.complete, progress.total),
1671                    "Recovery progress"
1672                );
1673            }
1674
1675            dbtx.insert_entry(
1676                &ClientModuleRecovery { module_instance_id },
1677                &ClientModuleRecoveryState { progress },
1678            )
1679            .await;
1680            dbtx.commit_tx().await;
1681
1682            recovery_sender.send_modify(|v| {
1683                v.insert(module_instance_id, progress);
1684            });
1685        }
1686        debug!(target: LOG_CLIENT_RECOVERY, "Recovery executor stopped");
1687    }
1688
1689    async fn load_peers_last_api_versions(
1690        db: &Database,
1691        num_peers: NumPeers,
1692    ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1693        let mut peer_api_version_sets = BTreeMap::new();
1694
1695        let mut dbtx = db.begin_transaction_nc().await;
1696        for peer_id in num_peers.peer_ids() {
1697            if let Some(v) = dbtx
1698                .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1699                .await
1700            {
1701                peer_api_version_sets.insert(peer_id, v.0);
1702            }
1703        }
1704        drop(dbtx);
1705        peer_api_version_sets
1706    }
1707
1708    /// You likely want to use [`Client::get_peer_urls`]. This function returns
1709    /// only the announcements and doesn't use the config as fallback.
1710    pub async fn get_peer_url_announcements(&self) -> BTreeMap<PeerId, SignedApiAnnouncement> {
1711        self.db()
1712            .begin_transaction_nc()
1713            .await
1714            .find_by_prefix(&ApiAnnouncementPrefix)
1715            .await
1716            .map(|(announcement_key, announcement)| (announcement_key.0, announcement))
1717            .collect()
1718            .await
1719    }
1720
1721    /// Returns guardian metadata stored in the client database
1722    pub async fn get_guardian_metadata(
1723        &self,
1724    ) -> BTreeMap<PeerId, fedimint_core::net::guardian_metadata::SignedGuardianMetadata> {
1725        self.db()
1726            .begin_transaction_nc()
1727            .await
1728            .find_by_prefix(&crate::guardian_metadata::GuardianMetadataPrefix)
1729            .await
1730            .map(|(key, metadata)| (key.0, metadata))
1731            .collect()
1732            .await
1733    }
1734
1735    /// Returns a list of guardian API URLs
1736    pub async fn get_peer_urls(&self) -> BTreeMap<PeerId, SafeUrl> {
1737        get_api_urls(&self.db, &self.config().await).await
1738    }
1739
1740    /// Create an invite code with the api endpoint of the given peer which can
1741    /// be used to download this client config
1742    pub async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
1743        self.get_peer_urls()
1744            .await
1745            .into_iter()
1746            .find_map(|(peer_id, url)| (peer == peer_id).then_some(url))
1747            .map(|peer_url| {
1748                InviteCode::new(
1749                    peer_url.clone(),
1750                    peer,
1751                    self.federation_id(),
1752                    self.api_secret.clone(),
1753                )
1754            })
1755    }
1756
1757    /// Blocks till the client has synced the guardian public key set
1758    /// (introduced in version 0.4) and returns it. Once it has been fetched
1759    /// once this function is guaranteed to return immediately.
1760    pub async fn get_guardian_public_keys_blocking(
1761        &self,
1762    ) -> BTreeMap<PeerId, fedimint_core::secp256k1::PublicKey> {
1763        self.db
1764            .autocommit(
1765                |dbtx, _| {
1766                    Box::pin(async move {
1767                        let config = self.config().await;
1768
1769                        let guardian_pub_keys = self
1770                            .get_or_backfill_broadcast_public_keys(dbtx, config)
1771                            .await;
1772
1773                        Result::<_, ()>::Ok(guardian_pub_keys)
1774                    })
1775                },
1776                None,
1777            )
1778            .await
1779            .expect("Will retry forever")
1780    }
1781
1782    async fn get_or_backfill_broadcast_public_keys(
1783        &self,
1784        dbtx: &mut DatabaseTransaction<'_>,
1785        config: ClientConfig,
1786    ) -> BTreeMap<PeerId, PublicKey> {
1787        match config.global.broadcast_public_keys {
1788            Some(guardian_pub_keys) => guardian_pub_keys,
1789            _ => {
1790                let (guardian_pub_keys, new_config) = self.fetch_and_update_config(config).await;
1791
1792                dbtx.insert_entry(&ClientConfigKey, &new_config).await;
1793                *(self.config.write().await) = new_config;
1794                guardian_pub_keys
1795            }
1796        }
1797    }
1798
1799    async fn fetch_session_count(&self) -> FederationResult<u64> {
1800        self.api.session_count().await
1801    }
1802
1803    async fn fetch_and_update_config(
1804        &self,
1805        config: ClientConfig,
1806    ) -> (BTreeMap<PeerId, PublicKey>, ClientConfig) {
1807        let fetched_config = retry(
1808            "Fetching guardian public keys",
1809            backoff_util::background_backoff(),
1810            || async {
1811                Ok(self
1812                    .api
1813                    .request_current_consensus::<ClientConfig>(
1814                        CLIENT_CONFIG_ENDPOINT.to_owned(),
1815                        ApiRequestErased::default(),
1816                    )
1817                    .await?)
1818            },
1819        )
1820        .await
1821        .expect("Will never return on error");
1822
1823        let Some(guardian_pub_keys) = fetched_config.global.broadcast_public_keys else {
1824            warn!(
1825                target: LOG_CLIENT,
1826                "Guardian public keys not found in fetched config, server not updated to 0.4 yet"
1827            );
1828            pending::<()>().await;
1829            unreachable!("Pending will never return");
1830        };
1831
1832        let new_config = ClientConfig {
1833            global: GlobalClientConfig {
1834                broadcast_public_keys: Some(guardian_pub_keys.clone()),
1835                ..config.global
1836            },
1837            modules: config.modules,
1838        };
1839        (guardian_pub_keys, new_config)
1840    }
1841
1842    pub fn handle_global_rpc(
1843        &self,
1844        method: String,
1845        params: serde_json::Value,
1846    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1847        Box::pin(try_stream! {
1848            match method.as_str() {
1849                "get_balance" => {
1850                    let balance = self.get_balance_for_btc().await.unwrap_or_default();
1851                    yield serde_json::to_value(balance)?;
1852                }
1853                "subscribe_balance_changes" => {
1854                    let req: GetBalanceChangesRequest= serde_json::from_value(params)?;
1855                    let mut stream = self.subscribe_balance_changes(req.unit).await;
1856                    while let Some(balance) = stream.next().await {
1857                        yield serde_json::to_value(balance)?;
1858                    }
1859                }
1860                "get_config" => {
1861                    let config = self.config().await;
1862                    yield serde_json::to_value(config)?;
1863                }
1864                "get_federation_id" => {
1865                    let federation_id = self.federation_id();
1866                    yield serde_json::to_value(federation_id)?;
1867                }
1868                "get_invite_code" => {
1869                    let req: GetInviteCodeRequest = serde_json::from_value(params)?;
1870                    let invite_code = self.invite_code(req.peer).await;
1871                    yield serde_json::to_value(invite_code)?;
1872                }
1873                "get_operation" => {
1874                    let req: GetOperationIdRequest = serde_json::from_value(params)?;
1875                    let operation = self.operation_log().get_operation(req.operation_id).await;
1876                    yield serde_json::to_value(operation)?;
1877                }
1878                "list_operations" => {
1879                    let req: ListOperationsParams = serde_json::from_value(params)?;
1880                    let limit = if req.limit.is_none() && req.last_seen.is_none() {
1881                        usize::MAX
1882                    } else {
1883                        req.limit.unwrap_or(usize::MAX)
1884                    };
1885                    let operations = self.operation_log()
1886                        .paginate_operations_rev(limit, req.last_seen)
1887                        .await;
1888                    yield serde_json::to_value(operations)?;
1889                }
1890                "session_count" => {
1891                    let count = self.fetch_session_count().await?;
1892                    yield serde_json::to_value(count)?;
1893                }
1894                "has_pending_recoveries" => {
1895                    let has_pending = self.has_pending_recoveries();
1896                    yield serde_json::to_value(has_pending)?;
1897                }
1898                "wait_for_all_recoveries" => {
1899                    self.wait_for_all_recoveries().await?;
1900                    yield serde_json::Value::Null;
1901                }
1902                "subscribe_to_recovery_progress" => {
1903                    let mut stream = self.subscribe_to_recovery_progress();
1904                    while let Some((module_id, progress)) = stream.next().await {
1905                        yield serde_json::json!({
1906                            "module_id": module_id,
1907                            "progress": progress
1908                        });
1909                    }
1910                }
1911                #[allow(deprecated)]
1912                "backup_to_federation" => {
1913                    let metadata = if params.is_null() {
1914                        Metadata::from_json_serialized(serde_json::json!({}))
1915                    } else {
1916                        Metadata::from_json_serialized(params)
1917                    };
1918                    self.backup_to_federation(metadata).await?;
1919                    yield serde_json::Value::Null;
1920                }
1921                _ => {
1922                    Err(anyhow::format_err!("Unknown method: {}", method))?;
1923                    unreachable!()
1924                },
1925            }
1926        })
1927    }
1928
1929    pub async fn log_event<E>(&self, module_id: Option<ModuleInstanceId>, event: E)
1930    where
1931        E: Event + Send,
1932    {
1933        let mut dbtx = self.db.begin_transaction().await;
1934        self.log_event_dbtx(&mut dbtx, module_id, event).await;
1935        dbtx.commit_tx().await;
1936    }
1937
1938    pub async fn log_event_dbtx<E, Cap>(
1939        &self,
1940        dbtx: &mut DatabaseTransaction<'_, Cap>,
1941        module_id: Option<ModuleInstanceId>,
1942        event: E,
1943    ) where
1944        E: Event + Send,
1945        Cap: Send,
1946    {
1947        dbtx.log_event(self.log_ordering_wakeup_tx.clone(), module_id, event)
1948            .await;
1949    }
1950
1951    pub async fn log_event_raw_dbtx<Cap>(
1952        &self,
1953        dbtx: &mut DatabaseTransaction<'_, Cap>,
1954        kind: EventKind,
1955        module: Option<(ModuleKind, ModuleInstanceId)>,
1956        payload: Vec<u8>,
1957        persist: EventPersistence,
1958    ) where
1959        Cap: Send,
1960    {
1961        let module_id = module.as_ref().map(|m| m.1);
1962        let module_kind = module.map(|m| m.0);
1963        dbtx.log_event_raw(
1964            self.log_ordering_wakeup_tx.clone(),
1965            kind,
1966            module_kind,
1967            module_id,
1968            payload,
1969            persist,
1970        )
1971        .await;
1972    }
1973
1974    /// Built in event log (trimmable) tracker
1975    ///
1976    /// For the convenience of downstream applications, [`Client`] can store
1977    /// internally event log position for the main application using/driving it.
1978    ///
1979    /// Note that this position is a singleton, so this tracker should not be
1980    /// used for multiple purposes or applications, etc. at the same time.
1981    ///
1982    /// If the application has a need to follow log using multiple trackers, it
1983    /// should implement own [`DynEventLogTrimableTracker`] and store its
1984    /// persient data by itself.
1985    pub fn built_in_application_event_log_tracker(&self) -> DynEventLogTrimableTracker {
1986        struct BuiltInApplicationEventLogTracker;
1987
1988        #[apply(async_trait_maybe_send!)]
1989        impl EventLogTrimableTracker for BuiltInApplicationEventLogTracker {
1990            // Store position in the event log
1991            async fn store(
1992                &mut self,
1993                dbtx: &mut DatabaseTransaction<NonCommittable>,
1994                pos: EventLogTrimableId,
1995            ) -> anyhow::Result<()> {
1996                dbtx.insert_entry(&DefaultApplicationEventLogKey, &pos)
1997                    .await;
1998                Ok(())
1999            }
2000
2001            /// Load the last previous stored position (or None if never stored)
2002            async fn load(
2003                &mut self,
2004                dbtx: &mut DatabaseTransaction<NonCommittable>,
2005            ) -> anyhow::Result<Option<EventLogTrimableId>> {
2006                Ok(dbtx.get_value(&DefaultApplicationEventLogKey).await)
2007            }
2008        }
2009        Box::new(BuiltInApplicationEventLogTracker)
2010    }
2011
2012    /// Like [`Self::handle_events`] but for historical data.
2013    ///
2014    ///
2015    /// This function can be used to process subset of events
2016    /// that is infrequent and important enough to be persisted
2017    /// forever. Most applications should prefer to use [`Self::handle_events`]
2018    /// which emits *all* events.
2019    pub async fn handle_historical_events<F, R>(
2020        &self,
2021        tracker: fedimint_eventlog::DynEventLogTracker,
2022        handler_fn: F,
2023    ) -> anyhow::Result<()>
2024    where
2025        F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
2026        R: Future<Output = anyhow::Result<()>>,
2027    {
2028        fedimint_eventlog::handle_events(
2029            self.db.clone(),
2030            tracker,
2031            self.log_event_added_rx.clone(),
2032            handler_fn,
2033        )
2034        .await
2035    }
2036
2037    /// Handle events emitted by the client
2038    ///
2039    /// This is a preferred method for reactive & asynchronous
2040    /// processing of events emitted by the client.
2041    ///
2042    /// It needs a `tracker` that will persist the position in the log
2043    /// as it is being handled. You can use the
2044    /// [`Client::built_in_application_event_log_tracker`] if this call is
2045    /// used for the single main application handling this instance of the
2046    /// [`Client`]. Otherwise you should implement your own tracker.
2047    ///
2048    /// This handler will call `handle_fn` with ever event emitted by
2049    /// [`Client`], including transient ones. The caller should atomically
2050    /// handle each event it is interested in and ignore other ones.
2051    ///
2052    /// This method returns only when client is shutting down or on internal
2053    /// error, so typically should be called in a background task dedicated
2054    /// to handling events.
2055    pub async fn handle_events<F, R>(
2056        &self,
2057        tracker: fedimint_eventlog::DynEventLogTrimableTracker,
2058        handler_fn: F,
2059    ) -> anyhow::Result<()>
2060    where
2061        F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
2062        R: Future<Output = anyhow::Result<()>>,
2063    {
2064        fedimint_eventlog::handle_trimable_events(
2065            self.db.clone(),
2066            tracker,
2067            self.log_event_added_rx.clone(),
2068            handler_fn,
2069        )
2070        .await
2071    }
2072
2073    pub async fn get_event_log(
2074        &self,
2075        pos: Option<EventLogId>,
2076        limit: u64,
2077    ) -> Vec<PersistedLogEntry> {
2078        self.get_event_log_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
2079            .await
2080    }
2081
2082    pub async fn get_event_log_trimable(
2083        &self,
2084        pos: Option<EventLogTrimableId>,
2085        limit: u64,
2086    ) -> Vec<PersistedLogEntry> {
2087        self.get_event_log_trimable_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
2088            .await
2089    }
2090
2091    pub async fn get_event_log_dbtx<Cap>(
2092        &self,
2093        dbtx: &mut DatabaseTransaction<'_, Cap>,
2094        pos: Option<EventLogId>,
2095        limit: u64,
2096    ) -> Vec<PersistedLogEntry>
2097    where
2098        Cap: Send,
2099    {
2100        dbtx.get_event_log(pos, limit).await
2101    }
2102
2103    pub async fn get_event_log_trimable_dbtx<Cap>(
2104        &self,
2105        dbtx: &mut DatabaseTransaction<'_, Cap>,
2106        pos: Option<EventLogTrimableId>,
2107        limit: u64,
2108    ) -> Vec<PersistedLogEntry>
2109    where
2110        Cap: Send,
2111    {
2112        dbtx.get_event_log_trimable(pos, limit).await
2113    }
2114
2115    /// Register to receiver all new transient (unpersisted) events
2116    pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
2117        self.log_event_added_transient_tx.subscribe()
2118    }
2119
2120    /// Get a receiver that signals when new events are added to the event log
2121    pub fn log_event_added_rx(&self) -> watch::Receiver<()> {
2122        self.log_event_added_rx.clone()
2123    }
2124
2125    pub fn iroh_enable_dht(&self) -> bool {
2126        self.iroh_enable_dht
2127    }
2128
2129    pub(crate) async fn run_core_migrations(
2130        db_no_decoders: &Database,
2131    ) -> Result<(), anyhow::Error> {
2132        let mut dbtx = db_no_decoders.begin_transaction().await;
2133        apply_migrations_core_client_dbtx(&mut dbtx.to_ref_nc(), "fedimint-client".to_string())
2134            .await?;
2135        if is_running_in_test_env() {
2136            verify_client_db_integrity_dbtx(&mut dbtx.to_ref_nc()).await;
2137        }
2138        dbtx.commit_tx_result().await?;
2139        Ok(())
2140    }
2141
2142    /// Iterator over primary modules for a given `unit`
2143    fn primary_modules_for_unit(
2144        &self,
2145        unit: AmountUnit,
2146    ) -> impl Iterator<Item = (ModuleInstanceId, &DynClientModule)> {
2147        self.primary_modules
2148            .iter()
2149            .flat_map(move |(_prio, candidates)| {
2150                candidates
2151                    .specific
2152                    .get(&unit)
2153                    .into_iter()
2154                    .flatten()
2155                    .copied()
2156                    // within same priority, wildcard matches come last
2157                    .chain(candidates.wildcard.iter().copied())
2158            })
2159            .map(|id| (id, self.modules.get_expect(id)))
2160    }
2161
2162    /// Primary module to use for `unit`
2163    ///
2164    /// Currently, just pick the first (highest priority) match
2165    pub fn primary_module_for_unit(
2166        &self,
2167        unit: AmountUnit,
2168    ) -> Option<(ModuleInstanceId, &DynClientModule)> {
2169        self.primary_modules_for_unit(unit).next()
2170    }
2171
2172    /// [`Self::primary_module_for_unit`] for Bitcoin
2173    pub fn primary_module_for_btc(&self) -> (ModuleInstanceId, &DynClientModule) {
2174        self.primary_module_for_unit(AmountUnit::BITCOIN)
2175            .expect("No primary module for Bitcoin")
2176    }
2177}
2178
2179#[apply(async_trait_maybe_send!)]
2180impl ClientContextIface for Client {
2181    fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
2182        Client::get_module(self, instance)
2183    }
2184
2185    fn api_clone(&self) -> DynGlobalApi {
2186        Client::api_clone(self)
2187    }
2188    fn decoders(&self) -> &ModuleDecoderRegistry {
2189        Client::decoders(self)
2190    }
2191
2192    async fn finalize_and_submit_transaction(
2193        &self,
2194        operation_id: OperationId,
2195        operation_type: &str,
2196        operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
2197        tx_builder: TransactionBuilder,
2198    ) -> anyhow::Result<OutPointRange> {
2199        Client::finalize_and_submit_transaction(
2200            self,
2201            operation_id,
2202            operation_type,
2203            // |out_point_range| operation_meta_gen(out_point_range),
2204            &operation_meta_gen,
2205            tx_builder,
2206        )
2207        .await
2208    }
2209
2210    async fn finalize_and_submit_transaction_inner(
2211        &self,
2212        dbtx: &mut DatabaseTransaction<'_>,
2213        operation_id: OperationId,
2214        tx_builder: TransactionBuilder,
2215    ) -> anyhow::Result<OutPointRange> {
2216        Client::finalize_and_submit_transaction_inner(self, dbtx, operation_id, tx_builder).await
2217    }
2218
2219    async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
2220        Client::transaction_updates(self, operation_id).await
2221    }
2222
2223    async fn await_primary_module_outputs(
2224        &self,
2225        operation_id: OperationId,
2226        // TODO: make `impl Iterator<Item = ...>`
2227        outputs: Vec<OutPoint>,
2228    ) -> anyhow::Result<()> {
2229        Client::await_primary_bitcoin_module_outputs(self, operation_id, outputs).await
2230    }
2231
2232    fn operation_log(&self) -> &dyn IOperationLog {
2233        Client::operation_log(self)
2234    }
2235
2236    async fn has_active_states(&self, operation_id: OperationId) -> bool {
2237        Client::has_active_states(self, operation_id).await
2238    }
2239
2240    async fn operation_exists(&self, operation_id: OperationId) -> bool {
2241        Client::operation_exists(self, operation_id).await
2242    }
2243
2244    async fn config(&self) -> ClientConfig {
2245        Client::config(self).await
2246    }
2247
2248    fn db(&self) -> &Database {
2249        Client::db(self)
2250    }
2251
2252    fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static)) {
2253        Client::executor(self)
2254    }
2255
2256    async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
2257        Client::invite_code(self, peer).await
2258    }
2259
2260    fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
2261        Client::get_internal_payment_markers(self)
2262    }
2263
2264    async fn log_event_json(
2265        &self,
2266        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
2267        module_kind: Option<ModuleKind>,
2268        module_id: ModuleInstanceId,
2269        kind: EventKind,
2270        payload: serde_json::Value,
2271        persist: EventPersistence,
2272    ) {
2273        dbtx.ensure_global()
2274            .expect("Must be called with global dbtx");
2275        self.log_event_raw_dbtx(
2276            dbtx,
2277            kind,
2278            module_kind.map(|kind| (kind, module_id)),
2279            serde_json::to_vec(&payload).expect("Serialization can't fail"),
2280            persist,
2281        )
2282        .await;
2283    }
2284
2285    async fn read_operation_active_states<'dbtx>(
2286        &self,
2287        operation_id: OperationId,
2288        module_id: ModuleInstanceId,
2289        dbtx: &'dbtx mut DatabaseTransaction<'_>,
2290    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>
2291    {
2292        Box::pin(
2293            dbtx.find_by_prefix(&ActiveModuleOperationStateKeyPrefix {
2294                operation_id,
2295                module_instance: module_id,
2296            })
2297            .await
2298            .map(move |(k, v)| (k.0, v)),
2299        )
2300    }
2301    async fn read_operation_inactive_states<'dbtx>(
2302        &self,
2303        operation_id: OperationId,
2304        module_id: ModuleInstanceId,
2305        dbtx: &'dbtx mut DatabaseTransaction<'_>,
2306    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>
2307    {
2308        Box::pin(
2309            dbtx.find_by_prefix(&InactiveModuleOperationStateKeyPrefix {
2310                operation_id,
2311                module_instance: module_id,
2312            })
2313            .await
2314            .map(move |(k, v)| (k.0, v)),
2315        )
2316    }
2317}
2318
2319// TODO: impl `Debug` for `Client` and derive here
2320impl fmt::Debug for Client {
2321    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2322        write!(f, "Client")
2323    }
2324}
2325
2326pub fn client_decoders<'a>(
2327    registry: &ModuleInitRegistry<DynClientModuleInit>,
2328    module_kinds: impl Iterator<Item = (ModuleInstanceId, &'a ModuleKind)>,
2329) -> ModuleDecoderRegistry {
2330    let mut modules = BTreeMap::new();
2331    for (id, kind) in module_kinds {
2332        let Some(init) = registry.get(kind) else {
2333            debug!("Detected configuration for unsupported module id: {id}, kind: {kind}");
2334            continue;
2335        };
2336
2337        modules.insert(
2338            id,
2339            (
2340                kind.clone(),
2341                IClientModuleInit::decoder(AsRef::<dyn IClientModuleInit + 'static>::as_ref(init)),
2342            ),
2343        );
2344    }
2345    ModuleDecoderRegistry::from(modules)
2346}