fedimint_client/
client.rs

1use std::collections::{BTreeMap, HashSet};
2use std::fmt::{self, Formatter};
3use std::future::{Future, pending};
4use std::ops::Range;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9use anyhow::{Context as _, anyhow, bail, format_err};
10use async_stream::try_stream;
11use bitcoin::key::Secp256k1;
12use bitcoin::key::rand::thread_rng;
13use bitcoin::secp256k1::{self, PublicKey};
14use fedimint_api_client::api::global_api::with_request_hook::ApiRequestHook;
15use fedimint_api_client::api::net::Connector;
16use fedimint_api_client::api::{
17    ApiVersionSet, DynGlobalApi, FederationApiExt as _, FederationResult, IGlobalFederationApi,
18};
19use fedimint_client_module::module::recovery::RecoveryProgress;
20use fedimint_client_module::module::{
21    ClientContextIface, ClientModule, ClientModuleRegistry, DynClientModule, FinalClientIface,
22    IClientModule, IdxRange, OutPointRange,
23};
24use fedimint_client_module::oplog::IOperationLog;
25use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy as _};
26use fedimint_client_module::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
27use fedimint_client_module::sm::{ActiveStateMeta, DynState, InactiveStateMeta};
28use fedimint_client_module::transaction::{
29    TRANSACTION_SUBMISSION_MODULE_INSTANCE, TransactionBuilder, TxSubmissionStates,
30    TxSubmissionStatesSM,
31};
32use fedimint_client_module::{
33    AddStateMachinesResult, ClientModuleInstance, GetInviteCodeRequest, ModuleGlobalContextGen,
34    ModuleRecoveryCompleted, TransactionUpdates, TxCreatedEvent,
35};
36use fedimint_core::config::{
37    ClientConfig, FederationId, GlobalClientConfig, JsonClientConfig, ModuleInitRegistry,
38};
39use fedimint_core::core::{DynInput, DynOutput, ModuleInstanceId, ModuleKind, OperationId};
40use fedimint_core::db::{
41    AutocommitError, Database, DatabaseKey, DatabaseRecord, DatabaseTransaction,
42    IDatabaseTransactionOpsCore as _, IDatabaseTransactionOpsCoreTyped as _, NonCommittable,
43};
44use fedimint_core::encoding::{Decodable, Encodable};
45use fedimint_core::endpoint_constants::{CLIENT_CONFIG_ENDPOINT, VERSION_ENDPOINT};
46use fedimint_core::envs::is_running_in_test_env;
47use fedimint_core::invite_code::InviteCode;
48use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
49use fedimint_core::module::{
50    ApiRequestErased, ApiVersion, MultiApiVersion, SupportedApiVersionsSummary,
51    SupportedCoreApiVersions, SupportedModuleApiVersions,
52};
53use fedimint_core::net::api_announcement::SignedApiAnnouncement;
54use fedimint_core::task::{Elapsed, MaybeSend, MaybeSync, TaskGroup};
55use fedimint_core::transaction::Transaction;
56use fedimint_core::util::backoff_util::custom_backoff;
57use fedimint_core::util::{
58    BoxStream, FmtCompact as _, FmtCompactAnyhow as _, SafeUrl, backoff_util, retry,
59};
60use fedimint_core::{
61    Amount, NumPeers, OutPoint, PeerId, apply, async_trait_maybe_send, maybe_add_send,
62    maybe_add_send_sync, runtime,
63};
64use fedimint_derive_secret::DerivableSecret;
65use fedimint_eventlog::{
66    DBTransactionEventLogExt as _, Event, EventKind, EventLogEntry, EventLogId, EventLogTrimableId,
67    EventPersistence, PersistedLogEntry,
68};
69use fedimint_logging::{LOG_CLIENT, LOG_CLIENT_NET_API, LOG_CLIENT_RECOVERY};
70use futures::stream::FuturesUnordered;
71use futures::{Stream, StreamExt as _};
72use global_ctx::ModuleGlobalClientContext;
73use serde::{Deserialize, Serialize};
74use tokio::sync::{broadcast, watch};
75use tokio_stream::wrappers::WatchStream;
76use tracing::{debug, info, warn};
77
78use crate::ClientBuilder;
79use crate::api_announcements::{ApiAnnouncementPrefix, get_api_urls};
80use crate::backup::Metadata;
81use crate::db::{
82    ApiSecretKey, CachedApiVersionSet, CachedApiVersionSetKey, ChronologicalOperationLogKey,
83    ClientConfigKey, ClientMetadataKey, ClientModuleRecovery, ClientModuleRecoveryState,
84    EncodedClientSecretKey, OperationLogKey, PeerLastApiVersionsSummary,
85    PeerLastApiVersionsSummaryKey, PendingClientConfigKey, apply_migrations_core_client_dbtx,
86    get_decoded_client_secret, verify_client_db_integrity_dbtx,
87};
88use crate::meta::MetaService;
89use crate::module_init::{ClientModuleInitRegistry, DynClientModuleInit, IClientModuleInit};
90use crate::oplog::OperationLog;
91use crate::sm::executor::{
92    ActiveModuleOperationStateKeyPrefix, ActiveOperationStateKeyPrefix, Executor,
93    InactiveModuleOperationStateKeyPrefix, InactiveOperationStateKeyPrefix,
94};
95
96pub(crate) mod builder;
97pub(crate) mod global_ctx;
98pub(crate) mod handle;
99
100/// List of core api versions supported by the implementation.
101/// Notably `major` version is the one being supported, and corresponding
102/// `minor` version is the one required (for given `major` version).
103const SUPPORTED_CORE_API_VERSIONS: &[fedimint_core::module::ApiVersion] =
104    &[ApiVersion { major: 0, minor: 0 }];
105
106/// Main client type
107///
108/// A handle and API to interacting with a single federation. End user
109/// applications that want to support interacting with multiple federations at
110/// the same time, will need to instantiate and manage multiple instances of
111/// this struct.
112///
113/// Under the hood it is starting and managing service tasks, state machines,
114/// database and other resources required.
115///
116/// This type is shared externally and internally, and
117/// [`crate::ClientHandle`] is responsible for external lifecycle management
118/// and resource freeing of the [`Client`].
119pub struct Client {
120    final_client: FinalClientIface,
121    config: tokio::sync::RwLock<ClientConfig>,
122    api_secret: Option<String>,
123    decoders: ModuleDecoderRegistry,
124    db: Database,
125    federation_id: FederationId,
126    federation_config_meta: BTreeMap<String, String>,
127    primary_module_instance: ModuleInstanceId,
128    pub(crate) modules: ClientModuleRegistry,
129    module_inits: ClientModuleInitRegistry,
130    executor: Executor,
131    pub(crate) api: DynGlobalApi,
132    root_secret: DerivableSecret,
133    operation_log: OperationLog,
134    secp_ctx: Secp256k1<secp256k1::All>,
135    meta_service: Arc<MetaService>,
136    connector: Connector,
137
138    task_group: TaskGroup,
139
140    /// Updates about client recovery progress
141    client_recovery_progress_receiver:
142        watch::Receiver<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
143
144    /// Internal client sender to wake up log ordering task every time a
145    /// (unuordered) log event is added.
146    log_ordering_wakeup_tx: watch::Sender<()>,
147    /// Receiver for events fired every time (ordered) log event is added.
148    log_event_added_rx: watch::Receiver<()>,
149    log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
150    request_hook: ApiRequestHook,
151    iroh_enable_dht: bool,
152    iroh_enable_next: bool,
153}
154
155#[derive(Debug, Serialize, Deserialize)]
156struct ListOperationsParams {
157    limit: Option<usize>,
158    last_seen: Option<ChronologicalOperationLogKey>,
159}
160
161#[derive(Debug, Clone, Serialize, Deserialize)]
162pub struct GetOperationIdRequest {
163    operation_id: OperationId,
164}
165
166impl Client {
167    /// Initialize a client builder that can be configured to create a new
168    /// client.
169    pub async fn builder() -> anyhow::Result<ClientBuilder> {
170        Ok(ClientBuilder::new())
171    }
172
173    pub fn api(&self) -> &(dyn IGlobalFederationApi + 'static) {
174        self.api.as_ref()
175    }
176
177    pub fn api_clone(&self) -> DynGlobalApi {
178        self.api.clone()
179    }
180
181    /// Get the [`TaskGroup`] that is tied to Client's lifetime.
182    pub fn task_group(&self) -> &TaskGroup {
183        &self.task_group
184    }
185
186    /// Useful for our CLI tooling, not meant for external use
187    #[doc(hidden)]
188    pub fn executor(&self) -> &Executor {
189        &self.executor
190    }
191
192    pub async fn get_config_from_db(db: &Database) -> Option<ClientConfig> {
193        let mut dbtx = db.begin_transaction_nc().await;
194        dbtx.get_value(&ClientConfigKey).await
195    }
196
197    pub async fn get_pending_config_from_db(db: &Database) -> Option<ClientConfig> {
198        let mut dbtx = db.begin_transaction_nc().await;
199        dbtx.get_value(&PendingClientConfigKey).await
200    }
201
202    pub async fn get_api_secret_from_db(db: &Database) -> Option<String> {
203        let mut dbtx = db.begin_transaction_nc().await;
204        dbtx.get_value(&ApiSecretKey).await
205    }
206
207    pub async fn store_encodable_client_secret<T: Encodable>(
208        db: &Database,
209        secret: T,
210    ) -> anyhow::Result<()> {
211        let mut dbtx = db.begin_transaction().await;
212
213        // Don't overwrite an existing secret
214        if dbtx.get_value(&EncodedClientSecretKey).await.is_some() {
215            bail!("Encoded client secret already exists, cannot overwrite")
216        }
217
218        let encoded_secret = T::consensus_encode_to_vec(&secret);
219        dbtx.insert_entry(&EncodedClientSecretKey, &encoded_secret)
220            .await;
221        dbtx.commit_tx().await;
222        Ok(())
223    }
224
225    pub async fn load_decodable_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
226        let Some(secret) = Self::load_decodable_client_secret_opt(db).await? else {
227            bail!("Encoded client secret not present in DB")
228        };
229
230        Ok(secret)
231    }
232    pub async fn load_decodable_client_secret_opt<T: Decodable>(
233        db: &Database,
234    ) -> anyhow::Result<Option<T>> {
235        let mut dbtx = db.begin_transaction_nc().await;
236
237        let client_secret = dbtx.get_value(&EncodedClientSecretKey).await;
238
239        Ok(match client_secret {
240            Some(client_secret) => Some(
241                T::consensus_decode_whole(&client_secret, &ModuleRegistry::default())
242                    .map_err(|e| anyhow!("Decoding failed: {e}"))?,
243            ),
244            None => None,
245        })
246    }
247
248    pub async fn load_or_generate_client_secret(db: &Database) -> anyhow::Result<[u8; 64]> {
249        let client_secret = match Self::load_decodable_client_secret::<[u8; 64]>(db).await {
250            Ok(secret) => secret,
251            _ => {
252                let secret = PlainRootSecretStrategy::random(&mut thread_rng());
253                Self::store_encodable_client_secret(db, secret)
254                    .await
255                    .expect("Storing client secret must work");
256                secret
257            }
258        };
259        Ok(client_secret)
260    }
261
262    pub async fn is_initialized(db: &Database) -> bool {
263        let mut dbtx = db.begin_transaction_nc().await;
264        dbtx.raw_get_bytes(&[ClientConfigKey::DB_PREFIX])
265            .await
266            .expect("Unrecoverable error occurred while reading and entry from the database")
267            .is_some()
268    }
269
270    pub fn start_executor(self: &Arc<Self>) {
271        debug!(
272            target: LOG_CLIENT,
273            "Starting fedimint client executor",
274        );
275        self.executor.start_executor(self.context_gen());
276    }
277
278    pub fn federation_id(&self) -> FederationId {
279        self.federation_id
280    }
281
282    fn context_gen(self: &Arc<Self>) -> ModuleGlobalContextGen {
283        let client_inner = Arc::downgrade(self);
284        Arc::new(move |module_instance, operation| {
285            ModuleGlobalClientContext {
286                client: client_inner
287                    .clone()
288                    .upgrade()
289                    .expect("ModuleGlobalContextGen called after client was dropped"),
290                module_instance_id: module_instance,
291                operation,
292            }
293            .into()
294        })
295    }
296
297    pub async fn config(&self) -> ClientConfig {
298        self.config.read().await.clone()
299    }
300
301    pub fn api_secret(&self) -> &Option<String> {
302        &self.api_secret
303    }
304
305    pub fn decoders(&self) -> &ModuleDecoderRegistry {
306        &self.decoders
307    }
308
309    /// Returns a reference to the module, panics if not found
310    fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
311        self.try_get_module(instance)
312            .expect("Module instance not found")
313    }
314
315    fn try_get_module(
316        &self,
317        instance: ModuleInstanceId,
318    ) -> Option<&maybe_add_send_sync!(dyn IClientModule)> {
319        Some(self.modules.get(instance)?.as_ref())
320    }
321
322    pub fn has_module(&self, instance: ModuleInstanceId) -> bool {
323        self.modules.get(instance).is_some()
324    }
325
326    /// Returns the input amount and output amount of a transaction
327    ///
328    /// # Panics
329    /// If any of the input or output versions in the transaction builder are
330    /// unknown by the respective module.
331    fn transaction_builder_balance(&self, builder: &TransactionBuilder) -> (Amount, Amount) {
332        // FIXME: prevent overflows, currently not suitable for untrusted input
333        let mut in_amount = Amount::ZERO;
334        let mut out_amount = Amount::ZERO;
335        let mut fee_amount = Amount::ZERO;
336
337        for input in builder.inputs() {
338            let module = self.get_module(input.input.module_instance_id());
339
340            let item_fee = module.input_fee(input.amount, &input.input).expect(
341                "We only build transactions with input versions that are supported by the module",
342            );
343
344            in_amount += input.amount;
345            fee_amount += item_fee;
346        }
347
348        for output in builder.outputs() {
349            let module = self.get_module(output.output.module_instance_id());
350
351            let item_fee = module.output_fee(output.amount, &output.output).expect(
352                "We only build transactions with output versions that are supported by the module",
353            );
354
355            out_amount += output.amount;
356            fee_amount += item_fee;
357        }
358
359        (in_amount, out_amount + fee_amount)
360    }
361
362    pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
363        Ok((self.federation_id().to_fake_ln_pub_key(&self.secp_ctx)?, 0))
364    }
365
366    /// Get metadata value from the federation config itself
367    pub fn get_config_meta(&self, key: &str) -> Option<String> {
368        self.federation_config_meta.get(key).cloned()
369    }
370
371    pub(crate) fn root_secret(&self) -> DerivableSecret {
372        self.root_secret.clone()
373    }
374
375    pub async fn add_state_machines(
376        &self,
377        dbtx: &mut DatabaseTransaction<'_>,
378        states: Vec<DynState>,
379    ) -> AddStateMachinesResult {
380        self.executor.add_state_machines_dbtx(dbtx, states).await
381    }
382
383    // TODO: implement as part of [`OperationLog`]
384    pub async fn get_active_operations(&self) -> HashSet<OperationId> {
385        let active_states = self.executor.get_active_states().await;
386        let mut active_operations = HashSet::with_capacity(active_states.len());
387        let mut dbtx = self.db().begin_transaction_nc().await;
388        for (state, _) in active_states {
389            let operation_id = state.operation_id();
390            if dbtx
391                .get_value(&OperationLogKey { operation_id })
392                .await
393                .is_some()
394            {
395                active_operations.insert(operation_id);
396            }
397        }
398        active_operations
399    }
400
401    pub fn operation_log(&self) -> &OperationLog {
402        &self.operation_log
403    }
404
405    /// Get the meta manager to read meta fields.
406    pub fn meta_service(&self) -> &Arc<MetaService> {
407        &self.meta_service
408    }
409
410    /// Get the meta manager to read meta fields.
411    pub async fn get_meta_expiration_timestamp(&self) -> Option<SystemTime> {
412        let meta_service = self.meta_service();
413        let ts = meta_service
414            .get_field::<u64>(self.db(), "federation_expiry_timestamp")
415            .await
416            .and_then(|v| v.value)?;
417        Some(UNIX_EPOCH + Duration::from_secs(ts))
418    }
419
420    /// Adds funding to a transaction or removes over-funding via change.
421    async fn finalize_transaction(
422        &self,
423        dbtx: &mut DatabaseTransaction<'_>,
424        operation_id: OperationId,
425        mut partial_transaction: TransactionBuilder,
426    ) -> anyhow::Result<(Transaction, Vec<DynState>, Range<u64>)> {
427        let (input_amount, output_amount) = self.transaction_builder_balance(&partial_transaction);
428
429        let (added_input_bundle, change_outputs) = self
430            .primary_module()
431            .create_final_inputs_and_outputs(
432                self.primary_module_instance,
433                dbtx,
434                operation_id,
435                input_amount,
436                output_amount,
437            )
438            .await?;
439
440        // This is the range of  outputs that will be added to the transaction
441        // in order to balance it. Notice that it may stay empty in case the transaction
442        // is already balanced.
443        let change_range = Range {
444            start: partial_transaction.outputs().count() as u64,
445            end: (partial_transaction.outputs().count() + change_outputs.outputs().len()) as u64,
446        };
447
448        partial_transaction = partial_transaction
449            .with_inputs(added_input_bundle)
450            .with_outputs(change_outputs);
451
452        let (input_amount, output_amount) = self.transaction_builder_balance(&partial_transaction);
453
454        assert!(input_amount >= output_amount, "Transaction is underfunded");
455
456        let (tx, states) = partial_transaction.build(&self.secp_ctx, thread_rng());
457
458        Ok((tx, states, change_range))
459    }
460
461    /// Add funding and/or change to the transaction builder as needed, finalize
462    /// the transaction and submit it to the federation.
463    ///
464    /// ## Errors
465    /// The function will return an error if the operation with given ID already
466    /// exists.
467    ///
468    /// ## Panics
469    /// The function will panic if the database transaction collides with
470    /// other and fails with others too often, this should not happen except for
471    /// excessively concurrent scenarios.
472    pub async fn finalize_and_submit_transaction<F, M>(
473        &self,
474        operation_id: OperationId,
475        operation_type: &str,
476        operation_meta_gen: F,
477        tx_builder: TransactionBuilder,
478    ) -> anyhow::Result<OutPointRange>
479    where
480        F: Fn(OutPointRange) -> M + Clone + MaybeSend + MaybeSync,
481        M: serde::Serialize + MaybeSend,
482    {
483        let operation_type = operation_type.to_owned();
484
485        let autocommit_res = self
486            .db
487            .autocommit(
488                |dbtx, _| {
489                    let operation_type = operation_type.clone();
490                    let tx_builder = tx_builder.clone();
491                    let operation_meta_gen = operation_meta_gen.clone();
492                    Box::pin(async move {
493                        if Client::operation_exists_dbtx(dbtx, operation_id).await {
494                            bail!("There already exists an operation with id {operation_id:?}")
495                        }
496
497                        let out_point_range = self
498                            .finalize_and_submit_transaction_inner(dbtx, operation_id, tx_builder)
499                            .await?;
500
501                        self.operation_log()
502                            .add_operation_log_entry_dbtx(
503                                dbtx,
504                                operation_id,
505                                &operation_type,
506                                operation_meta_gen(out_point_range),
507                            )
508                            .await;
509
510                        Ok(out_point_range)
511                    })
512                },
513                Some(100), // TODO: handle what happens after 100 retries
514            )
515            .await;
516
517        match autocommit_res {
518            Ok(txid) => Ok(txid),
519            Err(AutocommitError::ClosureError { error, .. }) => Err(error),
520            Err(AutocommitError::CommitFailed {
521                attempts,
522                last_error,
523            }) => panic!(
524                "Failed to commit tx submission dbtx after {attempts} attempts: {last_error}"
525            ),
526        }
527    }
528
529    async fn finalize_and_submit_transaction_inner(
530        &self,
531        dbtx: &mut DatabaseTransaction<'_>,
532        operation_id: OperationId,
533        tx_builder: TransactionBuilder,
534    ) -> anyhow::Result<OutPointRange> {
535        let (transaction, mut states, change_range) = self
536            .finalize_transaction(&mut dbtx.to_ref_nc(), operation_id, tx_builder)
537            .await?;
538
539        if transaction.consensus_encode_to_vec().len() > Transaction::MAX_TX_SIZE {
540            let inputs = transaction
541                .inputs
542                .iter()
543                .map(DynInput::module_instance_id)
544                .collect::<Vec<_>>();
545            let outputs = transaction
546                .outputs
547                .iter()
548                .map(DynOutput::module_instance_id)
549                .collect::<Vec<_>>();
550            warn!(
551                target: LOG_CLIENT_NET_API,
552                size=%transaction.consensus_encode_to_vec().len(),
553                ?inputs,
554                ?outputs,
555                "Transaction too large",
556            );
557            debug!(target: LOG_CLIENT_NET_API, ?transaction, "transaction details");
558            bail!(
559                "The generated transaction would be rejected by the federation for being too large."
560            );
561        }
562
563        let txid = transaction.tx_hash();
564
565        debug!(target: LOG_CLIENT_NET_API, %txid, ?transaction,  "Finalized and submitting transaction");
566
567        let tx_submission_sm = DynState::from_typed(
568            TRANSACTION_SUBMISSION_MODULE_INSTANCE,
569            TxSubmissionStatesSM {
570                operation_id,
571                state: TxSubmissionStates::Created(transaction),
572            },
573        );
574        states.push(tx_submission_sm);
575
576        self.executor.add_state_machines_dbtx(dbtx, states).await?;
577
578        self.log_event_dbtx(dbtx, None, TxCreatedEvent { txid, operation_id })
579            .await;
580
581        Ok(OutPointRange::new(txid, IdxRange::from(change_range)))
582    }
583
584    async fn transaction_update_stream(
585        &self,
586        operation_id: OperationId,
587    ) -> BoxStream<'static, TxSubmissionStatesSM> {
588        self.executor
589            .notifier()
590            .module_notifier::<TxSubmissionStatesSM>(
591                TRANSACTION_SUBMISSION_MODULE_INSTANCE,
592                self.final_client.clone(),
593            )
594            .subscribe(operation_id)
595            .await
596    }
597
598    pub async fn operation_exists(&self, operation_id: OperationId) -> bool {
599        let mut dbtx = self.db().begin_transaction_nc().await;
600
601        Client::operation_exists_dbtx(&mut dbtx, operation_id).await
602    }
603
604    pub async fn operation_exists_dbtx(
605        dbtx: &mut DatabaseTransaction<'_>,
606        operation_id: OperationId,
607    ) -> bool {
608        let active_state_exists = dbtx
609            .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
610            .await
611            .next()
612            .await
613            .is_some();
614
615        let inactive_state_exists = dbtx
616            .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
617            .await
618            .next()
619            .await
620            .is_some();
621
622        active_state_exists || inactive_state_exists
623    }
624
625    pub async fn has_active_states(&self, operation_id: OperationId) -> bool {
626        self.db
627            .begin_transaction_nc()
628            .await
629            .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
630            .await
631            .next()
632            .await
633            .is_some()
634    }
635
636    /// Waits for an output from the primary module to reach its final
637    /// state.
638    pub async fn await_primary_module_output(
639        &self,
640        operation_id: OperationId,
641        out_point: OutPoint,
642    ) -> anyhow::Result<()> {
643        self.primary_module()
644            .await_primary_module_output(operation_id, out_point)
645            .await
646    }
647
648    /// Returns a reference to a typed module client instance by kind
649    pub fn get_first_module<M: ClientModule>(
650        &'_ self,
651    ) -> anyhow::Result<ClientModuleInstance<'_, M>> {
652        let module_kind = M::kind();
653        let id = self
654            .get_first_instance(&module_kind)
655            .ok_or_else(|| format_err!("No modules found of kind {module_kind}"))?;
656        let module: &M = self
657            .try_get_module(id)
658            .ok_or_else(|| format_err!("Unknown module instance {id}"))?
659            .as_any()
660            .downcast_ref::<M>()
661            .ok_or_else(|| format_err!("Module is not of type {}", std::any::type_name::<M>()))?;
662        let (db, _) = self.db().with_prefix_module_id(id);
663        Ok(ClientModuleInstance {
664            id,
665            db,
666            api: self.api().with_module(id),
667            module,
668        })
669    }
670
671    pub fn get_module_client_dyn(
672        &self,
673        instance_id: ModuleInstanceId,
674    ) -> anyhow::Result<&maybe_add_send_sync!(dyn IClientModule)> {
675        self.try_get_module(instance_id)
676            .ok_or(anyhow!("Unknown module instance {}", instance_id))
677    }
678
679    pub fn db(&self) -> &Database {
680        &self.db
681    }
682
683    /// Returns a stream of transaction updates for the given operation id that
684    /// can later be used to watch for a specific transaction being accepted.
685    pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
686        TransactionUpdates {
687            update_stream: self.transaction_update_stream(operation_id).await,
688        }
689    }
690
691    /// Returns the instance id of the first module of the given kind. The
692    /// primary module will always be returned before any other modules (which
693    /// themselves are ordered by their instance ID).
694    pub fn get_first_instance(&self, module_kind: &ModuleKind) -> Option<ModuleInstanceId> {
695        if self
696            .modules
697            .get_with_kind(self.primary_module_instance)
698            .is_some_and(|(kind, _)| kind == module_kind)
699        {
700            return Some(self.primary_module_instance);
701        }
702
703        self.modules
704            .iter_modules()
705            .find(|(_, kind, _module)| *kind == module_kind)
706            .map(|(instance_id, _, _)| instance_id)
707    }
708
709    /// Returns the data from which the client's root secret is derived (e.g.
710    /// BIP39 seed phrase struct).
711    pub async fn root_secret_encoding<T: Decodable>(&self) -> anyhow::Result<T> {
712        get_decoded_client_secret::<T>(self.db()).await
713    }
714
715    /// Waits for outputs from the primary module to reach its final
716    /// state.
717    pub async fn await_primary_module_outputs(
718        &self,
719        operation_id: OperationId,
720        outputs: Vec<OutPoint>,
721    ) -> anyhow::Result<()> {
722        for out_point in outputs {
723            self.await_primary_module_output(operation_id, out_point)
724                .await?;
725        }
726
727        Ok(())
728    }
729
730    /// Returns the config of the client in JSON format.
731    ///
732    /// Compared to the consensus module format where module configs are binary
733    /// encoded this format cannot be cryptographically verified but is easier
734    /// to consume and to some degree human-readable.
735    pub async fn get_config_json(&self) -> JsonClientConfig {
736        self.config().await.to_json()
737    }
738
739    /// Get the primary module
740    pub fn primary_module(&self) -> &DynClientModule {
741        self.modules
742            .get(self.primary_module_instance)
743            .expect("primary module must be present")
744    }
745
746    /// Balance available to the client for spending
747    pub async fn get_balance(&self) -> Amount {
748        self.primary_module()
749            .get_balance(
750                self.primary_module_instance,
751                &mut self.db().begin_transaction_nc().await,
752            )
753            .await
754    }
755
756    /// Returns a stream that yields the current client balance every time it
757    /// changes.
758    pub async fn subscribe_balance_changes(&self) -> BoxStream<'static, Amount> {
759        let mut balance_changes = self.primary_module().subscribe_balance_changes().await;
760        let initial_balance = self.get_balance().await;
761        let db = self.db().clone();
762        let primary_module = self.primary_module().clone();
763        let primary_module_instance = self.primary_module_instance;
764
765        Box::pin(async_stream::stream! {
766            yield initial_balance;
767            let mut prev_balance = initial_balance;
768            while let Some(()) = balance_changes.next().await {
769                let mut dbtx = db.begin_transaction_nc().await;
770                let balance = primary_module
771                    .get_balance(primary_module_instance, &mut dbtx)
772                    .await;
773
774                // Deduplicate in case modules cannot always tell if the balance actually changed
775                if balance != prev_balance {
776                    prev_balance = balance;
777                    yield balance;
778                }
779            }
780        })
781    }
782
783    /// Query the federation for API version support and then calculate
784    /// the best API version to use (supported by most guardians).
785    pub async fn refresh_peers_api_versions(
786        num_peers: NumPeers,
787        api: DynGlobalApi,
788        db: Database,
789        num_responses_sender: watch::Sender<usize>,
790    ) {
791        // Keep trying, initially somewhat aggressively, but after a while retry very
792        // slowly, because chances for response are getting lower and lower.
793        let mut backoff =
794            custom_backoff(Duration::from_millis(200), Duration::from_secs(600), None);
795
796        // Make a single request to a peer after a delay
797        //
798        // The delay is here to unify the type of a future both for initial request and
799        // possible retries.
800        async fn make_request(
801            delay: Duration,
802            peer_id: PeerId,
803            api: &DynGlobalApi,
804        ) -> (
805            PeerId,
806            Result<SupportedApiVersionsSummary, fedimint_api_client::api::PeerError>,
807        ) {
808            runtime::sleep(delay).await;
809            (
810                peer_id,
811                api.request_single_peer::<SupportedApiVersionsSummary>(
812                    VERSION_ENDPOINT.to_owned(),
813                    ApiRequestErased::default(),
814                    peer_id,
815                )
816                .await,
817            )
818        }
819
820        // NOTE: `FuturesUnordered` is a footgun, but since we only poll it for result
821        // and make a single async db write operation, it should be OK.
822        let mut requests = FuturesUnordered::new();
823
824        for peer_id in num_peers.peer_ids() {
825            requests.push(make_request(Duration::ZERO, peer_id, &api));
826        }
827
828        let mut num_responses = 0;
829
830        while let Some((peer_id, response)) = requests.next().await {
831            let retry = match response {
832                Err(err) => {
833                    let has_previous_response = db
834                        .begin_transaction_nc()
835                        .await
836                        .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
837                        .await
838                        .is_some();
839                    debug!(
840                        target: LOG_CLIENT,
841                        %peer_id,
842                        err = %err.fmt_compact(),
843                        %has_previous_response,
844                        "Failed to refresh API versions of a peer"
845                    );
846
847                    !has_previous_response
848                }
849                Ok(o) => {
850                    // Save the response to the database right away, just to
851                    // not lose it
852                    let mut dbtx = db.begin_transaction().await;
853                    dbtx.insert_entry(
854                        &PeerLastApiVersionsSummaryKey(peer_id),
855                        &PeerLastApiVersionsSummary(o),
856                    )
857                    .await;
858                    dbtx.commit_tx().await;
859                    false
860                }
861            };
862
863            if retry {
864                requests.push(make_request(
865                    backoff.next().expect("Keeps retrying"),
866                    peer_id,
867                    &api,
868                ));
869            } else {
870                num_responses += 1;
871                num_responses_sender.send_replace(num_responses);
872            }
873        }
874    }
875
876    /// [`SupportedApiVersionsSummary`] that the client and its modules support
877    pub fn supported_api_versions_summary_static(
878        config: &ClientConfig,
879        client_module_init: &ClientModuleInitRegistry,
880    ) -> SupportedApiVersionsSummary {
881        SupportedApiVersionsSummary {
882            core: SupportedCoreApiVersions {
883                core_consensus: config.global.consensus_version,
884                api: MultiApiVersion::try_from_iter(SUPPORTED_CORE_API_VERSIONS.to_owned())
885                    .expect("must not have conflicting versions"),
886            },
887            modules: config
888                .modules
889                .iter()
890                .filter_map(|(&module_instance_id, module_config)| {
891                    client_module_init
892                        .get(module_config.kind())
893                        .map(|module_init| {
894                            (
895                                module_instance_id,
896                                SupportedModuleApiVersions {
897                                    core_consensus: config.global.consensus_version,
898                                    module_consensus: module_config.version,
899                                    api: module_init.supported_api_versions(),
900                                },
901                            )
902                        })
903                })
904                .collect(),
905        }
906    }
907
908    pub async fn load_and_refresh_common_api_version(&self) -> anyhow::Result<ApiVersionSet> {
909        Self::load_and_refresh_common_api_version_static(
910            &self.config().await,
911            &self.module_inits,
912            &self.api,
913            &self.db,
914            &self.task_group,
915        )
916        .await
917    }
918
919    /// Load the common api versions to use from cache and start a background
920    /// process to refresh them.
921    ///
922    /// This is a compromise, so we not have to wait for version discovery to
923    /// complete every time a [`Client`] is being built.
924    async fn load_and_refresh_common_api_version_static(
925        config: &ClientConfig,
926        module_init: &ClientModuleInitRegistry,
927        api: &DynGlobalApi,
928        db: &Database,
929        task_group: &TaskGroup,
930    ) -> anyhow::Result<ApiVersionSet> {
931        if let Some(v) = db
932            .begin_transaction_nc()
933            .await
934            .get_value(&CachedApiVersionSetKey)
935            .await
936        {
937            debug!(
938                target: LOG_CLIENT,
939                "Found existing cached common api versions"
940            );
941            let config = config.clone();
942            let client_module_init = module_init.clone();
943            let api = api.clone();
944            let db = db.clone();
945            let task_group = task_group.clone();
946            // Separate task group, because we actually don't want to be waiting for this to
947            // finish, and it's just best effort.
948            task_group
949                .clone()
950                .spawn_cancellable("refresh_common_api_version_static", async move {
951                    if let Err(error) = Self::refresh_common_api_version_static(
952                        &config,
953                        &client_module_init,
954                        &api,
955                        &db,
956                        task_group,
957                        false,
958                    )
959                    .await
960                    {
961                        warn!(
962                            target: LOG_CLIENT,
963                            err = %error.fmt_compact_anyhow(), "Failed to discover common api versions"
964                        );
965                    }
966                });
967
968            return Ok(v.0);
969        }
970
971        info!(
972            target: LOG_CLIENT,
973            "Fetching initial API versions "
974        );
975        Self::refresh_common_api_version_static(
976            config,
977            module_init,
978            api,
979            db,
980            task_group.clone(),
981            true,
982        )
983        .await
984    }
985
986    async fn refresh_common_api_version_static(
987        config: &ClientConfig,
988        client_module_init: &ClientModuleInitRegistry,
989        api: &DynGlobalApi,
990        db: &Database,
991        task_group: TaskGroup,
992        block_until_ok: bool,
993    ) -> anyhow::Result<ApiVersionSet> {
994        debug!(
995            target: LOG_CLIENT,
996            "Refreshing common api versions"
997        );
998
999        let (num_responses_sender, mut num_responses_receiver) = tokio::sync::watch::channel(0);
1000        let num_peers = NumPeers::from(config.global.api_endpoints.len());
1001
1002        task_group.spawn_cancellable("refresh peers api versions", {
1003            Client::refresh_peers_api_versions(
1004                num_peers,
1005                api.clone(),
1006                db.clone(),
1007                num_responses_sender,
1008            )
1009        });
1010
1011        let common_api_versions = loop {
1012            // Wait to collect enough answers before calculating a set of common api
1013            // versions to use. Note that all peers individual responses from
1014            // previous attempts are still being used, and requests, or even
1015            // retries for response of peers are not actually cancelled, as they
1016            // are happening on a separate task. This is all just to bound the
1017            // time user can be waiting for the join operation to finish, at the
1018            // risk of picking wrong version in very rare circumstances.
1019            let _: Result<_, Elapsed> = runtime::timeout(
1020                Duration::from_secs(30),
1021                num_responses_receiver.wait_for(|num| num_peers.threshold() <= *num),
1022            )
1023            .await;
1024
1025            let peer_api_version_sets = Self::load_peers_last_api_versions(db, num_peers).await;
1026
1027            match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1028                &Self::supported_api_versions_summary_static(config, client_module_init),
1029                &peer_api_version_sets,
1030            ) {
1031                Ok(o) => break o,
1032                Err(err) if block_until_ok => {
1033                    warn!(
1034                        target: LOG_CLIENT,
1035                        err = %err.fmt_compact_anyhow(),
1036                        "Failed to discover API version to use. Retrying..."
1037                    );
1038                    continue;
1039                }
1040                Err(e) => return Err(e),
1041            }
1042        };
1043
1044        debug!(
1045            target: LOG_CLIENT,
1046            value = ?common_api_versions,
1047            "Updating the cached common api versions"
1048        );
1049        let mut dbtx = db.begin_transaction().await;
1050        let _ = dbtx
1051            .insert_entry(
1052                &CachedApiVersionSetKey,
1053                &CachedApiVersionSet(common_api_versions.clone()),
1054            )
1055            .await;
1056
1057        dbtx.commit_tx().await;
1058
1059        Ok(common_api_versions)
1060    }
1061
1062    /// Get the client [`Metadata`]
1063    pub async fn get_metadata(&self) -> Metadata {
1064        self.db
1065            .begin_transaction_nc()
1066            .await
1067            .get_value(&ClientMetadataKey)
1068            .await
1069            .unwrap_or_else(|| {
1070                warn!(
1071                    target: LOG_CLIENT,
1072                    "Missing existing metadata. This key should have been set on Client init"
1073                );
1074                Metadata::empty()
1075            })
1076    }
1077
1078    /// Set the client [`Metadata`]
1079    pub async fn set_metadata(&self, metadata: &Metadata) {
1080        self.db
1081            .autocommit::<_, _, anyhow::Error>(
1082                |dbtx, _| {
1083                    Box::pin(async {
1084                        Self::set_metadata_dbtx(dbtx, metadata).await;
1085                        Ok(())
1086                    })
1087                },
1088                None,
1089            )
1090            .await
1091            .expect("Failed to autocommit metadata");
1092    }
1093
1094    pub fn has_pending_recoveries(&self) -> bool {
1095        !self
1096            .client_recovery_progress_receiver
1097            .borrow()
1098            .iter()
1099            .all(|(_id, progress)| progress.is_done())
1100    }
1101
1102    /// Wait for all module recoveries to finish
1103    ///
1104    /// This will block until the recovery task is done with recoveries.
1105    /// Returns success if all recovery tasks are complete (success case),
1106    /// or an error if some modules could not complete the recovery at the time.
1107    ///
1108    /// A bit of a heavy approach.
1109    pub async fn wait_for_all_recoveries(&self) -> anyhow::Result<()> {
1110        let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1111        recovery_receiver
1112            .wait_for(|in_progress| {
1113                in_progress
1114                    .iter()
1115                    .all(|(_id, progress)| progress.is_done())
1116            })
1117            .await
1118            .context("Recovery task completed and update receiver disconnected, but some modules failed to recover")?;
1119
1120        Ok(())
1121    }
1122
1123    /// Subscribe to recover progress for all the modules.
1124    ///
1125    /// This stream can contain duplicate progress for a module.
1126    /// Don't use this stream for detecting completion of recovery.
1127    pub fn subscribe_to_recovery_progress(
1128        &self,
1129    ) -> impl Stream<Item = (ModuleInstanceId, RecoveryProgress)> + use<> {
1130        WatchStream::new(self.client_recovery_progress_receiver.clone())
1131            .flat_map(futures::stream::iter)
1132    }
1133
1134    pub async fn wait_for_module_kind_recovery(
1135        &self,
1136        module_kind: ModuleKind,
1137    ) -> anyhow::Result<()> {
1138        let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1139        let config = self.config().await;
1140        recovery_receiver
1141            .wait_for(|in_progress| {
1142                !in_progress
1143                    .iter()
1144                    .filter(|(module_instance_id, _progress)| {
1145                        config.modules[module_instance_id].kind == module_kind
1146                    })
1147                    .any(|(_id, progress)| !progress.is_done())
1148            })
1149            .await
1150            .context("Recovery task completed and update receiver disconnected, but the desired modules are still unavailable or failed to recover")?;
1151
1152        Ok(())
1153    }
1154
1155    pub async fn wait_for_all_active_state_machines(&self) -> anyhow::Result<()> {
1156        loop {
1157            if self.executor.get_active_states().await.is_empty() {
1158                break;
1159            }
1160            fedimint_core::runtime::sleep(Duration::from_millis(100)).await;
1161        }
1162        Ok(())
1163    }
1164
1165    /// Set the client [`Metadata`]
1166    pub async fn set_metadata_dbtx(dbtx: &mut DatabaseTransaction<'_>, metadata: &Metadata) {
1167        dbtx.insert_new_entry(&ClientMetadataKey, metadata).await;
1168    }
1169
1170    fn spawn_module_recoveries_task(
1171        &self,
1172        recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1173        module_recoveries: BTreeMap<
1174            ModuleInstanceId,
1175            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1176        >,
1177        module_recovery_progress_receivers: BTreeMap<
1178            ModuleInstanceId,
1179            watch::Receiver<RecoveryProgress>,
1180        >,
1181    ) {
1182        let db = self.db.clone();
1183        let log_ordering_wakeup_tx = self.log_ordering_wakeup_tx.clone();
1184        self.task_group
1185            .spawn("module recoveries", |_task_handle| async {
1186                Self::run_module_recoveries_task(
1187                    db,
1188                    log_ordering_wakeup_tx,
1189                    recovery_sender,
1190                    module_recoveries,
1191                    module_recovery_progress_receivers,
1192                )
1193                .await;
1194            });
1195    }
1196
1197    async fn run_module_recoveries_task(
1198        db: Database,
1199        log_ordering_wakeup_tx: watch::Sender<()>,
1200        recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1201        module_recoveries: BTreeMap<
1202            ModuleInstanceId,
1203            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1204        >,
1205        module_recovery_progress_receivers: BTreeMap<
1206            ModuleInstanceId,
1207            watch::Receiver<RecoveryProgress>,
1208        >,
1209    ) {
1210        debug!(target: LOG_CLIENT_RECOVERY, num_modules=%module_recovery_progress_receivers.len(), "Staring module recoveries");
1211        let mut completed_stream = Vec::new();
1212        let progress_stream = futures::stream::FuturesUnordered::new();
1213
1214        for (module_instance_id, f) in module_recoveries {
1215            completed_stream.push(futures::stream::once(Box::pin(async move {
1216                match f.await {
1217                    Ok(()) => (module_instance_id, None),
1218                    Err(err) => {
1219                        warn!(
1220                            target: LOG_CLIENT,
1221                            err = %err.fmt_compact_anyhow(), module_instance_id, "Module recovery failed"
1222                        );
1223                        // a module recovery that failed reports and error and
1224                        // just never finishes, so we don't need a separate state
1225                        // for it
1226                        futures::future::pending::<()>().await;
1227                        unreachable!()
1228                    }
1229                }
1230            })));
1231        }
1232
1233        for (module_instance_id, rx) in module_recovery_progress_receivers {
1234            progress_stream.push(
1235                tokio_stream::wrappers::WatchStream::new(rx)
1236                    .fuse()
1237                    .map(move |progress| (module_instance_id, Some(progress))),
1238            );
1239        }
1240
1241        let mut futures = futures::stream::select(
1242            futures::stream::select_all(progress_stream),
1243            futures::stream::select_all(completed_stream),
1244        );
1245
1246        while let Some((module_instance_id, progress)) = futures.next().await {
1247            let mut dbtx = db.begin_transaction().await;
1248
1249            let prev_progress = *recovery_sender
1250                .borrow()
1251                .get(&module_instance_id)
1252                .expect("existing progress must be present");
1253
1254            let progress = if prev_progress.is_done() {
1255                // since updates might be out of order, once done, stick with it
1256                prev_progress
1257            } else if let Some(progress) = progress {
1258                progress
1259            } else {
1260                prev_progress.to_complete()
1261            };
1262
1263            if !prev_progress.is_done() && progress.is_done() {
1264                info!(
1265                    target: LOG_CLIENT,
1266                    module_instance_id,
1267                    progress = format!("{}/{}", progress.complete, progress.total),
1268                    "Recovery complete"
1269                );
1270                dbtx.log_event(
1271                    log_ordering_wakeup_tx.clone(),
1272                    None,
1273                    ModuleRecoveryCompleted {
1274                        module_id: module_instance_id,
1275                    },
1276                )
1277                .await;
1278            } else {
1279                info!(
1280                    target: LOG_CLIENT,
1281                    module_instance_id,
1282                    progress = format!("{}/{}", progress.complete, progress.total),
1283                    "Recovery progress"
1284                );
1285            }
1286
1287            dbtx.insert_entry(
1288                &ClientModuleRecovery { module_instance_id },
1289                &ClientModuleRecoveryState { progress },
1290            )
1291            .await;
1292            dbtx.commit_tx().await;
1293
1294            recovery_sender.send_modify(|v| {
1295                v.insert(module_instance_id, progress);
1296            });
1297        }
1298        debug!(target: LOG_CLIENT_RECOVERY, "Recovery executor stopped");
1299    }
1300
1301    async fn load_peers_last_api_versions(
1302        db: &Database,
1303        num_peers: NumPeers,
1304    ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1305        let mut peer_api_version_sets = BTreeMap::new();
1306
1307        let mut dbtx = db.begin_transaction_nc().await;
1308        for peer_id in num_peers.peer_ids() {
1309            if let Some(v) = dbtx
1310                .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1311                .await
1312            {
1313                peer_api_version_sets.insert(peer_id, v.0);
1314            }
1315        }
1316        drop(dbtx);
1317        peer_api_version_sets
1318    }
1319
1320    /// You likely want to use [`Client::get_peer_urls`]. This function returns
1321    /// only the announcements and doesn't use the config as fallback.
1322    pub async fn get_peer_url_announcements(&self) -> BTreeMap<PeerId, SignedApiAnnouncement> {
1323        self.db()
1324            .begin_transaction_nc()
1325            .await
1326            .find_by_prefix(&ApiAnnouncementPrefix)
1327            .await
1328            .map(|(announcement_key, announcement)| (announcement_key.0, announcement))
1329            .collect()
1330            .await
1331    }
1332
1333    /// Returns a list of guardian API URLs
1334    pub async fn get_peer_urls(&self) -> BTreeMap<PeerId, SafeUrl> {
1335        get_api_urls(&self.db, &self.config().await).await
1336    }
1337
1338    /// Create an invite code with the api endpoint of the given peer which can
1339    /// be used to download this client config
1340    pub async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
1341        self.get_peer_urls()
1342            .await
1343            .into_iter()
1344            .find_map(|(peer_id, url)| (peer == peer_id).then_some(url))
1345            .map(|peer_url| {
1346                InviteCode::new(
1347                    peer_url.clone(),
1348                    peer,
1349                    self.federation_id(),
1350                    self.api_secret.clone(),
1351                )
1352            })
1353    }
1354
1355    /// Blocks till the client has synced the guardian public key set
1356    /// (introduced in version 0.4) and returns it. Once it has been fetched
1357    /// once this function is guaranteed to return immediately.
1358    pub async fn get_guardian_public_keys_blocking(
1359        &self,
1360    ) -> BTreeMap<PeerId, fedimint_core::secp256k1::PublicKey> {
1361        self.db
1362            .autocommit(
1363                |dbtx, _| {
1364                    Box::pin(async move {
1365                        let config = self.config().await;
1366
1367                        let guardian_pub_keys = self
1368                            .get_or_backfill_broadcast_public_keys(dbtx, config)
1369                            .await;
1370
1371                        Result::<_, ()>::Ok(guardian_pub_keys)
1372                    })
1373                },
1374                None,
1375            )
1376            .await
1377            .expect("Will retry forever")
1378    }
1379
1380    async fn get_or_backfill_broadcast_public_keys(
1381        &self,
1382        dbtx: &mut DatabaseTransaction<'_>,
1383        config: ClientConfig,
1384    ) -> BTreeMap<PeerId, PublicKey> {
1385        match config.global.broadcast_public_keys {
1386            Some(guardian_pub_keys) => guardian_pub_keys,
1387            _ => {
1388                let (guardian_pub_keys, new_config) = self.fetch_and_update_config(config).await;
1389
1390                dbtx.insert_entry(&ClientConfigKey, &new_config).await;
1391                *(self.config.write().await) = new_config;
1392                guardian_pub_keys
1393            }
1394        }
1395    }
1396
1397    async fn fetch_session_count(&self) -> FederationResult<u64> {
1398        self.api.session_count().await
1399    }
1400
1401    async fn fetch_and_update_config(
1402        &self,
1403        config: ClientConfig,
1404    ) -> (BTreeMap<PeerId, PublicKey>, ClientConfig) {
1405        let fetched_config = retry(
1406            "Fetching guardian public keys",
1407            backoff_util::background_backoff(),
1408            || async {
1409                Ok(self
1410                    .api
1411                    .request_current_consensus::<ClientConfig>(
1412                        CLIENT_CONFIG_ENDPOINT.to_owned(),
1413                        ApiRequestErased::default(),
1414                    )
1415                    .await?)
1416            },
1417        )
1418        .await
1419        .expect("Will never return on error");
1420
1421        let Some(guardian_pub_keys) = fetched_config.global.broadcast_public_keys else {
1422            warn!(
1423                target: LOG_CLIENT,
1424                "Guardian public keys not found in fetched config, server not updated to 0.4 yet"
1425            );
1426            pending::<()>().await;
1427            unreachable!("Pending will never return");
1428        };
1429
1430        let new_config = ClientConfig {
1431            global: GlobalClientConfig {
1432                broadcast_public_keys: Some(guardian_pub_keys.clone()),
1433                ..config.global
1434            },
1435            modules: config.modules,
1436        };
1437        (guardian_pub_keys, new_config)
1438    }
1439
1440    pub fn handle_global_rpc(
1441        &self,
1442        method: String,
1443        params: serde_json::Value,
1444    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1445        Box::pin(try_stream! {
1446            match method.as_str() {
1447                "get_balance" => {
1448                    let balance = self.get_balance().await;
1449                    yield serde_json::to_value(balance)?;
1450                }
1451                "subscribe_balance_changes" => {
1452                    let mut stream = self.subscribe_balance_changes().await;
1453                    while let Some(balance) = stream.next().await {
1454                        yield serde_json::to_value(balance)?;
1455                    }
1456                }
1457                "get_config" => {
1458                    let config = self.config().await;
1459                    yield serde_json::to_value(config)?;
1460                }
1461                "get_federation_id" => {
1462                    let federation_id = self.federation_id();
1463                    yield serde_json::to_value(federation_id)?;
1464                }
1465                "get_invite_code" => {
1466                    let req: GetInviteCodeRequest = serde_json::from_value(params)?;
1467                    let invite_code = self.invite_code(req.peer).await;
1468                    yield serde_json::to_value(invite_code)?;
1469                }
1470                "get_operation" => {
1471                    let req: GetOperationIdRequest = serde_json::from_value(params)?;
1472                    let operation = self.operation_log().get_operation(req.operation_id).await;
1473                    yield serde_json::to_value(operation)?;
1474                }
1475                "list_operations" => {
1476                    let req: ListOperationsParams = serde_json::from_value(params)?;
1477                    let limit = if req.limit.is_none() && req.last_seen.is_none() {
1478                        usize::MAX
1479                    } else {
1480                        req.limit.unwrap_or(usize::MAX)
1481                    };
1482                    let operations = self.operation_log()
1483                        .paginate_operations_rev(limit, req.last_seen)
1484                        .await;
1485                    yield serde_json::to_value(operations)?;
1486                }
1487                "session_count" => {
1488                    let count = self.fetch_session_count().await?;
1489                    yield serde_json::to_value(count)?;
1490                }
1491                "has_pending_recoveries" => {
1492                    let has_pending = self.has_pending_recoveries();
1493                    yield serde_json::to_value(has_pending)?;
1494                }
1495                "wait_for_all_recoveries" => {
1496                    self.wait_for_all_recoveries().await?;
1497                    yield serde_json::Value::Null;
1498                }
1499                "subscribe_to_recovery_progress" => {
1500                    let mut stream = self.subscribe_to_recovery_progress();
1501                    while let Some((module_id, progress)) = stream.next().await {
1502                        yield serde_json::json!({
1503                            "module_id": module_id,
1504                            "progress": progress
1505                        });
1506                    }
1507                }
1508                "backup_to_federation" => {
1509                    let metadata = if params.is_null() {
1510                        Metadata::from_json_serialized(serde_json::json!({}))
1511                    } else {
1512                        Metadata::from_json_serialized(params)
1513                    };
1514                    self.backup_to_federation(metadata).await?;
1515                    yield serde_json::Value::Null;
1516                }
1517                _ => {
1518                    Err(anyhow::format_err!("Unknown method: {}", method))?;
1519                    unreachable!()
1520                },
1521            }
1522        })
1523    }
1524
1525    pub async fn log_event<E>(&self, module_id: Option<ModuleInstanceId>, event: E)
1526    where
1527        E: Event + Send,
1528    {
1529        let mut dbtx = self.db.begin_transaction().await;
1530        self.log_event_dbtx(&mut dbtx, module_id, event).await;
1531        dbtx.commit_tx().await;
1532    }
1533
1534    pub async fn log_event_dbtx<E, Cap>(
1535        &self,
1536        dbtx: &mut DatabaseTransaction<'_, Cap>,
1537        module_id: Option<ModuleInstanceId>,
1538        event: E,
1539    ) where
1540        E: Event + Send,
1541        Cap: Send,
1542    {
1543        dbtx.log_event(self.log_ordering_wakeup_tx.clone(), module_id, event)
1544            .await;
1545    }
1546
1547    pub async fn log_event_raw_dbtx<Cap>(
1548        &self,
1549        dbtx: &mut DatabaseTransaction<'_, Cap>,
1550        kind: EventKind,
1551        module: Option<(ModuleKind, ModuleInstanceId)>,
1552        payload: Vec<u8>,
1553        persist: EventPersistence,
1554    ) where
1555        Cap: Send,
1556    {
1557        let module_id = module.as_ref().map(|m| m.1);
1558        let module_kind = module.map(|m| m.0);
1559        dbtx.log_event_raw(
1560            self.log_ordering_wakeup_tx.clone(),
1561            kind,
1562            module_kind,
1563            module_id,
1564            payload,
1565            persist,
1566        )
1567        .await;
1568    }
1569
1570    pub async fn handle_events<F, R, K>(&self, pos_key: &K, call_fn: F) -> anyhow::Result<()>
1571    where
1572        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1573        K: DatabaseRecord<Value = EventLogId>,
1574        F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
1575        R: Future<Output = anyhow::Result<()>>,
1576    {
1577        fedimint_eventlog::handle_events(
1578            self.db.clone(),
1579            pos_key,
1580            self.log_event_added_rx.clone(),
1581            call_fn,
1582        )
1583        .await
1584    }
1585
1586    pub async fn handle_trimable_events<F, R, K>(
1587        &self,
1588        pos_key: &K,
1589        call_fn: F,
1590    ) -> anyhow::Result<()>
1591    where
1592        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1593        K: DatabaseRecord<Value = EventLogTrimableId>,
1594        F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
1595        R: Future<Output = anyhow::Result<()>>,
1596    {
1597        fedimint_eventlog::handle_trimable_events(
1598            self.db.clone(),
1599            pos_key,
1600            self.log_event_added_rx.clone(),
1601            call_fn,
1602        )
1603        .await
1604    }
1605
1606    pub async fn get_event_log(
1607        &self,
1608        pos: Option<EventLogId>,
1609        limit: u64,
1610    ) -> Vec<PersistedLogEntry> {
1611        self.get_event_log_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
1612            .await
1613    }
1614
1615    pub async fn get_event_log_trimable(
1616        &self,
1617        pos: Option<EventLogTrimableId>,
1618        limit: u64,
1619    ) -> Vec<PersistedLogEntry> {
1620        self.get_event_log_trimable_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
1621            .await
1622    }
1623
1624    pub async fn get_event_log_dbtx<Cap>(
1625        &self,
1626        dbtx: &mut DatabaseTransaction<'_, Cap>,
1627        pos: Option<EventLogId>,
1628        limit: u64,
1629    ) -> Vec<PersistedLogEntry>
1630    where
1631        Cap: Send,
1632    {
1633        dbtx.get_event_log(pos, limit).await
1634    }
1635
1636    pub async fn get_event_log_trimable_dbtx<Cap>(
1637        &self,
1638        dbtx: &mut DatabaseTransaction<'_, Cap>,
1639        pos: Option<EventLogTrimableId>,
1640        limit: u64,
1641    ) -> Vec<PersistedLogEntry>
1642    where
1643        Cap: Send,
1644    {
1645        dbtx.get_event_log_trimable(pos, limit).await
1646    }
1647
1648    /// Register to receiver all new transient (unpersisted) events
1649    pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
1650        self.log_event_added_transient_tx.subscribe()
1651    }
1652
1653    pub fn iroh_enable_dht(&self) -> bool {
1654        self.iroh_enable_dht
1655    }
1656
1657    pub(crate) async fn run_core_migrations(
1658        db_no_decoders: &Database,
1659    ) -> Result<(), anyhow::Error> {
1660        let mut dbtx = db_no_decoders.begin_transaction().await;
1661        apply_migrations_core_client_dbtx(&mut dbtx.to_ref_nc(), "fedimint-client".to_string())
1662            .await?;
1663        if is_running_in_test_env() {
1664            verify_client_db_integrity_dbtx(&mut dbtx.to_ref_nc()).await;
1665        }
1666        dbtx.commit_tx_result().await?;
1667        Ok(())
1668    }
1669}
1670
1671#[apply(async_trait_maybe_send!)]
1672impl ClientContextIface for Client {
1673    fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
1674        Client::get_module(self, instance)
1675    }
1676
1677    fn api_clone(&self) -> DynGlobalApi {
1678        Client::api_clone(self)
1679    }
1680    fn decoders(&self) -> &ModuleDecoderRegistry {
1681        Client::decoders(self)
1682    }
1683
1684    async fn finalize_and_submit_transaction(
1685        &self,
1686        operation_id: OperationId,
1687        operation_type: &str,
1688        operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
1689        tx_builder: TransactionBuilder,
1690    ) -> anyhow::Result<OutPointRange> {
1691        Client::finalize_and_submit_transaction(
1692            self,
1693            operation_id,
1694            operation_type,
1695            // |out_point_range| operation_meta_gen(out_point_range),
1696            &operation_meta_gen,
1697            tx_builder,
1698        )
1699        .await
1700    }
1701
1702    async fn finalize_and_submit_transaction_inner(
1703        &self,
1704        dbtx: &mut DatabaseTransaction<'_>,
1705        operation_id: OperationId,
1706        tx_builder: TransactionBuilder,
1707    ) -> anyhow::Result<OutPointRange> {
1708        Client::finalize_and_submit_transaction_inner(self, dbtx, operation_id, tx_builder).await
1709    }
1710
1711    async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
1712        Client::transaction_updates(self, operation_id).await
1713    }
1714
1715    async fn await_primary_module_outputs(
1716        &self,
1717        operation_id: OperationId,
1718        // TODO: make `impl Iterator<Item = ...>`
1719        outputs: Vec<OutPoint>,
1720    ) -> anyhow::Result<()> {
1721        Client::await_primary_module_outputs(self, operation_id, outputs).await
1722    }
1723
1724    fn operation_log(&self) -> &dyn IOperationLog {
1725        Client::operation_log(self)
1726    }
1727
1728    async fn has_active_states(&self, operation_id: OperationId) -> bool {
1729        Client::has_active_states(self, operation_id).await
1730    }
1731
1732    async fn operation_exists(&self, operation_id: OperationId) -> bool {
1733        Client::operation_exists(self, operation_id).await
1734    }
1735
1736    async fn config(&self) -> ClientConfig {
1737        Client::config(self).await
1738    }
1739
1740    fn db(&self) -> &Database {
1741        Client::db(self)
1742    }
1743
1744    fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static)) {
1745        Client::executor(self)
1746    }
1747
1748    async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
1749        Client::invite_code(self, peer).await
1750    }
1751
1752    fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
1753        Client::get_internal_payment_markers(self)
1754    }
1755
1756    async fn log_event_json(
1757        &self,
1758        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
1759        module_kind: Option<ModuleKind>,
1760        module_id: ModuleInstanceId,
1761        kind: EventKind,
1762        payload: serde_json::Value,
1763        persist: EventPersistence,
1764    ) {
1765        dbtx.ensure_global()
1766            .expect("Must be called with global dbtx");
1767        self.log_event_raw_dbtx(
1768            dbtx,
1769            kind,
1770            module_kind.map(|kind| (kind, module_id)),
1771            serde_json::to_vec(&payload).expect("Serialization can't fail"),
1772            persist,
1773        )
1774        .await;
1775    }
1776
1777    async fn read_operation_active_states<'dbtx>(
1778        &self,
1779        operation_id: OperationId,
1780        module_id: ModuleInstanceId,
1781        dbtx: &'dbtx mut DatabaseTransaction<'_>,
1782    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>
1783    {
1784        Box::pin(
1785            dbtx.find_by_prefix(&ActiveModuleOperationStateKeyPrefix {
1786                operation_id,
1787                module_instance: module_id,
1788            })
1789            .await
1790            .map(move |(k, v)| (k.0, v)),
1791        )
1792    }
1793    async fn read_operation_inactive_states<'dbtx>(
1794        &self,
1795        operation_id: OperationId,
1796        module_id: ModuleInstanceId,
1797        dbtx: &'dbtx mut DatabaseTransaction<'_>,
1798    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>
1799    {
1800        Box::pin(
1801            dbtx.find_by_prefix(&InactiveModuleOperationStateKeyPrefix {
1802                operation_id,
1803                module_instance: module_id,
1804            })
1805            .await
1806            .map(move |(k, v)| (k.0, v)),
1807        )
1808    }
1809}
1810
1811// TODO: impl `Debug` for `Client` and derive here
1812impl fmt::Debug for Client {
1813    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1814        write!(f, "Client")
1815    }
1816}
1817
1818pub fn client_decoders<'a>(
1819    registry: &ModuleInitRegistry<DynClientModuleInit>,
1820    module_kinds: impl Iterator<Item = (ModuleInstanceId, &'a ModuleKind)>,
1821) -> ModuleDecoderRegistry {
1822    let mut modules = BTreeMap::new();
1823    for (id, kind) in module_kinds {
1824        let Some(init) = registry.get(kind) else {
1825            debug!("Detected configuration for unsupported module id: {id}, kind: {kind}");
1826            continue;
1827        };
1828
1829        modules.insert(
1830            id,
1831            (
1832                kind.clone(),
1833                IClientModuleInit::decoder(AsRef::<dyn IClientModuleInit + 'static>::as_ref(init)),
1834            ),
1835        );
1836    }
1837    ModuleDecoderRegistry::from(modules)
1838}