Skip to main content

fedimint_client/
client.rs

1use std::collections::{BTreeMap, HashSet};
2use std::fmt::{self, Formatter};
3use std::future::{Future, pending};
4use std::ops::Range;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9use anyhow::{Context as _, anyhow, bail, format_err};
10use async_stream::try_stream;
11use bitcoin::key::Secp256k1;
12use bitcoin::key::rand::thread_rng;
13use bitcoin::secp256k1::{self, PublicKey};
14use fedimint_api_client::api::global_api::with_request_hook::ApiRequestHook;
15use fedimint_api_client::api::{
16    ApiVersionSet, DynGlobalApi, FederationApiExt as _, FederationResult, IGlobalFederationApi,
17};
18use fedimint_bitcoind::DynBitcoindRpc;
19use fedimint_client_module::module::recovery::RecoveryProgress;
20use fedimint_client_module::module::{
21    ClientContextIface, ClientModule, ClientModuleRegistry, DynClientModule, FinalClientIface,
22    IClientModule, IdxRange, OutPointRange, PrimaryModulePriority,
23};
24use fedimint_client_module::oplog::IOperationLog;
25use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy as _};
26use fedimint_client_module::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
27use fedimint_client_module::sm::{ActiveStateMeta, DynState, InactiveStateMeta};
28use fedimint_client_module::transaction::{
29    TRANSACTION_SUBMISSION_MODULE_INSTANCE, TransactionBuilder, TxSubmissionStates,
30    TxSubmissionStatesSM,
31};
32use fedimint_client_module::{
33    AddStateMachinesResult, ClientModuleInstance, GetInviteCodeRequest, ModuleGlobalContextGen,
34    ModuleRecoveryCompleted, TransactionUpdates, TxCreatedEvent,
35};
36use fedimint_connectors::ConnectorRegistry;
37use fedimint_core::config::{
38    ClientConfig, FederationId, GlobalClientConfig, JsonClientConfig, ModuleInitRegistry,
39};
40use fedimint_core::core::{DynInput, DynOutput, ModuleInstanceId, ModuleKind, OperationId};
41use fedimint_core::db::{
42    AutocommitError, Database, DatabaseRecord, DatabaseTransaction,
43    IDatabaseTransactionOpsCore as _, IDatabaseTransactionOpsCoreTyped as _, NonCommittable,
44};
45use fedimint_core::encoding::{Decodable, Encodable};
46use fedimint_core::endpoint_constants::{CLIENT_CONFIG_ENDPOINT, VERSION_ENDPOINT};
47use fedimint_core::envs::is_running_in_test_env;
48use fedimint_core::invite_code::InviteCode;
49use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
50use fedimint_core::module::{
51    AmountUnit, Amounts, ApiRequestErased, ApiVersion, MultiApiVersion,
52    SupportedApiVersionsSummary, SupportedCoreApiVersions, SupportedModuleApiVersions,
53};
54use fedimint_core::net::api_announcement::SignedApiAnnouncement;
55use fedimint_core::runtime::sleep;
56use fedimint_core::task::{Elapsed, MaybeSend, MaybeSync, TaskGroup};
57use fedimint_core::transaction::Transaction;
58use fedimint_core::util::backoff_util::custom_backoff;
59use fedimint_core::util::{
60    BoxStream, FmtCompact as _, FmtCompactAnyhow as _, SafeUrl, backoff_util, retry,
61};
62use fedimint_core::{
63    Amount, ChainId, NumPeers, OutPoint, PeerId, apply, async_trait_maybe_send, maybe_add_send,
64    maybe_add_send_sync, runtime,
65};
66use fedimint_derive_secret::DerivableSecret;
67use fedimint_eventlog::{
68    DBTransactionEventLogExt as _, DynEventLogTrimableTracker, Event, EventKind, EventLogEntry,
69    EventLogId, EventLogTrimableId, EventLogTrimableTracker, EventPersistence, PersistedLogEntry,
70};
71use fedimint_logging::{LOG_CLIENT, LOG_CLIENT_NET_API, LOG_CLIENT_RECOVERY};
72use futures::stream::FuturesUnordered;
73use futures::{Stream, StreamExt as _};
74use global_ctx::ModuleGlobalClientContext;
75use serde::{Deserialize, Serialize};
76use tokio::sync::{broadcast, watch};
77use tokio_stream::wrappers::WatchStream;
78use tracing::{debug, info, warn};
79
80use crate::ClientBuilder;
81use crate::api_announcements::{ApiAnnouncementPrefix, get_api_urls};
82use crate::backup::Metadata;
83use crate::client::event_log::DefaultApplicationEventLogKey;
84use crate::db::{
85    ApiSecretKey, CachedApiVersionSet, CachedApiVersionSetKey, ChainIdKey,
86    ChronologicalOperationLogKey, ClientConfigKey, ClientMetadataKey, ClientModuleRecovery,
87    ClientModuleRecoveryState, EncodedClientSecretKey, OperationLogKey, PeerLastApiVersionsSummary,
88    PeerLastApiVersionsSummaryKey, PendingClientConfigKey, apply_migrations_core_client_dbtx,
89    get_decoded_client_secret, verify_client_db_integrity_dbtx,
90};
91use crate::meta::MetaService;
92use crate::module_init::{ClientModuleInitRegistry, DynClientModuleInit, IClientModuleInit};
93use crate::oplog::OperationLog;
94use crate::sm::executor::{
95    ActiveModuleOperationStateKeyPrefix, ActiveOperationStateKeyPrefix, Executor,
96    InactiveModuleOperationStateKeyPrefix, InactiveOperationStateKeyPrefix,
97};
98
99pub(crate) mod builder;
100pub(crate) mod event_log;
101pub(crate) mod global_ctx;
102pub(crate) mod handle;
103
104/// List of core api versions supported by the implementation.
105/// Notably `major` version is the one being supported, and corresponding
106/// `minor` version is the one required (for given `major` version).
107const SUPPORTED_CORE_API_VERSIONS: &[fedimint_core::module::ApiVersion] =
108    &[ApiVersion { major: 0, minor: 0 }];
109
110/// Primary module candidates at specific priority level
111#[derive(Default)]
112pub(crate) struct PrimaryModuleCandidates {
113    /// Modules that listed specific units they handle
114    specific: BTreeMap<AmountUnit, Vec<ModuleInstanceId>>,
115    /// Modules handling any unit
116    wildcard: Vec<ModuleInstanceId>,
117}
118
119/// Main client type
120///
121/// A handle and API to interacting with a single federation. End user
122/// applications that want to support interacting with multiple federations at
123/// the same time, will need to instantiate and manage multiple instances of
124/// this struct.
125///
126/// Under the hood it is starting and managing service tasks, state machines,
127/// database and other resources required.
128///
129/// This type is shared externally and internally, and
130/// [`crate::ClientHandle`] is responsible for external lifecycle management
131/// and resource freeing of the [`Client`].
132pub struct Client {
133    final_client: FinalClientIface,
134    config: tokio::sync::RwLock<ClientConfig>,
135    api_secret: Option<String>,
136    decoders: ModuleDecoderRegistry,
137    connectors: ConnectorRegistry,
138    db: Database,
139    federation_id: FederationId,
140    federation_config_meta: BTreeMap<String, String>,
141    primary_modules: BTreeMap<PrimaryModulePriority, PrimaryModuleCandidates>,
142    pub(crate) modules: ClientModuleRegistry,
143    module_inits: ClientModuleInitRegistry,
144    executor: Executor,
145    pub(crate) api: DynGlobalApi,
146    root_secret: DerivableSecret,
147    operation_log: OperationLog,
148    secp_ctx: Secp256k1<secp256k1::All>,
149    meta_service: Arc<MetaService>,
150
151    task_group: TaskGroup,
152
153    /// Updates about client recovery progress
154    client_recovery_progress_receiver:
155        watch::Receiver<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
156
157    /// Internal client sender to wake up log ordering task every time a
158    /// (unuordered) log event is added.
159    log_ordering_wakeup_tx: watch::Sender<()>,
160    /// Receiver for events fired every time (ordered) log event is added.
161    log_event_added_rx: watch::Receiver<()>,
162    log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
163    request_hook: ApiRequestHook,
164    iroh_enable_dht: bool,
165    iroh_enable_next: bool,
166    /// User-provided Bitcoin RPC client for modules to use
167    ///
168    /// Stored here for potential future access; currently passed to modules
169    /// during initialization.
170    #[allow(dead_code)]
171    user_bitcoind_rpc: Option<DynBitcoindRpc>,
172    /// User-provided Bitcoin RPC factory for when ChainId is not available
173    ///
174    /// This is used as a fallback when the federation doesn't support ChainId.
175    /// Modules can call this with a URL from their config to get an RPC client.
176    pub(crate) user_bitcoind_rpc_no_chain_id:
177        Option<fedimint_client_module::module::init::BitcoindRpcNoChainIdFactory>,
178}
179
180#[derive(Debug, Serialize, Deserialize)]
181struct ListOperationsParams {
182    limit: Option<usize>,
183    last_seen: Option<ChronologicalOperationLogKey>,
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize)]
187pub struct GetOperationIdRequest {
188    operation_id: OperationId,
189}
190
191#[derive(Debug, Clone, Serialize, Deserialize)]
192pub struct GetBalanceChangesRequest {
193    #[serde(default = "AmountUnit::bitcoin")]
194    unit: AmountUnit,
195}
196
197impl Client {
198    /// Initialize a client builder that can be configured to create a new
199    /// client.
200    pub async fn builder() -> anyhow::Result<ClientBuilder> {
201        Ok(ClientBuilder::new())
202    }
203
204    pub fn api(&self) -> &(dyn IGlobalFederationApi + 'static) {
205        self.api.as_ref()
206    }
207
208    pub fn api_clone(&self) -> DynGlobalApi {
209        self.api.clone()
210    }
211
212    /// Returns a stream that emits the current connection status of all peers
213    /// whenever any peer's status changes. Emits initial state immediately.
214    pub fn connection_status_stream(&self) -> impl Stream<Item = BTreeMap<PeerId, bool>> {
215        self.api.connection_status_stream()
216    }
217
218    /// Establishes connections to all federation guardians once.
219    ///
220    /// Spawns tasks to connect to each guardian in the federation. Unlike
221    /// [`Self::spawn_federation_reconnect`], this only attempts to establish
222    /// connections once and completes - it does not maintain or reconnect.
223    ///
224    /// Useful for warming up connections before making API calls.
225    pub fn federation_reconnect(&self) {
226        let peers: Vec<PeerId> = self.api.all_peers().iter().copied().collect();
227
228        for peer_id in peers {
229            let api = self.api.clone();
230            self.task_group.spawn_cancellable(
231                format!("federation-reconnect-once-{peer_id}"),
232                async move {
233                    if let Err(e) = api.get_peer_connection(peer_id).await {
234                        debug!(
235                            target: LOG_CLIENT_NET_API,
236                            %peer_id,
237                            err = %e.fmt_compact(),
238                            "Failed to connect to peer"
239                        );
240                    }
241                },
242            );
243        }
244    }
245
246    /// Spawns background tasks that proactively maintain connections to all
247    /// federation guardians unconditionally.
248    ///
249    /// For each guardian, a task loops: establishes a connection, waits for it
250    /// to disconnect, then reconnects.
251    ///
252    /// The tasks are cancellable and will be terminated when the client shuts
253    /// down.
254    ///
255    /// By default [`Client`] creates connections on demand only, and share
256    /// them as long as they are alive.
257    ///
258    /// Reconnecting continuously might increase data and battery usage,
259    /// but potentially improve UX, depending on the time it takes to establish
260    /// a new network connection in given network conditions.
261    ///
262    /// Downstream users are encouraged to implement their own version of
263    /// this function, e.g. by reconnecting only when it is anticipated
264    /// that connection might be needed, or alternatively pre-warm
265    /// connections by calling [`Self::federation_reconnect`] when it seems
266    /// worthwhile.
267    pub fn spawn_federation_reconnect(&self) {
268        let peers: Vec<PeerId> = self.api.all_peers().iter().copied().collect();
269
270        for peer_id in peers {
271            let api = self.api.clone();
272            self.task_group.spawn_cancellable(
273                format!("federation-reconnect-{peer_id}"),
274                async move {
275                    loop {
276                        match api.get_peer_connection(peer_id).await {
277                            Ok(conn) => {
278                                conn.await_disconnection().await;
279                            }
280                            Err(e) => {
281                                // Connection failed, backoff is handled inside
282                                // get_or_create_connection
283                                debug!(
284                                    target: LOG_CLIENT_NET_API,
285                                    %peer_id,
286                                    err = %e.fmt_compact(),
287                                    "Failed to connect to peer, will retry"
288                                );
289                            }
290                        }
291                    }
292                },
293            );
294        }
295    }
296
297    /// Get the [`TaskGroup`] that is tied to Client's lifetime.
298    pub fn task_group(&self) -> &TaskGroup {
299        &self.task_group
300    }
301
302    /// Returns all registered Prometheus metrics encoded in text format.
303    ///
304    /// This can be used by downstream clients to expose metrics via their own
305    /// HTTP server or print them for debugging purposes.
306    pub fn get_metrics() -> anyhow::Result<String> {
307        fedimint_metrics::get_metrics()
308    }
309
310    /// Useful for our CLI tooling, not meant for external use
311    #[doc(hidden)]
312    pub fn executor(&self) -> &Executor {
313        &self.executor
314    }
315
316    pub async fn get_config_from_db(db: &Database) -> Option<ClientConfig> {
317        let mut dbtx = db.begin_transaction_nc().await;
318        dbtx.get_value(&ClientConfigKey).await
319    }
320
321    pub async fn get_pending_config_from_db(db: &Database) -> Option<ClientConfig> {
322        let mut dbtx = db.begin_transaction_nc().await;
323        dbtx.get_value(&PendingClientConfigKey).await
324    }
325
326    pub async fn get_api_secret_from_db(db: &Database) -> Option<String> {
327        let mut dbtx = db.begin_transaction_nc().await;
328        dbtx.get_value(&ApiSecretKey).await
329    }
330
331    pub async fn store_encodable_client_secret<T: Encodable>(
332        db: &Database,
333        secret: T,
334    ) -> anyhow::Result<()> {
335        let mut dbtx = db.begin_transaction().await;
336
337        // Don't overwrite an existing secret
338        if dbtx.get_value(&EncodedClientSecretKey).await.is_some() {
339            bail!("Encoded client secret already exists, cannot overwrite")
340        }
341
342        let encoded_secret = T::consensus_encode_to_vec(&secret);
343        dbtx.insert_entry(&EncodedClientSecretKey, &encoded_secret)
344            .await;
345        dbtx.commit_tx().await;
346        Ok(())
347    }
348
349    pub async fn load_decodable_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
350        let Some(secret) = Self::load_decodable_client_secret_opt(db).await? else {
351            bail!("Encoded client secret not present in DB")
352        };
353
354        Ok(secret)
355    }
356    pub async fn load_decodable_client_secret_opt<T: Decodable>(
357        db: &Database,
358    ) -> anyhow::Result<Option<T>> {
359        let mut dbtx = db.begin_transaction_nc().await;
360
361        let client_secret = dbtx.get_value(&EncodedClientSecretKey).await;
362
363        Ok(match client_secret {
364            Some(client_secret) => Some(
365                T::consensus_decode_whole(&client_secret, &ModuleRegistry::default())
366                    .map_err(|e| anyhow!("Decoding failed: {e}"))?,
367            ),
368            None => None,
369        })
370    }
371
372    pub async fn load_or_generate_client_secret(db: &Database) -> anyhow::Result<[u8; 64]> {
373        let client_secret = match Self::load_decodable_client_secret::<[u8; 64]>(db).await {
374            Ok(secret) => secret,
375            _ => {
376                let secret = PlainRootSecretStrategy::random(&mut thread_rng());
377                Self::store_encodable_client_secret(db, secret)
378                    .await
379                    .expect("Storing client secret must work");
380                secret
381            }
382        };
383        Ok(client_secret)
384    }
385
386    pub async fn is_initialized(db: &Database) -> bool {
387        let mut dbtx = db.begin_transaction_nc().await;
388        dbtx.raw_get_bytes(&[ClientConfigKey::DB_PREFIX])
389            .await
390            .expect("Unrecoverable error occurred while reading and entry from the database")
391            .is_some()
392    }
393
394    pub fn start_executor(self: &Arc<Self>) {
395        debug!(
396            target: LOG_CLIENT,
397            "Starting fedimint client executor",
398        );
399        self.executor.start_executor(self.context_gen());
400    }
401
402    pub fn federation_id(&self) -> FederationId {
403        self.federation_id
404    }
405
406    fn context_gen(self: &Arc<Self>) -> ModuleGlobalContextGen {
407        let client_inner = Arc::downgrade(self);
408        Arc::new(move |module_instance, operation| {
409            ModuleGlobalClientContext {
410                client: client_inner
411                    .clone()
412                    .upgrade()
413                    .expect("ModuleGlobalContextGen called after client was dropped"),
414                module_instance_id: module_instance,
415                operation,
416            }
417            .into()
418        })
419    }
420
421    pub async fn config(&self) -> ClientConfig {
422        self.config.read().await.clone()
423    }
424
425    // TODO: change to `-> Option<&str>`
426    pub fn api_secret(&self) -> &Option<String> {
427        &self.api_secret
428    }
429
430    /// Returns the core API version that the federation supports
431    ///
432    /// This reads from the cached version stored during client initialization.
433    /// If no cache is available (e.g., during initial setup), returns a default
434    /// version (0, 0).
435    pub async fn core_api_version(&self) -> ApiVersion {
436        // Try to get from cache. If not available, return a conservative
437        // default. The cache should always be populated after successful client init.
438        self.db
439            .begin_transaction_nc()
440            .await
441            .get_value(&CachedApiVersionSetKey)
442            .await
443            .map(|cached: CachedApiVersionSet| cached.0.core)
444            .unwrap_or(ApiVersion { major: 0, minor: 0 })
445    }
446
447    /// Returns the chain ID (bitcoin block hash at height 1) from the
448    /// federation
449    ///
450    /// This is cached in the database after the first successful fetch.
451    /// The chain ID uniquely identifies which bitcoin network the federation
452    /// operates on (mainnet, testnet, signet, regtest).
453    pub async fn chain_id(&self) -> anyhow::Result<ChainId> {
454        // Check cache first
455        if let Some(chain_id) = self
456            .db
457            .begin_transaction_nc()
458            .await
459            .get_value(&ChainIdKey)
460            .await
461        {
462            return Ok(chain_id);
463        }
464
465        // Fetch from federation with consensus
466        let chain_id = self.api.chain_id().await?;
467
468        // Cache the result
469        let mut dbtx = self.db.begin_transaction().await;
470        dbtx.insert_entry(&ChainIdKey, &chain_id).await;
471        dbtx.commit_tx().await;
472
473        Ok(chain_id)
474    }
475
476    pub fn decoders(&self) -> &ModuleDecoderRegistry {
477        &self.decoders
478    }
479
480    /// Returns a reference to the module, panics if not found
481    fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
482        self.try_get_module(instance)
483            .expect("Module instance not found")
484    }
485
486    fn try_get_module(
487        &self,
488        instance: ModuleInstanceId,
489    ) -> Option<&maybe_add_send_sync!(dyn IClientModule)> {
490        Some(self.modules.get(instance)?.as_ref())
491    }
492
493    pub fn has_module(&self, instance: ModuleInstanceId) -> bool {
494        self.modules.get(instance).is_some()
495    }
496
497    /// Returns the input amount and output amount of a transaction
498    ///
499    /// # Panics
500    /// If any of the input or output versions in the transaction builder are
501    /// unknown by the respective module.
502    fn transaction_builder_get_balance(&self, builder: &TransactionBuilder) -> (Amounts, Amounts) {
503        // FIXME: prevent overflows, currently not suitable for untrusted input
504        let mut in_amounts = Amounts::ZERO;
505        let mut out_amounts = Amounts::ZERO;
506        let mut fee_amounts = Amounts::ZERO;
507
508        for input in builder.inputs() {
509            let module = self.get_module(input.input.module_instance_id());
510
511            let item_fees = module.input_fee(&input.amounts, &input.input).expect(
512                "We only build transactions with input versions that are supported by the module",
513            );
514
515            in_amounts.checked_add_mut(&input.amounts);
516            fee_amounts.checked_add_mut(&item_fees);
517        }
518
519        for output in builder.outputs() {
520            let module = self.get_module(output.output.module_instance_id());
521
522            let item_fees = module.output_fee(&output.amounts, &output.output).expect(
523                "We only build transactions with output versions that are supported by the module",
524            );
525
526            out_amounts.checked_add_mut(&output.amounts);
527            fee_amounts.checked_add_mut(&item_fees);
528        }
529
530        out_amounts.checked_add_mut(&fee_amounts);
531        (in_amounts, out_amounts)
532    }
533
534    pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
535        Ok((self.federation_id().to_fake_ln_pub_key(&self.secp_ctx)?, 0))
536    }
537
538    /// Get metadata value from the federation config itself
539    pub fn get_config_meta(&self, key: &str) -> Option<String> {
540        self.federation_config_meta.get(key).cloned()
541    }
542
543    pub(crate) fn root_secret(&self) -> DerivableSecret {
544        self.root_secret.clone()
545    }
546
547    pub async fn add_state_machines(
548        &self,
549        dbtx: &mut DatabaseTransaction<'_>,
550        states: Vec<DynState>,
551    ) -> AddStateMachinesResult {
552        self.executor.add_state_machines_dbtx(dbtx, states).await
553    }
554
555    // TODO: implement as part of [`OperationLog`]
556    pub async fn get_active_operations(&self) -> HashSet<OperationId> {
557        let active_states = self.executor.get_active_states().await;
558        let mut active_operations = HashSet::with_capacity(active_states.len());
559        let mut dbtx = self.db().begin_transaction_nc().await;
560        for (state, _) in active_states {
561            let operation_id = state.operation_id();
562            if dbtx
563                .get_value(&OperationLogKey { operation_id })
564                .await
565                .is_some()
566            {
567                active_operations.insert(operation_id);
568            }
569        }
570        active_operations
571    }
572
573    pub fn operation_log(&self) -> &OperationLog {
574        &self.operation_log
575    }
576
577    /// Get the meta manager to read meta fields.
578    pub fn meta_service(&self) -> &Arc<MetaService> {
579        &self.meta_service
580    }
581
582    /// Get the meta manager to read meta fields.
583    pub async fn get_meta_expiration_timestamp(&self) -> Option<SystemTime> {
584        let meta_service = self.meta_service();
585        let ts = meta_service
586            .get_field::<u64>(self.db(), "federation_expiry_timestamp")
587            .await
588            .and_then(|v| v.value)?;
589        Some(UNIX_EPOCH + Duration::from_secs(ts))
590    }
591
592    /// Adds funding to a transaction or removes over-funding via change.
593    async fn finalize_transaction(
594        &self,
595        dbtx: &mut DatabaseTransaction<'_>,
596        operation_id: OperationId,
597        mut partial_transaction: TransactionBuilder,
598    ) -> anyhow::Result<(Transaction, Vec<DynState>, Range<u64>)> {
599        let (in_amounts, out_amounts) = self.transaction_builder_get_balance(&partial_transaction);
600
601        let mut added_inputs_bundles = vec![];
602        let mut added_outputs_bundles = vec![];
603
604        // The way currently things are implemented is OK for modules which can
605        // collect a fee relative to one being used, but will break down in any
606        // fancy scenarios. Future TODOs:
607        //
608        // * create_final_inputs_and_outputs needs to get broken down, so we can use
609        //   primary modules using priorities (possibly separate prios for inputs and
610        //   outputs to be able to drain modules, etc.); we need the split "check if
611        //   possible" and "take" steps,
612        // * extra inputs and outputs adding fees needs to be taken into account,
613        //   possibly with some looping
614        for unit in in_amounts.units().union(&out_amounts.units()) {
615            let input_amount = in_amounts.get(unit).copied().unwrap_or_default();
616            let output_amount = out_amounts.get(unit).copied().unwrap_or_default();
617            if input_amount == output_amount {
618                continue;
619            }
620
621            let Some((module_id, module)) = self.primary_module_for_unit(*unit) else {
622                bail!("No module to balance a partial transaction (affected unit: {unit:?}");
623            };
624
625            let (added_input_bundle, added_output_bundle) = module
626                .create_final_inputs_and_outputs(
627                    module_id,
628                    dbtx,
629                    operation_id,
630                    *unit,
631                    input_amount,
632                    output_amount,
633                )
634                .await?;
635
636            added_inputs_bundles.push(added_input_bundle);
637            added_outputs_bundles.push(added_output_bundle);
638        }
639
640        // This is the range of  outputs that will be added to the transaction
641        // in order to balance it. Notice that it may stay empty in case the transaction
642        // is already balanced.
643        let change_range = Range {
644            start: partial_transaction.outputs().count() as u64,
645            end: (partial_transaction.outputs().count() as u64
646                + added_outputs_bundles
647                    .iter()
648                    .map(|output| output.outputs().len() as u64)
649                    .sum::<u64>()),
650        };
651
652        for added_inputs in added_inputs_bundles {
653            partial_transaction = partial_transaction.with_inputs(added_inputs);
654        }
655
656        for added_outputs in added_outputs_bundles {
657            partial_transaction = partial_transaction.with_outputs(added_outputs);
658        }
659
660        let (input_amounts, output_amounts) =
661            self.transaction_builder_get_balance(&partial_transaction);
662
663        for (unit, output_amount) in output_amounts {
664            let input_amount = input_amounts.get(&unit).copied().unwrap_or_default();
665
666            assert!(input_amount >= output_amount, "Transaction is underfunded");
667        }
668
669        let (tx, states) = partial_transaction.build(&self.secp_ctx, thread_rng());
670
671        Ok((tx, states, change_range))
672    }
673
674    /// Add funding and/or change to the transaction builder as needed, finalize
675    /// the transaction and submit it to the federation.
676    ///
677    /// ## Errors
678    /// The function will return an error if the operation with given ID already
679    /// exists.
680    ///
681    /// ## Panics
682    /// The function will panic if the database transaction collides with
683    /// other and fails with others too often, this should not happen except for
684    /// excessively concurrent scenarios.
685    pub async fn finalize_and_submit_transaction<F, M>(
686        &self,
687        operation_id: OperationId,
688        operation_type: &str,
689        operation_meta_gen: F,
690        tx_builder: TransactionBuilder,
691    ) -> anyhow::Result<OutPointRange>
692    where
693        F: Fn(OutPointRange) -> M + Clone + MaybeSend + MaybeSync,
694        M: serde::Serialize + MaybeSend,
695    {
696        let operation_type = operation_type.to_owned();
697
698        let autocommit_res = self
699            .db
700            .autocommit(
701                |dbtx, _| {
702                    let operation_type = operation_type.clone();
703                    let tx_builder = tx_builder.clone();
704                    let operation_meta_gen = operation_meta_gen.clone();
705                    Box::pin(async move {
706                        self.finalize_and_submit_transaction_dbtx(
707                            dbtx,
708                            operation_id,
709                            &operation_type,
710                            operation_meta_gen,
711                            tx_builder,
712                        )
713                        .await
714                    })
715                },
716                Some(100), // TODO: handle what happens after 100 retries
717            )
718            .await;
719
720        match autocommit_res {
721            Ok(txid) => Ok(txid),
722            Err(AutocommitError::ClosureError { error, .. }) => Err(error),
723            Err(AutocommitError::CommitFailed {
724                attempts,
725                last_error,
726            }) => panic!(
727                "Failed to commit tx submission dbtx after {attempts} attempts: {last_error}"
728            ),
729        }
730    }
731
732    /// See [`Self::finalize_and_submit_transaction`], just inside a database
733    /// transaction.
734    pub async fn finalize_and_submit_transaction_dbtx<F, M>(
735        &self,
736        dbtx: &mut DatabaseTransaction<'_>,
737        operation_id: OperationId,
738        operation_type: &str,
739        operation_meta_gen: F,
740        tx_builder: TransactionBuilder,
741    ) -> anyhow::Result<OutPointRange>
742    where
743        F: FnOnce(OutPointRange) -> M + MaybeSend,
744        M: serde::Serialize + MaybeSend,
745    {
746        if Client::operation_exists_dbtx(dbtx, operation_id).await {
747            bail!("There already exists an operation with id {operation_id:?}")
748        }
749
750        let out_point_range = self
751            .finalize_and_submit_transaction_inner(dbtx, operation_id, tx_builder)
752            .await?;
753
754        self.operation_log()
755            .add_operation_log_entry_dbtx(
756                dbtx,
757                operation_id,
758                operation_type,
759                operation_meta_gen(out_point_range),
760            )
761            .await;
762
763        Ok(out_point_range)
764    }
765
766    async fn finalize_and_submit_transaction_inner(
767        &self,
768        dbtx: &mut DatabaseTransaction<'_>,
769        operation_id: OperationId,
770        tx_builder: TransactionBuilder,
771    ) -> anyhow::Result<OutPointRange> {
772        let (transaction, mut states, change_range) = self
773            .finalize_transaction(&mut dbtx.to_ref_nc(), operation_id, tx_builder)
774            .await?;
775
776        if transaction.consensus_encode_to_vec().len() > Transaction::MAX_TX_SIZE {
777            let inputs = transaction
778                .inputs
779                .iter()
780                .map(DynInput::module_instance_id)
781                .collect::<Vec<_>>();
782            let outputs = transaction
783                .outputs
784                .iter()
785                .map(DynOutput::module_instance_id)
786                .collect::<Vec<_>>();
787            warn!(
788                target: LOG_CLIENT_NET_API,
789                size=%transaction.consensus_encode_to_vec().len(),
790                ?inputs,
791                ?outputs,
792                "Transaction too large",
793            );
794            debug!(target: LOG_CLIENT_NET_API, ?transaction, "transaction details");
795            bail!(
796                "The generated transaction would be rejected by the federation for being too large."
797            );
798        }
799
800        let txid = transaction.tx_hash();
801
802        debug!(
803            target: LOG_CLIENT_NET_API,
804            %txid,
805            operation_id = %operation_id.fmt_short(),
806            ?transaction,
807            "Finalized and submitting transaction",
808        );
809
810        let tx_submission_sm = DynState::from_typed(
811            TRANSACTION_SUBMISSION_MODULE_INSTANCE,
812            TxSubmissionStatesSM {
813                operation_id,
814                state: TxSubmissionStates::Created(transaction),
815            },
816        );
817        states.push(tx_submission_sm);
818
819        self.executor.add_state_machines_dbtx(dbtx, states).await?;
820
821        self.log_event_dbtx(dbtx, None, TxCreatedEvent { txid, operation_id })
822            .await;
823
824        Ok(OutPointRange::new(txid, IdxRange::from(change_range)))
825    }
826
827    async fn transaction_update_stream(
828        &self,
829        operation_id: OperationId,
830    ) -> BoxStream<'static, TxSubmissionStatesSM> {
831        self.executor
832            .notifier()
833            .module_notifier::<TxSubmissionStatesSM>(
834                TRANSACTION_SUBMISSION_MODULE_INSTANCE,
835                self.final_client.clone(),
836            )
837            .subscribe(operation_id)
838            .await
839    }
840
841    pub async fn operation_exists(&self, operation_id: OperationId) -> bool {
842        let mut dbtx = self.db().begin_transaction_nc().await;
843
844        Client::operation_exists_dbtx(&mut dbtx, operation_id).await
845    }
846
847    pub async fn operation_exists_dbtx(
848        dbtx: &mut DatabaseTransaction<'_>,
849        operation_id: OperationId,
850    ) -> bool {
851        let active_state_exists = dbtx
852            .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
853            .await
854            .next()
855            .await
856            .is_some();
857
858        let inactive_state_exists = dbtx
859            .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
860            .await
861            .next()
862            .await
863            .is_some();
864
865        active_state_exists || inactive_state_exists
866    }
867
868    pub async fn has_active_states(&self, operation_id: OperationId) -> bool {
869        self.db
870            .begin_transaction_nc()
871            .await
872            .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
873            .await
874            .next()
875            .await
876            .is_some()
877    }
878
879    /// Waits for an output from the primary module to reach its final
880    /// state.
881    pub async fn await_primary_bitcoin_module_output(
882        &self,
883        operation_id: OperationId,
884        out_point: OutPoint,
885    ) -> anyhow::Result<()> {
886        self.primary_module_for_unit(AmountUnit::BITCOIN)
887            .ok_or_else(|| anyhow!("No primary module available"))?
888            .1
889            .await_primary_module_output(operation_id, out_point)
890            .await
891    }
892
893    /// Returns a reference to a typed module client instance by kind
894    pub fn get_first_module<M: ClientModule>(
895        &'_ self,
896    ) -> anyhow::Result<ClientModuleInstance<'_, M>> {
897        let module_kind = M::kind();
898        let id = self
899            .get_first_instance(&module_kind)
900            .ok_or_else(|| format_err!("No modules found of kind {module_kind}"))?;
901        let module: &M = self
902            .try_get_module(id)
903            .ok_or_else(|| format_err!("Unknown module instance {id}"))?
904            .as_any()
905            .downcast_ref::<M>()
906            .ok_or_else(|| format_err!("Module is not of type {}", std::any::type_name::<M>()))?;
907        let (db, _) = self.db().with_prefix_module_id(id);
908        Ok(ClientModuleInstance {
909            id,
910            db,
911            api: self.api().with_module(id),
912            module,
913        })
914    }
915
916    pub fn get_module_client_dyn(
917        &self,
918        instance_id: ModuleInstanceId,
919    ) -> anyhow::Result<&maybe_add_send_sync!(dyn IClientModule)> {
920        self.try_get_module(instance_id)
921            .ok_or(anyhow!("Unknown module instance {}", instance_id))
922    }
923
924    pub fn db(&self) -> &Database {
925        &self.db
926    }
927
928    pub fn endpoints(&self) -> &ConnectorRegistry {
929        &self.connectors
930    }
931
932    /// Returns a stream of transaction updates for the given operation id that
933    /// can later be used to watch for a specific transaction being accepted.
934    pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
935        TransactionUpdates {
936            update_stream: self.transaction_update_stream(operation_id).await,
937        }
938    }
939
940    /// Returns the instance id of the first module of the given kind.
941    pub fn get_first_instance(&self, module_kind: &ModuleKind) -> Option<ModuleInstanceId> {
942        self.modules
943            .iter_modules()
944            .find(|(_, kind, _module)| *kind == module_kind)
945            .map(|(instance_id, _, _)| instance_id)
946    }
947
948    /// Returns the data from which the client's root secret is derived (e.g.
949    /// BIP39 seed phrase struct).
950    pub async fn root_secret_encoding<T: Decodable>(&self) -> anyhow::Result<T> {
951        get_decoded_client_secret::<T>(self.db()).await
952    }
953
954    /// Waits for outputs from the primary module to reach its final
955    /// state.
956    pub async fn await_primary_bitcoin_module_outputs(
957        &self,
958        operation_id: OperationId,
959        outputs: Vec<OutPoint>,
960    ) -> anyhow::Result<()> {
961        for out_point in outputs {
962            self.await_primary_bitcoin_module_output(operation_id, out_point)
963                .await?;
964        }
965
966        Ok(())
967    }
968
969    /// Returns the config of the client in JSON format.
970    ///
971    /// Compared to the consensus module format where module configs are binary
972    /// encoded this format cannot be cryptographically verified but is easier
973    /// to consume and to some degree human-readable.
974    pub async fn get_config_json(&self) -> JsonClientConfig {
975        self.config().await.to_json()
976    }
977
978    // Ideally this would not be in the API, but there's a lot of places where this
979    // makes it easier.
980    #[doc(hidden)]
981    /// Like [`Self::get_balance`] but returns an error if primary module is not
982    /// available
983    pub async fn get_balance_for_btc(&self) -> anyhow::Result<Amount> {
984        self.get_balance_for_unit(AmountUnit::BITCOIN).await
985    }
986
987    pub async fn get_balance_for_unit(&self, unit: AmountUnit) -> anyhow::Result<Amount> {
988        let (id, module) = self
989            .primary_module_for_unit(unit)
990            .ok_or_else(|| anyhow!("Primary module not available"))?;
991        Ok(module
992            .get_balance(id, &mut self.db().begin_transaction_nc().await, unit)
993            .await)
994    }
995
996    /// Returns a stream that yields the current client balance every time it
997    /// changes.
998    pub async fn subscribe_balance_changes(&self, unit: AmountUnit) -> BoxStream<'static, Amount> {
999        let primary_module_things =
1000            if let Some((primary_module_id, primary_module)) = self.primary_module_for_unit(unit) {
1001                let balance_changes = primary_module.subscribe_balance_changes().await;
1002                let initial_balance = self
1003                    .get_balance_for_unit(unit)
1004                    .await
1005                    .expect("Primary is present");
1006
1007                Some((
1008                    primary_module_id,
1009                    primary_module.clone(),
1010                    balance_changes,
1011                    initial_balance,
1012                ))
1013            } else {
1014                None
1015            };
1016        let db = self.db().clone();
1017
1018        Box::pin(async_stream::stream! {
1019            let Some((primary_module_id, primary_module, mut balance_changes, initial_balance)) = primary_module_things else {
1020                // If there is no primary module, there will not be one until client is
1021                // restarted
1022                pending().await
1023            };
1024
1025
1026            yield initial_balance;
1027            let mut prev_balance = initial_balance;
1028            while let Some(()) = balance_changes.next().await {
1029                let mut dbtx = db.begin_transaction_nc().await;
1030                let balance = primary_module
1031                     .get_balance(primary_module_id, &mut dbtx, unit)
1032                    .await;
1033
1034                // Deduplicate in case modules cannot always tell if the balance actually changed
1035                if balance != prev_balance {
1036                    prev_balance = balance;
1037                    yield balance;
1038                }
1039            }
1040        })
1041    }
1042
1043    /// Make a single API version request to a peer after a delay.
1044    ///
1045    /// The delay is here to unify the type of a future both for initial request
1046    /// and possible retries.
1047    async fn make_api_version_request(
1048        delay: Duration,
1049        peer_id: PeerId,
1050        api: &DynGlobalApi,
1051    ) -> (
1052        PeerId,
1053        Result<SupportedApiVersionsSummary, fedimint_connectors::error::ServerError>,
1054    ) {
1055        runtime::sleep(delay).await;
1056        (
1057            peer_id,
1058            api.request_single_peer::<SupportedApiVersionsSummary>(
1059                VERSION_ENDPOINT.to_owned(),
1060                ApiRequestErased::default(),
1061                peer_id,
1062            )
1063            .await,
1064        )
1065    }
1066
1067    /// Create a backoff strategy for API version requests.
1068    ///
1069    /// Keep trying, initially somewhat aggressively, but after a while retry
1070    /// very slowly, because chances for response are getting lower and
1071    /// lower.
1072    fn create_api_version_backoff() -> impl Iterator<Item = Duration> {
1073        custom_backoff(Duration::from_millis(200), Duration::from_secs(600), None)
1074    }
1075
1076    /// Query the federation for API version support and then calculate
1077    /// the best API version to use (supported by most guardians).
1078    pub async fn fetch_common_api_versions_from_all_peers(
1079        num_peers: NumPeers,
1080        api: DynGlobalApi,
1081        db: Database,
1082        num_responses_sender: watch::Sender<usize>,
1083    ) {
1084        let mut backoff = Self::create_api_version_backoff();
1085
1086        // NOTE: `FuturesUnordered` is a footgun, but since we only poll it for result
1087        // and make a single async db write operation, it should be OK.
1088        let mut requests = FuturesUnordered::new();
1089
1090        for peer_id in num_peers.peer_ids() {
1091            requests.push(Self::make_api_version_request(
1092                Duration::ZERO,
1093                peer_id,
1094                &api,
1095            ));
1096        }
1097
1098        let mut num_responses = 0;
1099
1100        while let Some((peer_id, response)) = requests.next().await {
1101            let retry = match response {
1102                Err(err) => {
1103                    let has_previous_response = db
1104                        .begin_transaction_nc()
1105                        .await
1106                        .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1107                        .await
1108                        .is_some();
1109                    debug!(
1110                        target: LOG_CLIENT,
1111                        %peer_id,
1112                        err = %err.fmt_compact(),
1113                        %has_previous_response,
1114                        "Failed to refresh API versions of a peer"
1115                    );
1116
1117                    !has_previous_response
1118                }
1119                Ok(o) => {
1120                    // Save the response to the database right away, just to
1121                    // not lose it
1122                    let mut dbtx = db.begin_transaction().await;
1123                    dbtx.insert_entry(
1124                        &PeerLastApiVersionsSummaryKey(peer_id),
1125                        &PeerLastApiVersionsSummary(o),
1126                    )
1127                    .await;
1128                    dbtx.commit_tx().await;
1129                    false
1130                }
1131            };
1132
1133            if retry {
1134                requests.push(Self::make_api_version_request(
1135                    backoff.next().expect("Keeps retrying"),
1136                    peer_id,
1137                    &api,
1138                ));
1139            } else {
1140                num_responses += 1;
1141                num_responses_sender.send_replace(num_responses);
1142            }
1143        }
1144    }
1145
1146    /// Fetch API versions from peers, retrying until we get threshold number of
1147    /// successful responses. Returns the successful responses collected
1148    /// from at least `num_peers.threshold()` peers.
1149    pub async fn fetch_peers_api_versions_from_threshold_of_peers(
1150        num_peers: NumPeers,
1151        api: DynGlobalApi,
1152    ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1153        let mut backoff = Self::create_api_version_backoff();
1154
1155        // NOTE: `FuturesUnordered` is a footgun, but since we only poll it for result
1156        // and collect responses, it should be OK.
1157        let mut requests = FuturesUnordered::new();
1158
1159        for peer_id in num_peers.peer_ids() {
1160            requests.push(Self::make_api_version_request(
1161                Duration::ZERO,
1162                peer_id,
1163                &api,
1164            ));
1165        }
1166
1167        let mut successful_responses = BTreeMap::new();
1168
1169        while successful_responses.len() < num_peers.threshold()
1170            && let Some((peer_id, response)) = requests.next().await
1171        {
1172            let retry = match response {
1173                Err(err) => {
1174                    debug!(
1175                        target: LOG_CLIENT,
1176                        %peer_id,
1177                        err = %err.fmt_compact(),
1178                        "Failed to fetch API versions from peer"
1179                    );
1180                    true
1181                }
1182                Ok(response) => {
1183                    successful_responses.insert(peer_id, response);
1184                    false
1185                }
1186            };
1187
1188            if retry {
1189                requests.push(Self::make_api_version_request(
1190                    backoff.next().expect("Keeps retrying"),
1191                    peer_id,
1192                    &api,
1193                ));
1194            }
1195        }
1196
1197        successful_responses
1198    }
1199
1200    /// Fetch API versions from peers and discover common API versions to use.
1201    pub async fn fetch_common_api_versions(
1202        config: &ClientConfig,
1203        api: &DynGlobalApi,
1204    ) -> anyhow::Result<BTreeMap<PeerId, SupportedApiVersionsSummary>> {
1205        debug!(
1206            target: LOG_CLIENT,
1207            "Fetching common api versions"
1208        );
1209
1210        let num_peers = NumPeers::from(config.global.api_endpoints.len());
1211
1212        let peer_api_version_sets =
1213            Self::fetch_peers_api_versions_from_threshold_of_peers(num_peers, api.clone()).await;
1214
1215        Ok(peer_api_version_sets)
1216    }
1217
1218    /// Write API version set to database cache.
1219    /// Used when we have a pre-calculated API version set that should be stored
1220    /// for later use.
1221    pub async fn write_api_version_cache(
1222        dbtx: &mut DatabaseTransaction<'_>,
1223        api_version_set: ApiVersionSet,
1224    ) {
1225        debug!(
1226            target: LOG_CLIENT,
1227            value = ?api_version_set,
1228            "Writing API version set to cache"
1229        );
1230
1231        dbtx.insert_entry(
1232            &CachedApiVersionSetKey,
1233            &CachedApiVersionSet(api_version_set),
1234        )
1235        .await;
1236    }
1237
1238    /// Store prefetched peer API version responses and calculate/store common
1239    /// API version set. This processes the individual peer responses by
1240    /// storing them in the database and calculating the common API version
1241    /// set for caching.
1242    pub async fn store_prefetched_api_versions(
1243        db: &Database,
1244        config: &ClientConfig,
1245        client_module_init: &ClientModuleInitRegistry,
1246        peer_api_versions: &BTreeMap<PeerId, SupportedApiVersionsSummary>,
1247    ) {
1248        debug!(
1249            target: LOG_CLIENT,
1250            "Storing {} prefetched peer API version responses and calculating common version set",
1251            peer_api_versions.len()
1252        );
1253
1254        let mut dbtx = db.begin_transaction().await;
1255        // Calculate common API version set from individual responses
1256        let client_supported_versions =
1257            Self::supported_api_versions_summary_static(config, client_module_init);
1258        match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1259            &client_supported_versions,
1260            peer_api_versions,
1261        ) {
1262            Ok(common_api_versions) => {
1263                // Write the calculated common API version set to database cache
1264                Self::write_api_version_cache(&mut dbtx.to_ref_nc(), common_api_versions).await;
1265                debug!(target: LOG_CLIENT, "Calculated and stored common API version set");
1266            }
1267            Err(err) => {
1268                debug!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Failed to calculate common API versions from prefetched data");
1269            }
1270        }
1271
1272        // Store individual peer responses to database
1273        for (peer_id, peer_api_versions) in peer_api_versions {
1274            dbtx.insert_entry(
1275                &PeerLastApiVersionsSummaryKey(*peer_id),
1276                &PeerLastApiVersionsSummary(peer_api_versions.clone()),
1277            )
1278            .await;
1279        }
1280        dbtx.commit_tx().await;
1281        debug!(target: LOG_CLIENT, "Stored individual peer API version responses");
1282    }
1283
1284    /// [`SupportedApiVersionsSummary`] that the client and its modules support
1285    pub fn supported_api_versions_summary_static(
1286        config: &ClientConfig,
1287        client_module_init: &ClientModuleInitRegistry,
1288    ) -> SupportedApiVersionsSummary {
1289        SupportedApiVersionsSummary {
1290            core: SupportedCoreApiVersions {
1291                core_consensus: config.global.consensus_version,
1292                api: MultiApiVersion::try_from_iter(SUPPORTED_CORE_API_VERSIONS.to_owned())
1293                    .expect("must not have conflicting versions"),
1294            },
1295            modules: config
1296                .modules
1297                .iter()
1298                .filter_map(|(&module_instance_id, module_config)| {
1299                    client_module_init
1300                        .get(module_config.kind())
1301                        .map(|module_init| {
1302                            (
1303                                module_instance_id,
1304                                SupportedModuleApiVersions {
1305                                    core_consensus: config.global.consensus_version,
1306                                    module_consensus: module_config.version,
1307                                    api: module_init.supported_api_versions(),
1308                                },
1309                            )
1310                        })
1311                })
1312                .collect(),
1313        }
1314    }
1315
1316    pub async fn load_and_refresh_common_api_version(&self) -> anyhow::Result<ApiVersionSet> {
1317        Self::load_and_refresh_common_api_version_static(
1318            &self.config().await,
1319            &self.module_inits,
1320            self.connectors.clone(),
1321            &self.api,
1322            &self.db,
1323            &self.task_group,
1324        )
1325        .await
1326    }
1327
1328    /// Load the common api versions to use from cache and start a background
1329    /// process to refresh them.
1330    ///
1331    /// This is a compromise, so we not have to wait for version discovery to
1332    /// complete every time a [`Client`] is being built.
1333    async fn load_and_refresh_common_api_version_static(
1334        config: &ClientConfig,
1335        module_init: &ClientModuleInitRegistry,
1336        connectors: ConnectorRegistry,
1337        api: &DynGlobalApi,
1338        db: &Database,
1339        task_group: &TaskGroup,
1340    ) -> anyhow::Result<ApiVersionSet> {
1341        if let Some(v) = db
1342            .begin_transaction_nc()
1343            .await
1344            .get_value(&CachedApiVersionSetKey)
1345            .await
1346        {
1347            debug!(
1348                target: LOG_CLIENT,
1349                "Found existing cached common api versions"
1350            );
1351            let config = config.clone();
1352            let client_module_init = module_init.clone();
1353            let api = api.clone();
1354            let db = db.clone();
1355            let task_group = task_group.clone();
1356            // Separate task group, because we actually don't want to be waiting for this to
1357            // finish, and it's just best effort.
1358            task_group
1359                .clone()
1360                .spawn_cancellable("refresh_common_api_version_static", async move {
1361                    connectors.wait_for_initialized_connections().await;
1362
1363                    if let Err(error) = Self::refresh_common_api_version_static(
1364                        &config,
1365                        &client_module_init,
1366                        &api,
1367                        &db,
1368                        task_group,
1369                        false,
1370                    )
1371                    .await
1372                    {
1373                        warn!(
1374                            target: LOG_CLIENT,
1375                            err = %error.fmt_compact_anyhow(), "Failed to discover common api versions"
1376                        );
1377                    }
1378                });
1379
1380            return Ok(v.0);
1381        }
1382
1383        info!(
1384            target: LOG_CLIENT,
1385            "Fetching initial API versions "
1386        );
1387        Self::refresh_common_api_version_static(
1388            config,
1389            module_init,
1390            api,
1391            db,
1392            task_group.clone(),
1393            true,
1394        )
1395        .await
1396    }
1397
1398    async fn refresh_common_api_version_static(
1399        config: &ClientConfig,
1400        client_module_init: &ClientModuleInitRegistry,
1401        api: &DynGlobalApi,
1402        db: &Database,
1403        task_group: TaskGroup,
1404        block_until_ok: bool,
1405    ) -> anyhow::Result<ApiVersionSet> {
1406        debug!(
1407            target: LOG_CLIENT,
1408            "Refreshing common api versions"
1409        );
1410
1411        let (num_responses_sender, mut num_responses_receiver) = tokio::sync::watch::channel(0);
1412        let num_peers = NumPeers::from(config.global.api_endpoints.len());
1413
1414        task_group.spawn_cancellable("refresh peers api versions", {
1415            Client::fetch_common_api_versions_from_all_peers(
1416                num_peers,
1417                api.clone(),
1418                db.clone(),
1419                num_responses_sender,
1420            )
1421        });
1422
1423        let common_api_versions = loop {
1424            // Wait to collect enough answers before calculating a set of common api
1425            // versions to use. Note that all peers individual responses from
1426            // previous attempts are still being used, and requests, or even
1427            // retries for response of peers are not actually cancelled, as they
1428            // are happening on a separate task. This is all just to bound the
1429            // time user can be waiting for the join operation to finish, at the
1430            // risk of picking wrong version in very rare circumstances.
1431            let _: Result<_, Elapsed> = runtime::timeout(
1432                Duration::from_secs(30),
1433                num_responses_receiver.wait_for(|num| num_peers.threshold() <= *num),
1434            )
1435            .await;
1436
1437            let peer_api_version_sets = Self::load_peers_last_api_versions(db, num_peers).await;
1438
1439            match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1440                &Self::supported_api_versions_summary_static(config, client_module_init),
1441                &peer_api_version_sets,
1442            ) {
1443                Ok(o) => break o,
1444                Err(err) if block_until_ok => {
1445                    warn!(
1446                        target: LOG_CLIENT,
1447                        err = %err.fmt_compact_anyhow(),
1448                        "Failed to discover API version to use. Retrying..."
1449                    );
1450                    continue;
1451                }
1452                Err(e) => return Err(e),
1453            }
1454        };
1455
1456        debug!(
1457            target: LOG_CLIENT,
1458            value = ?common_api_versions,
1459            "Updating the cached common api versions"
1460        );
1461        let mut dbtx = db.begin_transaction().await;
1462        let _ = dbtx
1463            .insert_entry(
1464                &CachedApiVersionSetKey,
1465                &CachedApiVersionSet(common_api_versions.clone()),
1466            )
1467            .await;
1468
1469        dbtx.commit_tx().await;
1470
1471        Ok(common_api_versions)
1472    }
1473
1474    /// Get the client [`Metadata`]
1475    pub async fn get_metadata(&self) -> Metadata {
1476        self.db
1477            .begin_transaction_nc()
1478            .await
1479            .get_value(&ClientMetadataKey)
1480            .await
1481            .unwrap_or_else(|| {
1482                warn!(
1483                    target: LOG_CLIENT,
1484                    "Missing existing metadata. This key should have been set on Client init"
1485                );
1486                Metadata::empty()
1487            })
1488    }
1489
1490    /// Set the client [`Metadata`]
1491    pub async fn set_metadata(&self, metadata: &Metadata) {
1492        self.db
1493            .autocommit::<_, _, anyhow::Error>(
1494                |dbtx, _| {
1495                    Box::pin(async {
1496                        Self::set_metadata_dbtx(dbtx, metadata).await;
1497                        Ok(())
1498                    })
1499                },
1500                None,
1501            )
1502            .await
1503            .expect("Failed to autocommit metadata");
1504    }
1505
1506    pub fn has_pending_recoveries(&self) -> bool {
1507        !self
1508            .client_recovery_progress_receiver
1509            .borrow()
1510            .iter()
1511            .all(|(_id, progress)| progress.is_done())
1512    }
1513
1514    /// Wait for all module recoveries to finish
1515    ///
1516    /// This will block until the recovery task is done with recoveries.
1517    /// Returns success if all recovery tasks are complete (success case),
1518    /// or an error if some modules could not complete the recovery at the time.
1519    ///
1520    /// A bit of a heavy approach.
1521    pub async fn wait_for_all_recoveries(&self) -> anyhow::Result<()> {
1522        let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1523        recovery_receiver
1524            .wait_for(|in_progress| {
1525                in_progress
1526                    .iter()
1527                    .all(|(_id, progress)| progress.is_done())
1528            })
1529            .await
1530            .context("Recovery task completed and update receiver disconnected, but some modules failed to recover")?;
1531
1532        Ok(())
1533    }
1534
1535    /// Subscribe to recover progress for all the modules.
1536    ///
1537    /// This stream can contain duplicate progress for a module.
1538    /// Don't use this stream for detecting completion of recovery.
1539    pub fn subscribe_to_recovery_progress(
1540        &self,
1541    ) -> impl Stream<Item = (ModuleInstanceId, RecoveryProgress)> + use<> {
1542        WatchStream::new(self.client_recovery_progress_receiver.clone())
1543            .flat_map(futures::stream::iter)
1544    }
1545
1546    pub async fn wait_for_module_kind_recovery(
1547        &self,
1548        module_kind: ModuleKind,
1549    ) -> anyhow::Result<()> {
1550        let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1551        let config = self.config().await;
1552        recovery_receiver
1553            .wait_for(|in_progress| {
1554                !in_progress
1555                    .iter()
1556                    .filter(|(module_instance_id, _progress)| {
1557                        config.modules[module_instance_id].kind == module_kind
1558                    })
1559                    .any(|(_id, progress)| !progress.is_done())
1560            })
1561            .await
1562            .context("Recovery task completed and update receiver disconnected, but the desired modules are still unavailable or failed to recover")?;
1563
1564        Ok(())
1565    }
1566
1567    pub async fn wait_for_all_active_state_machines(&self) -> anyhow::Result<()> {
1568        loop {
1569            if self.executor.get_active_states().await.is_empty() {
1570                break;
1571            }
1572            sleep(Duration::from_millis(100)).await;
1573        }
1574        Ok(())
1575    }
1576
1577    /// Set the client [`Metadata`]
1578    pub async fn set_metadata_dbtx(dbtx: &mut DatabaseTransaction<'_>, metadata: &Metadata) {
1579        dbtx.insert_new_entry(&ClientMetadataKey, metadata).await;
1580    }
1581
1582    fn spawn_module_recoveries_task(
1583        &self,
1584        recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1585        module_recoveries: BTreeMap<
1586            ModuleInstanceId,
1587            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1588        >,
1589        module_recovery_progress_receivers: BTreeMap<
1590            ModuleInstanceId,
1591            watch::Receiver<RecoveryProgress>,
1592        >,
1593    ) {
1594        let db = self.db.clone();
1595        let log_ordering_wakeup_tx = self.log_ordering_wakeup_tx.clone();
1596        let module_kinds: BTreeMap<ModuleInstanceId, String> = self
1597            .modules
1598            .iter_modules_id_kind()
1599            .map(|(id, kind)| (id, kind.to_string()))
1600            .collect();
1601        self.task_group
1602            .spawn("module recoveries", |_task_handle| async {
1603                Self::run_module_recoveries_task(
1604                    db,
1605                    log_ordering_wakeup_tx,
1606                    recovery_sender,
1607                    module_recoveries,
1608                    module_recovery_progress_receivers,
1609                    module_kinds,
1610                )
1611                .await;
1612            });
1613    }
1614
1615    async fn run_module_recoveries_task(
1616        db: Database,
1617        log_ordering_wakeup_tx: watch::Sender<()>,
1618        recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1619        module_recoveries: BTreeMap<
1620            ModuleInstanceId,
1621            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1622        >,
1623        module_recovery_progress_receivers: BTreeMap<
1624            ModuleInstanceId,
1625            watch::Receiver<RecoveryProgress>,
1626        >,
1627        module_kinds: BTreeMap<ModuleInstanceId, String>,
1628    ) {
1629        debug!(target: LOG_CLIENT_RECOVERY, num_modules=%module_recovery_progress_receivers.len(), "Staring module recoveries");
1630        let mut completed_stream = Vec::new();
1631        let progress_stream = futures::stream::FuturesUnordered::new();
1632
1633        for (module_instance_id, f) in module_recoveries {
1634            completed_stream.push(futures::stream::once(Box::pin(async move {
1635                match f.await {
1636                    Ok(()) => (module_instance_id, None),
1637                    Err(err) => {
1638                        warn!(
1639                            target: LOG_CLIENT,
1640                            err = %err.fmt_compact_anyhow(), module_instance_id, "Module recovery failed"
1641                        );
1642                        // a module recovery that failed reports and error and
1643                        // just never finishes, so we don't need a separate state
1644                        // for it
1645                        futures::future::pending::<()>().await;
1646                        unreachable!()
1647                    }
1648                }
1649            })));
1650        }
1651
1652        for (module_instance_id, rx) in module_recovery_progress_receivers {
1653            progress_stream.push(
1654                tokio_stream::wrappers::WatchStream::new(rx)
1655                    .fuse()
1656                    .map(move |progress| (module_instance_id, Some(progress))),
1657            );
1658        }
1659
1660        let mut futures = futures::stream::select(
1661            futures::stream::select_all(progress_stream),
1662            futures::stream::select_all(completed_stream),
1663        );
1664
1665        while let Some((module_instance_id, progress)) = futures.next().await {
1666            let mut dbtx = db.begin_transaction().await;
1667
1668            let prev_progress = *recovery_sender
1669                .borrow()
1670                .get(&module_instance_id)
1671                .expect("existing progress must be present");
1672
1673            let progress = if prev_progress.is_done() {
1674                // since updates might be out of order, once done, stick with it
1675                prev_progress
1676            } else if let Some(progress) = progress {
1677                progress
1678            } else {
1679                prev_progress.to_complete()
1680            };
1681
1682            if !prev_progress.is_done() && progress.is_done() {
1683                info!(
1684                    target: LOG_CLIENT,
1685                    module_instance_id,
1686                    progress = format!("{}/{}", progress.complete, progress.total),
1687                    "Recovery complete"
1688                );
1689                dbtx.log_event(
1690                    log_ordering_wakeup_tx.clone(),
1691                    None,
1692                    ModuleRecoveryCompleted {
1693                        module_id: module_instance_id,
1694                    },
1695                )
1696                .await;
1697            } else {
1698                info!(
1699                    target: LOG_CLIENT,
1700                    module_instance_id,
1701                    kind = module_kinds.get(&module_instance_id).map(String::as_str).unwrap_or("unknown"),
1702                    progress = format!("{}/{}", progress.complete, progress.total),
1703                    "Recovery progress"
1704                );
1705            }
1706
1707            dbtx.insert_entry(
1708                &ClientModuleRecovery { module_instance_id },
1709                &ClientModuleRecoveryState { progress },
1710            )
1711            .await;
1712            dbtx.commit_tx().await;
1713
1714            recovery_sender.send_modify(|v| {
1715                v.insert(module_instance_id, progress);
1716            });
1717        }
1718        debug!(target: LOG_CLIENT_RECOVERY, "Recovery executor stopped");
1719    }
1720
1721    async fn load_peers_last_api_versions(
1722        db: &Database,
1723        num_peers: NumPeers,
1724    ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1725        let mut peer_api_version_sets = BTreeMap::new();
1726
1727        let mut dbtx = db.begin_transaction_nc().await;
1728        for peer_id in num_peers.peer_ids() {
1729            if let Some(v) = dbtx
1730                .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1731                .await
1732            {
1733                peer_api_version_sets.insert(peer_id, v.0);
1734            }
1735        }
1736        drop(dbtx);
1737        peer_api_version_sets
1738    }
1739
1740    /// You likely want to use [`Client::get_peer_urls`]. This function returns
1741    /// only the announcements and doesn't use the config as fallback.
1742    pub async fn get_peer_url_announcements(&self) -> BTreeMap<PeerId, SignedApiAnnouncement> {
1743        self.db()
1744            .begin_transaction_nc()
1745            .await
1746            .find_by_prefix(&ApiAnnouncementPrefix)
1747            .await
1748            .map(|(announcement_key, announcement)| (announcement_key.0, announcement))
1749            .collect()
1750            .await
1751    }
1752
1753    /// Returns guardian metadata stored in the client database
1754    pub async fn get_guardian_metadata(
1755        &self,
1756    ) -> BTreeMap<PeerId, fedimint_core::net::guardian_metadata::SignedGuardianMetadata> {
1757        self.db()
1758            .begin_transaction_nc()
1759            .await
1760            .find_by_prefix(&crate::guardian_metadata::GuardianMetadataPrefix)
1761            .await
1762            .map(|(key, metadata)| (key.0, metadata))
1763            .collect()
1764            .await
1765    }
1766
1767    /// Returns a list of guardian API URLs
1768    pub async fn get_peer_urls(&self) -> BTreeMap<PeerId, SafeUrl> {
1769        get_api_urls(&self.db, &self.config().await).await
1770    }
1771
1772    /// Create an invite code with the api endpoint of the given peer which can
1773    /// be used to download this client config
1774    pub async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
1775        self.get_peer_urls()
1776            .await
1777            .into_iter()
1778            .find_map(|(peer_id, url)| (peer == peer_id).then_some(url))
1779            .map(|peer_url| {
1780                InviteCode::new(
1781                    peer_url.clone(),
1782                    peer,
1783                    self.federation_id(),
1784                    self.api_secret.clone(),
1785                )
1786            })
1787    }
1788
1789    /// Blocks till the client has synced the guardian public key set
1790    /// (introduced in version 0.4) and returns it. Once it has been fetched
1791    /// once this function is guaranteed to return immediately.
1792    pub async fn get_guardian_public_keys_blocking(
1793        &self,
1794    ) -> BTreeMap<PeerId, fedimint_core::secp256k1::PublicKey> {
1795        self.db
1796            .autocommit(
1797                |dbtx, _| {
1798                    Box::pin(async move {
1799                        let config = self.config().await;
1800
1801                        let guardian_pub_keys = self
1802                            .get_or_backfill_broadcast_public_keys(dbtx, config)
1803                            .await;
1804
1805                        Result::<_, ()>::Ok(guardian_pub_keys)
1806                    })
1807                },
1808                None,
1809            )
1810            .await
1811            .expect("Will retry forever")
1812    }
1813
1814    async fn get_or_backfill_broadcast_public_keys(
1815        &self,
1816        dbtx: &mut DatabaseTransaction<'_>,
1817        config: ClientConfig,
1818    ) -> BTreeMap<PeerId, PublicKey> {
1819        match config.global.broadcast_public_keys {
1820            Some(guardian_pub_keys) => guardian_pub_keys,
1821            _ => {
1822                let (guardian_pub_keys, new_config) = self.fetch_and_update_config(config).await;
1823
1824                dbtx.insert_entry(&ClientConfigKey, &new_config).await;
1825                *(self.config.write().await) = new_config;
1826                guardian_pub_keys
1827            }
1828        }
1829    }
1830
1831    async fn fetch_session_count(&self) -> FederationResult<u64> {
1832        self.api.session_count().await
1833    }
1834
1835    async fn fetch_and_update_config(
1836        &self,
1837        config: ClientConfig,
1838    ) -> (BTreeMap<PeerId, PublicKey>, ClientConfig) {
1839        let fetched_config = retry(
1840            "Fetching guardian public keys",
1841            backoff_util::background_backoff(),
1842            || async {
1843                Ok(self
1844                    .api
1845                    .request_current_consensus::<ClientConfig>(
1846                        CLIENT_CONFIG_ENDPOINT.to_owned(),
1847                        ApiRequestErased::default(),
1848                    )
1849                    .await?)
1850            },
1851        )
1852        .await
1853        .expect("Will never return on error");
1854
1855        let Some(guardian_pub_keys) = fetched_config.global.broadcast_public_keys else {
1856            warn!(
1857                target: LOG_CLIENT,
1858                "Guardian public keys not found in fetched config, server not updated to 0.4 yet"
1859            );
1860            pending::<()>().await;
1861            unreachable!("Pending will never return");
1862        };
1863
1864        let new_config = ClientConfig {
1865            global: GlobalClientConfig {
1866                broadcast_public_keys: Some(guardian_pub_keys.clone()),
1867                ..config.global
1868            },
1869            modules: config.modules,
1870        };
1871        (guardian_pub_keys, new_config)
1872    }
1873
1874    pub fn handle_global_rpc(
1875        &self,
1876        method: String,
1877        params: serde_json::Value,
1878    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1879        Box::pin(try_stream! {
1880            match method.as_str() {
1881                "get_balance" => {
1882                    let balance = self.get_balance_for_btc().await.unwrap_or_default();
1883                    yield serde_json::to_value(balance)?;
1884                }
1885                "subscribe_balance_changes" => {
1886                    let req: GetBalanceChangesRequest= serde_json::from_value(params)?;
1887                    let mut stream = self.subscribe_balance_changes(req.unit).await;
1888                    while let Some(balance) = stream.next().await {
1889                        yield serde_json::to_value(balance)?;
1890                    }
1891                }
1892                "get_config" => {
1893                    let config = self.config().await;
1894                    yield serde_json::to_value(config)?;
1895                }
1896                "get_federation_id" => {
1897                    let federation_id = self.federation_id();
1898                    yield serde_json::to_value(federation_id)?;
1899                }
1900                "get_invite_code" => {
1901                    let req: GetInviteCodeRequest = serde_json::from_value(params)?;
1902                    let invite_code = self.invite_code(req.peer).await;
1903                    yield serde_json::to_value(invite_code)?;
1904                }
1905                "get_operation" => {
1906                    let req: GetOperationIdRequest = serde_json::from_value(params)?;
1907                    let operation = self.operation_log().get_operation(req.operation_id).await;
1908                    yield serde_json::to_value(operation)?;
1909                }
1910                "list_operations" => {
1911                    let req: ListOperationsParams = serde_json::from_value(params)?;
1912                    let limit = if req.limit.is_none() && req.last_seen.is_none() {
1913                        usize::MAX
1914                    } else {
1915                        req.limit.unwrap_or(usize::MAX)
1916                    };
1917                    let operations = self.operation_log()
1918                        .paginate_operations_rev(limit, req.last_seen)
1919                        .await;
1920                    yield serde_json::to_value(operations)?;
1921                }
1922                "session_count" => {
1923                    let count = self.fetch_session_count().await?;
1924                    yield serde_json::to_value(count)?;
1925                }
1926                "has_pending_recoveries" => {
1927                    let has_pending = self.has_pending_recoveries();
1928                    yield serde_json::to_value(has_pending)?;
1929                }
1930                "wait_for_all_recoveries" => {
1931                    self.wait_for_all_recoveries().await?;
1932                    yield serde_json::Value::Null;
1933                }
1934                "subscribe_to_recovery_progress" => {
1935                    let mut stream = self.subscribe_to_recovery_progress();
1936                    while let Some((module_id, progress)) = stream.next().await {
1937                        yield serde_json::json!({
1938                            "module_id": module_id,
1939                            "progress": progress
1940                        });
1941                    }
1942                }
1943                #[allow(deprecated)]
1944                "backup_to_federation" => {
1945                    let metadata = if params.is_null() {
1946                        Metadata::from_json_serialized(serde_json::json!({}))
1947                    } else {
1948                        Metadata::from_json_serialized(params)
1949                    };
1950                    self.backup_to_federation(metadata).await?;
1951                    yield serde_json::Value::Null;
1952                }
1953                _ => {
1954                    Err(anyhow::format_err!("Unknown method: {}", method))?;
1955                    unreachable!()
1956                },
1957            }
1958        })
1959    }
1960
1961    pub async fn log_event<E>(&self, module_id: Option<ModuleInstanceId>, event: E)
1962    where
1963        E: Event + Send,
1964    {
1965        let mut dbtx = self.db.begin_transaction().await;
1966        self.log_event_dbtx(&mut dbtx, module_id, event).await;
1967        dbtx.commit_tx().await;
1968    }
1969
1970    pub async fn log_event_dbtx<E, Cap>(
1971        &self,
1972        dbtx: &mut DatabaseTransaction<'_, Cap>,
1973        module_id: Option<ModuleInstanceId>,
1974        event: E,
1975    ) where
1976        E: Event + Send,
1977        Cap: Send,
1978    {
1979        dbtx.log_event(self.log_ordering_wakeup_tx.clone(), module_id, event)
1980            .await;
1981    }
1982
1983    pub async fn log_event_raw_dbtx<Cap>(
1984        &self,
1985        dbtx: &mut DatabaseTransaction<'_, Cap>,
1986        kind: EventKind,
1987        module: Option<(ModuleKind, ModuleInstanceId)>,
1988        payload: Vec<u8>,
1989        persist: EventPersistence,
1990    ) where
1991        Cap: Send,
1992    {
1993        let module_id = module.as_ref().map(|m| m.1);
1994        let module_kind = module.map(|m| m.0);
1995        dbtx.log_event_raw(
1996            self.log_ordering_wakeup_tx.clone(),
1997            kind,
1998            module_kind,
1999            module_id,
2000            payload,
2001            persist,
2002        )
2003        .await;
2004    }
2005
2006    /// Built in event log (trimmable) tracker
2007    ///
2008    /// For the convenience of downstream applications, [`Client`] can store
2009    /// internally event log position for the main application using/driving it.
2010    ///
2011    /// Note that this position is a singleton, so this tracker should not be
2012    /// used for multiple purposes or applications, etc. at the same time.
2013    ///
2014    /// If the application has a need to follow log using multiple trackers, it
2015    /// should implement own [`DynEventLogTrimableTracker`] and store its
2016    /// persient data by itself.
2017    pub fn built_in_application_event_log_tracker(&self) -> DynEventLogTrimableTracker {
2018        struct BuiltInApplicationEventLogTracker;
2019
2020        #[apply(async_trait_maybe_send!)]
2021        impl EventLogTrimableTracker for BuiltInApplicationEventLogTracker {
2022            // Store position in the event log
2023            async fn store(
2024                &mut self,
2025                dbtx: &mut DatabaseTransaction<NonCommittable>,
2026                pos: EventLogTrimableId,
2027            ) -> anyhow::Result<()> {
2028                dbtx.insert_entry(&DefaultApplicationEventLogKey, &pos)
2029                    .await;
2030                Ok(())
2031            }
2032
2033            /// Load the last previous stored position (or None if never stored)
2034            async fn load(
2035                &mut self,
2036                dbtx: &mut DatabaseTransaction<NonCommittable>,
2037            ) -> anyhow::Result<Option<EventLogTrimableId>> {
2038                Ok(dbtx.get_value(&DefaultApplicationEventLogKey).await)
2039            }
2040        }
2041        Box::new(BuiltInApplicationEventLogTracker)
2042    }
2043
2044    /// Like [`Self::handle_events`] but for historical data.
2045    ///
2046    ///
2047    /// This function can be used to process subset of events
2048    /// that is infrequent and important enough to be persisted
2049    /// forever. Most applications should prefer to use [`Self::handle_events`]
2050    /// which emits *all* events.
2051    pub async fn handle_historical_events<F, R>(
2052        &self,
2053        tracker: fedimint_eventlog::DynEventLogTracker,
2054        handler_fn: F,
2055    ) -> anyhow::Result<()>
2056    where
2057        F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
2058        R: Future<Output = anyhow::Result<()>>,
2059    {
2060        fedimint_eventlog::handle_events(
2061            self.db.clone(),
2062            tracker,
2063            self.log_event_added_rx.clone(),
2064            handler_fn,
2065        )
2066        .await
2067    }
2068
2069    /// Handle events emitted by the client
2070    ///
2071    /// This is a preferred method for reactive & asynchronous
2072    /// processing of events emitted by the client.
2073    ///
2074    /// It needs a `tracker` that will persist the position in the log
2075    /// as it is being handled. You can use the
2076    /// [`Client::built_in_application_event_log_tracker`] if this call is
2077    /// used for the single main application handling this instance of the
2078    /// [`Client`]. Otherwise you should implement your own tracker.
2079    ///
2080    /// This handler will call `handle_fn` with ever event emitted by
2081    /// [`Client`], including transient ones. The caller should atomically
2082    /// handle each event it is interested in and ignore other ones.
2083    ///
2084    /// This method returns only when client is shutting down or on internal
2085    /// error, so typically should be called in a background task dedicated
2086    /// to handling events.
2087    pub async fn handle_events<F, R>(
2088        &self,
2089        tracker: fedimint_eventlog::DynEventLogTrimableTracker,
2090        handler_fn: F,
2091    ) -> anyhow::Result<()>
2092    where
2093        F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
2094        R: Future<Output = anyhow::Result<()>>,
2095    {
2096        fedimint_eventlog::handle_trimable_events(
2097            self.db.clone(),
2098            tracker,
2099            self.log_event_added_rx.clone(),
2100            handler_fn,
2101        )
2102        .await
2103    }
2104
2105    pub async fn get_event_log(
2106        &self,
2107        pos: Option<EventLogId>,
2108        limit: u64,
2109    ) -> Vec<PersistedLogEntry> {
2110        self.get_event_log_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
2111            .await
2112    }
2113
2114    pub async fn get_event_log_trimable(
2115        &self,
2116        pos: Option<EventLogTrimableId>,
2117        limit: u64,
2118    ) -> Vec<PersistedLogEntry> {
2119        self.get_event_log_trimable_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
2120            .await
2121    }
2122
2123    pub async fn get_event_log_dbtx<Cap>(
2124        &self,
2125        dbtx: &mut DatabaseTransaction<'_, Cap>,
2126        pos: Option<EventLogId>,
2127        limit: u64,
2128    ) -> Vec<PersistedLogEntry>
2129    where
2130        Cap: Send,
2131    {
2132        dbtx.get_event_log(pos, limit).await
2133    }
2134
2135    pub async fn get_event_log_trimable_dbtx<Cap>(
2136        &self,
2137        dbtx: &mut DatabaseTransaction<'_, Cap>,
2138        pos: Option<EventLogTrimableId>,
2139        limit: u64,
2140    ) -> Vec<PersistedLogEntry>
2141    where
2142        Cap: Send,
2143    {
2144        dbtx.get_event_log_trimable(pos, limit).await
2145    }
2146
2147    /// Register to receiver all new transient (unpersisted) events
2148    pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
2149        self.log_event_added_transient_tx.subscribe()
2150    }
2151
2152    /// Get a receiver that signals when new events are added to the event log
2153    pub fn log_event_added_rx(&self) -> watch::Receiver<()> {
2154        self.log_event_added_rx.clone()
2155    }
2156
2157    pub fn iroh_enable_dht(&self) -> bool {
2158        self.iroh_enable_dht
2159    }
2160
2161    pub(crate) async fn run_core_migrations(
2162        db_no_decoders: &Database,
2163    ) -> Result<(), anyhow::Error> {
2164        let mut dbtx = db_no_decoders.begin_transaction().await;
2165        apply_migrations_core_client_dbtx(&mut dbtx.to_ref_nc(), "fedimint-client".to_string())
2166            .await?;
2167        if is_running_in_test_env() {
2168            verify_client_db_integrity_dbtx(&mut dbtx.to_ref_nc()).await;
2169        }
2170        dbtx.commit_tx_result().await?;
2171        Ok(())
2172    }
2173
2174    /// Iterator over primary modules for a given `unit`
2175    fn primary_modules_for_unit(
2176        &self,
2177        unit: AmountUnit,
2178    ) -> impl Iterator<Item = (ModuleInstanceId, &DynClientModule)> {
2179        self.primary_modules
2180            .iter()
2181            .flat_map(move |(_prio, candidates)| {
2182                candidates
2183                    .specific
2184                    .get(&unit)
2185                    .into_iter()
2186                    .flatten()
2187                    .copied()
2188                    // within same priority, wildcard matches come last
2189                    .chain(candidates.wildcard.iter().copied())
2190            })
2191            .map(|id| (id, self.modules.get_expect(id)))
2192    }
2193
2194    /// Primary module to use for `unit`
2195    ///
2196    /// Currently, just pick the first (highest priority) match
2197    pub fn primary_module_for_unit(
2198        &self,
2199        unit: AmountUnit,
2200    ) -> Option<(ModuleInstanceId, &DynClientModule)> {
2201        self.primary_modules_for_unit(unit).next()
2202    }
2203
2204    /// [`Self::primary_module_for_unit`] for Bitcoin
2205    pub fn primary_module_for_btc(&self) -> (ModuleInstanceId, &DynClientModule) {
2206        self.primary_module_for_unit(AmountUnit::BITCOIN)
2207            .expect("No primary module for Bitcoin")
2208    }
2209}
2210
2211#[apply(async_trait_maybe_send!)]
2212impl ClientContextIface for Client {
2213    fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
2214        Client::get_module(self, instance)
2215    }
2216
2217    fn api_clone(&self) -> DynGlobalApi {
2218        Client::api_clone(self)
2219    }
2220    fn decoders(&self) -> &ModuleDecoderRegistry {
2221        Client::decoders(self)
2222    }
2223
2224    async fn finalize_and_submit_transaction(
2225        &self,
2226        operation_id: OperationId,
2227        operation_type: &str,
2228        operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
2229        tx_builder: TransactionBuilder,
2230    ) -> anyhow::Result<OutPointRange> {
2231        Client::finalize_and_submit_transaction(
2232            self,
2233            operation_id,
2234            operation_type,
2235            // |out_point_range| operation_meta_gen(out_point_range),
2236            &operation_meta_gen,
2237            tx_builder,
2238        )
2239        .await
2240    }
2241
2242    async fn finalize_and_submit_transaction_dbtx(
2243        &self,
2244        dbtx: &mut DatabaseTransaction<'_>,
2245        operation_id: OperationId,
2246        operation_type: &str,
2247        operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
2248        tx_builder: TransactionBuilder,
2249    ) -> anyhow::Result<OutPointRange> {
2250        Client::finalize_and_submit_transaction_dbtx(
2251            self,
2252            dbtx,
2253            operation_id,
2254            operation_type,
2255            &operation_meta_gen,
2256            tx_builder,
2257        )
2258        .await
2259    }
2260
2261    async fn finalize_and_submit_transaction_inner(
2262        &self,
2263        dbtx: &mut DatabaseTransaction<'_>,
2264        operation_id: OperationId,
2265        tx_builder: TransactionBuilder,
2266    ) -> anyhow::Result<OutPointRange> {
2267        Client::finalize_and_submit_transaction_inner(self, dbtx, operation_id, tx_builder).await
2268    }
2269
2270    async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
2271        Client::transaction_updates(self, operation_id).await
2272    }
2273
2274    async fn await_primary_module_outputs(
2275        &self,
2276        operation_id: OperationId,
2277        // TODO: make `impl Iterator<Item = ...>`
2278        outputs: Vec<OutPoint>,
2279    ) -> anyhow::Result<()> {
2280        Client::await_primary_bitcoin_module_outputs(self, operation_id, outputs).await
2281    }
2282
2283    fn operation_log(&self) -> &dyn IOperationLog {
2284        Client::operation_log(self)
2285    }
2286
2287    async fn has_active_states(&self, operation_id: OperationId) -> bool {
2288        Client::has_active_states(self, operation_id).await
2289    }
2290
2291    async fn operation_exists(&self, operation_id: OperationId) -> bool {
2292        Client::operation_exists(self, operation_id).await
2293    }
2294
2295    async fn config(&self) -> ClientConfig {
2296        Client::config(self).await
2297    }
2298
2299    fn db(&self) -> &Database {
2300        Client::db(self)
2301    }
2302
2303    fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static)) {
2304        Client::executor(self)
2305    }
2306
2307    async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
2308        Client::invite_code(self, peer).await
2309    }
2310
2311    fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
2312        Client::get_internal_payment_markers(self)
2313    }
2314
2315    async fn log_event_json(
2316        &self,
2317        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
2318        module_kind: Option<ModuleKind>,
2319        module_id: ModuleInstanceId,
2320        kind: EventKind,
2321        payload: serde_json::Value,
2322        persist: EventPersistence,
2323    ) {
2324        dbtx.ensure_global()
2325            .expect("Must be called with global dbtx");
2326        self.log_event_raw_dbtx(
2327            dbtx,
2328            kind,
2329            module_kind.map(|kind| (kind, module_id)),
2330            serde_json::to_vec(&payload).expect("Serialization can't fail"),
2331            persist,
2332        )
2333        .await;
2334    }
2335
2336    async fn read_operation_active_states<'dbtx>(
2337        &self,
2338        operation_id: OperationId,
2339        module_id: ModuleInstanceId,
2340        dbtx: &'dbtx mut DatabaseTransaction<'_>,
2341    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>
2342    {
2343        Box::pin(
2344            dbtx.find_by_prefix(&ActiveModuleOperationStateKeyPrefix {
2345                operation_id,
2346                module_instance: module_id,
2347            })
2348            .await
2349            .map(move |(k, v)| (k.0, v)),
2350        )
2351    }
2352    async fn read_operation_inactive_states<'dbtx>(
2353        &self,
2354        operation_id: OperationId,
2355        module_id: ModuleInstanceId,
2356        dbtx: &'dbtx mut DatabaseTransaction<'_>,
2357    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>
2358    {
2359        Box::pin(
2360            dbtx.find_by_prefix(&InactiveModuleOperationStateKeyPrefix {
2361                operation_id,
2362                module_instance: module_id,
2363            })
2364            .await
2365            .map(move |(k, v)| (k.0, v)),
2366        )
2367    }
2368}
2369
2370// TODO: impl `Debug` for `Client` and derive here
2371impl fmt::Debug for Client {
2372    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2373        write!(f, "Client")
2374    }
2375}
2376
2377pub fn client_decoders<'a>(
2378    registry: &ModuleInitRegistry<DynClientModuleInit>,
2379    module_kinds: impl Iterator<Item = (ModuleInstanceId, &'a ModuleKind)>,
2380) -> ModuleDecoderRegistry {
2381    let mut modules = BTreeMap::new();
2382    for (id, kind) in module_kinds {
2383        let Some(init) = registry.get(kind) else {
2384            debug!("Detected configuration for unsupported module id: {id}, kind: {kind}");
2385            continue;
2386        };
2387
2388        modules.insert(
2389            id,
2390            (
2391                kind.clone(),
2392                IClientModuleInit::decoder(AsRef::<dyn IClientModuleInit + 'static>::as_ref(init)),
2393            ),
2394        );
2395    }
2396    ModuleDecoderRegistry::from(modules)
2397}