Skip to main content

fedimint_client/
client.rs

1use std::collections::{BTreeMap, HashSet};
2use std::fmt::{self, Formatter};
3use std::future::{Future, pending};
4use std::ops::Range;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9use anyhow::{Context as _, anyhow, bail, format_err};
10use async_stream::try_stream;
11use bitcoin::key::Secp256k1;
12use bitcoin::key::rand::thread_rng;
13use bitcoin::secp256k1::{self, PublicKey};
14use fedimint_api_client::api::global_api::with_request_hook::ApiRequestHook;
15use fedimint_api_client::api::{
16    ApiVersionSet, DynGlobalApi, FederationApiExt as _, FederationResult, IGlobalFederationApi,
17};
18use fedimint_bitcoind::DynBitcoindRpc;
19use fedimint_client_module::module::recovery::RecoveryProgress;
20use fedimint_client_module::module::{
21    ClientContextIface, ClientModule, ClientModuleRegistry, DynClientModule, FinalClientIface,
22    IClientModule, IdxRange, OutPointRange, PrimaryModulePriority,
23};
24use fedimint_client_module::oplog::IOperationLog;
25use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy as _};
26use fedimint_client_module::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
27use fedimint_client_module::sm::{ActiveStateMeta, DynState, InactiveStateMeta};
28use fedimint_client_module::transaction::{
29    TRANSACTION_SUBMISSION_MODULE_INSTANCE, TransactionBuilder, TxSubmissionStates,
30    TxSubmissionStatesSM,
31};
32use fedimint_client_module::{
33    AddStateMachinesResult, ClientModuleInstance, GetInviteCodeRequest, ModuleGlobalContextGen,
34    ModuleRecoveryCompleted, TransactionUpdates, TxCreatedEvent,
35};
36use fedimint_connectors::ConnectorRegistry;
37use fedimint_core::config::{
38    ClientConfig, FederationId, GlobalClientConfig, JsonClientConfig, ModuleInitRegistry,
39};
40use fedimint_core::core::{DynInput, DynOutput, ModuleInstanceId, ModuleKind, OperationId};
41use fedimint_core::db::{
42    AutocommitError, Database, DatabaseRecord, DatabaseTransaction,
43    IDatabaseTransactionOpsCore as _, IDatabaseTransactionOpsCoreTyped as _, NonCommittable,
44};
45use fedimint_core::encoding::{Decodable, Encodable};
46use fedimint_core::endpoint_constants::{CLIENT_CONFIG_ENDPOINT, VERSION_ENDPOINT};
47use fedimint_core::envs::is_running_in_test_env;
48use fedimint_core::invite_code::InviteCode;
49use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
50use fedimint_core::module::{
51    AmountUnit, Amounts, ApiRequestErased, ApiVersion, MultiApiVersion,
52    SupportedApiVersionsSummary, SupportedCoreApiVersions, SupportedModuleApiVersions,
53};
54use fedimint_core::net::api_announcement::SignedApiAnnouncement;
55use fedimint_core::runtime::sleep;
56use fedimint_core::task::{Elapsed, MaybeSend, MaybeSync, TaskGroup};
57use fedimint_core::transaction::Transaction;
58use fedimint_core::util::backoff_util::custom_backoff;
59use fedimint_core::util::{
60    BoxStream, FmtCompact as _, FmtCompactAnyhow as _, SafeUrl, backoff_util, retry,
61};
62use fedimint_core::{
63    Amount, ChainId, NumPeers, OutPoint, PeerId, apply, async_trait_maybe_send, maybe_add_send,
64    maybe_add_send_sync, runtime,
65};
66use fedimint_derive_secret::DerivableSecret;
67use fedimint_eventlog::{
68    DBTransactionEventLogExt as _, DynEventLogTrimableTracker, Event, EventKind, EventLogEntry,
69    EventLogId, EventLogTrimableId, EventLogTrimableTracker, EventPersistence, PersistedLogEntry,
70};
71use fedimint_logging::{LOG_CLIENT, LOG_CLIENT_NET_API, LOG_CLIENT_RECOVERY};
72use futures::stream::FuturesUnordered;
73use futures::{Stream, StreamExt as _};
74use global_ctx::ModuleGlobalClientContext;
75use serde::{Deserialize, Serialize};
76use tokio::sync::{broadcast, watch};
77use tokio_stream::wrappers::WatchStream;
78use tracing::{debug, info, warn};
79
80use crate::ClientBuilder;
81use crate::api_announcements::{ApiAnnouncementPrefix, get_api_urls};
82use crate::backup::Metadata;
83use crate::client::event_log::DefaultApplicationEventLogKey;
84use crate::db::{
85    ApiSecretKey, CachedApiVersionSet, CachedApiVersionSetKey, ChainIdKey,
86    ChronologicalOperationLogKey, ClientConfigKey, ClientMetadataKey, ClientModuleRecovery,
87    ClientModuleRecoveryState, EncodedClientSecretKey, OperationLogKey, PeerLastApiVersionsSummary,
88    PeerLastApiVersionsSummaryKey, PendingClientConfigKey, apply_migrations_core_client_dbtx,
89    get_decoded_client_secret, verify_client_db_integrity_dbtx,
90};
91use crate::meta::MetaService;
92use crate::module_init::{ClientModuleInitRegistry, DynClientModuleInit, IClientModuleInit};
93use crate::oplog::OperationLog;
94use crate::sm::executor::{
95    ActiveModuleOperationStateKeyPrefix, ActiveOperationStateKeyPrefix, Executor,
96    InactiveModuleOperationStateKeyPrefix, InactiveOperationStateKeyPrefix,
97};
98
99pub(crate) mod builder;
100pub(crate) mod event_log;
101pub(crate) mod global_ctx;
102pub(crate) mod handle;
103
104/// List of core api versions supported by the implementation.
105/// Notably `major` version is the one being supported, and corresponding
106/// `minor` version is the one required (for given `major` version).
107const SUPPORTED_CORE_API_VERSIONS: &[fedimint_core::module::ApiVersion] =
108    &[ApiVersion { major: 0, minor: 0 }];
109
110/// Primary module candidates at specific priority level
111#[derive(Default)]
112pub(crate) struct PrimaryModuleCandidates {
113    /// Modules that listed specific units they handle
114    specific: BTreeMap<AmountUnit, Vec<ModuleInstanceId>>,
115    /// Modules handling any unit
116    wildcard: Vec<ModuleInstanceId>,
117}
118
119/// Main client type
120///
121/// A handle and API to interacting with a single federation. End user
122/// applications that want to support interacting with multiple federations at
123/// the same time, will need to instantiate and manage multiple instances of
124/// this struct.
125///
126/// Under the hood it is starting and managing service tasks, state machines,
127/// database and other resources required.
128///
129/// This type is shared externally and internally, and
130/// [`crate::ClientHandle`] is responsible for external lifecycle management
131/// and resource freeing of the [`Client`].
132pub struct Client {
133    final_client: FinalClientIface,
134    config: tokio::sync::RwLock<ClientConfig>,
135    api_secret: Option<String>,
136    decoders: ModuleDecoderRegistry,
137    connectors: ConnectorRegistry,
138    db: Database,
139    federation_id: FederationId,
140    federation_config_meta: BTreeMap<String, String>,
141    primary_modules: BTreeMap<PrimaryModulePriority, PrimaryModuleCandidates>,
142    pub(crate) modules: ClientModuleRegistry,
143    module_inits: ClientModuleInitRegistry,
144    executor: Executor,
145    pub(crate) api: DynGlobalApi,
146    root_secret: DerivableSecret,
147    operation_log: OperationLog,
148    secp_ctx: Secp256k1<secp256k1::All>,
149    meta_service: Arc<MetaService>,
150
151    task_group: TaskGroup,
152
153    /// Updates about client recovery progress
154    client_recovery_progress_receiver:
155        watch::Receiver<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
156
157    /// Internal client sender to wake up log ordering task every time a
158    /// (unuordered) log event is added.
159    log_ordering_wakeup_tx: watch::Sender<()>,
160    /// Receiver for events fired every time (ordered) log event is added.
161    log_event_added_rx: watch::Receiver<()>,
162    log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
163    request_hook: ApiRequestHook,
164    iroh_enable_dht: bool,
165    iroh_enable_next: bool,
166    /// User-provided Bitcoin RPC client for modules to use
167    ///
168    /// Stored here for potential future access; currently passed to modules
169    /// during initialization.
170    #[allow(dead_code)]
171    user_bitcoind_rpc: Option<DynBitcoindRpc>,
172    /// User-provided Bitcoin RPC factory for when ChainId is not available
173    ///
174    /// This is used as a fallback when the federation doesn't support ChainId.
175    /// Modules can call this with a URL from their config to get an RPC client.
176    pub(crate) user_bitcoind_rpc_no_chain_id:
177        Option<fedimint_client_module::module::init::BitcoindRpcNoChainIdFactory>,
178}
179
180#[derive(Debug, Serialize, Deserialize)]
181struct ListOperationsParams {
182    limit: Option<usize>,
183    last_seen: Option<ChronologicalOperationLogKey>,
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize)]
187pub struct GetOperationIdRequest {
188    operation_id: OperationId,
189}
190
191#[derive(Debug, Clone, Serialize, Deserialize)]
192pub struct GetBalanceChangesRequest {
193    #[serde(default = "AmountUnit::bitcoin")]
194    unit: AmountUnit,
195}
196
197impl Client {
198    /// Initialize a client builder that can be configured to create a new
199    /// client.
200    pub async fn builder() -> anyhow::Result<ClientBuilder> {
201        Ok(ClientBuilder::new())
202    }
203
204    pub fn api(&self) -> &(dyn IGlobalFederationApi + 'static) {
205        self.api.as_ref()
206    }
207
208    pub fn api_clone(&self) -> DynGlobalApi {
209        self.api.clone()
210    }
211
212    /// Returns a stream that emits the current connection status of all peers
213    /// whenever any peer's status changes. Emits initial state immediately.
214    pub fn connection_status_stream(&self) -> impl Stream<Item = BTreeMap<PeerId, bool>> {
215        self.api.connection_status_stream()
216    }
217
218    /// Establishes connections to all federation guardians once.
219    ///
220    /// Spawns tasks to connect to each guardian in the federation. Unlike
221    /// [`Self::spawn_federation_reconnect`], this only attempts to establish
222    /// connections once and completes - it does not maintain or reconnect.
223    ///
224    /// Useful for warming up connections before making API calls.
225    pub fn federation_reconnect(&self) {
226        let peers: Vec<PeerId> = self.api.all_peers().iter().copied().collect();
227
228        for peer_id in peers {
229            let api = self.api.clone();
230            self.task_group.spawn_cancellable(
231                format!("federation-reconnect-once-{peer_id}"),
232                async move {
233                    if let Err(e) = api.get_peer_connection(peer_id).await {
234                        debug!(
235                            target: LOG_CLIENT_NET_API,
236                            %peer_id,
237                            err = %e.fmt_compact(),
238                            "Failed to connect to peer"
239                        );
240                    }
241                },
242            );
243        }
244    }
245
246    /// Spawns background tasks that proactively maintain connections to all
247    /// federation guardians unconditionally.
248    ///
249    /// For each guardian, a task loops: establishes a connection, waits for it
250    /// to disconnect, then reconnects.
251    ///
252    /// The tasks are cancellable and will be terminated when the client shuts
253    /// down.
254    ///
255    /// By default [`Client`] creates connections on demand only, and share
256    /// them as long as they are alive.
257    ///
258    /// Reconnecting continuously might increase data and battery usage,
259    /// but potentially improve UX, depending on the time it takes to establish
260    /// a new network connection in given network conditions.
261    ///
262    /// Downstream users are encouraged to implement their own version of
263    /// this function, e.g. by reconnecting only when it is anticipated
264    /// that connection might be needed, or alternatively pre-warm
265    /// connections by calling [`Self::federation_reconnect`] when it seems
266    /// worthwhile.
267    pub fn spawn_federation_reconnect(&self) {
268        let peers: Vec<PeerId> = self.api.all_peers().iter().copied().collect();
269
270        for peer_id in peers {
271            let api = self.api.clone();
272            self.task_group.spawn_cancellable(
273                format!("federation-reconnect-{peer_id}"),
274                async move {
275                    loop {
276                        match api.get_peer_connection(peer_id).await {
277                            Ok(conn) => {
278                                conn.await_disconnection().await;
279                            }
280                            Err(e) => {
281                                // Connection failed, backoff is handled inside
282                                // get_or_create_connection
283                                debug!(
284                                    target: LOG_CLIENT_NET_API,
285                                    %peer_id,
286                                    err = %e.fmt_compact(),
287                                    "Failed to connect to peer, will retry"
288                                );
289                            }
290                        }
291                    }
292                },
293            );
294        }
295    }
296
297    /// Get the [`TaskGroup`] that is tied to Client's lifetime.
298    pub fn task_group(&self) -> &TaskGroup {
299        &self.task_group
300    }
301
302    /// Returns all registered Prometheus metrics encoded in text format.
303    ///
304    /// This can be used by downstream clients to expose metrics via their own
305    /// HTTP server or print them for debugging purposes.
306    pub fn get_metrics() -> anyhow::Result<String> {
307        fedimint_metrics::get_metrics()
308    }
309
310    /// Useful for our CLI tooling, not meant for external use
311    #[doc(hidden)]
312    pub fn executor(&self) -> &Executor {
313        &self.executor
314    }
315
316    pub async fn get_config_from_db(db: &Database) -> Option<ClientConfig> {
317        let mut dbtx = db.begin_transaction_nc().await;
318        dbtx.get_value(&ClientConfigKey).await
319    }
320
321    pub async fn get_pending_config_from_db(db: &Database) -> Option<ClientConfig> {
322        let mut dbtx = db.begin_transaction_nc().await;
323        dbtx.get_value(&PendingClientConfigKey).await
324    }
325
326    pub async fn get_api_secret_from_db(db: &Database) -> Option<String> {
327        let mut dbtx = db.begin_transaction_nc().await;
328        dbtx.get_value(&ApiSecretKey).await
329    }
330
331    pub async fn store_encodable_client_secret<T: Encodable>(
332        db: &Database,
333        secret: T,
334    ) -> anyhow::Result<()> {
335        let mut dbtx = db.begin_transaction().await;
336
337        // Don't overwrite an existing secret
338        if dbtx.get_value(&EncodedClientSecretKey).await.is_some() {
339            bail!("Encoded client secret already exists, cannot overwrite")
340        }
341
342        let encoded_secret = T::consensus_encode_to_vec(&secret);
343        dbtx.insert_entry(&EncodedClientSecretKey, &encoded_secret)
344            .await;
345        dbtx.commit_tx().await;
346        Ok(())
347    }
348
349    pub async fn load_decodable_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
350        let Some(secret) = Self::load_decodable_client_secret_opt(db).await? else {
351            bail!("Encoded client secret not present in DB")
352        };
353
354        Ok(secret)
355    }
356    pub async fn load_decodable_client_secret_opt<T: Decodable>(
357        db: &Database,
358    ) -> anyhow::Result<Option<T>> {
359        let mut dbtx = db.begin_transaction_nc().await;
360
361        let client_secret = dbtx.get_value(&EncodedClientSecretKey).await;
362
363        Ok(match client_secret {
364            Some(client_secret) => Some(
365                T::consensus_decode_whole(&client_secret, &ModuleRegistry::default())
366                    .map_err(|e| anyhow!("Decoding failed: {e}"))?,
367            ),
368            None => None,
369        })
370    }
371
372    pub async fn load_or_generate_client_secret(db: &Database) -> anyhow::Result<[u8; 64]> {
373        let client_secret = match Self::load_decodable_client_secret::<[u8; 64]>(db).await {
374            Ok(secret) => secret,
375            _ => {
376                let secret = PlainRootSecretStrategy::random(&mut thread_rng());
377                Self::store_encodable_client_secret(db, secret)
378                    .await
379                    .expect("Storing client secret must work");
380                secret
381            }
382        };
383        Ok(client_secret)
384    }
385
386    pub async fn is_initialized(db: &Database) -> bool {
387        let mut dbtx = db.begin_transaction_nc().await;
388        dbtx.raw_get_bytes(&[ClientConfigKey::DB_PREFIX])
389            .await
390            .expect("Unrecoverable error occurred while reading and entry from the database")
391            .is_some()
392    }
393
394    pub fn start_executor(self: &Arc<Self>) {
395        debug!(
396            target: LOG_CLIENT,
397            "Starting fedimint client executor",
398        );
399        self.executor.start_executor(self.context_gen());
400    }
401
402    pub fn federation_id(&self) -> FederationId {
403        self.federation_id
404    }
405
406    fn context_gen(self: &Arc<Self>) -> ModuleGlobalContextGen {
407        let client_inner = Arc::downgrade(self);
408        Arc::new(move |module_instance, operation| {
409            ModuleGlobalClientContext {
410                client: client_inner
411                    .clone()
412                    .upgrade()
413                    .expect("ModuleGlobalContextGen called after client was dropped"),
414                module_instance_id: module_instance,
415                operation,
416            }
417            .into()
418        })
419    }
420
421    pub async fn config(&self) -> ClientConfig {
422        self.config.read().await.clone()
423    }
424
425    // TODO: change to `-> Option<&str>`
426    pub fn api_secret(&self) -> &Option<String> {
427        &self.api_secret
428    }
429
430    /// Returns the core API version that the federation supports
431    ///
432    /// This reads from the cached version stored during client initialization.
433    /// If no cache is available (e.g., during initial setup), returns a default
434    /// version (0, 0).
435    pub async fn core_api_version(&self) -> ApiVersion {
436        // Try to get from cache. If not available, return a conservative
437        // default. The cache should always be populated after successful client init.
438        self.db
439            .begin_transaction_nc()
440            .await
441            .get_value(&CachedApiVersionSetKey)
442            .await
443            .map(|cached: CachedApiVersionSet| cached.0.core)
444            .unwrap_or(ApiVersion { major: 0, minor: 0 })
445    }
446
447    /// Returns the chain ID (bitcoin block hash at height 1) from the
448    /// federation
449    ///
450    /// This is cached in the database after the first successful fetch.
451    /// The chain ID uniquely identifies which bitcoin network the federation
452    /// operates on (mainnet, testnet, signet, regtest).
453    pub async fn chain_id(&self) -> anyhow::Result<ChainId> {
454        // Check cache first
455        if let Some(chain_id) = self
456            .db
457            .begin_transaction_nc()
458            .await
459            .get_value(&ChainIdKey)
460            .await
461        {
462            return Ok(chain_id);
463        }
464
465        // Fetch from federation with consensus
466        let chain_id = self.api.chain_id().await?;
467
468        // Cache the result
469        let mut dbtx = self.db.begin_transaction().await;
470        dbtx.insert_entry(&ChainIdKey, &chain_id).await;
471        dbtx.commit_tx().await;
472
473        Ok(chain_id)
474    }
475
476    pub fn decoders(&self) -> &ModuleDecoderRegistry {
477        &self.decoders
478    }
479
480    /// Returns a reference to the module, panics if not found
481    fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
482        self.try_get_module(instance)
483            .expect("Module instance not found")
484    }
485
486    fn try_get_module(
487        &self,
488        instance: ModuleInstanceId,
489    ) -> Option<&maybe_add_send_sync!(dyn IClientModule)> {
490        Some(self.modules.get(instance)?.as_ref())
491    }
492
493    pub fn has_module(&self, instance: ModuleInstanceId) -> bool {
494        self.modules.get(instance).is_some()
495    }
496
497    /// Returns the input amount and output amount of a transaction
498    ///
499    /// # Panics
500    /// If any of the input or output versions in the transaction builder are
501    /// unknown by the respective module.
502    fn transaction_builder_get_balance(&self, builder: &TransactionBuilder) -> (Amounts, Amounts) {
503        // FIXME: prevent overflows, currently not suitable for untrusted input
504        let mut in_amounts = Amounts::ZERO;
505        let mut out_amounts = Amounts::ZERO;
506        let mut fee_amounts = Amounts::ZERO;
507
508        for input in builder.inputs() {
509            let module = self.get_module(input.input.module_instance_id());
510
511            let item_fees = module.input_fee(&input.amounts, &input.input).expect(
512                "We only build transactions with input versions that are supported by the module",
513            );
514
515            in_amounts.checked_add_mut(&input.amounts);
516            fee_amounts.checked_add_mut(&item_fees);
517        }
518
519        for output in builder.outputs() {
520            let module = self.get_module(output.output.module_instance_id());
521
522            let item_fees = module.output_fee(&output.amounts, &output.output).expect(
523                "We only build transactions with output versions that are supported by the module",
524            );
525
526            out_amounts.checked_add_mut(&output.amounts);
527            fee_amounts.checked_add_mut(&item_fees);
528        }
529
530        out_amounts.checked_add_mut(&fee_amounts);
531        (in_amounts, out_amounts)
532    }
533
534    pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
535        Ok((self.federation_id().to_fake_ln_pub_key(&self.secp_ctx)?, 0))
536    }
537
538    /// Get metadata value from the federation config itself
539    pub fn get_config_meta(&self, key: &str) -> Option<String> {
540        self.federation_config_meta.get(key).cloned()
541    }
542
543    pub(crate) fn root_secret(&self) -> DerivableSecret {
544        self.root_secret.clone()
545    }
546
547    pub async fn add_state_machines(
548        &self,
549        dbtx: &mut DatabaseTransaction<'_>,
550        states: Vec<DynState>,
551    ) -> AddStateMachinesResult {
552        self.executor.add_state_machines_dbtx(dbtx, states).await
553    }
554
555    // TODO: implement as part of [`OperationLog`]
556    pub async fn get_active_operations(&self) -> HashSet<OperationId> {
557        let active_states = self.executor.get_active_states().await;
558        let mut active_operations = HashSet::with_capacity(active_states.len());
559        let mut dbtx = self.db().begin_transaction_nc().await;
560        for (state, _) in active_states {
561            let operation_id = state.operation_id();
562            if dbtx
563                .get_value(&OperationLogKey { operation_id })
564                .await
565                .is_some()
566            {
567                active_operations.insert(operation_id);
568            }
569        }
570        active_operations
571    }
572
573    pub fn operation_log(&self) -> &OperationLog {
574        &self.operation_log
575    }
576
577    /// Get the meta manager to read meta fields.
578    pub fn meta_service(&self) -> &Arc<MetaService> {
579        &self.meta_service
580    }
581
582    /// Get the meta manager to read meta fields.
583    pub async fn get_meta_expiration_timestamp(&self) -> Option<SystemTime> {
584        let meta_service = self.meta_service();
585        let ts = meta_service
586            .get_field::<u64>(self.db(), "federation_expiry_timestamp")
587            .await
588            .and_then(|v| v.value)?;
589        Some(UNIX_EPOCH + Duration::from_secs(ts))
590    }
591
592    /// Adds funding to a transaction or removes over-funding via change.
593    async fn finalize_transaction(
594        &self,
595        dbtx: &mut DatabaseTransaction<'_>,
596        operation_id: OperationId,
597        mut partial_transaction: TransactionBuilder,
598    ) -> anyhow::Result<(Transaction, Vec<DynState>, Range<u64>)> {
599        let (in_amounts, out_amounts) = self.transaction_builder_get_balance(&partial_transaction);
600
601        let mut added_inputs_bundles = vec![];
602        let mut added_outputs_bundles = vec![];
603
604        // The way currently things are implemented is OK for modules which can
605        // collect a fee relative to one being used, but will break down in any
606        // fancy scenarios. Future TODOs:
607        //
608        // * create_final_inputs_and_outputs needs to get broken down, so we can use
609        //   primary modules using priorities (possibly separate prios for inputs and
610        //   outputs to be able to drain modules, etc.); we need the split "check if
611        //   possible" and "take" steps,
612        // * extra inputs and outputs adding fees needs to be taken into account,
613        //   possibly with some looping
614        for unit in in_amounts.units().union(&out_amounts.units()) {
615            let input_amount = in_amounts.get(unit).copied().unwrap_or_default();
616            let output_amount = out_amounts.get(unit).copied().unwrap_or_default();
617            if input_amount == output_amount {
618                continue;
619            }
620
621            let Some((module_id, module)) = self.primary_module_for_unit(*unit) else {
622                bail!("No module to balance a partial transaction (affected unit: {unit:?}");
623            };
624
625            let (added_input_bundle, added_output_bundle) = module
626                .create_final_inputs_and_outputs(
627                    module_id,
628                    dbtx,
629                    operation_id,
630                    *unit,
631                    input_amount,
632                    output_amount,
633                )
634                .await?;
635
636            added_inputs_bundles.push(added_input_bundle);
637            added_outputs_bundles.push(added_output_bundle);
638        }
639
640        // This is the range of  outputs that will be added to the transaction
641        // in order to balance it. Notice that it may stay empty in case the transaction
642        // is already balanced.
643        let change_range = Range {
644            start: partial_transaction.outputs().count() as u64,
645            end: (partial_transaction.outputs().count() as u64
646                + added_outputs_bundles
647                    .iter()
648                    .map(|output| output.outputs().len() as u64)
649                    .sum::<u64>()),
650        };
651
652        for added_inputs in added_inputs_bundles {
653            partial_transaction = partial_transaction.with_inputs(added_inputs);
654        }
655
656        for added_outputs in added_outputs_bundles {
657            partial_transaction = partial_transaction.with_outputs(added_outputs);
658        }
659
660        let (input_amounts, output_amounts) =
661            self.transaction_builder_get_balance(&partial_transaction);
662
663        for (unit, output_amount) in output_amounts {
664            let input_amount = input_amounts.get(&unit).copied().unwrap_or_default();
665
666            assert!(input_amount >= output_amount, "Transaction is underfunded");
667        }
668
669        let (tx, states) = partial_transaction.build(&self.secp_ctx, thread_rng());
670
671        Ok((tx, states, change_range))
672    }
673
674    /// Add funding and/or change to the transaction builder as needed, finalize
675    /// the transaction and submit it to the federation.
676    ///
677    /// ## Errors
678    /// The function will return an error if the operation with given ID already
679    /// exists.
680    ///
681    /// ## Panics
682    /// The function will panic if the database transaction collides with
683    /// other and fails with others too often, this should not happen except for
684    /// excessively concurrent scenarios.
685    pub async fn finalize_and_submit_transaction<F, M>(
686        &self,
687        operation_id: OperationId,
688        operation_type: &str,
689        operation_meta_gen: F,
690        tx_builder: TransactionBuilder,
691    ) -> anyhow::Result<OutPointRange>
692    where
693        F: Fn(OutPointRange) -> M + Clone + MaybeSend + MaybeSync,
694        M: serde::Serialize + MaybeSend,
695    {
696        let operation_type = operation_type.to_owned();
697
698        let autocommit_res = self
699            .db
700            .autocommit(
701                |dbtx, _| {
702                    let operation_type = operation_type.clone();
703                    let tx_builder = tx_builder.clone();
704                    let operation_meta_gen = operation_meta_gen.clone();
705                    Box::pin(async move {
706                        if Client::operation_exists_dbtx(dbtx, operation_id).await {
707                            bail!("There already exists an operation with id {operation_id:?}")
708                        }
709
710                        let out_point_range = self
711                            .finalize_and_submit_transaction_inner(dbtx, operation_id, tx_builder)
712                            .await?;
713
714                        self.operation_log()
715                            .add_operation_log_entry_dbtx(
716                                dbtx,
717                                operation_id,
718                                &operation_type,
719                                operation_meta_gen(out_point_range),
720                            )
721                            .await;
722
723                        Ok(out_point_range)
724                    })
725                },
726                Some(100), // TODO: handle what happens after 100 retries
727            )
728            .await;
729
730        match autocommit_res {
731            Ok(txid) => Ok(txid),
732            Err(AutocommitError::ClosureError { error, .. }) => Err(error),
733            Err(AutocommitError::CommitFailed {
734                attempts,
735                last_error,
736            }) => panic!(
737                "Failed to commit tx submission dbtx after {attempts} attempts: {last_error}"
738            ),
739        }
740    }
741
742    async fn finalize_and_submit_transaction_inner(
743        &self,
744        dbtx: &mut DatabaseTransaction<'_>,
745        operation_id: OperationId,
746        tx_builder: TransactionBuilder,
747    ) -> anyhow::Result<OutPointRange> {
748        let (transaction, mut states, change_range) = self
749            .finalize_transaction(&mut dbtx.to_ref_nc(), operation_id, tx_builder)
750            .await?;
751
752        if transaction.consensus_encode_to_vec().len() > Transaction::MAX_TX_SIZE {
753            let inputs = transaction
754                .inputs
755                .iter()
756                .map(DynInput::module_instance_id)
757                .collect::<Vec<_>>();
758            let outputs = transaction
759                .outputs
760                .iter()
761                .map(DynOutput::module_instance_id)
762                .collect::<Vec<_>>();
763            warn!(
764                target: LOG_CLIENT_NET_API,
765                size=%transaction.consensus_encode_to_vec().len(),
766                ?inputs,
767                ?outputs,
768                "Transaction too large",
769            );
770            debug!(target: LOG_CLIENT_NET_API, ?transaction, "transaction details");
771            bail!(
772                "The generated transaction would be rejected by the federation for being too large."
773            );
774        }
775
776        let txid = transaction.tx_hash();
777
778        debug!(
779            target: LOG_CLIENT_NET_API,
780            %txid,
781            operation_id = %operation_id.fmt_short(),
782            ?transaction,
783            "Finalized and submitting transaction",
784        );
785
786        let tx_submission_sm = DynState::from_typed(
787            TRANSACTION_SUBMISSION_MODULE_INSTANCE,
788            TxSubmissionStatesSM {
789                operation_id,
790                state: TxSubmissionStates::Created(transaction),
791            },
792        );
793        states.push(tx_submission_sm);
794
795        self.executor.add_state_machines_dbtx(dbtx, states).await?;
796
797        self.log_event_dbtx(dbtx, None, TxCreatedEvent { txid, operation_id })
798            .await;
799
800        Ok(OutPointRange::new(txid, IdxRange::from(change_range)))
801    }
802
803    async fn transaction_update_stream(
804        &self,
805        operation_id: OperationId,
806    ) -> BoxStream<'static, TxSubmissionStatesSM> {
807        self.executor
808            .notifier()
809            .module_notifier::<TxSubmissionStatesSM>(
810                TRANSACTION_SUBMISSION_MODULE_INSTANCE,
811                self.final_client.clone(),
812            )
813            .subscribe(operation_id)
814            .await
815    }
816
817    pub async fn operation_exists(&self, operation_id: OperationId) -> bool {
818        let mut dbtx = self.db().begin_transaction_nc().await;
819
820        Client::operation_exists_dbtx(&mut dbtx, operation_id).await
821    }
822
823    pub async fn operation_exists_dbtx(
824        dbtx: &mut DatabaseTransaction<'_>,
825        operation_id: OperationId,
826    ) -> bool {
827        let active_state_exists = dbtx
828            .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
829            .await
830            .next()
831            .await
832            .is_some();
833
834        let inactive_state_exists = dbtx
835            .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
836            .await
837            .next()
838            .await
839            .is_some();
840
841        active_state_exists || inactive_state_exists
842    }
843
844    pub async fn has_active_states(&self, operation_id: OperationId) -> bool {
845        self.db
846            .begin_transaction_nc()
847            .await
848            .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
849            .await
850            .next()
851            .await
852            .is_some()
853    }
854
855    /// Waits for an output from the primary module to reach its final
856    /// state.
857    pub async fn await_primary_bitcoin_module_output(
858        &self,
859        operation_id: OperationId,
860        out_point: OutPoint,
861    ) -> anyhow::Result<()> {
862        self.primary_module_for_unit(AmountUnit::BITCOIN)
863            .ok_or_else(|| anyhow!("No primary module available"))?
864            .1
865            .await_primary_module_output(operation_id, out_point)
866            .await
867    }
868
869    /// Returns a reference to a typed module client instance by kind
870    pub fn get_first_module<M: ClientModule>(
871        &'_ self,
872    ) -> anyhow::Result<ClientModuleInstance<'_, M>> {
873        let module_kind = M::kind();
874        let id = self
875            .get_first_instance(&module_kind)
876            .ok_or_else(|| format_err!("No modules found of kind {module_kind}"))?;
877        let module: &M = self
878            .try_get_module(id)
879            .ok_or_else(|| format_err!("Unknown module instance {id}"))?
880            .as_any()
881            .downcast_ref::<M>()
882            .ok_or_else(|| format_err!("Module is not of type {}", std::any::type_name::<M>()))?;
883        let (db, _) = self.db().with_prefix_module_id(id);
884        Ok(ClientModuleInstance {
885            id,
886            db,
887            api: self.api().with_module(id),
888            module,
889        })
890    }
891
892    pub fn get_module_client_dyn(
893        &self,
894        instance_id: ModuleInstanceId,
895    ) -> anyhow::Result<&maybe_add_send_sync!(dyn IClientModule)> {
896        self.try_get_module(instance_id)
897            .ok_or(anyhow!("Unknown module instance {}", instance_id))
898    }
899
900    pub fn db(&self) -> &Database {
901        &self.db
902    }
903
904    pub fn endpoints(&self) -> &ConnectorRegistry {
905        &self.connectors
906    }
907
908    /// Returns a stream of transaction updates for the given operation id that
909    /// can later be used to watch for a specific transaction being accepted.
910    pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
911        TransactionUpdates {
912            update_stream: self.transaction_update_stream(operation_id).await,
913        }
914    }
915
916    /// Returns the instance id of the first module of the given kind.
917    pub fn get_first_instance(&self, module_kind: &ModuleKind) -> Option<ModuleInstanceId> {
918        self.modules
919            .iter_modules()
920            .find(|(_, kind, _module)| *kind == module_kind)
921            .map(|(instance_id, _, _)| instance_id)
922    }
923
924    /// Returns the data from which the client's root secret is derived (e.g.
925    /// BIP39 seed phrase struct).
926    pub async fn root_secret_encoding<T: Decodable>(&self) -> anyhow::Result<T> {
927        get_decoded_client_secret::<T>(self.db()).await
928    }
929
930    /// Waits for outputs from the primary module to reach its final
931    /// state.
932    pub async fn await_primary_bitcoin_module_outputs(
933        &self,
934        operation_id: OperationId,
935        outputs: Vec<OutPoint>,
936    ) -> anyhow::Result<()> {
937        for out_point in outputs {
938            self.await_primary_bitcoin_module_output(operation_id, out_point)
939                .await?;
940        }
941
942        Ok(())
943    }
944
945    /// Returns the config of the client in JSON format.
946    ///
947    /// Compared to the consensus module format where module configs are binary
948    /// encoded this format cannot be cryptographically verified but is easier
949    /// to consume and to some degree human-readable.
950    pub async fn get_config_json(&self) -> JsonClientConfig {
951        self.config().await.to_json()
952    }
953
954    // Ideally this would not be in the API, but there's a lot of places where this
955    // makes it easier.
956    #[doc(hidden)]
957    /// Like [`Self::get_balance`] but returns an error if primary module is not
958    /// available
959    pub async fn get_balance_for_btc(&self) -> anyhow::Result<Amount> {
960        self.get_balance_for_unit(AmountUnit::BITCOIN).await
961    }
962
963    pub async fn get_balance_for_unit(&self, unit: AmountUnit) -> anyhow::Result<Amount> {
964        let (id, module) = self
965            .primary_module_for_unit(unit)
966            .ok_or_else(|| anyhow!("Primary module not available"))?;
967        Ok(module
968            .get_balance(id, &mut self.db().begin_transaction_nc().await, unit)
969            .await)
970    }
971
972    /// Returns a stream that yields the current client balance every time it
973    /// changes.
974    pub async fn subscribe_balance_changes(&self, unit: AmountUnit) -> BoxStream<'static, Amount> {
975        let primary_module_things =
976            if let Some((primary_module_id, primary_module)) = self.primary_module_for_unit(unit) {
977                let balance_changes = primary_module.subscribe_balance_changes().await;
978                let initial_balance = self
979                    .get_balance_for_unit(unit)
980                    .await
981                    .expect("Primary is present");
982
983                Some((
984                    primary_module_id,
985                    primary_module.clone(),
986                    balance_changes,
987                    initial_balance,
988                ))
989            } else {
990                None
991            };
992        let db = self.db().clone();
993
994        Box::pin(async_stream::stream! {
995            let Some((primary_module_id, primary_module, mut balance_changes, initial_balance)) = primary_module_things else {
996                // If there is no primary module, there will not be one until client is
997                // restarted
998                pending().await
999            };
1000
1001
1002            yield initial_balance;
1003            let mut prev_balance = initial_balance;
1004            while let Some(()) = balance_changes.next().await {
1005                let mut dbtx = db.begin_transaction_nc().await;
1006                let balance = primary_module
1007                     .get_balance(primary_module_id, &mut dbtx, unit)
1008                    .await;
1009
1010                // Deduplicate in case modules cannot always tell if the balance actually changed
1011                if balance != prev_balance {
1012                    prev_balance = balance;
1013                    yield balance;
1014                }
1015            }
1016        })
1017    }
1018
1019    /// Make a single API version request to a peer after a delay.
1020    ///
1021    /// The delay is here to unify the type of a future both for initial request
1022    /// and possible retries.
1023    async fn make_api_version_request(
1024        delay: Duration,
1025        peer_id: PeerId,
1026        api: &DynGlobalApi,
1027    ) -> (
1028        PeerId,
1029        Result<SupportedApiVersionsSummary, fedimint_connectors::error::ServerError>,
1030    ) {
1031        runtime::sleep(delay).await;
1032        (
1033            peer_id,
1034            api.request_single_peer::<SupportedApiVersionsSummary>(
1035                VERSION_ENDPOINT.to_owned(),
1036                ApiRequestErased::default(),
1037                peer_id,
1038            )
1039            .await,
1040        )
1041    }
1042
1043    /// Create a backoff strategy for API version requests.
1044    ///
1045    /// Keep trying, initially somewhat aggressively, but after a while retry
1046    /// very slowly, because chances for response are getting lower and
1047    /// lower.
1048    fn create_api_version_backoff() -> impl Iterator<Item = Duration> {
1049        custom_backoff(Duration::from_millis(200), Duration::from_secs(600), None)
1050    }
1051
1052    /// Query the federation for API version support and then calculate
1053    /// the best API version to use (supported by most guardians).
1054    pub async fn fetch_common_api_versions_from_all_peers(
1055        num_peers: NumPeers,
1056        api: DynGlobalApi,
1057        db: Database,
1058        num_responses_sender: watch::Sender<usize>,
1059    ) {
1060        let mut backoff = Self::create_api_version_backoff();
1061
1062        // NOTE: `FuturesUnordered` is a footgun, but since we only poll it for result
1063        // and make a single async db write operation, it should be OK.
1064        let mut requests = FuturesUnordered::new();
1065
1066        for peer_id in num_peers.peer_ids() {
1067            requests.push(Self::make_api_version_request(
1068                Duration::ZERO,
1069                peer_id,
1070                &api,
1071            ));
1072        }
1073
1074        let mut num_responses = 0;
1075
1076        while let Some((peer_id, response)) = requests.next().await {
1077            let retry = match response {
1078                Err(err) => {
1079                    let has_previous_response = db
1080                        .begin_transaction_nc()
1081                        .await
1082                        .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1083                        .await
1084                        .is_some();
1085                    debug!(
1086                        target: LOG_CLIENT,
1087                        %peer_id,
1088                        err = %err.fmt_compact(),
1089                        %has_previous_response,
1090                        "Failed to refresh API versions of a peer"
1091                    );
1092
1093                    !has_previous_response
1094                }
1095                Ok(o) => {
1096                    // Save the response to the database right away, just to
1097                    // not lose it
1098                    let mut dbtx = db.begin_transaction().await;
1099                    dbtx.insert_entry(
1100                        &PeerLastApiVersionsSummaryKey(peer_id),
1101                        &PeerLastApiVersionsSummary(o),
1102                    )
1103                    .await;
1104                    dbtx.commit_tx().await;
1105                    false
1106                }
1107            };
1108
1109            if retry {
1110                requests.push(Self::make_api_version_request(
1111                    backoff.next().expect("Keeps retrying"),
1112                    peer_id,
1113                    &api,
1114                ));
1115            } else {
1116                num_responses += 1;
1117                num_responses_sender.send_replace(num_responses);
1118            }
1119        }
1120    }
1121
1122    /// Fetch API versions from peers, retrying until we get threshold number of
1123    /// successful responses. Returns the successful responses collected
1124    /// from at least `num_peers.threshold()` peers.
1125    pub async fn fetch_peers_api_versions_from_threshold_of_peers(
1126        num_peers: NumPeers,
1127        api: DynGlobalApi,
1128    ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1129        let mut backoff = Self::create_api_version_backoff();
1130
1131        // NOTE: `FuturesUnordered` is a footgun, but since we only poll it for result
1132        // and collect responses, it should be OK.
1133        let mut requests = FuturesUnordered::new();
1134
1135        for peer_id in num_peers.peer_ids() {
1136            requests.push(Self::make_api_version_request(
1137                Duration::ZERO,
1138                peer_id,
1139                &api,
1140            ));
1141        }
1142
1143        let mut successful_responses = BTreeMap::new();
1144
1145        while successful_responses.len() < num_peers.threshold()
1146            && let Some((peer_id, response)) = requests.next().await
1147        {
1148            let retry = match response {
1149                Err(err) => {
1150                    debug!(
1151                        target: LOG_CLIENT,
1152                        %peer_id,
1153                        err = %err.fmt_compact(),
1154                        "Failed to fetch API versions from peer"
1155                    );
1156                    true
1157                }
1158                Ok(response) => {
1159                    successful_responses.insert(peer_id, response);
1160                    false
1161                }
1162            };
1163
1164            if retry {
1165                requests.push(Self::make_api_version_request(
1166                    backoff.next().expect("Keeps retrying"),
1167                    peer_id,
1168                    &api,
1169                ));
1170            }
1171        }
1172
1173        successful_responses
1174    }
1175
1176    /// Fetch API versions from peers and discover common API versions to use.
1177    pub async fn fetch_common_api_versions(
1178        config: &ClientConfig,
1179        api: &DynGlobalApi,
1180    ) -> anyhow::Result<BTreeMap<PeerId, SupportedApiVersionsSummary>> {
1181        debug!(
1182            target: LOG_CLIENT,
1183            "Fetching common api versions"
1184        );
1185
1186        let num_peers = NumPeers::from(config.global.api_endpoints.len());
1187
1188        let peer_api_version_sets =
1189            Self::fetch_peers_api_versions_from_threshold_of_peers(num_peers, api.clone()).await;
1190
1191        Ok(peer_api_version_sets)
1192    }
1193
1194    /// Write API version set to database cache.
1195    /// Used when we have a pre-calculated API version set that should be stored
1196    /// for later use.
1197    pub async fn write_api_version_cache(
1198        dbtx: &mut DatabaseTransaction<'_>,
1199        api_version_set: ApiVersionSet,
1200    ) {
1201        debug!(
1202            target: LOG_CLIENT,
1203            value = ?api_version_set,
1204            "Writing API version set to cache"
1205        );
1206
1207        dbtx.insert_entry(
1208            &CachedApiVersionSetKey,
1209            &CachedApiVersionSet(api_version_set),
1210        )
1211        .await;
1212    }
1213
1214    /// Store prefetched peer API version responses and calculate/store common
1215    /// API version set. This processes the individual peer responses by
1216    /// storing them in the database and calculating the common API version
1217    /// set for caching.
1218    pub async fn store_prefetched_api_versions(
1219        db: &Database,
1220        config: &ClientConfig,
1221        client_module_init: &ClientModuleInitRegistry,
1222        peer_api_versions: &BTreeMap<PeerId, SupportedApiVersionsSummary>,
1223    ) {
1224        debug!(
1225            target: LOG_CLIENT,
1226            "Storing {} prefetched peer API version responses and calculating common version set",
1227            peer_api_versions.len()
1228        );
1229
1230        let mut dbtx = db.begin_transaction().await;
1231        // Calculate common API version set from individual responses
1232        let client_supported_versions =
1233            Self::supported_api_versions_summary_static(config, client_module_init);
1234        match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1235            &client_supported_versions,
1236            peer_api_versions,
1237        ) {
1238            Ok(common_api_versions) => {
1239                // Write the calculated common API version set to database cache
1240                Self::write_api_version_cache(&mut dbtx.to_ref_nc(), common_api_versions).await;
1241                debug!(target: LOG_CLIENT, "Calculated and stored common API version set");
1242            }
1243            Err(err) => {
1244                debug!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Failed to calculate common API versions from prefetched data");
1245            }
1246        }
1247
1248        // Store individual peer responses to database
1249        for (peer_id, peer_api_versions) in peer_api_versions {
1250            dbtx.insert_entry(
1251                &PeerLastApiVersionsSummaryKey(*peer_id),
1252                &PeerLastApiVersionsSummary(peer_api_versions.clone()),
1253            )
1254            .await;
1255        }
1256        dbtx.commit_tx().await;
1257        debug!(target: LOG_CLIENT, "Stored individual peer API version responses");
1258    }
1259
1260    /// [`SupportedApiVersionsSummary`] that the client and its modules support
1261    pub fn supported_api_versions_summary_static(
1262        config: &ClientConfig,
1263        client_module_init: &ClientModuleInitRegistry,
1264    ) -> SupportedApiVersionsSummary {
1265        SupportedApiVersionsSummary {
1266            core: SupportedCoreApiVersions {
1267                core_consensus: config.global.consensus_version,
1268                api: MultiApiVersion::try_from_iter(SUPPORTED_CORE_API_VERSIONS.to_owned())
1269                    .expect("must not have conflicting versions"),
1270            },
1271            modules: config
1272                .modules
1273                .iter()
1274                .filter_map(|(&module_instance_id, module_config)| {
1275                    client_module_init
1276                        .get(module_config.kind())
1277                        .map(|module_init| {
1278                            (
1279                                module_instance_id,
1280                                SupportedModuleApiVersions {
1281                                    core_consensus: config.global.consensus_version,
1282                                    module_consensus: module_config.version,
1283                                    api: module_init.supported_api_versions(),
1284                                },
1285                            )
1286                        })
1287                })
1288                .collect(),
1289        }
1290    }
1291
1292    pub async fn load_and_refresh_common_api_version(&self) -> anyhow::Result<ApiVersionSet> {
1293        Self::load_and_refresh_common_api_version_static(
1294            &self.config().await,
1295            &self.module_inits,
1296            self.connectors.clone(),
1297            &self.api,
1298            &self.db,
1299            &self.task_group,
1300        )
1301        .await
1302    }
1303
1304    /// Load the common api versions to use from cache and start a background
1305    /// process to refresh them.
1306    ///
1307    /// This is a compromise, so we not have to wait for version discovery to
1308    /// complete every time a [`Client`] is being built.
1309    async fn load_and_refresh_common_api_version_static(
1310        config: &ClientConfig,
1311        module_init: &ClientModuleInitRegistry,
1312        connectors: ConnectorRegistry,
1313        api: &DynGlobalApi,
1314        db: &Database,
1315        task_group: &TaskGroup,
1316    ) -> anyhow::Result<ApiVersionSet> {
1317        if let Some(v) = db
1318            .begin_transaction_nc()
1319            .await
1320            .get_value(&CachedApiVersionSetKey)
1321            .await
1322        {
1323            debug!(
1324                target: LOG_CLIENT,
1325                "Found existing cached common api versions"
1326            );
1327            let config = config.clone();
1328            let client_module_init = module_init.clone();
1329            let api = api.clone();
1330            let db = db.clone();
1331            let task_group = task_group.clone();
1332            // Separate task group, because we actually don't want to be waiting for this to
1333            // finish, and it's just best effort.
1334            task_group
1335                .clone()
1336                .spawn_cancellable("refresh_common_api_version_static", async move {
1337                    connectors.wait_for_initialized_connections().await;
1338
1339                    if let Err(error) = Self::refresh_common_api_version_static(
1340                        &config,
1341                        &client_module_init,
1342                        &api,
1343                        &db,
1344                        task_group,
1345                        false,
1346                    )
1347                    .await
1348                    {
1349                        warn!(
1350                            target: LOG_CLIENT,
1351                            err = %error.fmt_compact_anyhow(), "Failed to discover common api versions"
1352                        );
1353                    }
1354                });
1355
1356            return Ok(v.0);
1357        }
1358
1359        info!(
1360            target: LOG_CLIENT,
1361            "Fetching initial API versions "
1362        );
1363        Self::refresh_common_api_version_static(
1364            config,
1365            module_init,
1366            api,
1367            db,
1368            task_group.clone(),
1369            true,
1370        )
1371        .await
1372    }
1373
1374    async fn refresh_common_api_version_static(
1375        config: &ClientConfig,
1376        client_module_init: &ClientModuleInitRegistry,
1377        api: &DynGlobalApi,
1378        db: &Database,
1379        task_group: TaskGroup,
1380        block_until_ok: bool,
1381    ) -> anyhow::Result<ApiVersionSet> {
1382        debug!(
1383            target: LOG_CLIENT,
1384            "Refreshing common api versions"
1385        );
1386
1387        let (num_responses_sender, mut num_responses_receiver) = tokio::sync::watch::channel(0);
1388        let num_peers = NumPeers::from(config.global.api_endpoints.len());
1389
1390        task_group.spawn_cancellable("refresh peers api versions", {
1391            Client::fetch_common_api_versions_from_all_peers(
1392                num_peers,
1393                api.clone(),
1394                db.clone(),
1395                num_responses_sender,
1396            )
1397        });
1398
1399        let common_api_versions = loop {
1400            // Wait to collect enough answers before calculating a set of common api
1401            // versions to use. Note that all peers individual responses from
1402            // previous attempts are still being used, and requests, or even
1403            // retries for response of peers are not actually cancelled, as they
1404            // are happening on a separate task. This is all just to bound the
1405            // time user can be waiting for the join operation to finish, at the
1406            // risk of picking wrong version in very rare circumstances.
1407            let _: Result<_, Elapsed> = runtime::timeout(
1408                Duration::from_secs(30),
1409                num_responses_receiver.wait_for(|num| num_peers.threshold() <= *num),
1410            )
1411            .await;
1412
1413            let peer_api_version_sets = Self::load_peers_last_api_versions(db, num_peers).await;
1414
1415            match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1416                &Self::supported_api_versions_summary_static(config, client_module_init),
1417                &peer_api_version_sets,
1418            ) {
1419                Ok(o) => break o,
1420                Err(err) if block_until_ok => {
1421                    warn!(
1422                        target: LOG_CLIENT,
1423                        err = %err.fmt_compact_anyhow(),
1424                        "Failed to discover API version to use. Retrying..."
1425                    );
1426                    continue;
1427                }
1428                Err(e) => return Err(e),
1429            }
1430        };
1431
1432        debug!(
1433            target: LOG_CLIENT,
1434            value = ?common_api_versions,
1435            "Updating the cached common api versions"
1436        );
1437        let mut dbtx = db.begin_transaction().await;
1438        let _ = dbtx
1439            .insert_entry(
1440                &CachedApiVersionSetKey,
1441                &CachedApiVersionSet(common_api_versions.clone()),
1442            )
1443            .await;
1444
1445        dbtx.commit_tx().await;
1446
1447        Ok(common_api_versions)
1448    }
1449
1450    /// Get the client [`Metadata`]
1451    pub async fn get_metadata(&self) -> Metadata {
1452        self.db
1453            .begin_transaction_nc()
1454            .await
1455            .get_value(&ClientMetadataKey)
1456            .await
1457            .unwrap_or_else(|| {
1458                warn!(
1459                    target: LOG_CLIENT,
1460                    "Missing existing metadata. This key should have been set on Client init"
1461                );
1462                Metadata::empty()
1463            })
1464    }
1465
1466    /// Set the client [`Metadata`]
1467    pub async fn set_metadata(&self, metadata: &Metadata) {
1468        self.db
1469            .autocommit::<_, _, anyhow::Error>(
1470                |dbtx, _| {
1471                    Box::pin(async {
1472                        Self::set_metadata_dbtx(dbtx, metadata).await;
1473                        Ok(())
1474                    })
1475                },
1476                None,
1477            )
1478            .await
1479            .expect("Failed to autocommit metadata");
1480    }
1481
1482    pub fn has_pending_recoveries(&self) -> bool {
1483        !self
1484            .client_recovery_progress_receiver
1485            .borrow()
1486            .iter()
1487            .all(|(_id, progress)| progress.is_done())
1488    }
1489
1490    /// Wait for all module recoveries to finish
1491    ///
1492    /// This will block until the recovery task is done with recoveries.
1493    /// Returns success if all recovery tasks are complete (success case),
1494    /// or an error if some modules could not complete the recovery at the time.
1495    ///
1496    /// A bit of a heavy approach.
1497    pub async fn wait_for_all_recoveries(&self) -> anyhow::Result<()> {
1498        let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1499        recovery_receiver
1500            .wait_for(|in_progress| {
1501                in_progress
1502                    .iter()
1503                    .all(|(_id, progress)| progress.is_done())
1504            })
1505            .await
1506            .context("Recovery task completed and update receiver disconnected, but some modules failed to recover")?;
1507
1508        Ok(())
1509    }
1510
1511    /// Subscribe to recover progress for all the modules.
1512    ///
1513    /// This stream can contain duplicate progress for a module.
1514    /// Don't use this stream for detecting completion of recovery.
1515    pub fn subscribe_to_recovery_progress(
1516        &self,
1517    ) -> impl Stream<Item = (ModuleInstanceId, RecoveryProgress)> + use<> {
1518        WatchStream::new(self.client_recovery_progress_receiver.clone())
1519            .flat_map(futures::stream::iter)
1520    }
1521
1522    pub async fn wait_for_module_kind_recovery(
1523        &self,
1524        module_kind: ModuleKind,
1525    ) -> anyhow::Result<()> {
1526        let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1527        let config = self.config().await;
1528        recovery_receiver
1529            .wait_for(|in_progress| {
1530                !in_progress
1531                    .iter()
1532                    .filter(|(module_instance_id, _progress)| {
1533                        config.modules[module_instance_id].kind == module_kind
1534                    })
1535                    .any(|(_id, progress)| !progress.is_done())
1536            })
1537            .await
1538            .context("Recovery task completed and update receiver disconnected, but the desired modules are still unavailable or failed to recover")?;
1539
1540        Ok(())
1541    }
1542
1543    pub async fn wait_for_all_active_state_machines(&self) -> anyhow::Result<()> {
1544        loop {
1545            if self.executor.get_active_states().await.is_empty() {
1546                break;
1547            }
1548            sleep(Duration::from_millis(100)).await;
1549        }
1550        Ok(())
1551    }
1552
1553    /// Set the client [`Metadata`]
1554    pub async fn set_metadata_dbtx(dbtx: &mut DatabaseTransaction<'_>, metadata: &Metadata) {
1555        dbtx.insert_new_entry(&ClientMetadataKey, metadata).await;
1556    }
1557
1558    fn spawn_module_recoveries_task(
1559        &self,
1560        recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1561        module_recoveries: BTreeMap<
1562            ModuleInstanceId,
1563            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1564        >,
1565        module_recovery_progress_receivers: BTreeMap<
1566            ModuleInstanceId,
1567            watch::Receiver<RecoveryProgress>,
1568        >,
1569    ) {
1570        let db = self.db.clone();
1571        let log_ordering_wakeup_tx = self.log_ordering_wakeup_tx.clone();
1572        let module_kinds: BTreeMap<ModuleInstanceId, String> = self
1573            .modules
1574            .iter_modules_id_kind()
1575            .map(|(id, kind)| (id, kind.to_string()))
1576            .collect();
1577        self.task_group
1578            .spawn("module recoveries", |_task_handle| async {
1579                Self::run_module_recoveries_task(
1580                    db,
1581                    log_ordering_wakeup_tx,
1582                    recovery_sender,
1583                    module_recoveries,
1584                    module_recovery_progress_receivers,
1585                    module_kinds,
1586                )
1587                .await;
1588            });
1589    }
1590
1591    async fn run_module_recoveries_task(
1592        db: Database,
1593        log_ordering_wakeup_tx: watch::Sender<()>,
1594        recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1595        module_recoveries: BTreeMap<
1596            ModuleInstanceId,
1597            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1598        >,
1599        module_recovery_progress_receivers: BTreeMap<
1600            ModuleInstanceId,
1601            watch::Receiver<RecoveryProgress>,
1602        >,
1603        module_kinds: BTreeMap<ModuleInstanceId, String>,
1604    ) {
1605        debug!(target: LOG_CLIENT_RECOVERY, num_modules=%module_recovery_progress_receivers.len(), "Staring module recoveries");
1606        let mut completed_stream = Vec::new();
1607        let progress_stream = futures::stream::FuturesUnordered::new();
1608
1609        for (module_instance_id, f) in module_recoveries {
1610            completed_stream.push(futures::stream::once(Box::pin(async move {
1611                match f.await {
1612                    Ok(()) => (module_instance_id, None),
1613                    Err(err) => {
1614                        warn!(
1615                            target: LOG_CLIENT,
1616                            err = %err.fmt_compact_anyhow(), module_instance_id, "Module recovery failed"
1617                        );
1618                        // a module recovery that failed reports and error and
1619                        // just never finishes, so we don't need a separate state
1620                        // for it
1621                        futures::future::pending::<()>().await;
1622                        unreachable!()
1623                    }
1624                }
1625            })));
1626        }
1627
1628        for (module_instance_id, rx) in module_recovery_progress_receivers {
1629            progress_stream.push(
1630                tokio_stream::wrappers::WatchStream::new(rx)
1631                    .fuse()
1632                    .map(move |progress| (module_instance_id, Some(progress))),
1633            );
1634        }
1635
1636        let mut futures = futures::stream::select(
1637            futures::stream::select_all(progress_stream),
1638            futures::stream::select_all(completed_stream),
1639        );
1640
1641        while let Some((module_instance_id, progress)) = futures.next().await {
1642            let mut dbtx = db.begin_transaction().await;
1643
1644            let prev_progress = *recovery_sender
1645                .borrow()
1646                .get(&module_instance_id)
1647                .expect("existing progress must be present");
1648
1649            let progress = if prev_progress.is_done() {
1650                // since updates might be out of order, once done, stick with it
1651                prev_progress
1652            } else if let Some(progress) = progress {
1653                progress
1654            } else {
1655                prev_progress.to_complete()
1656            };
1657
1658            if !prev_progress.is_done() && progress.is_done() {
1659                info!(
1660                    target: LOG_CLIENT,
1661                    module_instance_id,
1662                    progress = format!("{}/{}", progress.complete, progress.total),
1663                    "Recovery complete"
1664                );
1665                dbtx.log_event(
1666                    log_ordering_wakeup_tx.clone(),
1667                    None,
1668                    ModuleRecoveryCompleted {
1669                        module_id: module_instance_id,
1670                    },
1671                )
1672                .await;
1673            } else {
1674                info!(
1675                    target: LOG_CLIENT,
1676                    module_instance_id,
1677                    kind = module_kinds.get(&module_instance_id).map(String::as_str).unwrap_or("unknown"),
1678                    progress = format!("{}/{}", progress.complete, progress.total),
1679                    "Recovery progress"
1680                );
1681            }
1682
1683            dbtx.insert_entry(
1684                &ClientModuleRecovery { module_instance_id },
1685                &ClientModuleRecoveryState { progress },
1686            )
1687            .await;
1688            dbtx.commit_tx().await;
1689
1690            recovery_sender.send_modify(|v| {
1691                v.insert(module_instance_id, progress);
1692            });
1693        }
1694        debug!(target: LOG_CLIENT_RECOVERY, "Recovery executor stopped");
1695    }
1696
1697    async fn load_peers_last_api_versions(
1698        db: &Database,
1699        num_peers: NumPeers,
1700    ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1701        let mut peer_api_version_sets = BTreeMap::new();
1702
1703        let mut dbtx = db.begin_transaction_nc().await;
1704        for peer_id in num_peers.peer_ids() {
1705            if let Some(v) = dbtx
1706                .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1707                .await
1708            {
1709                peer_api_version_sets.insert(peer_id, v.0);
1710            }
1711        }
1712        drop(dbtx);
1713        peer_api_version_sets
1714    }
1715
1716    /// You likely want to use [`Client::get_peer_urls`]. This function returns
1717    /// only the announcements and doesn't use the config as fallback.
1718    pub async fn get_peer_url_announcements(&self) -> BTreeMap<PeerId, SignedApiAnnouncement> {
1719        self.db()
1720            .begin_transaction_nc()
1721            .await
1722            .find_by_prefix(&ApiAnnouncementPrefix)
1723            .await
1724            .map(|(announcement_key, announcement)| (announcement_key.0, announcement))
1725            .collect()
1726            .await
1727    }
1728
1729    /// Returns guardian metadata stored in the client database
1730    pub async fn get_guardian_metadata(
1731        &self,
1732    ) -> BTreeMap<PeerId, fedimint_core::net::guardian_metadata::SignedGuardianMetadata> {
1733        self.db()
1734            .begin_transaction_nc()
1735            .await
1736            .find_by_prefix(&crate::guardian_metadata::GuardianMetadataPrefix)
1737            .await
1738            .map(|(key, metadata)| (key.0, metadata))
1739            .collect()
1740            .await
1741    }
1742
1743    /// Returns a list of guardian API URLs
1744    pub async fn get_peer_urls(&self) -> BTreeMap<PeerId, SafeUrl> {
1745        get_api_urls(&self.db, &self.config().await).await
1746    }
1747
1748    /// Create an invite code with the api endpoint of the given peer which can
1749    /// be used to download this client config
1750    pub async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
1751        self.get_peer_urls()
1752            .await
1753            .into_iter()
1754            .find_map(|(peer_id, url)| (peer == peer_id).then_some(url))
1755            .map(|peer_url| {
1756                InviteCode::new(
1757                    peer_url.clone(),
1758                    peer,
1759                    self.federation_id(),
1760                    self.api_secret.clone(),
1761                )
1762            })
1763    }
1764
1765    /// Blocks till the client has synced the guardian public key set
1766    /// (introduced in version 0.4) and returns it. Once it has been fetched
1767    /// once this function is guaranteed to return immediately.
1768    pub async fn get_guardian_public_keys_blocking(
1769        &self,
1770    ) -> BTreeMap<PeerId, fedimint_core::secp256k1::PublicKey> {
1771        self.db
1772            .autocommit(
1773                |dbtx, _| {
1774                    Box::pin(async move {
1775                        let config = self.config().await;
1776
1777                        let guardian_pub_keys = self
1778                            .get_or_backfill_broadcast_public_keys(dbtx, config)
1779                            .await;
1780
1781                        Result::<_, ()>::Ok(guardian_pub_keys)
1782                    })
1783                },
1784                None,
1785            )
1786            .await
1787            .expect("Will retry forever")
1788    }
1789
1790    async fn get_or_backfill_broadcast_public_keys(
1791        &self,
1792        dbtx: &mut DatabaseTransaction<'_>,
1793        config: ClientConfig,
1794    ) -> BTreeMap<PeerId, PublicKey> {
1795        match config.global.broadcast_public_keys {
1796            Some(guardian_pub_keys) => guardian_pub_keys,
1797            _ => {
1798                let (guardian_pub_keys, new_config) = self.fetch_and_update_config(config).await;
1799
1800                dbtx.insert_entry(&ClientConfigKey, &new_config).await;
1801                *(self.config.write().await) = new_config;
1802                guardian_pub_keys
1803            }
1804        }
1805    }
1806
1807    async fn fetch_session_count(&self) -> FederationResult<u64> {
1808        self.api.session_count().await
1809    }
1810
1811    async fn fetch_and_update_config(
1812        &self,
1813        config: ClientConfig,
1814    ) -> (BTreeMap<PeerId, PublicKey>, ClientConfig) {
1815        let fetched_config = retry(
1816            "Fetching guardian public keys",
1817            backoff_util::background_backoff(),
1818            || async {
1819                Ok(self
1820                    .api
1821                    .request_current_consensus::<ClientConfig>(
1822                        CLIENT_CONFIG_ENDPOINT.to_owned(),
1823                        ApiRequestErased::default(),
1824                    )
1825                    .await?)
1826            },
1827        )
1828        .await
1829        .expect("Will never return on error");
1830
1831        let Some(guardian_pub_keys) = fetched_config.global.broadcast_public_keys else {
1832            warn!(
1833                target: LOG_CLIENT,
1834                "Guardian public keys not found in fetched config, server not updated to 0.4 yet"
1835            );
1836            pending::<()>().await;
1837            unreachable!("Pending will never return");
1838        };
1839
1840        let new_config = ClientConfig {
1841            global: GlobalClientConfig {
1842                broadcast_public_keys: Some(guardian_pub_keys.clone()),
1843                ..config.global
1844            },
1845            modules: config.modules,
1846        };
1847        (guardian_pub_keys, new_config)
1848    }
1849
1850    pub fn handle_global_rpc(
1851        &self,
1852        method: String,
1853        params: serde_json::Value,
1854    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1855        Box::pin(try_stream! {
1856            match method.as_str() {
1857                "get_balance" => {
1858                    let balance = self.get_balance_for_btc().await.unwrap_or_default();
1859                    yield serde_json::to_value(balance)?;
1860                }
1861                "subscribe_balance_changes" => {
1862                    let req: GetBalanceChangesRequest= serde_json::from_value(params)?;
1863                    let mut stream = self.subscribe_balance_changes(req.unit).await;
1864                    while let Some(balance) = stream.next().await {
1865                        yield serde_json::to_value(balance)?;
1866                    }
1867                }
1868                "get_config" => {
1869                    let config = self.config().await;
1870                    yield serde_json::to_value(config)?;
1871                }
1872                "get_federation_id" => {
1873                    let federation_id = self.federation_id();
1874                    yield serde_json::to_value(federation_id)?;
1875                }
1876                "get_invite_code" => {
1877                    let req: GetInviteCodeRequest = serde_json::from_value(params)?;
1878                    let invite_code = self.invite_code(req.peer).await;
1879                    yield serde_json::to_value(invite_code)?;
1880                }
1881                "get_operation" => {
1882                    let req: GetOperationIdRequest = serde_json::from_value(params)?;
1883                    let operation = self.operation_log().get_operation(req.operation_id).await;
1884                    yield serde_json::to_value(operation)?;
1885                }
1886                "list_operations" => {
1887                    let req: ListOperationsParams = serde_json::from_value(params)?;
1888                    let limit = if req.limit.is_none() && req.last_seen.is_none() {
1889                        usize::MAX
1890                    } else {
1891                        req.limit.unwrap_or(usize::MAX)
1892                    };
1893                    let operations = self.operation_log()
1894                        .paginate_operations_rev(limit, req.last_seen)
1895                        .await;
1896                    yield serde_json::to_value(operations)?;
1897                }
1898                "session_count" => {
1899                    let count = self.fetch_session_count().await?;
1900                    yield serde_json::to_value(count)?;
1901                }
1902                "has_pending_recoveries" => {
1903                    let has_pending = self.has_pending_recoveries();
1904                    yield serde_json::to_value(has_pending)?;
1905                }
1906                "wait_for_all_recoveries" => {
1907                    self.wait_for_all_recoveries().await?;
1908                    yield serde_json::Value::Null;
1909                }
1910                "subscribe_to_recovery_progress" => {
1911                    let mut stream = self.subscribe_to_recovery_progress();
1912                    while let Some((module_id, progress)) = stream.next().await {
1913                        yield serde_json::json!({
1914                            "module_id": module_id,
1915                            "progress": progress
1916                        });
1917                    }
1918                }
1919                #[allow(deprecated)]
1920                "backup_to_federation" => {
1921                    let metadata = if params.is_null() {
1922                        Metadata::from_json_serialized(serde_json::json!({}))
1923                    } else {
1924                        Metadata::from_json_serialized(params)
1925                    };
1926                    self.backup_to_federation(metadata).await?;
1927                    yield serde_json::Value::Null;
1928                }
1929                _ => {
1930                    Err(anyhow::format_err!("Unknown method: {}", method))?;
1931                    unreachable!()
1932                },
1933            }
1934        })
1935    }
1936
1937    pub async fn log_event<E>(&self, module_id: Option<ModuleInstanceId>, event: E)
1938    where
1939        E: Event + Send,
1940    {
1941        let mut dbtx = self.db.begin_transaction().await;
1942        self.log_event_dbtx(&mut dbtx, module_id, event).await;
1943        dbtx.commit_tx().await;
1944    }
1945
1946    pub async fn log_event_dbtx<E, Cap>(
1947        &self,
1948        dbtx: &mut DatabaseTransaction<'_, Cap>,
1949        module_id: Option<ModuleInstanceId>,
1950        event: E,
1951    ) where
1952        E: Event + Send,
1953        Cap: Send,
1954    {
1955        dbtx.log_event(self.log_ordering_wakeup_tx.clone(), module_id, event)
1956            .await;
1957    }
1958
1959    pub async fn log_event_raw_dbtx<Cap>(
1960        &self,
1961        dbtx: &mut DatabaseTransaction<'_, Cap>,
1962        kind: EventKind,
1963        module: Option<(ModuleKind, ModuleInstanceId)>,
1964        payload: Vec<u8>,
1965        persist: EventPersistence,
1966    ) where
1967        Cap: Send,
1968    {
1969        let module_id = module.as_ref().map(|m| m.1);
1970        let module_kind = module.map(|m| m.0);
1971        dbtx.log_event_raw(
1972            self.log_ordering_wakeup_tx.clone(),
1973            kind,
1974            module_kind,
1975            module_id,
1976            payload,
1977            persist,
1978        )
1979        .await;
1980    }
1981
1982    /// Built in event log (trimmable) tracker
1983    ///
1984    /// For the convenience of downstream applications, [`Client`] can store
1985    /// internally event log position for the main application using/driving it.
1986    ///
1987    /// Note that this position is a singleton, so this tracker should not be
1988    /// used for multiple purposes or applications, etc. at the same time.
1989    ///
1990    /// If the application has a need to follow log using multiple trackers, it
1991    /// should implement own [`DynEventLogTrimableTracker`] and store its
1992    /// persient data by itself.
1993    pub fn built_in_application_event_log_tracker(&self) -> DynEventLogTrimableTracker {
1994        struct BuiltInApplicationEventLogTracker;
1995
1996        #[apply(async_trait_maybe_send!)]
1997        impl EventLogTrimableTracker for BuiltInApplicationEventLogTracker {
1998            // Store position in the event log
1999            async fn store(
2000                &mut self,
2001                dbtx: &mut DatabaseTransaction<NonCommittable>,
2002                pos: EventLogTrimableId,
2003            ) -> anyhow::Result<()> {
2004                dbtx.insert_entry(&DefaultApplicationEventLogKey, &pos)
2005                    .await;
2006                Ok(())
2007            }
2008
2009            /// Load the last previous stored position (or None if never stored)
2010            async fn load(
2011                &mut self,
2012                dbtx: &mut DatabaseTransaction<NonCommittable>,
2013            ) -> anyhow::Result<Option<EventLogTrimableId>> {
2014                Ok(dbtx.get_value(&DefaultApplicationEventLogKey).await)
2015            }
2016        }
2017        Box::new(BuiltInApplicationEventLogTracker)
2018    }
2019
2020    /// Like [`Self::handle_events`] but for historical data.
2021    ///
2022    ///
2023    /// This function can be used to process subset of events
2024    /// that is infrequent and important enough to be persisted
2025    /// forever. Most applications should prefer to use [`Self::handle_events`]
2026    /// which emits *all* events.
2027    pub async fn handle_historical_events<F, R>(
2028        &self,
2029        tracker: fedimint_eventlog::DynEventLogTracker,
2030        handler_fn: F,
2031    ) -> anyhow::Result<()>
2032    where
2033        F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
2034        R: Future<Output = anyhow::Result<()>>,
2035    {
2036        fedimint_eventlog::handle_events(
2037            self.db.clone(),
2038            tracker,
2039            self.log_event_added_rx.clone(),
2040            handler_fn,
2041        )
2042        .await
2043    }
2044
2045    /// Handle events emitted by the client
2046    ///
2047    /// This is a preferred method for reactive & asynchronous
2048    /// processing of events emitted by the client.
2049    ///
2050    /// It needs a `tracker` that will persist the position in the log
2051    /// as it is being handled. You can use the
2052    /// [`Client::built_in_application_event_log_tracker`] if this call is
2053    /// used for the single main application handling this instance of the
2054    /// [`Client`]. Otherwise you should implement your own tracker.
2055    ///
2056    /// This handler will call `handle_fn` with ever event emitted by
2057    /// [`Client`], including transient ones. The caller should atomically
2058    /// handle each event it is interested in and ignore other ones.
2059    ///
2060    /// This method returns only when client is shutting down or on internal
2061    /// error, so typically should be called in a background task dedicated
2062    /// to handling events.
2063    pub async fn handle_events<F, R>(
2064        &self,
2065        tracker: fedimint_eventlog::DynEventLogTrimableTracker,
2066        handler_fn: F,
2067    ) -> anyhow::Result<()>
2068    where
2069        F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
2070        R: Future<Output = anyhow::Result<()>>,
2071    {
2072        fedimint_eventlog::handle_trimable_events(
2073            self.db.clone(),
2074            tracker,
2075            self.log_event_added_rx.clone(),
2076            handler_fn,
2077        )
2078        .await
2079    }
2080
2081    pub async fn get_event_log(
2082        &self,
2083        pos: Option<EventLogId>,
2084        limit: u64,
2085    ) -> Vec<PersistedLogEntry> {
2086        self.get_event_log_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
2087            .await
2088    }
2089
2090    pub async fn get_event_log_trimable(
2091        &self,
2092        pos: Option<EventLogTrimableId>,
2093        limit: u64,
2094    ) -> Vec<PersistedLogEntry> {
2095        self.get_event_log_trimable_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
2096            .await
2097    }
2098
2099    pub async fn get_event_log_dbtx<Cap>(
2100        &self,
2101        dbtx: &mut DatabaseTransaction<'_, Cap>,
2102        pos: Option<EventLogId>,
2103        limit: u64,
2104    ) -> Vec<PersistedLogEntry>
2105    where
2106        Cap: Send,
2107    {
2108        dbtx.get_event_log(pos, limit).await
2109    }
2110
2111    pub async fn get_event_log_trimable_dbtx<Cap>(
2112        &self,
2113        dbtx: &mut DatabaseTransaction<'_, Cap>,
2114        pos: Option<EventLogTrimableId>,
2115        limit: u64,
2116    ) -> Vec<PersistedLogEntry>
2117    where
2118        Cap: Send,
2119    {
2120        dbtx.get_event_log_trimable(pos, limit).await
2121    }
2122
2123    /// Register to receiver all new transient (unpersisted) events
2124    pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
2125        self.log_event_added_transient_tx.subscribe()
2126    }
2127
2128    /// Get a receiver that signals when new events are added to the event log
2129    pub fn log_event_added_rx(&self) -> watch::Receiver<()> {
2130        self.log_event_added_rx.clone()
2131    }
2132
2133    pub fn iroh_enable_dht(&self) -> bool {
2134        self.iroh_enable_dht
2135    }
2136
2137    pub(crate) async fn run_core_migrations(
2138        db_no_decoders: &Database,
2139    ) -> Result<(), anyhow::Error> {
2140        let mut dbtx = db_no_decoders.begin_transaction().await;
2141        apply_migrations_core_client_dbtx(&mut dbtx.to_ref_nc(), "fedimint-client".to_string())
2142            .await?;
2143        if is_running_in_test_env() {
2144            verify_client_db_integrity_dbtx(&mut dbtx.to_ref_nc()).await;
2145        }
2146        dbtx.commit_tx_result().await?;
2147        Ok(())
2148    }
2149
2150    /// Iterator over primary modules for a given `unit`
2151    fn primary_modules_for_unit(
2152        &self,
2153        unit: AmountUnit,
2154    ) -> impl Iterator<Item = (ModuleInstanceId, &DynClientModule)> {
2155        self.primary_modules
2156            .iter()
2157            .flat_map(move |(_prio, candidates)| {
2158                candidates
2159                    .specific
2160                    .get(&unit)
2161                    .into_iter()
2162                    .flatten()
2163                    .copied()
2164                    // within same priority, wildcard matches come last
2165                    .chain(candidates.wildcard.iter().copied())
2166            })
2167            .map(|id| (id, self.modules.get_expect(id)))
2168    }
2169
2170    /// Primary module to use for `unit`
2171    ///
2172    /// Currently, just pick the first (highest priority) match
2173    pub fn primary_module_for_unit(
2174        &self,
2175        unit: AmountUnit,
2176    ) -> Option<(ModuleInstanceId, &DynClientModule)> {
2177        self.primary_modules_for_unit(unit).next()
2178    }
2179
2180    /// [`Self::primary_module_for_unit`] for Bitcoin
2181    pub fn primary_module_for_btc(&self) -> (ModuleInstanceId, &DynClientModule) {
2182        self.primary_module_for_unit(AmountUnit::BITCOIN)
2183            .expect("No primary module for Bitcoin")
2184    }
2185}
2186
2187#[apply(async_trait_maybe_send!)]
2188impl ClientContextIface for Client {
2189    fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
2190        Client::get_module(self, instance)
2191    }
2192
2193    fn api_clone(&self) -> DynGlobalApi {
2194        Client::api_clone(self)
2195    }
2196    fn decoders(&self) -> &ModuleDecoderRegistry {
2197        Client::decoders(self)
2198    }
2199
2200    async fn finalize_and_submit_transaction(
2201        &self,
2202        operation_id: OperationId,
2203        operation_type: &str,
2204        operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
2205        tx_builder: TransactionBuilder,
2206    ) -> anyhow::Result<OutPointRange> {
2207        Client::finalize_and_submit_transaction(
2208            self,
2209            operation_id,
2210            operation_type,
2211            // |out_point_range| operation_meta_gen(out_point_range),
2212            &operation_meta_gen,
2213            tx_builder,
2214        )
2215        .await
2216    }
2217
2218    async fn finalize_and_submit_transaction_inner(
2219        &self,
2220        dbtx: &mut DatabaseTransaction<'_>,
2221        operation_id: OperationId,
2222        tx_builder: TransactionBuilder,
2223    ) -> anyhow::Result<OutPointRange> {
2224        Client::finalize_and_submit_transaction_inner(self, dbtx, operation_id, tx_builder).await
2225    }
2226
2227    async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
2228        Client::transaction_updates(self, operation_id).await
2229    }
2230
2231    async fn await_primary_module_outputs(
2232        &self,
2233        operation_id: OperationId,
2234        // TODO: make `impl Iterator<Item = ...>`
2235        outputs: Vec<OutPoint>,
2236    ) -> anyhow::Result<()> {
2237        Client::await_primary_bitcoin_module_outputs(self, operation_id, outputs).await
2238    }
2239
2240    fn operation_log(&self) -> &dyn IOperationLog {
2241        Client::operation_log(self)
2242    }
2243
2244    async fn has_active_states(&self, operation_id: OperationId) -> bool {
2245        Client::has_active_states(self, operation_id).await
2246    }
2247
2248    async fn operation_exists(&self, operation_id: OperationId) -> bool {
2249        Client::operation_exists(self, operation_id).await
2250    }
2251
2252    async fn config(&self) -> ClientConfig {
2253        Client::config(self).await
2254    }
2255
2256    fn db(&self) -> &Database {
2257        Client::db(self)
2258    }
2259
2260    fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static)) {
2261        Client::executor(self)
2262    }
2263
2264    async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
2265        Client::invite_code(self, peer).await
2266    }
2267
2268    fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
2269        Client::get_internal_payment_markers(self)
2270    }
2271
2272    async fn log_event_json(
2273        &self,
2274        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
2275        module_kind: Option<ModuleKind>,
2276        module_id: ModuleInstanceId,
2277        kind: EventKind,
2278        payload: serde_json::Value,
2279        persist: EventPersistence,
2280    ) {
2281        dbtx.ensure_global()
2282            .expect("Must be called with global dbtx");
2283        self.log_event_raw_dbtx(
2284            dbtx,
2285            kind,
2286            module_kind.map(|kind| (kind, module_id)),
2287            serde_json::to_vec(&payload).expect("Serialization can't fail"),
2288            persist,
2289        )
2290        .await;
2291    }
2292
2293    async fn read_operation_active_states<'dbtx>(
2294        &self,
2295        operation_id: OperationId,
2296        module_id: ModuleInstanceId,
2297        dbtx: &'dbtx mut DatabaseTransaction<'_>,
2298    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>
2299    {
2300        Box::pin(
2301            dbtx.find_by_prefix(&ActiveModuleOperationStateKeyPrefix {
2302                operation_id,
2303                module_instance: module_id,
2304            })
2305            .await
2306            .map(move |(k, v)| (k.0, v)),
2307        )
2308    }
2309    async fn read_operation_inactive_states<'dbtx>(
2310        &self,
2311        operation_id: OperationId,
2312        module_id: ModuleInstanceId,
2313        dbtx: &'dbtx mut DatabaseTransaction<'_>,
2314    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>
2315    {
2316        Box::pin(
2317            dbtx.find_by_prefix(&InactiveModuleOperationStateKeyPrefix {
2318                operation_id,
2319                module_instance: module_id,
2320            })
2321            .await
2322            .map(move |(k, v)| (k.0, v)),
2323        )
2324    }
2325}
2326
2327// TODO: impl `Debug` for `Client` and derive here
2328impl fmt::Debug for Client {
2329    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2330        write!(f, "Client")
2331    }
2332}
2333
2334pub fn client_decoders<'a>(
2335    registry: &ModuleInitRegistry<DynClientModuleInit>,
2336    module_kinds: impl Iterator<Item = (ModuleInstanceId, &'a ModuleKind)>,
2337) -> ModuleDecoderRegistry {
2338    let mut modules = BTreeMap::new();
2339    for (id, kind) in module_kinds {
2340        let Some(init) = registry.get(kind) else {
2341            debug!("Detected configuration for unsupported module id: {id}, kind: {kind}");
2342            continue;
2343        };
2344
2345        modules.insert(
2346            id,
2347            (
2348                kind.clone(),
2349                IClientModuleInit::decoder(AsRef::<dyn IClientModuleInit + 'static>::as_ref(init)),
2350            ),
2351        );
2352    }
2353    ModuleDecoderRegistry::from(modules)
2354}