Skip to main content

fedimint_client/
client.rs

1use std::collections::{BTreeMap, HashSet};
2use std::fmt::{self, Formatter};
3use std::future::{Future, pending};
4use std::ops::Range;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9use anyhow::{Context as _, anyhow, bail, format_err};
10use async_stream::try_stream;
11use bitcoin::key::Secp256k1;
12use bitcoin::key::rand::thread_rng;
13use bitcoin::secp256k1::{self, PublicKey};
14use fedimint_api_client::api::global_api::with_request_hook::ApiRequestHook;
15use fedimint_api_client::api::{
16    ApiVersionSet, DynGlobalApi, FederationApiExt as _, FederationResult, IGlobalFederationApi,
17};
18use fedimint_bitcoind::DynBitcoindRpc;
19use fedimint_client_module::module::recovery::RecoveryProgress;
20use fedimint_client_module::module::{
21    ClientContextIface, ClientModule, ClientModuleRegistry, DynClientModule, FinalClientIface,
22    IClientModule, IdxRange, OutPointRange, PrimaryModulePriority,
23};
24use fedimint_client_module::oplog::IOperationLog;
25use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy as _};
26use fedimint_client_module::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
27use fedimint_client_module::sm::{ActiveStateMeta, DynState, InactiveStateMeta};
28use fedimint_client_module::transaction::{
29    TRANSACTION_SUBMISSION_MODULE_INSTANCE, TransactionBuilder, TxSubmissionStates,
30    TxSubmissionStatesSM,
31};
32use fedimint_client_module::{
33    AddStateMachinesResult, ClientModuleInstance, GetInviteCodeRequest, ModuleGlobalContextGen,
34    ModuleRecoveryCompleted, TransactionUpdates, TxCreatedEvent,
35};
36use fedimint_connectors::{ConnectorRegistry, PeerStatus};
37use fedimint_core::config::{
38    ClientConfig, FederationId, GlobalClientConfig, JsonClientConfig, ModuleInitRegistry,
39};
40use fedimint_core::core::{DynInput, DynOutput, ModuleInstanceId, ModuleKind, OperationId};
41use fedimint_core::db::{
42    AutocommitError, Database, DatabaseRecord, DatabaseTransaction,
43    IDatabaseTransactionOpsCore as _, IDatabaseTransactionOpsCoreTyped as _, NonCommittable,
44};
45use fedimint_core::encoding::{Decodable, Encodable};
46use fedimint_core::endpoint_constants::{CLIENT_CONFIG_ENDPOINT, VERSION_ENDPOINT};
47use fedimint_core::envs::is_running_in_test_env;
48use fedimint_core::invite_code::InviteCode;
49use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
50use fedimint_core::module::{
51    AmountUnit, Amounts, ApiRequestErased, ApiVersion, MultiApiVersion,
52    SupportedApiVersionsSummary, SupportedCoreApiVersions, SupportedModuleApiVersions,
53};
54use fedimint_core::net::api_announcement::SignedApiAnnouncement;
55use fedimint_core::runtime::sleep;
56use fedimint_core::task::{
57    Elapsed, MaybeSend, MaybeSync, ShuttingDownError, TaskGroup, TaskHandle,
58};
59use fedimint_core::transaction::Transaction;
60use fedimint_core::util::backoff_util::custom_backoff;
61use fedimint_core::util::{
62    BoxStream, FmtCompact as _, FmtCompactAnyhow as _, SafeUrl, backoff_util, retry,
63};
64use fedimint_core::{
65    Amount, ChainId, NumPeers, OutPoint, PeerId, apply, async_trait_maybe_send, maybe_add_send,
66    maybe_add_send_sync, runtime,
67};
68use fedimint_derive_secret::DerivableSecret;
69use fedimint_eventlog::{
70    DBTransactionEventLogExt as _, DynEventLogTrimableTracker, Event, EventKind, EventLogEntry,
71    EventLogId, EventLogTrimableId, EventLogTrimableTracker, EventPersistence, PersistedLogEntry,
72};
73use fedimint_logging::{LOG_CLIENT, LOG_CLIENT_NET_API, LOG_CLIENT_RECOVERY};
74use futures::stream::FuturesUnordered;
75use futures::{Stream, StreamExt as _};
76use global_ctx::ModuleGlobalClientContext;
77use serde::{Deserialize, Serialize};
78use tokio::sync::{broadcast, oneshot, watch};
79use tokio_stream::wrappers::WatchStream;
80use tracing::{Span, debug, info, warn};
81
82use crate::ClientBuilder;
83use crate::api_announcements::{ApiAnnouncementPrefix, get_api_urls};
84use crate::backup::Metadata;
85use crate::client::event_log::DefaultApplicationEventLogKey;
86use crate::db::{
87    ApiSecretKey, CachedApiVersionSet, CachedApiVersionSetKey, ChainIdKey,
88    ChronologicalOperationLogKey, ClientConfigKey, ClientMetadataKey, ClientModuleRecovery,
89    ClientModuleRecoveryState, EncodedClientSecretKey, OperationLogKey, PeerLastApiVersionsSummary,
90    PeerLastApiVersionsSummaryKey, PendingClientConfigKey, apply_migrations_core_client_dbtx,
91    get_decoded_client_secret, verify_client_db_integrity_dbtx,
92};
93use crate::meta::MetaService;
94use crate::module_init::{ClientModuleInitRegistry, DynClientModuleInit, IClientModuleInit};
95use crate::oplog::OperationLog;
96use crate::sm::executor::{
97    ActiveModuleOperationStateKeyPrefix, ActiveOperationStateKeyPrefix, Executor,
98    InactiveModuleOperationStateKeyPrefix, InactiveOperationStateKeyPrefix,
99};
100
101pub(crate) mod builder;
102pub(crate) mod event_log;
103pub(crate) mod global_ctx;
104pub(crate) mod handle;
105
106/// List of core api versions supported by the implementation.
107/// Notably `major` version is the one being supported, and corresponding
108/// `minor` version is the one required (for given `major` version).
109const SUPPORTED_CORE_API_VERSIONS: &[fedimint_core::module::ApiVersion] =
110    &[ApiVersion { major: 0, minor: 0 }];
111
112/// Primary module candidates at specific priority level
113#[derive(Default)]
114pub(crate) struct PrimaryModuleCandidates {
115    /// Modules that listed specific units they handle
116    specific: BTreeMap<AmountUnit, Vec<ModuleInstanceId>>,
117    /// Modules handling any unit
118    wildcard: Vec<ModuleInstanceId>,
119}
120
121/// Main client type
122///
123/// A handle and API to interacting with a single federation. End user
124/// applications that want to support interacting with multiple federations at
125/// the same time, will need to instantiate and manage multiple instances of
126/// this struct.
127///
128/// Under the hood it is starting and managing service tasks, state machines,
129/// database and other resources required.
130///
131/// This type is shared externally and internally, and
132/// [`crate::ClientHandle`] is responsible for external lifecycle management
133/// and resource freeing of the [`Client`].
134pub struct Client {
135    final_client: FinalClientIface,
136    config: tokio::sync::RwLock<ClientConfig>,
137    api_secret: Option<String>,
138    decoders: ModuleDecoderRegistry,
139    connectors: ConnectorRegistry,
140    db: Database,
141    federation_id: FederationId,
142    federation_config_meta: BTreeMap<String, String>,
143    primary_modules: BTreeMap<PrimaryModulePriority, PrimaryModuleCandidates>,
144    pub(crate) modules: ClientModuleRegistry,
145    module_inits: ClientModuleInitRegistry,
146    executor: Executor,
147    pub(crate) api: DynGlobalApi,
148    root_secret: DerivableSecret,
149    operation_log: OperationLog,
150    secp_ctx: Secp256k1<secp256k1::All>,
151    meta_service: Arc<MetaService>,
152
153    task_group: TaskGroup,
154
155    /// Long-lived span attached to every task spawned via [`Client::spawn`] /
156    /// [`Client::spawn_cancellable`], so logs from background tasks carry the
157    /// federation prefix.
158    client_span: Span,
159
160    /// Updates about client recovery progress
161    client_recovery_progress_receiver:
162        watch::Receiver<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
163
164    /// Internal client sender to wake up log ordering task every time a
165    /// (unuordered) log event is added.
166    log_ordering_wakeup_tx: watch::Sender<()>,
167    /// Receiver for events fired every time (ordered) log event is added.
168    log_event_added_rx: watch::Receiver<()>,
169    log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
170    request_hook: ApiRequestHook,
171    iroh_enable_dht: bool,
172    iroh_enable_next: bool,
173    /// User-provided Bitcoin RPC client for modules to use
174    ///
175    /// Stored here for potential future access; currently passed to modules
176    /// during initialization.
177    #[allow(dead_code)]
178    user_bitcoind_rpc: Option<DynBitcoindRpc>,
179    /// User-provided Bitcoin RPC factory for when ChainId is not available
180    ///
181    /// This is used as a fallback when the federation doesn't support ChainId.
182    /// Modules can call this with a URL from their config to get an RPC client.
183    pub(crate) user_bitcoind_rpc_no_chain_id:
184        Option<fedimint_client_module::module::init::BitcoindRpcNoChainIdFactory>,
185}
186
187#[derive(Debug, Serialize, Deserialize)]
188struct ListOperationsParams {
189    limit: Option<usize>,
190    last_seen: Option<ChronologicalOperationLogKey>,
191}
192
193const DEFAULT_EVENT_LOG_PAGE_SIZE: u64 = 100;
194const MAX_EVENT_LOG_PAGE_SIZE: u64 = 10_000;
195
196#[derive(Debug, Clone, Serialize, Deserialize)]
197struct GetEventLogRequest {
198    pos: Option<EventLogId>,
199    limit: Option<u64>,
200}
201
202#[derive(Debug, Clone, Serialize, Deserialize)]
203pub struct GetOperationIdRequest {
204    operation_id: OperationId,
205}
206
207#[derive(Debug, Clone, Serialize, Deserialize)]
208pub struct GetBalanceChangesRequest {
209    #[serde(default = "AmountUnit::bitcoin")]
210    unit: AmountUnit,
211}
212
213impl Client {
214    /// Initialize a client builder that can be configured to create a new
215    /// client.
216    pub async fn builder() -> anyhow::Result<ClientBuilder> {
217        Ok(ClientBuilder::new())
218    }
219
220    pub fn api(&self) -> &(dyn IGlobalFederationApi + 'static) {
221        self.api.as_ref()
222    }
223
224    pub fn api_clone(&self) -> DynGlobalApi {
225        self.api.clone()
226    }
227
228    /// Returns a stream that emits the current connection status of all peers
229    /// whenever any peer's status changes. Emits initial state immediately.
230    pub fn connection_status_stream(&self) -> impl Stream<Item = BTreeMap<PeerId, PeerStatus>> {
231        self.api.connection_status_stream()
232    }
233
234    /// Establishes connections to all federation guardians once.
235    ///
236    /// Spawns tasks to connect to each guardian in the federation. Unlike
237    /// [`Self::spawn_federation_reconnect`], this only attempts to establish
238    /// connections once and completes - it does not maintain or reconnect.
239    ///
240    /// Useful for warming up connections before making API calls.
241    pub fn federation_reconnect(&self) {
242        let peers: Vec<PeerId> = self.api.all_peers().iter().copied().collect();
243
244        for peer_id in peers {
245            let api = self.api.clone();
246            self.spawn_cancellable(format!("federation-reconnect-once-{peer_id}"), async move {
247                if let Err(e) = api.get_peer_connection(peer_id).await {
248                    debug!(
249                        target: LOG_CLIENT_NET_API,
250                        %peer_id,
251                        err = %e.fmt_compact(),
252                        "Failed to connect to peer"
253                    );
254                }
255            });
256        }
257    }
258
259    /// Spawns background tasks that proactively maintain connections to all
260    /// federation guardians unconditionally.
261    ///
262    /// For each guardian, a task loops: establishes a connection, waits for it
263    /// to disconnect, then reconnects.
264    ///
265    /// The tasks are cancellable and will be terminated when the client shuts
266    /// down.
267    ///
268    /// By default [`Client`] creates connections on demand only, and share
269    /// them as long as they are alive.
270    ///
271    /// Reconnecting continuously might increase data and battery usage,
272    /// but potentially improve UX, depending on the time it takes to establish
273    /// a new network connection in given network conditions.
274    ///
275    /// Downstream users are encouraged to implement their own version of
276    /// this function, e.g. by reconnecting only when it is anticipated
277    /// that connection might be needed, or alternatively pre-warm
278    /// connections by calling [`Self::federation_reconnect`] when it seems
279    /// worthwhile.
280    pub fn spawn_federation_reconnect(&self) {
281        let peers: Vec<PeerId> = self.api.all_peers().iter().copied().collect();
282
283        for peer_id in peers {
284            let api = self.api.clone();
285            self.spawn_cancellable(format!("federation-reconnect-{peer_id}"), async move {
286                loop {
287                    match api.get_peer_connection(peer_id).await {
288                        Ok(conn) => {
289                            conn.await_disconnection().await;
290                        }
291                        Err(e) => {
292                            // Connection failed, backoff is handled inside
293                            // get_or_create_connection
294                            debug!(
295                                target: LOG_CLIENT_NET_API,
296                                %peer_id,
297                                err = %e.fmt_compact(),
298                                "Failed to connect to peer, will retry"
299                            );
300                        }
301                    }
302                }
303            });
304        }
305    }
306
307    /// Get the [`TaskGroup`] that is tied to Client's lifetime.
308    pub fn task_group(&self) -> &TaskGroup {
309        &self.task_group
310    }
311
312    /// Construct the long-lived span attached to all tasks spawned by this
313    /// client.
314    ///
315    /// `parent: None` keeps the span tree shallow; `runtime::spawn` already
316    /// wraps each task in its own `spawn(task=…)` span, so log events from
317    /// these tasks carry both `task` and `fed_id`.
318    pub(crate) fn make_client_span(federation_id: FederationId) -> Span {
319        tracing::info_span!(
320            target: LOG_CLIENT,
321            parent: None,
322            "client",
323            fed_id = %federation_id.to_prefix(),
324        )
325    }
326
327    /// Spawn a cancellable task on the client's task group, instrumented with
328    /// the client's [`Span`] so all events from the task carry `fed_id`.
329    pub(crate) fn spawn_cancellable<R>(
330        &self,
331        name: impl Into<String>,
332        future: impl Future<Output = R> + MaybeSend + 'static,
333    ) -> oneshot::Receiver<Result<R, ShuttingDownError>>
334    where
335        R: MaybeSend + 'static,
336    {
337        self.task_group
338            .spawn_cancellable_with_span(self.client_span.clone(), name, future)
339    }
340
341    /// Spawn a task on the client's task group, parented to the client's
342    /// [`Span`] so all events from the task carry `fed_id` (including the
343    /// task lifecycle events emitted by [`TaskGroup`] itself).
344    pub(crate) fn spawn<Fut, R>(
345        &self,
346        name: impl Into<String>,
347        f: impl FnOnce(TaskHandle) -> Fut + MaybeSend + 'static,
348    ) -> oneshot::Receiver<R>
349    where
350        Fut: Future<Output = R> + MaybeSend + 'static,
351        R: MaybeSend + 'static,
352    {
353        self.task_group
354            .spawn_with_span(self.client_span.clone(), name, f)
355    }
356
357    /// Returns all registered Prometheus metrics encoded in text format.
358    ///
359    /// This can be used by downstream clients to expose metrics via their own
360    /// HTTP server or print them for debugging purposes.
361    pub fn get_metrics() -> anyhow::Result<String> {
362        fedimint_metrics::get_metrics()
363    }
364
365    /// Useful for our CLI tooling, not meant for external use
366    #[doc(hidden)]
367    pub fn executor(&self) -> &Executor {
368        &self.executor
369    }
370
371    pub async fn get_config_from_db(db: &Database) -> Option<ClientConfig> {
372        let mut dbtx = db.begin_transaction_nc().await;
373        dbtx.get_value(&ClientConfigKey).await
374    }
375
376    pub async fn get_pending_config_from_db(db: &Database) -> Option<ClientConfig> {
377        let mut dbtx = db.begin_transaction_nc().await;
378        dbtx.get_value(&PendingClientConfigKey).await
379    }
380
381    pub async fn get_api_secret_from_db(db: &Database) -> Option<String> {
382        let mut dbtx = db.begin_transaction_nc().await;
383        dbtx.get_value(&ApiSecretKey).await
384    }
385
386    pub async fn store_encodable_client_secret<T: Encodable>(
387        db: &Database,
388        secret: T,
389    ) -> anyhow::Result<()> {
390        let mut dbtx = db.begin_transaction().await;
391
392        // Don't overwrite an existing secret
393        if dbtx.get_value(&EncodedClientSecretKey).await.is_some() {
394            bail!("Encoded client secret already exists, cannot overwrite")
395        }
396
397        let encoded_secret = T::consensus_encode_to_vec(&secret);
398        dbtx.insert_entry(&EncodedClientSecretKey, &encoded_secret)
399            .await;
400        dbtx.commit_tx().await;
401        Ok(())
402    }
403
404    pub async fn load_decodable_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
405        let Some(secret) = Self::load_decodable_client_secret_opt(db).await? else {
406            bail!("Encoded client secret not present in DB")
407        };
408
409        Ok(secret)
410    }
411    pub async fn load_decodable_client_secret_opt<T: Decodable>(
412        db: &Database,
413    ) -> anyhow::Result<Option<T>> {
414        let mut dbtx = db.begin_transaction_nc().await;
415
416        let client_secret = dbtx.get_value(&EncodedClientSecretKey).await;
417
418        Ok(match client_secret {
419            Some(client_secret) => Some(
420                T::consensus_decode_whole(&client_secret, &ModuleRegistry::default())
421                    .map_err(|e| anyhow!("Decoding failed: {e}"))?,
422            ),
423            None => None,
424        })
425    }
426
427    pub async fn load_or_generate_client_secret(db: &Database) -> anyhow::Result<[u8; 64]> {
428        let client_secret = match Self::load_decodable_client_secret::<[u8; 64]>(db).await {
429            Ok(secret) => secret,
430            _ => {
431                let secret = PlainRootSecretStrategy::random(&mut thread_rng());
432                Self::store_encodable_client_secret(db, secret)
433                    .await
434                    .expect("Storing client secret must work");
435                secret
436            }
437        };
438        Ok(client_secret)
439    }
440
441    pub async fn is_initialized(db: &Database) -> bool {
442        let mut dbtx = db.begin_transaction_nc().await;
443        dbtx.raw_get_bytes(&[ClientConfigKey::DB_PREFIX])
444            .await
445            .expect("Unrecoverable error occurred while reading and entry from the database")
446            .is_some()
447    }
448
449    pub fn start_executor(self: &Arc<Self>) {
450        self.client_span.in_scope(|| {
451            debug!(
452                target: LOG_CLIENT,
453                "Starting fedimint client executor",
454            );
455        });
456        self.executor
457            .start_executor(self.context_gen(), self.client_span.clone());
458    }
459
460    pub fn federation_id(&self) -> FederationId {
461        self.federation_id
462    }
463
464    fn context_gen(self: &Arc<Self>) -> ModuleGlobalContextGen {
465        let client_inner = Arc::downgrade(self);
466        Arc::new(move |module_instance, operation| {
467            ModuleGlobalClientContext {
468                client: client_inner
469                    .clone()
470                    .upgrade()
471                    .expect("ModuleGlobalContextGen called after client was dropped"),
472                module_instance_id: module_instance,
473                operation,
474            }
475            .into()
476        })
477    }
478
479    pub async fn config(&self) -> ClientConfig {
480        self.config.read().await.clone()
481    }
482
483    // TODO: change to `-> Option<&str>`
484    pub fn api_secret(&self) -> &Option<String> {
485        &self.api_secret
486    }
487
488    /// Returns the core API version that the federation supports
489    ///
490    /// This reads from the cached version stored during client initialization.
491    /// If no cache is available (e.g., during initial setup), returns a default
492    /// version (0, 0).
493    pub async fn core_api_version(&self) -> ApiVersion {
494        // Try to get from cache. If not available, return a conservative
495        // default. The cache should always be populated after successful client init.
496        self.db
497            .begin_transaction_nc()
498            .await
499            .get_value(&CachedApiVersionSetKey)
500            .await
501            .map(|cached: CachedApiVersionSet| cached.0.core)
502            .unwrap_or(ApiVersion { major: 0, minor: 0 })
503    }
504
505    /// Returns the chain ID (bitcoin block hash at height 1) from the
506    /// federation
507    ///
508    /// This is cached in the database after the first successful fetch.
509    /// The chain ID uniquely identifies which bitcoin network the federation
510    /// operates on (mainnet, testnet, signet, regtest).
511    pub async fn chain_id(&self) -> anyhow::Result<ChainId> {
512        // Check cache first
513        if let Some(chain_id) = self
514            .db
515            .begin_transaction_nc()
516            .await
517            .get_value(&ChainIdKey)
518            .await
519        {
520            return Ok(chain_id);
521        }
522
523        // Fetch from federation with consensus
524        let chain_id = self.api.chain_id().await?;
525
526        // Cache the result
527        let mut dbtx = self.db.begin_transaction().await;
528        dbtx.insert_entry(&ChainIdKey, &chain_id).await;
529        dbtx.commit_tx().await;
530
531        Ok(chain_id)
532    }
533
534    pub fn decoders(&self) -> &ModuleDecoderRegistry {
535        &self.decoders
536    }
537
538    /// Returns a reference to the module, panics if not found
539    fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
540        self.try_get_module(instance)
541            .expect("Module instance not found")
542    }
543
544    fn try_get_module(
545        &self,
546        instance: ModuleInstanceId,
547    ) -> Option<&maybe_add_send_sync!(dyn IClientModule)> {
548        Some(self.modules.get(instance)?.as_ref())
549    }
550
551    pub fn has_module(&self, instance: ModuleInstanceId) -> bool {
552        self.modules.get(instance).is_some()
553    }
554
555    /// Returns the input amount and output amount of a transaction
556    ///
557    /// # Panics
558    /// If any of the input or output versions in the transaction builder are
559    /// unknown by the respective module.
560    fn transaction_builder_get_balance(&self, builder: &TransactionBuilder) -> (Amounts, Amounts) {
561        // FIXME: prevent overflows, currently not suitable for untrusted input
562        let mut in_amounts = Amounts::ZERO;
563        let mut out_amounts = Amounts::ZERO;
564        let mut fee_amounts = Amounts::ZERO;
565
566        for input in builder.inputs() {
567            let module = self.get_module(input.input.module_instance_id());
568
569            let item_fees = module.input_fee(&input.amounts, &input.input).expect(
570                "We only build transactions with input versions that are supported by the module",
571            );
572
573            in_amounts.checked_add_mut(&input.amounts);
574            fee_amounts.checked_add_mut(&item_fees);
575        }
576
577        for output in builder.outputs() {
578            let module = self.get_module(output.output.module_instance_id());
579
580            let item_fees = module.output_fee(&output.amounts, &output.output).expect(
581                "We only build transactions with output versions that are supported by the module",
582            );
583
584            out_amounts.checked_add_mut(&output.amounts);
585            fee_amounts.checked_add_mut(&item_fees);
586        }
587
588        out_amounts.checked_add_mut(&fee_amounts);
589        (in_amounts, out_amounts)
590    }
591
592    pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
593        Ok((self.federation_id().to_fake_ln_pub_key(&self.secp_ctx)?, 0))
594    }
595
596    /// Get metadata value from the federation config itself
597    pub fn get_config_meta(&self, key: &str) -> Option<String> {
598        self.federation_config_meta.get(key).cloned()
599    }
600
601    pub(crate) fn root_secret(&self) -> DerivableSecret {
602        self.root_secret.clone()
603    }
604
605    pub async fn add_state_machines(
606        &self,
607        dbtx: &mut DatabaseTransaction<'_>,
608        states: Vec<DynState>,
609    ) -> AddStateMachinesResult {
610        self.executor.add_state_machines_dbtx(dbtx, states).await
611    }
612
613    // TODO: implement as part of [`OperationLog`]
614    pub async fn get_active_operations(&self) -> HashSet<OperationId> {
615        let active_states = self.executor.get_active_states().await;
616        let mut active_operations = HashSet::with_capacity(active_states.len());
617        let mut dbtx = self.db().begin_transaction_nc().await;
618        for (state, _) in active_states {
619            let operation_id = state.operation_id();
620            if dbtx
621                .get_value(&OperationLogKey { operation_id })
622                .await
623                .is_some()
624            {
625                active_operations.insert(operation_id);
626            }
627        }
628        active_operations
629    }
630
631    pub fn operation_log(&self) -> &OperationLog {
632        &self.operation_log
633    }
634
635    /// Get the meta manager to read meta fields.
636    pub fn meta_service(&self) -> &Arc<MetaService> {
637        &self.meta_service
638    }
639
640    /// Get the meta manager to read meta fields.
641    pub async fn get_meta_expiration_timestamp(&self) -> Option<SystemTime> {
642        let meta_service = self.meta_service();
643        let ts = meta_service
644            .get_field::<u64>(self.db(), "federation_expiry_timestamp")
645            .await
646            .and_then(|v| v.value)?;
647        Some(UNIX_EPOCH + Duration::from_secs(ts))
648    }
649
650    /// Adds funding to a transaction or removes over-funding via change.
651    async fn finalize_transaction(
652        &self,
653        dbtx: &mut DatabaseTransaction<'_>,
654        operation_id: OperationId,
655        mut partial_transaction: TransactionBuilder,
656    ) -> anyhow::Result<(Transaction, Vec<DynState>, Range<u64>)> {
657        let (in_amounts, out_amounts) = self.transaction_builder_get_balance(&partial_transaction);
658
659        let mut added_inputs_bundles = vec![];
660        let mut added_outputs_bundles = vec![];
661
662        // The way currently things are implemented is OK for modules which can
663        // collect a fee relative to one being used, but will break down in any
664        // fancy scenarios. Future TODOs:
665        //
666        // * create_final_inputs_and_outputs needs to get broken down, so we can use
667        //   primary modules using priorities (possibly separate prios for inputs and
668        //   outputs to be able to drain modules, etc.); we need the split "check if
669        //   possible" and "take" steps,
670        // * extra inputs and outputs adding fees needs to be taken into account,
671        //   possibly with some looping
672        for unit in in_amounts.units().union(&out_amounts.units()) {
673            let input_amount = in_amounts.get(unit).copied().unwrap_or_default();
674            let output_amount = out_amounts.get(unit).copied().unwrap_or_default();
675            if input_amount == output_amount {
676                continue;
677            }
678
679            let Some((module_id, module)) = self.primary_module_for_unit(*unit) else {
680                bail!("No module to balance a partial transaction (affected unit: {unit:?}");
681            };
682
683            let (added_input_bundle, added_output_bundle) = module
684                .create_final_inputs_and_outputs(
685                    module_id,
686                    dbtx,
687                    operation_id,
688                    *unit,
689                    input_amount,
690                    output_amount,
691                )
692                .await?;
693
694            added_inputs_bundles.push(added_input_bundle);
695            added_outputs_bundles.push(added_output_bundle);
696        }
697
698        // This is the range of  outputs that will be added to the transaction
699        // in order to balance it. Notice that it may stay empty in case the transaction
700        // is already balanced.
701        let change_range = Range {
702            start: partial_transaction.outputs().count() as u64,
703            end: (partial_transaction.outputs().count() as u64
704                + added_outputs_bundles
705                    .iter()
706                    .map(|output| output.outputs().len() as u64)
707                    .sum::<u64>()),
708        };
709
710        for added_inputs in added_inputs_bundles {
711            partial_transaction = partial_transaction.with_inputs(added_inputs);
712        }
713
714        for added_outputs in added_outputs_bundles {
715            partial_transaction = partial_transaction.with_outputs(added_outputs);
716        }
717
718        let (input_amounts, output_amounts) =
719            self.transaction_builder_get_balance(&partial_transaction);
720
721        for (unit, output_amount) in output_amounts {
722            let input_amount = input_amounts.get(&unit).copied().unwrap_or_default();
723
724            assert!(input_amount >= output_amount, "Transaction is underfunded");
725        }
726
727        let (tx, states) = partial_transaction.build(&self.secp_ctx, thread_rng());
728
729        Ok((tx, states, change_range))
730    }
731
732    /// Add funding and/or change to the transaction builder as needed, finalize
733    /// the transaction and submit it to the federation.
734    ///
735    /// ## Errors
736    /// The function will return an error if the operation with given ID already
737    /// exists.
738    ///
739    /// ## Panics
740    /// The function will panic if the database transaction collides with
741    /// other and fails with others too often, this should not happen except for
742    /// excessively concurrent scenarios.
743    pub async fn finalize_and_submit_transaction<F, M>(
744        &self,
745        operation_id: OperationId,
746        operation_type: &str,
747        operation_meta_gen: F,
748        tx_builder: TransactionBuilder,
749    ) -> anyhow::Result<OutPointRange>
750    where
751        F: Fn(OutPointRange) -> M + Clone + MaybeSend + MaybeSync,
752        M: serde::Serialize + MaybeSend,
753    {
754        let operation_type = operation_type.to_owned();
755
756        let autocommit_res = self
757            .db
758            .autocommit(
759                |dbtx, _| {
760                    let operation_type = operation_type.clone();
761                    let tx_builder = tx_builder.clone();
762                    let operation_meta_gen = operation_meta_gen.clone();
763                    Box::pin(async move {
764                        self.finalize_and_submit_transaction_dbtx(
765                            dbtx,
766                            operation_id,
767                            &operation_type,
768                            operation_meta_gen,
769                            tx_builder,
770                        )
771                        .await
772                    })
773                },
774                Some(100), // TODO: handle what happens after 100 retries
775            )
776            .await;
777
778        match autocommit_res {
779            Ok(txid) => Ok(txid),
780            Err(AutocommitError::ClosureError { error, .. }) => Err(error),
781            Err(AutocommitError::CommitFailed {
782                attempts,
783                last_error,
784            }) => panic!(
785                "Failed to commit tx submission dbtx after {attempts} attempts: {last_error}"
786            ),
787        }
788    }
789
790    /// See [`Self::finalize_and_submit_transaction`], just inside a database
791    /// transaction.
792    pub async fn finalize_and_submit_transaction_dbtx<F, M>(
793        &self,
794        dbtx: &mut DatabaseTransaction<'_>,
795        operation_id: OperationId,
796        operation_type: &str,
797        operation_meta_gen: F,
798        tx_builder: TransactionBuilder,
799    ) -> anyhow::Result<OutPointRange>
800    where
801        F: FnOnce(OutPointRange) -> M + MaybeSend,
802        M: serde::Serialize + MaybeSend,
803    {
804        if Client::operation_exists_dbtx(dbtx, operation_id).await {
805            bail!("There already exists an operation with id {operation_id:?}")
806        }
807
808        let out_point_range = self
809            .finalize_and_submit_transaction_inner(dbtx, operation_id, tx_builder)
810            .await?;
811
812        self.operation_log()
813            .add_operation_log_entry_dbtx(
814                dbtx,
815                operation_id,
816                operation_type,
817                operation_meta_gen(out_point_range),
818            )
819            .await;
820
821        Ok(out_point_range)
822    }
823
824    async fn finalize_and_submit_transaction_inner(
825        &self,
826        dbtx: &mut DatabaseTransaction<'_>,
827        operation_id: OperationId,
828        tx_builder: TransactionBuilder,
829    ) -> anyhow::Result<OutPointRange> {
830        let (transaction, mut states, change_range) = self
831            .finalize_transaction(&mut dbtx.to_ref_nc(), operation_id, tx_builder)
832            .await?;
833
834        if transaction.consensus_encode_to_vec().len() > Transaction::MAX_TX_SIZE {
835            let inputs = transaction
836                .inputs
837                .iter()
838                .map(DynInput::module_instance_id)
839                .collect::<Vec<_>>();
840            let outputs = transaction
841                .outputs
842                .iter()
843                .map(DynOutput::module_instance_id)
844                .collect::<Vec<_>>();
845            warn!(
846                target: LOG_CLIENT_NET_API,
847                size=%transaction.consensus_encode_to_vec().len(),
848                ?inputs,
849                ?outputs,
850                "Transaction too large",
851            );
852            debug!(target: LOG_CLIENT_NET_API, ?transaction, "transaction details");
853            bail!(
854                "The generated transaction would be rejected by the federation for being too large."
855            );
856        }
857
858        let txid = transaction.tx_hash();
859
860        debug!(
861            target: LOG_CLIENT_NET_API,
862            %txid,
863            operation_id = %operation_id.fmt_short(),
864            ?transaction,
865            "Finalized and submitting transaction",
866        );
867
868        let tx_submission_sm = DynState::from_typed(
869            TRANSACTION_SUBMISSION_MODULE_INSTANCE,
870            TxSubmissionStatesSM {
871                operation_id,
872                state: TxSubmissionStates::Created(transaction),
873            },
874        );
875        states.push(tx_submission_sm);
876
877        self.executor.add_state_machines_dbtx(dbtx, states).await?;
878
879        self.log_event_dbtx(dbtx, None, TxCreatedEvent { txid, operation_id })
880            .await;
881
882        Ok(OutPointRange::new(txid, IdxRange::from(change_range)))
883    }
884
885    async fn transaction_update_stream(
886        &self,
887        operation_id: OperationId,
888    ) -> BoxStream<'static, TxSubmissionStatesSM> {
889        self.executor
890            .notifier()
891            .module_notifier::<TxSubmissionStatesSM>(
892                TRANSACTION_SUBMISSION_MODULE_INSTANCE,
893                self.final_client.clone(),
894            )
895            .subscribe(operation_id)
896            .await
897    }
898
899    pub async fn operation_exists(&self, operation_id: OperationId) -> bool {
900        let mut dbtx = self.db().begin_transaction_nc().await;
901
902        Client::operation_exists_dbtx(&mut dbtx, operation_id).await
903    }
904
905    pub async fn operation_exists_dbtx(
906        dbtx: &mut DatabaseTransaction<'_>,
907        operation_id: OperationId,
908    ) -> bool {
909        let active_state_exists = dbtx
910            .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
911            .await
912            .next()
913            .await
914            .is_some();
915
916        let inactive_state_exists = dbtx
917            .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
918            .await
919            .next()
920            .await
921            .is_some();
922
923        active_state_exists || inactive_state_exists
924    }
925
926    pub async fn has_active_states(&self, operation_id: OperationId) -> bool {
927        self.db
928            .begin_transaction_nc()
929            .await
930            .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
931            .await
932            .next()
933            .await
934            .is_some()
935    }
936
937    /// Waits for an output from the primary module to reach its final
938    /// state.
939    pub async fn await_primary_bitcoin_module_output(
940        &self,
941        operation_id: OperationId,
942        out_point: OutPoint,
943    ) -> anyhow::Result<()> {
944        self.primary_module_for_unit(AmountUnit::BITCOIN)
945            .ok_or_else(|| anyhow!("No primary module available"))?
946            .1
947            .await_primary_module_output(operation_id, out_point)
948            .await
949    }
950
951    /// Returns a reference to a typed module client instance by kind
952    pub fn get_first_module<M: ClientModule>(
953        &'_ self,
954    ) -> anyhow::Result<ClientModuleInstance<'_, M>> {
955        let module_kind = M::kind();
956        let id = self
957            .get_first_instance(&module_kind)
958            .ok_or_else(|| format_err!("No modules found of kind {module_kind}"))?;
959        let module: &M = self
960            .try_get_module(id)
961            .ok_or_else(|| format_err!("Unknown module instance {id}"))?
962            .as_any()
963            .downcast_ref::<M>()
964            .ok_or_else(|| format_err!("Module is not of type {}", std::any::type_name::<M>()))?;
965        let (db, _) = self.db().with_prefix_module_id(id);
966        Ok(ClientModuleInstance {
967            id,
968            db,
969            api: self.api().with_module(id),
970            module,
971        })
972    }
973
974    pub fn get_module_client_dyn(
975        &self,
976        instance_id: ModuleInstanceId,
977    ) -> anyhow::Result<&maybe_add_send_sync!(dyn IClientModule)> {
978        self.try_get_module(instance_id)
979            .ok_or(anyhow!("Unknown module instance {}", instance_id))
980    }
981
982    pub fn db(&self) -> &Database {
983        &self.db
984    }
985
986    pub fn endpoints(&self) -> &ConnectorRegistry {
987        &self.connectors
988    }
989
990    /// Returns a stream of transaction updates for the given operation id that
991    /// can later be used to watch for a specific transaction being accepted.
992    pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
993        TransactionUpdates {
994            update_stream: self.transaction_update_stream(operation_id).await,
995        }
996    }
997
998    /// Returns the instance id of the first module of the given kind.
999    pub fn get_first_instance(&self, module_kind: &ModuleKind) -> Option<ModuleInstanceId> {
1000        self.modules
1001            .iter_modules()
1002            .find(|(_, kind, _module)| *kind == module_kind)
1003            .map(|(instance_id, _, _)| instance_id)
1004    }
1005
1006    /// Returns the data from which the client's root secret is derived (e.g.
1007    /// BIP39 seed phrase struct).
1008    pub async fn root_secret_encoding<T: Decodable>(&self) -> anyhow::Result<T> {
1009        get_decoded_client_secret::<T>(self.db()).await
1010    }
1011
1012    /// Waits for outputs from the primary module to reach its final
1013    /// state.
1014    pub async fn await_primary_bitcoin_module_outputs(
1015        &self,
1016        operation_id: OperationId,
1017        outputs: Vec<OutPoint>,
1018    ) -> anyhow::Result<()> {
1019        for out_point in outputs {
1020            self.await_primary_bitcoin_module_output(operation_id, out_point)
1021                .await?;
1022        }
1023
1024        Ok(())
1025    }
1026
1027    /// Returns the config of the client in JSON format.
1028    ///
1029    /// Compared to the consensus module format where module configs are binary
1030    /// encoded this format cannot be cryptographically verified but is easier
1031    /// to consume and to some degree human-readable.
1032    pub async fn get_config_json(&self) -> JsonClientConfig {
1033        self.config().await.to_json()
1034    }
1035
1036    // Ideally this would not be in the API, but there's a lot of places where this
1037    // makes it easier.
1038    #[doc(hidden)]
1039    /// Like [`Self::get_balance`] but returns an error if primary module is not
1040    /// available
1041    pub async fn get_balance_for_btc(&self) -> anyhow::Result<Amount> {
1042        self.get_balance_for_unit(AmountUnit::BITCOIN).await
1043    }
1044
1045    pub async fn get_balance_for_unit(&self, unit: AmountUnit) -> anyhow::Result<Amount> {
1046        let (id, module) = self
1047            .primary_module_for_unit(unit)
1048            .ok_or_else(|| anyhow!("Primary module not available"))?;
1049        Ok(module
1050            .get_balance(id, &mut self.db().begin_transaction_nc().await, unit)
1051            .await)
1052    }
1053
1054    /// Returns a stream that yields the current client balance every time it
1055    /// changes.
1056    pub async fn subscribe_balance_changes(&self, unit: AmountUnit) -> BoxStream<'static, Amount> {
1057        let primary_module_things =
1058            if let Some((primary_module_id, primary_module)) = self.primary_module_for_unit(unit) {
1059                let balance_changes = primary_module.subscribe_balance_changes().await;
1060                let initial_balance = self
1061                    .get_balance_for_unit(unit)
1062                    .await
1063                    .expect("Primary is present");
1064
1065                Some((
1066                    primary_module_id,
1067                    primary_module.clone(),
1068                    balance_changes,
1069                    initial_balance,
1070                ))
1071            } else {
1072                None
1073            };
1074        let db = self.db().clone();
1075
1076        Box::pin(async_stream::stream! {
1077            let Some((primary_module_id, primary_module, mut balance_changes, initial_balance)) = primary_module_things else {
1078                // If there is no primary module, there will not be one until client is
1079                // restarted
1080                pending().await
1081            };
1082
1083
1084            yield initial_balance;
1085            let mut prev_balance = initial_balance;
1086            while let Some(()) = balance_changes.next().await {
1087                let mut dbtx = db.begin_transaction_nc().await;
1088                let balance = primary_module
1089                     .get_balance(primary_module_id, &mut dbtx, unit)
1090                    .await;
1091
1092                // Deduplicate in case modules cannot always tell if the balance actually changed
1093                if balance != prev_balance {
1094                    prev_balance = balance;
1095                    yield balance;
1096                }
1097            }
1098        })
1099    }
1100
1101    /// Make a single API version request to a peer after a delay.
1102    ///
1103    /// The delay is here to unify the type of a future both for initial request
1104    /// and possible retries.
1105    async fn make_api_version_request(
1106        delay: Duration,
1107        peer_id: PeerId,
1108        api: &DynGlobalApi,
1109    ) -> (
1110        PeerId,
1111        Result<SupportedApiVersionsSummary, fedimint_connectors::error::ServerError>,
1112    ) {
1113        runtime::sleep(delay).await;
1114        (
1115            peer_id,
1116            api.request_single_peer::<SupportedApiVersionsSummary>(
1117                VERSION_ENDPOINT.to_owned(),
1118                ApiRequestErased::default(),
1119                peer_id,
1120            )
1121            .await,
1122        )
1123    }
1124
1125    /// Create a backoff strategy for API version requests.
1126    ///
1127    /// Keep trying, initially somewhat aggressively, but after a while retry
1128    /// very slowly, because chances for response are getting lower and
1129    /// lower.
1130    fn create_api_version_backoff() -> impl Iterator<Item = Duration> {
1131        custom_backoff(Duration::from_millis(200), Duration::from_secs(600), None)
1132    }
1133
1134    /// Query the federation for API version support and then calculate
1135    /// the best API version to use (supported by most guardians).
1136    pub async fn fetch_common_api_versions_from_all_peers(
1137        num_peers: NumPeers,
1138        api: DynGlobalApi,
1139        db: Database,
1140        num_responses_sender: watch::Sender<usize>,
1141    ) {
1142        let mut backoff = Self::create_api_version_backoff();
1143
1144        // NOTE: `FuturesUnordered` is a footgun, but since we only poll it for result
1145        // and make a single async db write operation, it should be OK.
1146        let mut requests = FuturesUnordered::new();
1147
1148        for peer_id in num_peers.peer_ids() {
1149            requests.push(Self::make_api_version_request(
1150                Duration::ZERO,
1151                peer_id,
1152                &api,
1153            ));
1154        }
1155
1156        let mut num_responses = 0;
1157
1158        while let Some((peer_id, response)) = requests.next().await {
1159            let retry = match response {
1160                Err(err) => {
1161                    let has_previous_response = db
1162                        .begin_transaction_nc()
1163                        .await
1164                        .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1165                        .await
1166                        .is_some();
1167                    debug!(
1168                        target: LOG_CLIENT,
1169                        %peer_id,
1170                        err = %err.fmt_compact(),
1171                        %has_previous_response,
1172                        "Failed to refresh API versions of a peer"
1173                    );
1174
1175                    !has_previous_response
1176                }
1177                Ok(o) => {
1178                    // Save the response to the database right away, just to
1179                    // not lose it
1180                    let mut dbtx = db.begin_transaction().await;
1181                    dbtx.insert_entry(
1182                        &PeerLastApiVersionsSummaryKey(peer_id),
1183                        &PeerLastApiVersionsSummary(o),
1184                    )
1185                    .await;
1186                    dbtx.commit_tx().await;
1187                    false
1188                }
1189            };
1190
1191            if retry {
1192                requests.push(Self::make_api_version_request(
1193                    backoff.next().expect("Keeps retrying"),
1194                    peer_id,
1195                    &api,
1196                ));
1197            } else {
1198                num_responses += 1;
1199                num_responses_sender.send_replace(num_responses);
1200            }
1201        }
1202    }
1203
1204    /// Fetch API versions from peers, retrying until we get threshold number of
1205    /// successful responses. Returns the successful responses collected
1206    /// from at least `num_peers.threshold()` peers.
1207    pub async fn fetch_peers_api_versions_from_threshold_of_peers(
1208        num_peers: NumPeers,
1209        api: DynGlobalApi,
1210    ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1211        let mut backoff = Self::create_api_version_backoff();
1212
1213        // NOTE: `FuturesUnordered` is a footgun, but since we only poll it for result
1214        // and collect responses, it should be OK.
1215        let mut requests = FuturesUnordered::new();
1216
1217        for peer_id in num_peers.peer_ids() {
1218            requests.push(Self::make_api_version_request(
1219                Duration::ZERO,
1220                peer_id,
1221                &api,
1222            ));
1223        }
1224
1225        let mut successful_responses = BTreeMap::new();
1226
1227        while successful_responses.len() < num_peers.threshold()
1228            && let Some((peer_id, response)) = requests.next().await
1229        {
1230            let retry = match response {
1231                Err(err) => {
1232                    debug!(
1233                        target: LOG_CLIENT,
1234                        %peer_id,
1235                        err = %err.fmt_compact(),
1236                        "Failed to fetch API versions from peer"
1237                    );
1238                    true
1239                }
1240                Ok(response) => {
1241                    successful_responses.insert(peer_id, response);
1242                    false
1243                }
1244            };
1245
1246            if retry {
1247                requests.push(Self::make_api_version_request(
1248                    backoff.next().expect("Keeps retrying"),
1249                    peer_id,
1250                    &api,
1251                ));
1252            }
1253        }
1254
1255        successful_responses
1256    }
1257
1258    /// Fetch API versions from peers and discover common API versions to use.
1259    pub async fn fetch_common_api_versions(
1260        config: &ClientConfig,
1261        api: &DynGlobalApi,
1262    ) -> anyhow::Result<BTreeMap<PeerId, SupportedApiVersionsSummary>> {
1263        debug!(
1264            target: LOG_CLIENT,
1265            "Fetching common api versions"
1266        );
1267
1268        let num_peers = NumPeers::from(config.global.api_endpoints.len());
1269
1270        let peer_api_version_sets =
1271            Self::fetch_peers_api_versions_from_threshold_of_peers(num_peers, api.clone()).await;
1272
1273        Ok(peer_api_version_sets)
1274    }
1275
1276    /// Write API version set to database cache.
1277    /// Used when we have a pre-calculated API version set that should be stored
1278    /// for later use.
1279    pub async fn write_api_version_cache(
1280        dbtx: &mut DatabaseTransaction<'_>,
1281        api_version_set: ApiVersionSet,
1282    ) {
1283        debug!(
1284            target: LOG_CLIENT,
1285            value = ?api_version_set,
1286            "Writing API version set to cache"
1287        );
1288
1289        dbtx.insert_entry(
1290            &CachedApiVersionSetKey,
1291            &CachedApiVersionSet(api_version_set),
1292        )
1293        .await;
1294    }
1295
1296    /// Store prefetched peer API version responses and calculate/store common
1297    /// API version set. This processes the individual peer responses by
1298    /// storing them in the database and calculating the common API version
1299    /// set for caching.
1300    pub async fn store_prefetched_api_versions(
1301        db: &Database,
1302        config: &ClientConfig,
1303        client_module_init: &ClientModuleInitRegistry,
1304        peer_api_versions: &BTreeMap<PeerId, SupportedApiVersionsSummary>,
1305    ) {
1306        debug!(
1307            target: LOG_CLIENT,
1308            "Storing {} prefetched peer API version responses and calculating common version set",
1309            peer_api_versions.len()
1310        );
1311
1312        let mut dbtx = db.begin_transaction().await;
1313        // Calculate common API version set from individual responses
1314        let client_supported_versions =
1315            Self::supported_api_versions_summary_static(config, client_module_init);
1316        match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1317            &client_supported_versions,
1318            peer_api_versions,
1319        ) {
1320            Ok(common_api_versions) => {
1321                // Write the calculated common API version set to database cache
1322                Self::write_api_version_cache(&mut dbtx.to_ref_nc(), common_api_versions).await;
1323                debug!(target: LOG_CLIENT, "Calculated and stored common API version set");
1324            }
1325            Err(err) => {
1326                debug!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Failed to calculate common API versions from prefetched data");
1327            }
1328        }
1329
1330        // Store individual peer responses to database
1331        for (peer_id, peer_api_versions) in peer_api_versions {
1332            dbtx.insert_entry(
1333                &PeerLastApiVersionsSummaryKey(*peer_id),
1334                &PeerLastApiVersionsSummary(peer_api_versions.clone()),
1335            )
1336            .await;
1337        }
1338        dbtx.commit_tx().await;
1339        debug!(target: LOG_CLIENT, "Stored individual peer API version responses");
1340    }
1341
1342    /// [`SupportedApiVersionsSummary`] that the client and its modules support
1343    pub fn supported_api_versions_summary_static(
1344        config: &ClientConfig,
1345        client_module_init: &ClientModuleInitRegistry,
1346    ) -> SupportedApiVersionsSummary {
1347        SupportedApiVersionsSummary {
1348            core: SupportedCoreApiVersions {
1349                core_consensus: config.global.consensus_version,
1350                api: MultiApiVersion::try_from_iter(SUPPORTED_CORE_API_VERSIONS.to_owned())
1351                    .expect("must not have conflicting versions"),
1352            },
1353            modules: config
1354                .modules
1355                .iter()
1356                .filter_map(|(&module_instance_id, module_config)| {
1357                    client_module_init
1358                        .get(module_config.kind())
1359                        .map(|module_init| {
1360                            (
1361                                module_instance_id,
1362                                SupportedModuleApiVersions {
1363                                    core_consensus: config.global.consensus_version,
1364                                    module_consensus: module_config.version,
1365                                    api: module_init.supported_api_versions(),
1366                                },
1367                            )
1368                        })
1369                })
1370                .collect(),
1371        }
1372    }
1373
1374    pub async fn load_and_refresh_common_api_version(&self) -> anyhow::Result<ApiVersionSet> {
1375        Self::load_and_refresh_common_api_version_static(
1376            &self.config().await,
1377            &self.module_inits,
1378            self.connectors.clone(),
1379            &self.api,
1380            &self.db,
1381            &self.task_group,
1382            &self.client_span,
1383        )
1384        .await
1385    }
1386
1387    /// Force refresh API versions from the federation, bypassing the cache.
1388    ///
1389    /// This queries all peers for their supported API versions and calculates
1390    /// the common API version set to use. The result is stored in the database
1391    /// cache for future use.
1392    pub async fn refresh_api_versions(&self) -> anyhow::Result<ApiVersionSet> {
1393        Self::refresh_common_api_version_static(
1394            &self.config().await,
1395            &self.module_inits,
1396            &self.api,
1397            &self.db,
1398            self.task_group.clone(),
1399            &self.client_span,
1400            true,
1401        )
1402        .await
1403    }
1404
1405    /// Load the common api versions to use from cache and start a background
1406    /// process to refresh them.
1407    ///
1408    /// This is a compromise, so we not have to wait for version discovery to
1409    /// complete every time a [`Client`] is being built.
1410    pub(crate) async fn load_and_refresh_common_api_version_static(
1411        config: &ClientConfig,
1412        module_init: &ClientModuleInitRegistry,
1413        connectors: ConnectorRegistry,
1414        api: &DynGlobalApi,
1415        db: &Database,
1416        task_group: &TaskGroup,
1417        client_span: &Span,
1418    ) -> anyhow::Result<ApiVersionSet> {
1419        if let Some(v) = db
1420            .begin_transaction_nc()
1421            .await
1422            .get_value(&CachedApiVersionSetKey)
1423            .await
1424        {
1425            client_span.in_scope(|| {
1426                debug!(
1427                    target: LOG_CLIENT,
1428                    "Found existing cached common api versions"
1429                );
1430            });
1431            let config = config.clone();
1432            let client_module_init = module_init.clone();
1433            let api = api.clone();
1434            let db = db.clone();
1435            let task_group = task_group.clone();
1436            let client_span_owned = client_span.clone();
1437            // Separate task group, because we actually don't want to be waiting for this to
1438            // finish, and it's just best effort.
1439            task_group.clone().spawn_cancellable_with_span(
1440                client_span.clone(),
1441                "refresh_common_api_version_static",
1442                async move {
1443                    connectors.wait_for_initialized_connections().await;
1444
1445                    if let Err(error) = Self::refresh_common_api_version_static(
1446                        &config,
1447                        &client_module_init,
1448                        &api,
1449                        &db,
1450                        task_group,
1451                        &client_span_owned,
1452                        false,
1453                    )
1454                    .await
1455                    {
1456                        warn!(
1457                            target: LOG_CLIENT,
1458                            err = %error.fmt_compact_anyhow(), "Failed to discover common api versions"
1459                        );
1460                    }
1461                },
1462            );
1463
1464            return Ok(v.0);
1465        }
1466
1467        info!(
1468            target: LOG_CLIENT,
1469            "Fetching initial API versions "
1470        );
1471        Self::refresh_common_api_version_static(
1472            config,
1473            module_init,
1474            api,
1475            db,
1476            task_group.clone(),
1477            client_span,
1478            true,
1479        )
1480        .await
1481    }
1482
1483    async fn refresh_common_api_version_static(
1484        config: &ClientConfig,
1485        client_module_init: &ClientModuleInitRegistry,
1486        api: &DynGlobalApi,
1487        db: &Database,
1488        task_group: TaskGroup,
1489        client_span: &Span,
1490        block_until_ok: bool,
1491    ) -> anyhow::Result<ApiVersionSet> {
1492        debug!(
1493            target: LOG_CLIENT,
1494            "Refreshing common api versions"
1495        );
1496
1497        let (num_responses_sender, mut num_responses_receiver) = tokio::sync::watch::channel(0);
1498        let num_peers = NumPeers::from(config.global.api_endpoints.len());
1499
1500        task_group.spawn_cancellable_with_span(
1501            client_span.clone(),
1502            "refresh peers api versions",
1503            Client::fetch_common_api_versions_from_all_peers(
1504                num_peers,
1505                api.clone(),
1506                db.clone(),
1507                num_responses_sender,
1508            ),
1509        );
1510
1511        let common_api_versions = loop {
1512            // Wait to collect enough answers before calculating a set of common api
1513            // versions to use. Note that all peers individual responses from
1514            // previous attempts are still being used, and requests, or even
1515            // retries for response of peers are not actually cancelled, as they
1516            // are happening on a separate task. This is all just to bound the
1517            // time user can be waiting for the join operation to finish, at the
1518            // risk of picking wrong version in very rare circumstances.
1519            let _: Result<_, Elapsed> = runtime::timeout(
1520                Duration::from_secs(30),
1521                num_responses_receiver.wait_for(|num| num_peers.threshold() <= *num),
1522            )
1523            .await;
1524
1525            let peer_api_version_sets = Self::load_peers_last_api_versions(db, num_peers).await;
1526
1527            match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1528                &Self::supported_api_versions_summary_static(config, client_module_init),
1529                &peer_api_version_sets,
1530            ) {
1531                Ok(o) => break o,
1532                Err(err) if block_until_ok => {
1533                    warn!(
1534                        target: LOG_CLIENT,
1535                        err = %err.fmt_compact_anyhow(),
1536                        "Failed to discover API version to use. Retrying..."
1537                    );
1538                    continue;
1539                }
1540                Err(e) => return Err(e),
1541            }
1542        };
1543
1544        debug!(
1545            target: LOG_CLIENT,
1546            value = ?common_api_versions,
1547            "Updating the cached common api versions"
1548        );
1549        let mut dbtx = db.begin_transaction().await;
1550        let _ = dbtx
1551            .insert_entry(
1552                &CachedApiVersionSetKey,
1553                &CachedApiVersionSet(common_api_versions.clone()),
1554            )
1555            .await;
1556
1557        dbtx.commit_tx().await;
1558
1559        Ok(common_api_versions)
1560    }
1561
1562    /// Get the client [`Metadata`]
1563    pub async fn get_metadata(&self) -> Metadata {
1564        self.db
1565            .begin_transaction_nc()
1566            .await
1567            .get_value(&ClientMetadataKey)
1568            .await
1569            .unwrap_or_else(|| {
1570                warn!(
1571                    target: LOG_CLIENT,
1572                    "Missing existing metadata. This key should have been set on Client init"
1573                );
1574                Metadata::empty()
1575            })
1576    }
1577
1578    /// Set the client [`Metadata`]
1579    pub async fn set_metadata(&self, metadata: &Metadata) {
1580        self.db
1581            .autocommit::<_, _, anyhow::Error>(
1582                |dbtx, _| {
1583                    Box::pin(async {
1584                        Self::set_metadata_dbtx(dbtx, metadata).await;
1585                        Ok(())
1586                    })
1587                },
1588                None,
1589            )
1590            .await
1591            .expect("Failed to autocommit metadata");
1592    }
1593
1594    pub fn has_pending_recoveries(&self) -> bool {
1595        !self
1596            .client_recovery_progress_receiver
1597            .borrow()
1598            .iter()
1599            .all(|(_id, progress)| progress.is_done())
1600    }
1601
1602    /// Wait for all module recoveries to finish
1603    ///
1604    /// This will block until the recovery task is done with recoveries.
1605    /// Returns success if all recovery tasks are complete (success case),
1606    /// or an error if some modules could not complete the recovery at the time.
1607    ///
1608    /// A bit of a heavy approach.
1609    pub async fn wait_for_all_recoveries(&self) -> anyhow::Result<()> {
1610        let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1611        recovery_receiver
1612            .wait_for(|in_progress| {
1613                in_progress
1614                    .iter()
1615                    .all(|(_id, progress)| progress.is_done())
1616            })
1617            .await
1618            .context("Recovery task completed and update receiver disconnected, but some modules failed to recover")?;
1619
1620        Ok(())
1621    }
1622
1623    /// Subscribe to recover progress for all the modules.
1624    ///
1625    /// This stream can contain duplicate progress for a module.
1626    /// Don't use this stream for detecting completion of recovery.
1627    pub fn subscribe_to_recovery_progress(
1628        &self,
1629    ) -> impl Stream<Item = (ModuleInstanceId, RecoveryProgress)> + use<> {
1630        WatchStream::new(self.client_recovery_progress_receiver.clone())
1631            .flat_map(futures::stream::iter)
1632    }
1633
1634    pub async fn wait_for_module_kind_recovery(
1635        &self,
1636        module_kind: ModuleKind,
1637    ) -> anyhow::Result<()> {
1638        let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1639        let config = self.config().await;
1640        recovery_receiver
1641            .wait_for(|in_progress| {
1642                !in_progress
1643                    .iter()
1644                    .filter(|(module_instance_id, _progress)| {
1645                        config.modules[module_instance_id].kind == module_kind
1646                    })
1647                    .any(|(_id, progress)| !progress.is_done())
1648            })
1649            .await
1650            .context("Recovery task completed and update receiver disconnected, but the desired modules are still unavailable or failed to recover")?;
1651
1652        Ok(())
1653    }
1654
1655    pub async fn wait_for_all_active_state_machines(&self) -> anyhow::Result<()> {
1656        loop {
1657            if self.executor.get_active_states().await.is_empty() {
1658                break;
1659            }
1660            sleep(Duration::from_millis(100)).await;
1661        }
1662        Ok(())
1663    }
1664
1665    /// Set the client [`Metadata`]
1666    pub async fn set_metadata_dbtx(dbtx: &mut DatabaseTransaction<'_>, metadata: &Metadata) {
1667        dbtx.insert_new_entry(&ClientMetadataKey, metadata).await;
1668    }
1669
1670    fn spawn_module_recoveries_task(
1671        &self,
1672        recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1673        module_recoveries: BTreeMap<
1674            ModuleInstanceId,
1675            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1676        >,
1677        module_recovery_progress_receivers: BTreeMap<
1678            ModuleInstanceId,
1679            watch::Receiver<RecoveryProgress>,
1680        >,
1681    ) {
1682        let db = self.db.clone();
1683        let log_ordering_wakeup_tx = self.log_ordering_wakeup_tx.clone();
1684        let module_kinds: BTreeMap<ModuleInstanceId, String> = self
1685            .modules
1686            .iter_modules_id_kind()
1687            .map(|(id, kind)| (id, kind.to_string()))
1688            .collect();
1689        self.spawn("module recoveries", |_task_handle| async {
1690            Self::run_module_recoveries_task(
1691                db,
1692                log_ordering_wakeup_tx,
1693                recovery_sender,
1694                module_recoveries,
1695                module_recovery_progress_receivers,
1696                module_kinds,
1697            )
1698            .await;
1699        });
1700    }
1701
1702    async fn run_module_recoveries_task(
1703        db: Database,
1704        log_ordering_wakeup_tx: watch::Sender<()>,
1705        recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1706        module_recoveries: BTreeMap<
1707            ModuleInstanceId,
1708            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1709        >,
1710        module_recovery_progress_receivers: BTreeMap<
1711            ModuleInstanceId,
1712            watch::Receiver<RecoveryProgress>,
1713        >,
1714        module_kinds: BTreeMap<ModuleInstanceId, String>,
1715    ) {
1716        debug!(target: LOG_CLIENT_RECOVERY, num_modules=%module_recovery_progress_receivers.len(), "Staring module recoveries");
1717        let mut completed_stream = Vec::new();
1718        let progress_stream = futures::stream::FuturesUnordered::new();
1719
1720        for (module_instance_id, f) in module_recoveries {
1721            completed_stream.push(futures::stream::once(Box::pin(async move {
1722                match f.await {
1723                    Ok(()) => (module_instance_id, None),
1724                    Err(err) => {
1725                        warn!(
1726                            target: LOG_CLIENT,
1727                            err = %err.fmt_compact_anyhow(), module_instance_id, "Module recovery failed"
1728                        );
1729                        // a module recovery that failed reports and error and
1730                        // just never finishes, so we don't need a separate state
1731                        // for it
1732                        futures::future::pending::<()>().await;
1733                        unreachable!()
1734                    }
1735                }
1736            })));
1737        }
1738
1739        for (module_instance_id, rx) in module_recovery_progress_receivers {
1740            progress_stream.push(
1741                tokio_stream::wrappers::WatchStream::new(rx)
1742                    .fuse()
1743                    .map(move |progress| (module_instance_id, Some(progress))),
1744            );
1745        }
1746
1747        let mut futures = futures::stream::select(
1748            futures::stream::select_all(progress_stream),
1749            futures::stream::select_all(completed_stream),
1750        );
1751
1752        while let Some((module_instance_id, progress)) = futures.next().await {
1753            let mut dbtx = db.begin_transaction().await;
1754
1755            let prev_progress = *recovery_sender
1756                .borrow()
1757                .get(&module_instance_id)
1758                .expect("existing progress must be present");
1759
1760            let progress = if prev_progress.is_done() {
1761                // since updates might be out of order, once done, stick with it
1762                prev_progress
1763            } else if let Some(progress) = progress {
1764                progress
1765            } else {
1766                prev_progress.to_complete()
1767            };
1768
1769            if !prev_progress.is_done() && progress.is_done() {
1770                info!(
1771                    target: LOG_CLIENT,
1772                    module_instance_id,
1773                    progress = format!("{}/{}", progress.complete, progress.total),
1774                    "Recovery complete"
1775                );
1776                dbtx.log_event(
1777                    log_ordering_wakeup_tx.clone(),
1778                    None,
1779                    ModuleRecoveryCompleted {
1780                        module_id: module_instance_id,
1781                    },
1782                )
1783                .await;
1784            } else {
1785                info!(
1786                    target: LOG_CLIENT,
1787                    module_instance_id,
1788                    kind = module_kinds.get(&module_instance_id).map(String::as_str).unwrap_or("unknown"),
1789                    progress = format!("{}/{}", progress.complete, progress.total),
1790                    "Recovery progress"
1791                );
1792            }
1793
1794            dbtx.insert_entry(
1795                &ClientModuleRecovery { module_instance_id },
1796                &ClientModuleRecoveryState { progress },
1797            )
1798            .await;
1799            dbtx.commit_tx().await;
1800
1801            recovery_sender.send_modify(|v| {
1802                v.insert(module_instance_id, progress);
1803            });
1804        }
1805        debug!(target: LOG_CLIENT_RECOVERY, "Recovery executor stopped");
1806    }
1807
1808    async fn load_peers_last_api_versions(
1809        db: &Database,
1810        num_peers: NumPeers,
1811    ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1812        let mut peer_api_version_sets = BTreeMap::new();
1813
1814        let mut dbtx = db.begin_transaction_nc().await;
1815        for peer_id in num_peers.peer_ids() {
1816            if let Some(v) = dbtx
1817                .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1818                .await
1819            {
1820                peer_api_version_sets.insert(peer_id, v.0);
1821            }
1822        }
1823        drop(dbtx);
1824        peer_api_version_sets
1825    }
1826
1827    /// You likely want to use [`Client::get_peer_urls`]. This function returns
1828    /// only the announcements and doesn't use the config as fallback.
1829    pub async fn get_peer_url_announcements(&self) -> BTreeMap<PeerId, SignedApiAnnouncement> {
1830        self.db()
1831            .begin_transaction_nc()
1832            .await
1833            .find_by_prefix(&ApiAnnouncementPrefix)
1834            .await
1835            .map(|(announcement_key, announcement)| (announcement_key.0, announcement))
1836            .collect()
1837            .await
1838    }
1839
1840    /// Returns guardian metadata stored in the client database
1841    pub async fn get_guardian_metadata(
1842        &self,
1843    ) -> BTreeMap<PeerId, fedimint_core::net::guardian_metadata::SignedGuardianMetadata> {
1844        self.db()
1845            .begin_transaction_nc()
1846            .await
1847            .find_by_prefix(&crate::guardian_metadata::GuardianMetadataPrefix)
1848            .await
1849            .map(|(key, metadata)| (key.0, metadata))
1850            .collect()
1851            .await
1852    }
1853
1854    /// Returns a list of guardian API URLs
1855    pub async fn get_peer_urls(&self) -> BTreeMap<PeerId, SafeUrl> {
1856        get_api_urls(&self.db, &self.config().await).await
1857    }
1858
1859    /// Create an invite code with the api endpoint of the given peer which can
1860    /// be used to download this client config
1861    pub async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
1862        self.get_peer_urls()
1863            .await
1864            .into_iter()
1865            .find_map(|(peer_id, url)| (peer == peer_id).then_some(url))
1866            .map(|peer_url| {
1867                InviteCode::new(
1868                    peer_url.clone(),
1869                    peer,
1870                    self.federation_id(),
1871                    self.api_secret.clone(),
1872                )
1873            })
1874    }
1875
1876    /// Blocks till the client has synced the guardian public key set
1877    /// (introduced in version 0.4) and returns it. Once it has been fetched
1878    /// once this function is guaranteed to return immediately.
1879    pub async fn get_guardian_public_keys_blocking(
1880        &self,
1881    ) -> BTreeMap<PeerId, fedimint_core::secp256k1::PublicKey> {
1882        self.db
1883            .autocommit(
1884                |dbtx, _| {
1885                    Box::pin(async move {
1886                        let config = self.config().await;
1887
1888                        let guardian_pub_keys = self
1889                            .get_or_backfill_broadcast_public_keys(dbtx, config)
1890                            .await;
1891
1892                        Result::<_, ()>::Ok(guardian_pub_keys)
1893                    })
1894                },
1895                None,
1896            )
1897            .await
1898            .expect("Will retry forever")
1899    }
1900
1901    async fn get_or_backfill_broadcast_public_keys(
1902        &self,
1903        dbtx: &mut DatabaseTransaction<'_>,
1904        config: ClientConfig,
1905    ) -> BTreeMap<PeerId, PublicKey> {
1906        match config.global.broadcast_public_keys {
1907            Some(guardian_pub_keys) => guardian_pub_keys,
1908            _ => {
1909                let (guardian_pub_keys, new_config) = self.fetch_and_update_config(config).await;
1910
1911                dbtx.insert_entry(&ClientConfigKey, &new_config).await;
1912                *(self.config.write().await) = new_config;
1913                guardian_pub_keys
1914            }
1915        }
1916    }
1917
1918    async fn fetch_session_count(&self) -> FederationResult<u64> {
1919        self.api.session_count().await
1920    }
1921
1922    async fn fetch_and_update_config(
1923        &self,
1924        config: ClientConfig,
1925    ) -> (BTreeMap<PeerId, PublicKey>, ClientConfig) {
1926        let fetched_config = retry(
1927            "Fetching guardian public keys",
1928            backoff_util::background_backoff(),
1929            || async {
1930                Ok(self
1931                    .api
1932                    .request_current_consensus::<ClientConfig>(
1933                        CLIENT_CONFIG_ENDPOINT.to_owned(),
1934                        ApiRequestErased::default(),
1935                    )
1936                    .await?)
1937            },
1938        )
1939        .await
1940        .expect("Will never return on error");
1941
1942        let Some(guardian_pub_keys) = fetched_config.global.broadcast_public_keys else {
1943            warn!(
1944                target: LOG_CLIENT,
1945                "Guardian public keys not found in fetched config, server not updated to 0.4 yet"
1946            );
1947            pending::<()>().await;
1948            unreachable!("Pending will never return");
1949        };
1950
1951        let new_config = ClientConfig {
1952            global: GlobalClientConfig {
1953                broadcast_public_keys: Some(guardian_pub_keys.clone()),
1954                ..config.global
1955            },
1956            modules: config.modules,
1957        };
1958        (guardian_pub_keys, new_config)
1959    }
1960
1961    pub fn handle_global_rpc(
1962        &self,
1963        method: String,
1964        params: serde_json::Value,
1965    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1966        Box::pin(try_stream! {
1967            match method.as_str() {
1968                "get_balance" => {
1969                    let balance = self.get_balance_for_btc().await.unwrap_or_default();
1970                    yield serde_json::to_value(balance)?;
1971                }
1972                "subscribe_balance_changes" => {
1973                    let req: GetBalanceChangesRequest= serde_json::from_value(params)?;
1974                    let mut stream = self.subscribe_balance_changes(req.unit).await;
1975                    while let Some(balance) = stream.next().await {
1976                        yield serde_json::to_value(balance)?;
1977                    }
1978                }
1979                "get_config" => {
1980                    let config = self.config().await;
1981                    yield serde_json::to_value(config)?;
1982                }
1983                "get_federation_id" => {
1984                    let federation_id = self.federation_id();
1985                    yield serde_json::to_value(federation_id)?;
1986                }
1987                "get_invite_code" => {
1988                    let req: GetInviteCodeRequest = serde_json::from_value(params)?;
1989                    let invite_code = self.invite_code(req.peer).await;
1990                    yield serde_json::to_value(invite_code)?;
1991                }
1992                "get_operation" => {
1993                    let req: GetOperationIdRequest = serde_json::from_value(params)?;
1994                    let operation = self.operation_log().get_operation(req.operation_id).await;
1995                    yield serde_json::to_value(operation)?;
1996                }
1997                "list_operations" => {
1998                    let req: ListOperationsParams = serde_json::from_value(params)?;
1999                    let limit = if req.limit.is_none() && req.last_seen.is_none() {
2000                        usize::MAX
2001                    } else {
2002                        req.limit.unwrap_or(usize::MAX)
2003                    };
2004                    let operations = self.operation_log()
2005                        .paginate_operations_rev(limit, req.last_seen)
2006                        .await;
2007                    yield serde_json::to_value(operations)?;
2008                }
2009                "get_event_log" => {
2010                    let req: GetEventLogRequest = serde_json::from_value(params)?;
2011                    let limit = req
2012                        .limit
2013                        .unwrap_or(DEFAULT_EVENT_LOG_PAGE_SIZE)
2014                        .min(MAX_EVENT_LOG_PAGE_SIZE);
2015                    let events = self.get_event_log(req.pos, limit).await;
2016                    yield serde_json::to_value(events)?;
2017                }
2018                "session_count" => {
2019                    let count = self.fetch_session_count().await?;
2020                    yield serde_json::to_value(count)?;
2021                }
2022                "has_pending_recoveries" => {
2023                    let has_pending = self.has_pending_recoveries();
2024                    yield serde_json::to_value(has_pending)?;
2025                }
2026                "wait_for_all_recoveries" => {
2027                    self.wait_for_all_recoveries().await?;
2028                    yield serde_json::Value::Null;
2029                }
2030                "subscribe_to_recovery_progress" => {
2031                    let mut stream = self.subscribe_to_recovery_progress();
2032                    while let Some((module_id, progress)) = stream.next().await {
2033                        yield serde_json::json!({
2034                            "module_id": module_id,
2035                            "progress": progress
2036                        });
2037                    }
2038                }
2039                #[allow(deprecated)]
2040                "backup_to_federation" => {
2041                    let metadata = if params.is_null() {
2042                        Metadata::from_json_serialized(serde_json::json!({}))
2043                    } else {
2044                        Metadata::from_json_serialized(params)
2045                    };
2046                    self.backup_to_federation(metadata).await?;
2047                    yield serde_json::Value::Null;
2048                }
2049                _ => {
2050                    Err(anyhow::format_err!("Unknown method: {}", method))?;
2051                    unreachable!()
2052                },
2053            }
2054        })
2055    }
2056
2057    pub async fn log_event<E>(&self, module_id: Option<ModuleInstanceId>, event: E)
2058    where
2059        E: Event + Send,
2060    {
2061        let mut dbtx = self.db.begin_transaction().await;
2062        self.log_event_dbtx(&mut dbtx, module_id, event).await;
2063        dbtx.commit_tx().await;
2064    }
2065
2066    pub async fn log_event_dbtx<E, Cap>(
2067        &self,
2068        dbtx: &mut DatabaseTransaction<'_, Cap>,
2069        module_id: Option<ModuleInstanceId>,
2070        event: E,
2071    ) where
2072        E: Event + Send,
2073        Cap: Send,
2074    {
2075        dbtx.log_event(self.log_ordering_wakeup_tx.clone(), module_id, event)
2076            .await;
2077    }
2078
2079    pub async fn log_event_raw_dbtx<Cap>(
2080        &self,
2081        dbtx: &mut DatabaseTransaction<'_, Cap>,
2082        kind: EventKind,
2083        module: Option<(ModuleKind, ModuleInstanceId)>,
2084        payload: Vec<u8>,
2085        persist: EventPersistence,
2086    ) where
2087        Cap: Send,
2088    {
2089        let module_id = module.as_ref().map(|m| m.1);
2090        let module_kind = module.map(|m| m.0);
2091        dbtx.log_event_raw(
2092            self.log_ordering_wakeup_tx.clone(),
2093            kind,
2094            module_kind,
2095            module_id,
2096            payload,
2097            persist,
2098        )
2099        .await;
2100    }
2101
2102    /// Built in event log (trimmable) tracker
2103    ///
2104    /// For the convenience of downstream applications, [`Client`] can store
2105    /// internally event log position for the main application using/driving it.
2106    ///
2107    /// Note that this position is a singleton, so this tracker should not be
2108    /// used for multiple purposes or applications, etc. at the same time.
2109    ///
2110    /// If the application has a need to follow log using multiple trackers, it
2111    /// should implement own [`DynEventLogTrimableTracker`] and store its
2112    /// persient data by itself.
2113    pub fn built_in_application_event_log_tracker(&self) -> DynEventLogTrimableTracker {
2114        struct BuiltInApplicationEventLogTracker;
2115
2116        #[apply(async_trait_maybe_send!)]
2117        impl EventLogTrimableTracker for BuiltInApplicationEventLogTracker {
2118            // Store position in the event log
2119            async fn store(
2120                &mut self,
2121                dbtx: &mut DatabaseTransaction<NonCommittable>,
2122                pos: EventLogTrimableId,
2123            ) -> anyhow::Result<()> {
2124                dbtx.insert_entry(&DefaultApplicationEventLogKey, &pos)
2125                    .await;
2126                Ok(())
2127            }
2128
2129            /// Load the last previous stored position (or None if never stored)
2130            async fn load(
2131                &mut self,
2132                dbtx: &mut DatabaseTransaction<NonCommittable>,
2133            ) -> anyhow::Result<Option<EventLogTrimableId>> {
2134                Ok(dbtx.get_value(&DefaultApplicationEventLogKey).await)
2135            }
2136        }
2137        Box::new(BuiltInApplicationEventLogTracker)
2138    }
2139
2140    /// Like [`Self::handle_events`] but for historical data.
2141    ///
2142    ///
2143    /// This function can be used to process subset of events
2144    /// that is infrequent and important enough to be persisted
2145    /// forever. Most applications should prefer to use [`Self::handle_events`]
2146    /// which emits *all* events.
2147    pub async fn handle_historical_events<F, R>(
2148        &self,
2149        tracker: fedimint_eventlog::DynEventLogTracker,
2150        handler_fn: F,
2151    ) -> anyhow::Result<()>
2152    where
2153        F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
2154        R: Future<Output = anyhow::Result<()>>,
2155    {
2156        fedimint_eventlog::handle_events(
2157            self.db.clone(),
2158            tracker,
2159            self.log_event_added_rx.clone(),
2160            handler_fn,
2161        )
2162        .await
2163    }
2164
2165    /// Handle events emitted by the client
2166    ///
2167    /// This is a preferred method for reactive & asynchronous
2168    /// processing of events emitted by the client.
2169    ///
2170    /// It needs a `tracker` that will persist the position in the log
2171    /// as it is being handled. You can use the
2172    /// [`Client::built_in_application_event_log_tracker`] if this call is
2173    /// used for the single main application handling this instance of the
2174    /// [`Client`]. Otherwise you should implement your own tracker.
2175    ///
2176    /// This handler will call `handle_fn` with ever event emitted by
2177    /// [`Client`], including transient ones. The caller should atomically
2178    /// handle each event it is interested in and ignore other ones.
2179    ///
2180    /// This method returns only when client is shutting down or on internal
2181    /// error, so typically should be called in a background task dedicated
2182    /// to handling events.
2183    pub async fn handle_events<F, R>(
2184        &self,
2185        tracker: fedimint_eventlog::DynEventLogTrimableTracker,
2186        handler_fn: F,
2187    ) -> anyhow::Result<()>
2188    where
2189        F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
2190        R: Future<Output = anyhow::Result<()>>,
2191    {
2192        fedimint_eventlog::handle_trimable_events(
2193            self.db.clone(),
2194            tracker,
2195            self.log_event_added_rx.clone(),
2196            handler_fn,
2197        )
2198        .await
2199    }
2200
2201    pub async fn get_event_log(
2202        &self,
2203        pos: Option<EventLogId>,
2204        limit: u64,
2205    ) -> Vec<PersistedLogEntry> {
2206        self.get_event_log_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
2207            .await
2208    }
2209
2210    pub async fn get_event_log_trimable(
2211        &self,
2212        pos: Option<EventLogTrimableId>,
2213        limit: u64,
2214    ) -> Vec<PersistedLogEntry> {
2215        self.get_event_log_trimable_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
2216            .await
2217    }
2218
2219    pub async fn get_event_log_dbtx<Cap>(
2220        &self,
2221        dbtx: &mut DatabaseTransaction<'_, Cap>,
2222        pos: Option<EventLogId>,
2223        limit: u64,
2224    ) -> Vec<PersistedLogEntry>
2225    where
2226        Cap: Send,
2227    {
2228        dbtx.get_event_log(pos, limit).await
2229    }
2230
2231    pub async fn get_event_log_trimable_dbtx<Cap>(
2232        &self,
2233        dbtx: &mut DatabaseTransaction<'_, Cap>,
2234        pos: Option<EventLogTrimableId>,
2235        limit: u64,
2236    ) -> Vec<PersistedLogEntry>
2237    where
2238        Cap: Send,
2239    {
2240        dbtx.get_event_log_trimable(pos, limit).await
2241    }
2242
2243    /// Register to receiver all new transient (unpersisted) events
2244    pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
2245        self.log_event_added_transient_tx.subscribe()
2246    }
2247
2248    /// Get a receiver that signals when new events are added to the event log
2249    pub fn log_event_added_rx(&self) -> watch::Receiver<()> {
2250        self.log_event_added_rx.clone()
2251    }
2252
2253    pub fn iroh_enable_dht(&self) -> bool {
2254        self.iroh_enable_dht
2255    }
2256
2257    pub(crate) async fn run_core_migrations(
2258        db_no_decoders: &Database,
2259    ) -> Result<(), anyhow::Error> {
2260        let mut dbtx = db_no_decoders.begin_transaction().await;
2261        apply_migrations_core_client_dbtx(&mut dbtx.to_ref_nc(), "fedimint-client".to_string())
2262            .await?;
2263        if is_running_in_test_env() {
2264            verify_client_db_integrity_dbtx(&mut dbtx.to_ref_nc()).await;
2265        }
2266        dbtx.commit_tx_result().await?;
2267        Ok(())
2268    }
2269
2270    /// Iterator over primary modules for a given `unit`
2271    fn primary_modules_for_unit(
2272        &self,
2273        unit: AmountUnit,
2274    ) -> impl Iterator<Item = (ModuleInstanceId, &DynClientModule)> {
2275        self.primary_modules
2276            .iter()
2277            .flat_map(move |(_prio, candidates)| {
2278                candidates
2279                    .specific
2280                    .get(&unit)
2281                    .into_iter()
2282                    .flatten()
2283                    .copied()
2284                    // within same priority, wildcard matches come last
2285                    .chain(candidates.wildcard.iter().copied())
2286            })
2287            .map(|id| (id, self.modules.get_expect(id)))
2288    }
2289
2290    /// Primary module to use for `unit`
2291    ///
2292    /// Currently, just pick the first (highest priority) match
2293    pub fn primary_module_for_unit(
2294        &self,
2295        unit: AmountUnit,
2296    ) -> Option<(ModuleInstanceId, &DynClientModule)> {
2297        self.primary_modules_for_unit(unit).next()
2298    }
2299
2300    /// [`Self::primary_module_for_unit`] for Bitcoin
2301    pub fn primary_module_for_btc(&self) -> (ModuleInstanceId, &DynClientModule) {
2302        self.primary_module_for_unit(AmountUnit::BITCOIN)
2303            .expect("No primary module for Bitcoin")
2304    }
2305}
2306
2307#[apply(async_trait_maybe_send!)]
2308impl ClientContextIface for Client {
2309    fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
2310        Client::get_module(self, instance)
2311    }
2312
2313    fn api_clone(&self) -> DynGlobalApi {
2314        Client::api_clone(self)
2315    }
2316    fn decoders(&self) -> &ModuleDecoderRegistry {
2317        Client::decoders(self)
2318    }
2319
2320    async fn finalize_and_submit_transaction(
2321        &self,
2322        operation_id: OperationId,
2323        operation_type: &str,
2324        operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
2325        tx_builder: TransactionBuilder,
2326    ) -> anyhow::Result<OutPointRange> {
2327        Client::finalize_and_submit_transaction(
2328            self,
2329            operation_id,
2330            operation_type,
2331            // |out_point_range| operation_meta_gen(out_point_range),
2332            &operation_meta_gen,
2333            tx_builder,
2334        )
2335        .await
2336    }
2337
2338    async fn finalize_and_submit_transaction_dbtx(
2339        &self,
2340        dbtx: &mut DatabaseTransaction<'_>,
2341        operation_id: OperationId,
2342        operation_type: &str,
2343        operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
2344        tx_builder: TransactionBuilder,
2345    ) -> anyhow::Result<OutPointRange> {
2346        Client::finalize_and_submit_transaction_dbtx(
2347            self,
2348            dbtx,
2349            operation_id,
2350            operation_type,
2351            &operation_meta_gen,
2352            tx_builder,
2353        )
2354        .await
2355    }
2356
2357    async fn finalize_and_submit_transaction_inner(
2358        &self,
2359        dbtx: &mut DatabaseTransaction<'_>,
2360        operation_id: OperationId,
2361        tx_builder: TransactionBuilder,
2362    ) -> anyhow::Result<OutPointRange> {
2363        Client::finalize_and_submit_transaction_inner(self, dbtx, operation_id, tx_builder).await
2364    }
2365
2366    async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
2367        Client::transaction_updates(self, operation_id).await
2368    }
2369
2370    async fn await_primary_module_outputs(
2371        &self,
2372        operation_id: OperationId,
2373        // TODO: make `impl Iterator<Item = ...>`
2374        outputs: Vec<OutPoint>,
2375    ) -> anyhow::Result<()> {
2376        Client::await_primary_bitcoin_module_outputs(self, operation_id, outputs).await
2377    }
2378
2379    fn operation_log(&self) -> &dyn IOperationLog {
2380        Client::operation_log(self)
2381    }
2382
2383    async fn has_active_states(&self, operation_id: OperationId) -> bool {
2384        Client::has_active_states(self, operation_id).await
2385    }
2386
2387    async fn operation_exists(&self, operation_id: OperationId) -> bool {
2388        Client::operation_exists(self, operation_id).await
2389    }
2390
2391    async fn config(&self) -> ClientConfig {
2392        Client::config(self).await
2393    }
2394
2395    fn db(&self) -> &Database {
2396        Client::db(self)
2397    }
2398
2399    fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static)) {
2400        Client::executor(self)
2401    }
2402
2403    async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
2404        Client::invite_code(self, peer).await
2405    }
2406
2407    fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
2408        Client::get_internal_payment_markers(self)
2409    }
2410
2411    async fn log_event_json(
2412        &self,
2413        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
2414        module_kind: Option<ModuleKind>,
2415        module_id: ModuleInstanceId,
2416        kind: EventKind,
2417        payload: serde_json::Value,
2418        persist: EventPersistence,
2419    ) {
2420        dbtx.ensure_global()
2421            .expect("Must be called with global dbtx");
2422        self.log_event_raw_dbtx(
2423            dbtx,
2424            kind,
2425            module_kind.map(|kind| (kind, module_id)),
2426            serde_json::to_vec(&payload).expect("Serialization can't fail"),
2427            persist,
2428        )
2429        .await;
2430    }
2431
2432    async fn read_operation_active_states<'dbtx>(
2433        &self,
2434        operation_id: OperationId,
2435        module_id: ModuleInstanceId,
2436        dbtx: &'dbtx mut DatabaseTransaction<'_>,
2437    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>
2438    {
2439        Box::pin(
2440            dbtx.find_by_prefix(&ActiveModuleOperationStateKeyPrefix {
2441                operation_id,
2442                module_instance: module_id,
2443            })
2444            .await
2445            .map(move |(k, v)| (k.0, v)),
2446        )
2447    }
2448    async fn read_operation_inactive_states<'dbtx>(
2449        &self,
2450        operation_id: OperationId,
2451        module_id: ModuleInstanceId,
2452        dbtx: &'dbtx mut DatabaseTransaction<'_>,
2453    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>
2454    {
2455        Box::pin(
2456            dbtx.find_by_prefix(&InactiveModuleOperationStateKeyPrefix {
2457                operation_id,
2458                module_instance: module_id,
2459            })
2460            .await
2461            .map(move |(k, v)| (k.0, v)),
2462        )
2463    }
2464}
2465
2466// TODO: impl `Debug` for `Client` and derive here
2467impl fmt::Debug for Client {
2468    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2469        write!(f, "Client")
2470    }
2471}
2472
2473pub fn client_decoders<'a>(
2474    registry: &ModuleInitRegistry<DynClientModuleInit>,
2475    module_kinds: impl Iterator<Item = (ModuleInstanceId, &'a ModuleKind)>,
2476) -> ModuleDecoderRegistry {
2477    let mut modules = BTreeMap::new();
2478    for (id, kind) in module_kinds {
2479        let Some(init) = registry.get(kind) else {
2480            debug!("Detected configuration for unsupported module id: {id}, kind: {kind}");
2481            continue;
2482        };
2483
2484        modules.insert(
2485            id,
2486            (
2487                kind.clone(),
2488                IClientModuleInit::decoder(AsRef::<dyn IClientModuleInit + 'static>::as_ref(init)),
2489            ),
2490        );
2491    }
2492    ModuleDecoderRegistry::from(modules)
2493}