fedimint_client/
client.rs

1use std::collections::{BTreeMap, HashSet};
2use std::fmt::{self, Formatter};
3use std::future::{Future, pending};
4use std::ops::Range;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9use anyhow::{Context as _, anyhow, bail, format_err};
10use async_stream::try_stream;
11use bitcoin::key::Secp256k1;
12use bitcoin::key::rand::thread_rng;
13use bitcoin::secp256k1::{self, PublicKey};
14use fedimint_api_client::api::global_api::with_request_hook::ApiRequestHook;
15use fedimint_api_client::api::net::Connector;
16use fedimint_api_client::api::{
17    ApiVersionSet, DynGlobalApi, FederationApiExt as _, FederationResult, IGlobalFederationApi,
18};
19use fedimint_client_module::module::recovery::RecoveryProgress;
20use fedimint_client_module::module::{
21    ClientContextIface, ClientModule, ClientModuleRegistry, DynClientModule, FinalClientIface,
22    IClientModule, IdxRange, OutPointRange,
23};
24use fedimint_client_module::oplog::IOperationLog;
25use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy as _};
26use fedimint_client_module::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
27use fedimint_client_module::sm::{ActiveStateMeta, DynState, InactiveStateMeta};
28use fedimint_client_module::transaction::{
29    TRANSACTION_SUBMISSION_MODULE_INSTANCE, TransactionBuilder, TxSubmissionStates,
30    TxSubmissionStatesSM,
31};
32use fedimint_client_module::{
33    AddStateMachinesResult, ClientModuleInstance, GetInviteCodeRequest, ModuleGlobalContextGen,
34    ModuleRecoveryCompleted, TransactionUpdates, TxCreatedEvent,
35};
36use fedimint_core::config::{
37    ClientConfig, FederationId, GlobalClientConfig, JsonClientConfig, ModuleInitRegistry,
38};
39use fedimint_core::core::{DynInput, DynOutput, ModuleInstanceId, ModuleKind, OperationId};
40use fedimint_core::db::{
41    AutocommitError, Database, DatabaseKey, DatabaseRecord, DatabaseTransaction,
42    IDatabaseTransactionOpsCoreTyped as _, NonCommittable,
43};
44use fedimint_core::encoding::{Decodable, Encodable};
45use fedimint_core::endpoint_constants::{CLIENT_CONFIG_ENDPOINT, VERSION_ENDPOINT};
46use fedimint_core::envs::is_running_in_test_env;
47use fedimint_core::invite_code::InviteCode;
48use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
49use fedimint_core::module::{
50    ApiRequestErased, ApiVersion, MultiApiVersion, SupportedApiVersionsSummary,
51    SupportedCoreApiVersions, SupportedModuleApiVersions,
52};
53use fedimint_core::net::api_announcement::SignedApiAnnouncement;
54use fedimint_core::task::{Elapsed, MaybeSend, MaybeSync, TaskGroup};
55use fedimint_core::transaction::Transaction;
56use fedimint_core::util::{
57    BoxStream, FmtCompact as _, FmtCompactAnyhow as _, SafeUrl, backoff_util, retry,
58};
59use fedimint_core::{
60    Amount, NumPeers, OutPoint, PeerId, apply, async_trait_maybe_send,
61    fedimint_build_code_version_env, maybe_add_send, maybe_add_send_sync, runtime,
62};
63use fedimint_derive_secret::DerivableSecret;
64use fedimint_eventlog::{
65    DBTransactionEventLogExt as _, Event, EventKind, EventLogEntry, EventLogId, EventLogTrimableId,
66    EventPersistence, PersistedLogEntry,
67};
68use fedimint_logging::{LOG_CLIENT, LOG_CLIENT_NET_API, LOG_CLIENT_RECOVERY};
69use futures::stream::FuturesUnordered;
70use futures::{Stream, StreamExt as _};
71use global_ctx::ModuleGlobalClientContext;
72use serde::{Deserialize, Serialize};
73use tokio::sync::{broadcast, watch};
74use tokio_stream::wrappers::WatchStream;
75use tracing::{debug, info, warn};
76
77use crate::ClientBuilder;
78use crate::api_announcements::{ApiAnnouncementPrefix, get_api_urls};
79use crate::backup::Metadata;
80use crate::db::{
81    ApiSecretKey, CachedApiVersionSet, CachedApiVersionSetKey, ChronologicalOperationLogKey,
82    ClientConfigKey, ClientMetadataKey, ClientModuleRecovery, ClientModuleRecoveryState,
83    EncodedClientSecretKey, OperationLogKey, PeerLastApiVersionsSummary,
84    PeerLastApiVersionsSummaryKey, PendingClientConfigKey, apply_migrations_core_client_dbtx,
85    get_decoded_client_secret, verify_client_db_integrity_dbtx,
86};
87use crate::meta::MetaService;
88use crate::module_init::{ClientModuleInitRegistry, DynClientModuleInit, IClientModuleInit};
89use crate::oplog::OperationLog;
90use crate::sm::executor::{
91    ActiveModuleOperationStateKeyPrefix, ActiveOperationStateKeyPrefix, Executor,
92    InactiveModuleOperationStateKeyPrefix, InactiveOperationStateKeyPrefix,
93};
94
95pub(crate) mod builder;
96pub(crate) mod global_ctx;
97pub(crate) mod handle;
98
99/// List of core api versions supported by the implementation.
100/// Notably `major` version is the one being supported, and corresponding
101/// `minor` version is the one required (for given `major` version).
102const SUPPORTED_CORE_API_VERSIONS: &[fedimint_core::module::ApiVersion] =
103    &[ApiVersion { major: 0, minor: 0 }];
104
105/// Main client type
106///
107/// A handle and API to interacting with a single federation. End user
108/// applications that want to support interacting with multiple federations at
109/// the same time, will need to instantiate and manage multiple instances of
110/// this struct.
111///
112/// Under the hood it is starting and managing service tasks, state machines,
113/// database and other resources required.
114///
115/// This type is shared externally and internally, and
116/// [`crate::ClientHandle`] is responsible for external lifecycle management
117/// and resource freeing of the [`Client`].
118pub struct Client {
119    final_client: FinalClientIface,
120    config: tokio::sync::RwLock<ClientConfig>,
121    api_secret: Option<String>,
122    decoders: ModuleDecoderRegistry,
123    db: Database,
124    federation_id: FederationId,
125    federation_config_meta: BTreeMap<String, String>,
126    primary_module_instance: ModuleInstanceId,
127    pub(crate) modules: ClientModuleRegistry,
128    module_inits: ClientModuleInitRegistry,
129    executor: Executor,
130    pub(crate) api: DynGlobalApi,
131    root_secret: DerivableSecret,
132    operation_log: OperationLog,
133    secp_ctx: Secp256k1<secp256k1::All>,
134    meta_service: Arc<MetaService>,
135    connector: Connector,
136
137    task_group: TaskGroup,
138
139    /// Updates about client recovery progress
140    client_recovery_progress_receiver:
141        watch::Receiver<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
142
143    /// Internal client sender to wake up log ordering task every time a
144    /// (unuordered) log event is added.
145    log_ordering_wakeup_tx: watch::Sender<()>,
146    /// Receiver for events fired every time (ordered) log event is added.
147    log_event_added_rx: watch::Receiver<()>,
148    log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
149    request_hook: ApiRequestHook,
150}
151
152#[derive(Debug, Serialize, Deserialize)]
153struct ListOperationsParams {
154    limit: Option<usize>,
155    last_seen: Option<ChronologicalOperationLogKey>,
156}
157
158#[derive(Debug, Clone, Serialize, Deserialize)]
159pub struct GetOperationIdRequest {
160    operation_id: OperationId,
161}
162
163impl Client {
164    /// Initialize a client builder that can be configured to create a new
165    /// client.
166    pub async fn builder(db: Database) -> anyhow::Result<ClientBuilder> {
167        let mut dbtx = db.begin_transaction().await;
168        apply_migrations_core_client_dbtx(&mut dbtx.to_ref_nc(), "fedimint-client".to_string())
169            .await?;
170        if is_running_in_test_env() {
171            verify_client_db_integrity_dbtx(&mut dbtx.to_ref_nc()).await;
172        }
173        dbtx.commit_tx_result().await?;
174
175        Ok(ClientBuilder::new(db))
176    }
177
178    pub fn api(&self) -> &(dyn IGlobalFederationApi + 'static) {
179        self.api.as_ref()
180    }
181
182    pub fn api_clone(&self) -> DynGlobalApi {
183        self.api.clone()
184    }
185
186    /// Get the [`TaskGroup`] that is tied to Client's lifetime.
187    pub fn task_group(&self) -> &TaskGroup {
188        &self.task_group
189    }
190
191    /// Useful for our CLI tooling, not meant for external use
192    #[doc(hidden)]
193    pub fn executor(&self) -> &Executor {
194        &self.executor
195    }
196
197    pub async fn get_config_from_db(db: &Database) -> Option<ClientConfig> {
198        let mut dbtx = db.begin_transaction_nc().await;
199        dbtx.get_value(&ClientConfigKey).await
200    }
201
202    pub async fn get_pending_config_from_db(db: &Database) -> Option<ClientConfig> {
203        let mut dbtx = db.begin_transaction_nc().await;
204        dbtx.get_value(&PendingClientConfigKey).await
205    }
206
207    pub async fn get_api_secret_from_db(db: &Database) -> Option<String> {
208        let mut dbtx = db.begin_transaction_nc().await;
209        dbtx.get_value(&ApiSecretKey).await
210    }
211
212    pub async fn store_encodable_client_secret<T: Encodable>(
213        db: &Database,
214        secret: T,
215    ) -> anyhow::Result<()> {
216        let mut dbtx = db.begin_transaction().await;
217
218        // Don't overwrite an existing secret
219        if dbtx.get_value(&EncodedClientSecretKey).await.is_some() {
220            bail!("Encoded client secret already exists, cannot overwrite")
221        }
222
223        let encoded_secret = T::consensus_encode_to_vec(&secret);
224        dbtx.insert_entry(&EncodedClientSecretKey, &encoded_secret)
225            .await;
226        dbtx.commit_tx().await;
227        Ok(())
228    }
229
230    pub async fn load_decodable_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
231        let Some(secret) = Self::load_decodable_client_secret_opt(db).await? else {
232            bail!("Encoded client secret not present in DB")
233        };
234
235        Ok(secret)
236    }
237    pub async fn load_decodable_client_secret_opt<T: Decodable>(
238        db: &Database,
239    ) -> anyhow::Result<Option<T>> {
240        let mut dbtx = db.begin_transaction_nc().await;
241
242        let client_secret = dbtx.get_value(&EncodedClientSecretKey).await;
243
244        Ok(match client_secret {
245            Some(client_secret) => Some(
246                T::consensus_decode_whole(&client_secret, &ModuleRegistry::default())
247                    .map_err(|e| anyhow!("Decoding failed: {e}"))?,
248            ),
249            None => None,
250        })
251    }
252
253    pub async fn load_or_generate_client_secret(db: &Database) -> anyhow::Result<[u8; 64]> {
254        let client_secret = match Self::load_decodable_client_secret::<[u8; 64]>(db).await {
255            Ok(secret) => secret,
256            _ => {
257                let secret = PlainRootSecretStrategy::random(&mut thread_rng());
258                Self::store_encodable_client_secret(db, secret)
259                    .await
260                    .expect("Storing client secret must work");
261                secret
262            }
263        };
264        Ok(client_secret)
265    }
266
267    pub async fn is_initialized(db: &Database) -> bool {
268        Self::get_config_from_db(db).await.is_some()
269    }
270
271    pub fn start_executor(self: &Arc<Self>) {
272        debug!(
273            target: LOG_CLIENT,
274            "Starting fedimint client executor (version: {})",
275            fedimint_build_code_version_env!()
276        );
277        self.executor.start_executor(self.context_gen());
278    }
279
280    pub fn federation_id(&self) -> FederationId {
281        self.federation_id
282    }
283
284    fn context_gen(self: &Arc<Self>) -> ModuleGlobalContextGen {
285        let client_inner = Arc::downgrade(self);
286        Arc::new(move |module_instance, operation| {
287            ModuleGlobalClientContext {
288                client: client_inner
289                    .clone()
290                    .upgrade()
291                    .expect("ModuleGlobalContextGen called after client was dropped"),
292                module_instance_id: module_instance,
293                operation,
294            }
295            .into()
296        })
297    }
298
299    pub async fn config(&self) -> ClientConfig {
300        self.config.read().await.clone()
301    }
302
303    pub fn api_secret(&self) -> &Option<String> {
304        &self.api_secret
305    }
306
307    pub fn decoders(&self) -> &ModuleDecoderRegistry {
308        &self.decoders
309    }
310
311    /// Returns a reference to the module, panics if not found
312    fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
313        self.try_get_module(instance)
314            .expect("Module instance not found")
315    }
316
317    fn try_get_module(
318        &self,
319        instance: ModuleInstanceId,
320    ) -> Option<&maybe_add_send_sync!(dyn IClientModule)> {
321        Some(self.modules.get(instance)?.as_ref())
322    }
323
324    pub fn has_module(&self, instance: ModuleInstanceId) -> bool {
325        self.modules.get(instance).is_some()
326    }
327
328    /// Returns the input amount and output amount of a transaction
329    ///
330    /// # Panics
331    /// If any of the input or output versions in the transaction builder are
332    /// unknown by the respective module.
333    fn transaction_builder_balance(&self, builder: &TransactionBuilder) -> (Amount, Amount) {
334        // FIXME: prevent overflows, currently not suitable for untrusted input
335        let mut in_amount = Amount::ZERO;
336        let mut out_amount = Amount::ZERO;
337        let mut fee_amount = Amount::ZERO;
338
339        for input in builder.inputs() {
340            let module = self.get_module(input.input.module_instance_id());
341
342            let item_fee = module.input_fee(input.amount, &input.input).expect(
343                "We only build transactions with input versions that are supported by the module",
344            );
345
346            in_amount += input.amount;
347            fee_amount += item_fee;
348        }
349
350        for output in builder.outputs() {
351            let module = self.get_module(output.output.module_instance_id());
352
353            let item_fee = module.output_fee(output.amount, &output.output).expect(
354                "We only build transactions with output versions that are supported by the module",
355            );
356
357            out_amount += output.amount;
358            fee_amount += item_fee;
359        }
360
361        (in_amount, out_amount + fee_amount)
362    }
363
364    pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
365        Ok((self.federation_id().to_fake_ln_pub_key(&self.secp_ctx)?, 0))
366    }
367
368    /// Get metadata value from the federation config itself
369    pub fn get_config_meta(&self, key: &str) -> Option<String> {
370        self.federation_config_meta.get(key).cloned()
371    }
372
373    pub(crate) fn root_secret(&self) -> DerivableSecret {
374        self.root_secret.clone()
375    }
376
377    pub async fn add_state_machines(
378        &self,
379        dbtx: &mut DatabaseTransaction<'_>,
380        states: Vec<DynState>,
381    ) -> AddStateMachinesResult {
382        self.executor.add_state_machines_dbtx(dbtx, states).await
383    }
384
385    // TODO: implement as part of [`OperationLog`]
386    pub async fn get_active_operations(&self) -> HashSet<OperationId> {
387        let active_states = self.executor.get_active_states().await;
388        let mut active_operations = HashSet::with_capacity(active_states.len());
389        let mut dbtx = self.db().begin_transaction_nc().await;
390        for (state, _) in active_states {
391            let operation_id = state.operation_id();
392            if dbtx
393                .get_value(&OperationLogKey { operation_id })
394                .await
395                .is_some()
396            {
397                active_operations.insert(operation_id);
398            }
399        }
400        active_operations
401    }
402
403    pub fn operation_log(&self) -> &OperationLog {
404        &self.operation_log
405    }
406
407    /// Get the meta manager to read meta fields.
408    pub fn meta_service(&self) -> &Arc<MetaService> {
409        &self.meta_service
410    }
411
412    /// Get the meta manager to read meta fields.
413    pub async fn get_meta_expiration_timestamp(&self) -> Option<SystemTime> {
414        let meta_service = self.meta_service();
415        let ts = meta_service
416            .get_field::<u64>(self.db(), "federation_expiry_timestamp")
417            .await
418            .and_then(|v| v.value)?;
419        Some(UNIX_EPOCH + Duration::from_secs(ts))
420    }
421
422    /// Adds funding to a transaction or removes over-funding via change.
423    async fn finalize_transaction(
424        &self,
425        dbtx: &mut DatabaseTransaction<'_>,
426        operation_id: OperationId,
427        mut partial_transaction: TransactionBuilder,
428    ) -> anyhow::Result<(Transaction, Vec<DynState>, Range<u64>)> {
429        let (input_amount, output_amount) = self.transaction_builder_balance(&partial_transaction);
430
431        let (added_input_bundle, change_outputs) = self
432            .primary_module()
433            .create_final_inputs_and_outputs(
434                self.primary_module_instance,
435                dbtx,
436                operation_id,
437                input_amount,
438                output_amount,
439            )
440            .await?;
441
442        // This is the range of  outputs that will be added to the transaction
443        // in order to balance it. Notice that it may stay empty in case the transaction
444        // is already balanced.
445        let change_range = Range {
446            start: partial_transaction.outputs().count() as u64,
447            end: (partial_transaction.outputs().count() + change_outputs.outputs().len()) as u64,
448        };
449
450        partial_transaction = partial_transaction
451            .with_inputs(added_input_bundle)
452            .with_outputs(change_outputs);
453
454        let (input_amount, output_amount) = self.transaction_builder_balance(&partial_transaction);
455
456        assert!(input_amount >= output_amount, "Transaction is underfunded");
457
458        let (tx, states) = partial_transaction.build(&self.secp_ctx, thread_rng());
459
460        Ok((tx, states, change_range))
461    }
462
463    /// Add funding and/or change to the transaction builder as needed, finalize
464    /// the transaction and submit it to the federation.
465    ///
466    /// ## Errors
467    /// The function will return an error if the operation with given ID already
468    /// exists.
469    ///
470    /// ## Panics
471    /// The function will panic if the database transaction collides with
472    /// other and fails with others too often, this should not happen except for
473    /// excessively concurrent scenarios.
474    pub async fn finalize_and_submit_transaction<F, M>(
475        &self,
476        operation_id: OperationId,
477        operation_type: &str,
478        operation_meta_gen: F,
479        tx_builder: TransactionBuilder,
480    ) -> anyhow::Result<OutPointRange>
481    where
482        F: Fn(OutPointRange) -> M + Clone + MaybeSend + MaybeSync,
483        M: serde::Serialize + MaybeSend,
484    {
485        let operation_type = operation_type.to_owned();
486
487        let autocommit_res = self
488            .db
489            .autocommit(
490                |dbtx, _| {
491                    let operation_type = operation_type.clone();
492                    let tx_builder = tx_builder.clone();
493                    let operation_meta_gen = operation_meta_gen.clone();
494                    Box::pin(async move {
495                        if Client::operation_exists_dbtx(dbtx, operation_id).await {
496                            bail!("There already exists an operation with id {operation_id:?}")
497                        }
498
499                        let out_point_range = self
500                            .finalize_and_submit_transaction_inner(dbtx, operation_id, tx_builder)
501                            .await?;
502
503                        self.operation_log()
504                            .add_operation_log_entry_dbtx(
505                                dbtx,
506                                operation_id,
507                                &operation_type,
508                                operation_meta_gen(out_point_range),
509                            )
510                            .await;
511
512                        Ok(out_point_range)
513                    })
514                },
515                Some(100), // TODO: handle what happens after 100 retries
516            )
517            .await;
518
519        match autocommit_res {
520            Ok(txid) => Ok(txid),
521            Err(AutocommitError::ClosureError { error, .. }) => Err(error),
522            Err(AutocommitError::CommitFailed {
523                attempts,
524                last_error,
525            }) => panic!(
526                "Failed to commit tx submission dbtx after {attempts} attempts: {last_error}"
527            ),
528        }
529    }
530
531    async fn finalize_and_submit_transaction_inner(
532        &self,
533        dbtx: &mut DatabaseTransaction<'_>,
534        operation_id: OperationId,
535        tx_builder: TransactionBuilder,
536    ) -> anyhow::Result<OutPointRange> {
537        let (transaction, mut states, change_range) = self
538            .finalize_transaction(&mut dbtx.to_ref_nc(), operation_id, tx_builder)
539            .await?;
540
541        if transaction.consensus_encode_to_vec().len() > Transaction::MAX_TX_SIZE {
542            let inputs = transaction
543                .inputs
544                .iter()
545                .map(DynInput::module_instance_id)
546                .collect::<Vec<_>>();
547            let outputs = transaction
548                .outputs
549                .iter()
550                .map(DynOutput::module_instance_id)
551                .collect::<Vec<_>>();
552            warn!(
553                target: LOG_CLIENT_NET_API,
554                size=%transaction.consensus_encode_to_vec().len(),
555                ?inputs,
556                ?outputs,
557                "Transaction too large",
558            );
559            debug!(target: LOG_CLIENT_NET_API, ?transaction, "transaction details");
560            bail!(
561                "The generated transaction would be rejected by the federation for being too large."
562            );
563        }
564
565        let txid = transaction.tx_hash();
566
567        debug!(target: LOG_CLIENT_NET_API, %txid, ?transaction,  "Finalized and submitting transaction");
568
569        let tx_submission_sm = DynState::from_typed(
570            TRANSACTION_SUBMISSION_MODULE_INSTANCE,
571            TxSubmissionStatesSM {
572                operation_id,
573                state: TxSubmissionStates::Created(transaction),
574            },
575        );
576        states.push(tx_submission_sm);
577
578        self.executor.add_state_machines_dbtx(dbtx, states).await?;
579
580        self.log_event_dbtx(dbtx, None, TxCreatedEvent { txid, operation_id })
581            .await;
582
583        Ok(OutPointRange::new(txid, IdxRange::from(change_range)))
584    }
585
586    async fn transaction_update_stream(
587        &self,
588        operation_id: OperationId,
589    ) -> BoxStream<'static, TxSubmissionStatesSM> {
590        self.executor
591            .notifier()
592            .module_notifier::<TxSubmissionStatesSM>(
593                TRANSACTION_SUBMISSION_MODULE_INSTANCE,
594                self.final_client.clone(),
595            )
596            .subscribe(operation_id)
597            .await
598    }
599
600    pub async fn operation_exists(&self, operation_id: OperationId) -> bool {
601        let mut dbtx = self.db().begin_transaction_nc().await;
602
603        Client::operation_exists_dbtx(&mut dbtx, operation_id).await
604    }
605
606    pub async fn operation_exists_dbtx(
607        dbtx: &mut DatabaseTransaction<'_>,
608        operation_id: OperationId,
609    ) -> bool {
610        let active_state_exists = dbtx
611            .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
612            .await
613            .next()
614            .await
615            .is_some();
616
617        let inactive_state_exists = dbtx
618            .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
619            .await
620            .next()
621            .await
622            .is_some();
623
624        active_state_exists || inactive_state_exists
625    }
626
627    pub async fn has_active_states(&self, operation_id: OperationId) -> bool {
628        self.db
629            .begin_transaction_nc()
630            .await
631            .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
632            .await
633            .next()
634            .await
635            .is_some()
636    }
637
638    /// Waits for an output from the primary module to reach its final
639    /// state.
640    pub async fn await_primary_module_output(
641        &self,
642        operation_id: OperationId,
643        out_point: OutPoint,
644    ) -> anyhow::Result<()> {
645        self.primary_module()
646            .await_primary_module_output(operation_id, out_point)
647            .await
648    }
649
650    /// Returns a reference to a typed module client instance by kind
651    pub fn get_first_module<M: ClientModule>(
652        &'_ self,
653    ) -> anyhow::Result<ClientModuleInstance<'_, M>> {
654        let module_kind = M::kind();
655        let id = self
656            .get_first_instance(&module_kind)
657            .ok_or_else(|| format_err!("No modules found of kind {module_kind}"))?;
658        let module: &M = self
659            .try_get_module(id)
660            .ok_or_else(|| format_err!("Unknown module instance {id}"))?
661            .as_any()
662            .downcast_ref::<M>()
663            .ok_or_else(|| format_err!("Module is not of type {}", std::any::type_name::<M>()))?;
664        let (db, _) = self.db().with_prefix_module_id(id);
665        Ok(ClientModuleInstance {
666            id,
667            db,
668            api: self.api().with_module(id),
669            module,
670        })
671    }
672
673    pub fn get_module_client_dyn(
674        &self,
675        instance_id: ModuleInstanceId,
676    ) -> anyhow::Result<&maybe_add_send_sync!(dyn IClientModule)> {
677        self.try_get_module(instance_id)
678            .ok_or(anyhow!("Unknown module instance {}", instance_id))
679    }
680
681    pub fn db(&self) -> &Database {
682        &self.db
683    }
684
685    /// Returns a stream of transaction updates for the given operation id that
686    /// can later be used to watch for a specific transaction being accepted.
687    pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
688        TransactionUpdates {
689            update_stream: self.transaction_update_stream(operation_id).await,
690        }
691    }
692
693    /// Returns the instance id of the first module of the given kind. The
694    /// primary module will always be returned before any other modules (which
695    /// themselves are ordered by their instance ID).
696    pub fn get_first_instance(&self, module_kind: &ModuleKind) -> Option<ModuleInstanceId> {
697        if self
698            .modules
699            .get_with_kind(self.primary_module_instance)
700            .is_some_and(|(kind, _)| kind == module_kind)
701        {
702            return Some(self.primary_module_instance);
703        }
704
705        self.modules
706            .iter_modules()
707            .find(|(_, kind, _module)| *kind == module_kind)
708            .map(|(instance_id, _, _)| instance_id)
709    }
710
711    /// Returns the data from which the client's root secret is derived (e.g.
712    /// BIP39 seed phrase struct).
713    pub async fn root_secret_encoding<T: Decodable>(&self) -> anyhow::Result<T> {
714        get_decoded_client_secret::<T>(self.db()).await
715    }
716
717    /// Waits for outputs from the primary module to reach its final
718    /// state.
719    pub async fn await_primary_module_outputs(
720        &self,
721        operation_id: OperationId,
722        outputs: Vec<OutPoint>,
723    ) -> anyhow::Result<()> {
724        for out_point in outputs {
725            self.await_primary_module_output(operation_id, out_point)
726                .await?;
727        }
728
729        Ok(())
730    }
731
732    /// Returns the config of the client in JSON format.
733    ///
734    /// Compared to the consensus module format where module configs are binary
735    /// encoded this format cannot be cryptographically verified but is easier
736    /// to consume and to some degree human-readable.
737    pub async fn get_config_json(&self) -> JsonClientConfig {
738        self.config().await.to_json()
739    }
740
741    /// Get the primary module
742    pub fn primary_module(&self) -> &DynClientModule {
743        self.modules
744            .get(self.primary_module_instance)
745            .expect("primary module must be present")
746    }
747
748    /// Balance available to the client for spending
749    pub async fn get_balance(&self) -> Amount {
750        self.primary_module()
751            .get_balance(
752                self.primary_module_instance,
753                &mut self.db().begin_transaction_nc().await,
754            )
755            .await
756    }
757
758    /// Returns a stream that yields the current client balance every time it
759    /// changes.
760    pub async fn subscribe_balance_changes(&self) -> BoxStream<'static, Amount> {
761        let mut balance_changes = self.primary_module().subscribe_balance_changes().await;
762        let initial_balance = self.get_balance().await;
763        let db = self.db().clone();
764        let primary_module = self.primary_module().clone();
765        let primary_module_instance = self.primary_module_instance;
766
767        Box::pin(async_stream::stream! {
768            yield initial_balance;
769            let mut prev_balance = initial_balance;
770            while let Some(()) = balance_changes.next().await {
771                let mut dbtx = db.begin_transaction_nc().await;
772                let balance = primary_module
773                    .get_balance(primary_module_instance, &mut dbtx)
774                    .await;
775
776                // Deduplicate in case modules cannot always tell if the balance actually changed
777                if balance != prev_balance {
778                    prev_balance = balance;
779                    yield balance;
780                }
781            }
782        })
783    }
784
785    /// Query the federation for API version support and then calculate
786    /// the best API version to use (supported by most guardians).
787    pub async fn refresh_peers_api_versions(
788        num_peers: NumPeers,
789        api: DynGlobalApi,
790        db: Database,
791        num_responses_sender: watch::Sender<usize>,
792    ) {
793        // Make a single request to a peer after a delay
794        //
795        // The delay is here to unify the type of a future both for initial request and
796        // possible retries.
797        async fn make_request(
798            delay: Duration,
799            peer_id: PeerId,
800            api: &DynGlobalApi,
801        ) -> (
802            PeerId,
803            Result<SupportedApiVersionsSummary, fedimint_api_client::api::PeerError>,
804        ) {
805            runtime::sleep(delay).await;
806            (
807                peer_id,
808                api.request_single_peer::<SupportedApiVersionsSummary>(
809                    VERSION_ENDPOINT.to_owned(),
810                    ApiRequestErased::default(),
811                    peer_id,
812                )
813                .await,
814            )
815        }
816
817        // NOTE: `FuturesUnordered` is a footgun, but since we only poll it for result
818        // and make a single async db write operation, it should be OK.
819        let mut requests = FuturesUnordered::new();
820
821        for peer_id in num_peers.peer_ids() {
822            requests.push(make_request(Duration::ZERO, peer_id, &api));
823        }
824
825        let mut num_responses = 0;
826
827        while let Some((peer_id, response)) = requests.next().await {
828            match response {
829                Err(err) => {
830                    if db
831                        .begin_transaction_nc()
832                        .await
833                        .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
834                        .await
835                        .is_some()
836                    {
837                        debug!(target: LOG_CLIENT, %peer_id, err = %err.fmt_compact(), "Failed to refresh API versions of a peer, but we have a previous response");
838                    } else {
839                        debug!(target: LOG_CLIENT, %peer_id, err = %err.fmt_compact(), "Failed to refresh API versions of a peer, will retry");
840                        requests.push(make_request(Duration::from_secs(15), peer_id, &api));
841                    }
842                }
843                Ok(o) => {
844                    // Save the response to the database right away, just to
845                    // not lose it
846                    let mut dbtx = db.begin_transaction().await;
847                    dbtx.insert_entry(
848                        &PeerLastApiVersionsSummaryKey(peer_id),
849                        &PeerLastApiVersionsSummary(o),
850                    )
851                    .await;
852                    dbtx.commit_tx().await;
853                    num_responses += 1;
854                    // ignore errors: we don't care if anyone is still listening
855                    num_responses_sender.send_replace(num_responses);
856                }
857            }
858        }
859    }
860
861    /// [`SupportedApiVersionsSummary`] that the client and its modules support
862    pub fn supported_api_versions_summary_static(
863        config: &ClientConfig,
864        client_module_init: &ClientModuleInitRegistry,
865    ) -> SupportedApiVersionsSummary {
866        SupportedApiVersionsSummary {
867            core: SupportedCoreApiVersions {
868                core_consensus: config.global.consensus_version,
869                api: MultiApiVersion::try_from_iter(SUPPORTED_CORE_API_VERSIONS.to_owned())
870                    .expect("must not have conflicting versions"),
871            },
872            modules: config
873                .modules
874                .iter()
875                .filter_map(|(&module_instance_id, module_config)| {
876                    client_module_init
877                        .get(module_config.kind())
878                        .map(|module_init| {
879                            (
880                                module_instance_id,
881                                SupportedModuleApiVersions {
882                                    core_consensus: config.global.consensus_version,
883                                    module_consensus: module_config.version,
884                                    api: module_init.supported_api_versions(),
885                                },
886                            )
887                        })
888                })
889                .collect(),
890        }
891    }
892
893    pub async fn load_and_refresh_common_api_version(&self) -> anyhow::Result<ApiVersionSet> {
894        Self::load_and_refresh_common_api_version_static(
895            &self.config().await,
896            &self.module_inits,
897            &self.api,
898            &self.db,
899            &self.task_group,
900        )
901        .await
902    }
903
904    /// Load the common api versions to use from cache and start a background
905    /// process to refresh them.
906    ///
907    /// This is a compromise, so we not have to wait for version discovery to
908    /// complete every time a [`Client`] is being built.
909    async fn load_and_refresh_common_api_version_static(
910        config: &ClientConfig,
911        module_init: &ClientModuleInitRegistry,
912        api: &DynGlobalApi,
913        db: &Database,
914        task_group: &TaskGroup,
915    ) -> anyhow::Result<ApiVersionSet> {
916        if let Some(v) = db
917            .begin_transaction_nc()
918            .await
919            .get_value(&CachedApiVersionSetKey)
920            .await
921        {
922            debug!(
923                target: LOG_CLIENT,
924                "Found existing cached common api versions"
925            );
926            let config = config.clone();
927            let client_module_init = module_init.clone();
928            let api = api.clone();
929            let db = db.clone();
930            let task_group = task_group.clone();
931            // Separate task group, because we actually don't want to be waiting for this to
932            // finish, and it's just best effort.
933            task_group
934                .clone()
935                .spawn_cancellable("refresh_common_api_version_static", async move {
936                    if let Err(error) = Self::refresh_common_api_version_static(
937                        &config,
938                        &client_module_init,
939                        &api,
940                        &db,
941                        task_group,
942                    )
943                    .await
944                    {
945                        warn!(
946                            target: LOG_CLIENT,
947                            err = %error.fmt_compact_anyhow(), "Failed to discover common api versions"
948                        );
949                    }
950                });
951
952            return Ok(v.0);
953        }
954
955        debug!(
956            target: LOG_CLIENT,
957            "No existing cached common api versions found, waiting for initial discovery"
958        );
959        Self::refresh_common_api_version_static(config, module_init, api, db, task_group.clone())
960            .await
961    }
962
963    async fn refresh_common_api_version_static(
964        config: &ClientConfig,
965        client_module_init: &ClientModuleInitRegistry,
966        api: &DynGlobalApi,
967        db: &Database,
968        task_group: TaskGroup,
969    ) -> anyhow::Result<ApiVersionSet> {
970        debug!(
971            target: LOG_CLIENT,
972            "Refreshing common api versions"
973        );
974
975        let (num_responses_sender, mut num_responses_receiver) = tokio::sync::watch::channel(0);
976        let num_peers = NumPeers::from(config.global.api_endpoints.len());
977
978        task_group.spawn_cancellable("refresh peers api versions", {
979            Client::refresh_peers_api_versions(
980                num_peers,
981                api.clone(),
982                db.clone(),
983                num_responses_sender,
984            )
985        });
986
987        // Wait at most 15 seconds before calculating a set of common api versions to
988        // use. Note that all peers individual responses from previous attempts
989        // are still being used, and requests, or even retries for response of
990        // peers are not actually cancelled, as they are happening on a separate
991        // task. This is all just to bound the time user can be waiting
992        // for the join operation to finish, at the risk of picking wrong version in
993        // very rare circumstances.
994        let _: Result<_, Elapsed> = runtime::timeout(
995            Duration::from_secs(15),
996            num_responses_receiver.wait_for(|num| num_peers.threshold() <= *num),
997        )
998        .await;
999
1000        let peer_api_version_sets = Self::load_peers_last_api_versions(db, num_peers).await;
1001
1002        let common_api_versions =
1003            fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1004                &Self::supported_api_versions_summary_static(config, client_module_init),
1005                &peer_api_version_sets,
1006            )?;
1007
1008        debug!(
1009            target: LOG_CLIENT,
1010            value = ?common_api_versions,
1011            "Updating the cached common api versions"
1012        );
1013        let mut dbtx = db.begin_transaction().await;
1014        let _ = dbtx
1015            .insert_entry(
1016                &CachedApiVersionSetKey,
1017                &CachedApiVersionSet(common_api_versions.clone()),
1018            )
1019            .await;
1020
1021        dbtx.commit_tx().await;
1022
1023        Ok(common_api_versions)
1024    }
1025
1026    /// Get the client [`Metadata`]
1027    pub async fn get_metadata(&self) -> Metadata {
1028        self.db
1029            .begin_transaction_nc()
1030            .await
1031            .get_value(&ClientMetadataKey)
1032            .await
1033            .unwrap_or_else(|| {
1034                warn!(
1035                    target: LOG_CLIENT,
1036                    "Missing existing metadata. This key should have been set on Client init"
1037                );
1038                Metadata::empty()
1039            })
1040    }
1041
1042    /// Set the client [`Metadata`]
1043    pub async fn set_metadata(&self, metadata: &Metadata) {
1044        self.db
1045            .autocommit::<_, _, anyhow::Error>(
1046                |dbtx, _| {
1047                    Box::pin(async {
1048                        Self::set_metadata_dbtx(dbtx, metadata).await;
1049                        Ok(())
1050                    })
1051                },
1052                None,
1053            )
1054            .await
1055            .expect("Failed to autocommit metadata");
1056    }
1057
1058    pub fn has_pending_recoveries(&self) -> bool {
1059        !self
1060            .client_recovery_progress_receiver
1061            .borrow()
1062            .iter()
1063            .all(|(_id, progress)| progress.is_done())
1064    }
1065
1066    /// Wait for all module recoveries to finish
1067    ///
1068    /// This will block until the recovery task is done with recoveries.
1069    /// Returns success if all recovery tasks are complete (success case),
1070    /// or an error if some modules could not complete the recovery at the time.
1071    ///
1072    /// A bit of a heavy approach.
1073    pub async fn wait_for_all_recoveries(&self) -> anyhow::Result<()> {
1074        let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1075        recovery_receiver
1076            .wait_for(|in_progress| {
1077                in_progress
1078                    .iter()
1079                    .all(|(_id, progress)| progress.is_done())
1080            })
1081            .await
1082            .context("Recovery task completed and update receiver disconnected, but some modules failed to recover")?;
1083
1084        Ok(())
1085    }
1086
1087    /// Subscribe to recover progress for all the modules.
1088    ///
1089    /// This stream can contain duplicate progress for a module.
1090    /// Don't use this stream for detecting completion of recovery.
1091    pub fn subscribe_to_recovery_progress(
1092        &self,
1093    ) -> impl Stream<Item = (ModuleInstanceId, RecoveryProgress)> + use<> {
1094        WatchStream::new(self.client_recovery_progress_receiver.clone())
1095            .flat_map(futures::stream::iter)
1096    }
1097
1098    pub async fn wait_for_module_kind_recovery(
1099        &self,
1100        module_kind: ModuleKind,
1101    ) -> anyhow::Result<()> {
1102        let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1103        let config = self.config().await;
1104        recovery_receiver
1105            .wait_for(|in_progress| {
1106                !in_progress
1107                    .iter()
1108                    .filter(|(module_instance_id, _progress)| {
1109                        config.modules[module_instance_id].kind == module_kind
1110                    })
1111                    .any(|(_id, progress)| !progress.is_done())
1112            })
1113            .await
1114            .context("Recovery task completed and update receiver disconnected, but the desired modules are still unavailable or failed to recover")?;
1115
1116        Ok(())
1117    }
1118
1119    pub async fn wait_for_all_active_state_machines(&self) -> anyhow::Result<()> {
1120        loop {
1121            if self.executor.get_active_states().await.is_empty() {
1122                break;
1123            }
1124            fedimint_core::runtime::sleep(Duration::from_millis(100)).await;
1125        }
1126        Ok(())
1127    }
1128
1129    /// Set the client [`Metadata`]
1130    pub async fn set_metadata_dbtx(dbtx: &mut DatabaseTransaction<'_>, metadata: &Metadata) {
1131        dbtx.insert_new_entry(&ClientMetadataKey, metadata).await;
1132    }
1133
1134    fn spawn_module_recoveries_task(
1135        &self,
1136        recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1137        module_recoveries: BTreeMap<
1138            ModuleInstanceId,
1139            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1140        >,
1141        module_recovery_progress_receivers: BTreeMap<
1142            ModuleInstanceId,
1143            watch::Receiver<RecoveryProgress>,
1144        >,
1145    ) {
1146        let db = self.db.clone();
1147        let log_ordering_wakeup_tx = self.log_ordering_wakeup_tx.clone();
1148        self.task_group
1149            .spawn("module recoveries", |_task_handle| async {
1150                Self::run_module_recoveries_task(
1151                    db,
1152                    log_ordering_wakeup_tx,
1153                    recovery_sender,
1154                    module_recoveries,
1155                    module_recovery_progress_receivers,
1156                )
1157                .await;
1158            });
1159    }
1160
1161    async fn run_module_recoveries_task(
1162        db: Database,
1163        log_ordering_wakeup_tx: watch::Sender<()>,
1164        recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1165        module_recoveries: BTreeMap<
1166            ModuleInstanceId,
1167            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1168        >,
1169        module_recovery_progress_receivers: BTreeMap<
1170            ModuleInstanceId,
1171            watch::Receiver<RecoveryProgress>,
1172        >,
1173    ) {
1174        debug!(target: LOG_CLIENT_RECOVERY, num_modules=%module_recovery_progress_receivers.len(), "Staring module recoveries");
1175        let mut completed_stream = Vec::new();
1176        let progress_stream = futures::stream::FuturesUnordered::new();
1177
1178        for (module_instance_id, f) in module_recoveries {
1179            completed_stream.push(futures::stream::once(Box::pin(async move {
1180                match f.await {
1181                    Ok(()) => (module_instance_id, None),
1182                    Err(err) => {
1183                        warn!(
1184                            target: LOG_CLIENT,
1185                            err = %err.fmt_compact_anyhow(), module_instance_id, "Module recovery failed"
1186                        );
1187                        // a module recovery that failed reports and error and
1188                        // just never finishes, so we don't need a separate state
1189                        // for it
1190                        futures::future::pending::<()>().await;
1191                        unreachable!()
1192                    }
1193                }
1194            })));
1195        }
1196
1197        for (module_instance_id, rx) in module_recovery_progress_receivers {
1198            progress_stream.push(
1199                tokio_stream::wrappers::WatchStream::new(rx)
1200                    .fuse()
1201                    .map(move |progress| (module_instance_id, Some(progress))),
1202            );
1203        }
1204
1205        let mut futures = futures::stream::select(
1206            futures::stream::select_all(progress_stream),
1207            futures::stream::select_all(completed_stream),
1208        );
1209
1210        while let Some((module_instance_id, progress)) = futures.next().await {
1211            let mut dbtx = db.begin_transaction().await;
1212
1213            let prev_progress = *recovery_sender
1214                .borrow()
1215                .get(&module_instance_id)
1216                .expect("existing progress must be present");
1217
1218            let progress = if prev_progress.is_done() {
1219                // since updates might be out of order, once done, stick with it
1220                prev_progress
1221            } else if let Some(progress) = progress {
1222                progress
1223            } else {
1224                prev_progress.to_complete()
1225            };
1226
1227            if !prev_progress.is_done() && progress.is_done() {
1228                info!(
1229                    target: LOG_CLIENT,
1230                    module_instance_id,
1231                    progress = format!("{}/{}", progress.complete, progress.total),
1232                    "Recovery complete"
1233                );
1234                dbtx.log_event(
1235                    log_ordering_wakeup_tx.clone(),
1236                    None,
1237                    ModuleRecoveryCompleted {
1238                        module_id: module_instance_id,
1239                    },
1240                )
1241                .await;
1242            } else {
1243                info!(
1244                    target: LOG_CLIENT,
1245                    module_instance_id,
1246                    progress = format!("{}/{}", progress.complete, progress.total),
1247                    "Recovery progress"
1248                );
1249            }
1250
1251            dbtx.insert_entry(
1252                &ClientModuleRecovery { module_instance_id },
1253                &ClientModuleRecoveryState { progress },
1254            )
1255            .await;
1256            dbtx.commit_tx().await;
1257
1258            recovery_sender.send_modify(|v| {
1259                v.insert(module_instance_id, progress);
1260            });
1261        }
1262        debug!(target: LOG_CLIENT_RECOVERY, "Recovery executor stopped");
1263    }
1264
1265    async fn load_peers_last_api_versions(
1266        db: &Database,
1267        num_peers: NumPeers,
1268    ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1269        let mut peer_api_version_sets = BTreeMap::new();
1270
1271        let mut dbtx = db.begin_transaction_nc().await;
1272        for peer_id in num_peers.peer_ids() {
1273            if let Some(v) = dbtx
1274                .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1275                .await
1276            {
1277                peer_api_version_sets.insert(peer_id, v.0);
1278            }
1279        }
1280        drop(dbtx);
1281        peer_api_version_sets
1282    }
1283
1284    /// You likely want to use [`Client::get_peer_urls`]. This function returns
1285    /// only the announcements and doesn't use the config as fallback.
1286    pub async fn get_peer_url_announcements(&self) -> BTreeMap<PeerId, SignedApiAnnouncement> {
1287        self.db()
1288            .begin_transaction_nc()
1289            .await
1290            .find_by_prefix(&ApiAnnouncementPrefix)
1291            .await
1292            .map(|(announcement_key, announcement)| (announcement_key.0, announcement))
1293            .collect()
1294            .await
1295    }
1296
1297    /// Returns a list of guardian API URLs
1298    pub async fn get_peer_urls(&self) -> BTreeMap<PeerId, SafeUrl> {
1299        get_api_urls(&self.db, &self.config().await).await
1300    }
1301
1302    /// Create an invite code with the api endpoint of the given peer which can
1303    /// be used to download this client config
1304    pub async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
1305        self.get_peer_urls()
1306            .await
1307            .into_iter()
1308            .find_map(|(peer_id, url)| (peer == peer_id).then_some(url))
1309            .map(|peer_url| {
1310                InviteCode::new(
1311                    peer_url.clone(),
1312                    peer,
1313                    self.federation_id(),
1314                    self.api_secret.clone(),
1315                )
1316            })
1317    }
1318
1319    /// Blocks till the client has synced the guardian public key set
1320    /// (introduced in version 0.4) and returns it. Once it has been fetched
1321    /// once this function is guaranteed to return immediately.
1322    pub async fn get_guardian_public_keys_blocking(
1323        &self,
1324    ) -> BTreeMap<PeerId, fedimint_core::secp256k1::PublicKey> {
1325        self.db
1326            .autocommit(
1327                |dbtx, _| {
1328                    Box::pin(async move {
1329                        let config = self.config().await;
1330
1331                        let guardian_pub_keys = self
1332                            .get_or_backfill_broadcast_public_keys(dbtx, config)
1333                            .await;
1334
1335                        Result::<_, ()>::Ok(guardian_pub_keys)
1336                    })
1337                },
1338                None,
1339            )
1340            .await
1341            .expect("Will retry forever")
1342    }
1343
1344    async fn get_or_backfill_broadcast_public_keys(
1345        &self,
1346        dbtx: &mut DatabaseTransaction<'_>,
1347        config: ClientConfig,
1348    ) -> BTreeMap<PeerId, PublicKey> {
1349        match config.global.broadcast_public_keys {
1350            Some(guardian_pub_keys) => guardian_pub_keys,
1351            _ => {
1352                let (guardian_pub_keys, new_config) = self.fetch_and_update_config(config).await;
1353
1354                dbtx.insert_entry(&ClientConfigKey, &new_config).await;
1355                *(self.config.write().await) = new_config;
1356                guardian_pub_keys
1357            }
1358        }
1359    }
1360
1361    async fn fetch_session_count(&self) -> FederationResult<u64> {
1362        self.api.session_count().await
1363    }
1364
1365    async fn fetch_and_update_config(
1366        &self,
1367        config: ClientConfig,
1368    ) -> (BTreeMap<PeerId, PublicKey>, ClientConfig) {
1369        let fetched_config = retry(
1370            "Fetching guardian public keys",
1371            backoff_util::background_backoff(),
1372            || async {
1373                Ok(self
1374                    .api
1375                    .request_current_consensus::<ClientConfig>(
1376                        CLIENT_CONFIG_ENDPOINT.to_owned(),
1377                        ApiRequestErased::default(),
1378                    )
1379                    .await?)
1380            },
1381        )
1382        .await
1383        .expect("Will never return on error");
1384
1385        let Some(guardian_pub_keys) = fetched_config.global.broadcast_public_keys else {
1386            warn!(
1387                target: LOG_CLIENT,
1388                "Guardian public keys not found in fetched config, server not updated to 0.4 yet"
1389            );
1390            pending::<()>().await;
1391            unreachable!("Pending will never return");
1392        };
1393
1394        let new_config = ClientConfig {
1395            global: GlobalClientConfig {
1396                broadcast_public_keys: Some(guardian_pub_keys.clone()),
1397                ..config.global
1398            },
1399            modules: config.modules,
1400        };
1401        (guardian_pub_keys, new_config)
1402    }
1403
1404    pub fn handle_global_rpc(
1405        &self,
1406        method: String,
1407        params: serde_json::Value,
1408    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1409        Box::pin(try_stream! {
1410            match method.as_str() {
1411                "get_balance" => {
1412                    let balance = self.get_balance().await;
1413                    yield serde_json::to_value(balance)?;
1414                }
1415                "subscribe_balance_changes" => {
1416                    let mut stream = self.subscribe_balance_changes().await;
1417                    while let Some(balance) = stream.next().await {
1418                        yield serde_json::to_value(balance)?;
1419                    }
1420                }
1421                "get_config" => {
1422                    let config = self.config().await;
1423                    yield serde_json::to_value(config)?;
1424                }
1425                "get_federation_id" => {
1426                    let federation_id = self.federation_id();
1427                    yield serde_json::to_value(federation_id)?;
1428                }
1429                "get_invite_code" => {
1430                    let req: GetInviteCodeRequest = serde_json::from_value(params)?;
1431                    let invite_code = self.invite_code(req.peer).await;
1432                    yield serde_json::to_value(invite_code)?;
1433                }
1434                "get_operation" => {
1435                    let req: GetOperationIdRequest = serde_json::from_value(params)?;
1436                    let operation = self.operation_log().get_operation(req.operation_id).await;
1437                    yield serde_json::to_value(operation)?;
1438                }
1439                "list_operations" => {
1440                    let req: ListOperationsParams = serde_json::from_value(params)?;
1441                    let limit = if req.limit.is_none() && req.last_seen.is_none() {
1442                        usize::MAX
1443                    } else {
1444                        req.limit.unwrap_or(usize::MAX)
1445                    };
1446                    let operations = self.operation_log()
1447                        .paginate_operations_rev(limit, req.last_seen)
1448                        .await;
1449                    yield serde_json::to_value(operations)?;
1450                }
1451                "session_count" => {
1452                    let count = self.fetch_session_count().await?;
1453                    yield serde_json::to_value(count)?;
1454                }
1455                "has_pending_recoveries" => {
1456                    let has_pending = self.has_pending_recoveries();
1457                    yield serde_json::to_value(has_pending)?;
1458                }
1459                "wait_for_all_recoveries" => {
1460                    self.wait_for_all_recoveries().await?;
1461                    yield serde_json::Value::Null;
1462                }
1463                "subscribe_to_recovery_progress" => {
1464                    let mut stream = self.subscribe_to_recovery_progress();
1465                    while let Some((module_id, progress)) = stream.next().await {
1466                        yield serde_json::json!({
1467                            "module_id": module_id,
1468                            "progress": progress
1469                        });
1470                    }
1471                }
1472                "backup_to_federation" => {
1473                    let metadata = if params.is_null() {
1474                        Metadata::from_json_serialized(serde_json::json!({}))
1475                    } else {
1476                        Metadata::from_json_serialized(params)
1477                    };
1478                    self.backup_to_federation(metadata).await?;
1479                    yield serde_json::Value::Null;
1480                }
1481                _ => {
1482                    Err(anyhow::format_err!("Unknown method: {}", method))?;
1483                    unreachable!()
1484                },
1485            }
1486        })
1487    }
1488
1489    pub async fn log_event<E>(&self, module_id: Option<ModuleInstanceId>, event: E)
1490    where
1491        E: Event + Send,
1492    {
1493        let mut dbtx = self.db.begin_transaction().await;
1494        self.log_event_dbtx(&mut dbtx, module_id, event).await;
1495        dbtx.commit_tx().await;
1496    }
1497
1498    pub async fn log_event_dbtx<E, Cap>(
1499        &self,
1500        dbtx: &mut DatabaseTransaction<'_, Cap>,
1501        module_id: Option<ModuleInstanceId>,
1502        event: E,
1503    ) where
1504        E: Event + Send,
1505        Cap: Send,
1506    {
1507        dbtx.log_event(self.log_ordering_wakeup_tx.clone(), module_id, event)
1508            .await;
1509    }
1510
1511    pub async fn log_event_raw_dbtx<Cap>(
1512        &self,
1513        dbtx: &mut DatabaseTransaction<'_, Cap>,
1514        kind: EventKind,
1515        module: Option<(ModuleKind, ModuleInstanceId)>,
1516        payload: Vec<u8>,
1517        persist: EventPersistence,
1518    ) where
1519        Cap: Send,
1520    {
1521        let module_id = module.as_ref().map(|m| m.1);
1522        let module_kind = module.map(|m| m.0);
1523        dbtx.log_event_raw(
1524            self.log_ordering_wakeup_tx.clone(),
1525            kind,
1526            module_kind,
1527            module_id,
1528            payload,
1529            persist,
1530        )
1531        .await;
1532    }
1533
1534    pub async fn handle_events<F, R, K>(&self, pos_key: &K, call_fn: F) -> anyhow::Result<()>
1535    where
1536        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1537        K: DatabaseRecord<Value = EventLogId>,
1538        F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
1539        R: Future<Output = anyhow::Result<()>>,
1540    {
1541        fedimint_eventlog::handle_events(
1542            self.db.clone(),
1543            pos_key,
1544            self.log_event_added_rx.clone(),
1545            call_fn,
1546        )
1547        .await
1548    }
1549
1550    pub async fn handle_trimable_events<F, R, K>(
1551        &self,
1552        pos_key: &K,
1553        call_fn: F,
1554    ) -> anyhow::Result<()>
1555    where
1556        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1557        K: DatabaseRecord<Value = EventLogTrimableId>,
1558        F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
1559        R: Future<Output = anyhow::Result<()>>,
1560    {
1561        fedimint_eventlog::handle_trimable_events(
1562            self.db.clone(),
1563            pos_key,
1564            self.log_event_added_rx.clone(),
1565            call_fn,
1566        )
1567        .await
1568    }
1569
1570    pub async fn get_event_log(
1571        &self,
1572        pos: Option<EventLogId>,
1573        limit: u64,
1574    ) -> Vec<PersistedLogEntry> {
1575        self.get_event_log_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
1576            .await
1577    }
1578
1579    pub async fn get_event_log_trimable(
1580        &self,
1581        pos: Option<EventLogTrimableId>,
1582        limit: u64,
1583    ) -> Vec<PersistedLogEntry> {
1584        self.get_event_log_trimable_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
1585            .await
1586    }
1587
1588    pub async fn get_event_log_dbtx<Cap>(
1589        &self,
1590        dbtx: &mut DatabaseTransaction<'_, Cap>,
1591        pos: Option<EventLogId>,
1592        limit: u64,
1593    ) -> Vec<PersistedLogEntry>
1594    where
1595        Cap: Send,
1596    {
1597        dbtx.get_event_log(pos, limit).await
1598    }
1599
1600    pub async fn get_event_log_trimable_dbtx<Cap>(
1601        &self,
1602        dbtx: &mut DatabaseTransaction<'_, Cap>,
1603        pos: Option<EventLogTrimableId>,
1604        limit: u64,
1605    ) -> Vec<PersistedLogEntry>
1606    where
1607        Cap: Send,
1608    {
1609        dbtx.get_event_log_trimable(pos, limit).await
1610    }
1611
1612    /// Register to receiver all new transient (unpersisted) events
1613    pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
1614        self.log_event_added_transient_tx.subscribe()
1615    }
1616}
1617
1618#[apply(async_trait_maybe_send!)]
1619impl ClientContextIface for Client {
1620    fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
1621        Client::get_module(self, instance)
1622    }
1623
1624    fn api_clone(&self) -> DynGlobalApi {
1625        Client::api_clone(self)
1626    }
1627    fn decoders(&self) -> &ModuleDecoderRegistry {
1628        Client::decoders(self)
1629    }
1630
1631    async fn finalize_and_submit_transaction(
1632        &self,
1633        operation_id: OperationId,
1634        operation_type: &str,
1635        operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
1636        tx_builder: TransactionBuilder,
1637    ) -> anyhow::Result<OutPointRange> {
1638        Client::finalize_and_submit_transaction(
1639            self,
1640            operation_id,
1641            operation_type,
1642            // |out_point_range| operation_meta_gen(out_point_range),
1643            &operation_meta_gen,
1644            tx_builder,
1645        )
1646        .await
1647    }
1648
1649    async fn finalize_and_submit_transaction_inner(
1650        &self,
1651        dbtx: &mut DatabaseTransaction<'_>,
1652        operation_id: OperationId,
1653        tx_builder: TransactionBuilder,
1654    ) -> anyhow::Result<OutPointRange> {
1655        Client::finalize_and_submit_transaction_inner(self, dbtx, operation_id, tx_builder).await
1656    }
1657
1658    async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
1659        Client::transaction_updates(self, operation_id).await
1660    }
1661
1662    async fn await_primary_module_outputs(
1663        &self,
1664        operation_id: OperationId,
1665        // TODO: make `impl Iterator<Item = ...>`
1666        outputs: Vec<OutPoint>,
1667    ) -> anyhow::Result<()> {
1668        Client::await_primary_module_outputs(self, operation_id, outputs).await
1669    }
1670
1671    fn operation_log(&self) -> &dyn IOperationLog {
1672        Client::operation_log(self)
1673    }
1674
1675    async fn has_active_states(&self, operation_id: OperationId) -> bool {
1676        Client::has_active_states(self, operation_id).await
1677    }
1678
1679    async fn operation_exists(&self, operation_id: OperationId) -> bool {
1680        Client::operation_exists(self, operation_id).await
1681    }
1682
1683    async fn config(&self) -> ClientConfig {
1684        Client::config(self).await
1685    }
1686
1687    fn db(&self) -> &Database {
1688        Client::db(self)
1689    }
1690
1691    fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static)) {
1692        Client::executor(self)
1693    }
1694
1695    async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
1696        Client::invite_code(self, peer).await
1697    }
1698
1699    fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
1700        Client::get_internal_payment_markers(self)
1701    }
1702
1703    async fn log_event_json(
1704        &self,
1705        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
1706        module_kind: Option<ModuleKind>,
1707        module_id: ModuleInstanceId,
1708        kind: EventKind,
1709        payload: serde_json::Value,
1710        persist: EventPersistence,
1711    ) {
1712        dbtx.ensure_global()
1713            .expect("Must be called with global dbtx");
1714        self.log_event_raw_dbtx(
1715            dbtx,
1716            kind,
1717            module_kind.map(|kind| (kind, module_id)),
1718            serde_json::to_vec(&payload).expect("Serialization can't fail"),
1719            persist,
1720        )
1721        .await;
1722    }
1723
1724    async fn read_operation_active_states<'dbtx>(
1725        &self,
1726        operation_id: OperationId,
1727        module_id: ModuleInstanceId,
1728        dbtx: &'dbtx mut DatabaseTransaction<'_>,
1729    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>
1730    {
1731        Box::pin(
1732            dbtx.find_by_prefix(&ActiveModuleOperationStateKeyPrefix {
1733                operation_id,
1734                module_instance: module_id,
1735            })
1736            .await
1737            .map(move |(k, v)| (k.0, v)),
1738        )
1739    }
1740    async fn read_operation_inactive_states<'dbtx>(
1741        &self,
1742        operation_id: OperationId,
1743        module_id: ModuleInstanceId,
1744        dbtx: &'dbtx mut DatabaseTransaction<'_>,
1745    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>
1746    {
1747        Box::pin(
1748            dbtx.find_by_prefix(&InactiveModuleOperationStateKeyPrefix {
1749                operation_id,
1750                module_instance: module_id,
1751            })
1752            .await
1753            .map(move |(k, v)| (k.0, v)),
1754        )
1755    }
1756}
1757
1758// TODO: impl `Debug` for `Client` and derive here
1759impl fmt::Debug for Client {
1760    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1761        write!(f, "Client")
1762    }
1763}
1764
1765pub fn client_decoders<'a>(
1766    registry: &ModuleInitRegistry<DynClientModuleInit>,
1767    module_kinds: impl Iterator<Item = (ModuleInstanceId, &'a ModuleKind)>,
1768) -> ModuleDecoderRegistry {
1769    let mut modules = BTreeMap::new();
1770    for (id, kind) in module_kinds {
1771        let Some(init) = registry.get(kind) else {
1772            debug!("Detected configuration for unsupported module id: {id}, kind: {kind}");
1773            continue;
1774        };
1775
1776        modules.insert(
1777            id,
1778            (
1779                kind.clone(),
1780                IClientModuleInit::decoder(AsRef::<dyn IClientModuleInit + 'static>::as_ref(init)),
1781            ),
1782        );
1783    }
1784    ModuleDecoderRegistry::from(modules)
1785}