fedimint_client/
client.rs

1use std::collections::{BTreeMap, HashSet};
2use std::fmt::{self, Formatter};
3use std::future::{Future, pending};
4use std::ops::Range;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9use anyhow::{Context as _, anyhow, bail, format_err};
10use async_stream::try_stream;
11use bitcoin::key::Secp256k1;
12use bitcoin::key::rand::thread_rng;
13use bitcoin::secp256k1::{self, PublicKey};
14use fedimint_api_client::api::global_api::with_request_hook::ApiRequestHook;
15use fedimint_api_client::api::net::Connector;
16use fedimint_api_client::api::{
17    ApiVersionSet, DynGlobalApi, FederationApiExt as _, IGlobalFederationApi,
18};
19use fedimint_client_module::module::recovery::RecoveryProgress;
20use fedimint_client_module::module::{
21    ClientContextIface, ClientModule, ClientModuleRegistry, DynClientModule, FinalClientIface,
22    IClientModule, IdxRange, OutPointRange,
23};
24use fedimint_client_module::oplog::IOperationLog;
25use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy as _};
26use fedimint_client_module::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
27use fedimint_client_module::sm::{ActiveStateMeta, DynState, InactiveStateMeta};
28use fedimint_client_module::transaction::{
29    TRANSACTION_SUBMISSION_MODULE_INSTANCE, TransactionBuilder, TxSubmissionStates,
30    TxSubmissionStatesSM,
31};
32use fedimint_client_module::{
33    AddStateMachinesResult, ClientModuleInstance, GetInviteCodeRequest, ModuleGlobalContextGen,
34    ModuleRecoveryCompleted, TransactionUpdates, TxCreatedEvent,
35};
36use fedimint_core::config::{
37    ClientConfig, FederationId, GlobalClientConfig, JsonClientConfig, ModuleInitRegistry,
38};
39use fedimint_core::core::{DynInput, DynOutput, ModuleInstanceId, ModuleKind, OperationId};
40use fedimint_core::db::{
41    AutocommitError, Database, DatabaseKey, DatabaseRecord, DatabaseTransaction,
42    IDatabaseTransactionOpsCoreTyped as _, NonCommittable,
43};
44use fedimint_core::encoding::{Decodable, Encodable};
45use fedimint_core::endpoint_constants::{CLIENT_CONFIG_ENDPOINT, VERSION_ENDPOINT};
46use fedimint_core::envs::is_running_in_test_env;
47use fedimint_core::invite_code::InviteCode;
48use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
49use fedimint_core::module::{
50    ApiRequestErased, ApiVersion, MultiApiVersion, SupportedApiVersionsSummary,
51    SupportedCoreApiVersions, SupportedModuleApiVersions,
52};
53use fedimint_core::net::api_announcement::SignedApiAnnouncement;
54use fedimint_core::task::{Elapsed, MaybeSend, MaybeSync, TaskGroup};
55use fedimint_core::transaction::Transaction;
56use fedimint_core::util::{
57    BoxStream, FmtCompact as _, FmtCompactAnyhow as _, SafeUrl, backoff_util, retry,
58};
59use fedimint_core::{
60    Amount, NumPeers, OutPoint, PeerId, apply, async_trait_maybe_send,
61    fedimint_build_code_version_env, maybe_add_send, maybe_add_send_sync, runtime,
62};
63use fedimint_derive_secret::DerivableSecret;
64use fedimint_eventlog::{
65    DBTransactionEventLogExt as _, Event, EventKind, EventLogEntry, EventLogId, PersistedLogEntry,
66};
67use fedimint_logging::{LOG_CLIENT, LOG_CLIENT_NET_API, LOG_CLIENT_RECOVERY};
68use futures::stream::FuturesUnordered;
69use futures::{Stream, StreamExt as _};
70use global_ctx::ModuleGlobalClientContext;
71use serde::{Deserialize, Serialize};
72use tokio::sync::{broadcast, watch};
73use tokio_stream::wrappers::WatchStream;
74use tracing::{debug, info, warn};
75
76use crate::ClientBuilder;
77use crate::api_announcements::{ApiAnnouncementPrefix, get_api_urls};
78use crate::backup::Metadata;
79use crate::db::{
80    ApiSecretKey, CachedApiVersionSet, CachedApiVersionSetKey, ChronologicalOperationLogKey,
81    ClientConfigKey, ClientMetadataKey, ClientModuleRecovery, ClientModuleRecoveryState,
82    EncodedClientSecretKey, OperationLogKey, PeerLastApiVersionsSummary,
83    PeerLastApiVersionsSummaryKey, apply_migrations_core_client_dbtx, get_decoded_client_secret,
84    verify_client_db_integrity_dbtx,
85};
86use crate::meta::MetaService;
87use crate::module_init::{ClientModuleInitRegistry, DynClientModuleInit, IClientModuleInit};
88use crate::oplog::OperationLog;
89use crate::sm::executor::{
90    ActiveModuleOperationStateKeyPrefix, ActiveOperationStateKeyPrefix, Executor,
91    InactiveModuleOperationStateKeyPrefix, InactiveOperationStateKeyPrefix,
92};
93
94pub(crate) mod builder;
95pub(crate) mod global_ctx;
96pub(crate) mod handle;
97
98/// List of core api versions supported by the implementation.
99/// Notably `major` version is the one being supported, and corresponding
100/// `minor` version is the one required (for given `major` version).
101const SUPPORTED_CORE_API_VERSIONS: &[fedimint_core::module::ApiVersion] =
102    &[ApiVersion { major: 0, minor: 0 }];
103
104/// Main client type
105///
106/// A handle and API to interacting with a single federation. End user
107/// applications that want to support interacting with multiple federations at
108/// the same time, will need to instantiate and manage multiple instances of
109/// this struct.
110///
111/// Under the hood it is starting and managing service tasks, state machines,
112/// database and other resources required.
113///
114/// This type is shared externally and internally, and
115/// [`crate::ClientHandle`] is responsible for external lifecycle management
116/// and resource freeing of the [`Client`].
117pub struct Client {
118    final_client: FinalClientIface,
119    config: tokio::sync::RwLock<ClientConfig>,
120    api_secret: Option<String>,
121    decoders: ModuleDecoderRegistry,
122    db: Database,
123    federation_id: FederationId,
124    federation_config_meta: BTreeMap<String, String>,
125    primary_module_instance: ModuleInstanceId,
126    pub(crate) modules: ClientModuleRegistry,
127    module_inits: ClientModuleInitRegistry,
128    executor: Executor,
129    pub(crate) api: DynGlobalApi,
130    root_secret: DerivableSecret,
131    operation_log: OperationLog,
132    secp_ctx: Secp256k1<secp256k1::All>,
133    meta_service: Arc<MetaService>,
134    connector: Connector,
135
136    task_group: TaskGroup,
137
138    /// Updates about client recovery progress
139    client_recovery_progress_receiver:
140        watch::Receiver<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
141
142    /// Internal client sender to wake up log ordering task every time a
143    /// (unuordered) log event is added.
144    log_ordering_wakeup_tx: watch::Sender<()>,
145    /// Receiver for events fired every time (ordered) log event is added.
146    log_event_added_rx: watch::Receiver<()>,
147    log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
148    request_hook: ApiRequestHook,
149}
150
151#[derive(Debug, Serialize, Deserialize)]
152struct ListOperationsParams {
153    limit: Option<usize>,
154    last_seen: Option<ChronologicalOperationLogKey>,
155}
156
157#[derive(Debug, Clone, Serialize, Deserialize)]
158pub struct GetOperationIdRequest {
159    operation_id: OperationId,
160}
161
162impl Client {
163    /// Initialize a client builder that can be configured to create a new
164    /// client.
165    pub async fn builder(db: Database) -> anyhow::Result<ClientBuilder> {
166        let mut dbtx = db.begin_transaction().await;
167        apply_migrations_core_client_dbtx(&mut dbtx.to_ref_nc(), "fedimint-client".to_string())
168            .await?;
169        if is_running_in_test_env() {
170            verify_client_db_integrity_dbtx(&mut dbtx.to_ref_nc()).await;
171        }
172        dbtx.commit_tx_result().await?;
173
174        Ok(ClientBuilder::new(db))
175    }
176
177    pub fn api(&self) -> &(dyn IGlobalFederationApi + 'static) {
178        self.api.as_ref()
179    }
180
181    pub fn api_clone(&self) -> DynGlobalApi {
182        self.api.clone()
183    }
184
185    /// Get the [`TaskGroup`] that is tied to Client's lifetime.
186    pub fn task_group(&self) -> &TaskGroup {
187        &self.task_group
188    }
189
190    /// Useful for our CLI tooling, not meant for external use
191    #[doc(hidden)]
192    pub fn executor(&self) -> &Executor {
193        &self.executor
194    }
195
196    pub async fn get_config_from_db(db: &Database) -> Option<ClientConfig> {
197        let mut dbtx = db.begin_transaction_nc().await;
198        dbtx.get_value(&ClientConfigKey).await
199    }
200
201    pub async fn get_api_secret_from_db(db: &Database) -> Option<String> {
202        let mut dbtx = db.begin_transaction_nc().await;
203        dbtx.get_value(&ApiSecretKey).await
204    }
205
206    pub async fn store_encodable_client_secret<T: Encodable>(
207        db: &Database,
208        secret: T,
209    ) -> anyhow::Result<()> {
210        let mut dbtx = db.begin_transaction().await;
211
212        // Don't overwrite an existing secret
213        if dbtx.get_value(&EncodedClientSecretKey).await.is_some() {
214            bail!("Encoded client secret already exists, cannot overwrite")
215        }
216
217        let encoded_secret = T::consensus_encode_to_vec(&secret);
218        dbtx.insert_entry(&EncodedClientSecretKey, &encoded_secret)
219            .await;
220        dbtx.commit_tx().await;
221        Ok(())
222    }
223
224    pub async fn load_decodable_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
225        let Some(secret) = Self::load_decodable_client_secret_opt(db).await? else {
226            bail!("Encoded client secret not present in DB")
227        };
228
229        Ok(secret)
230    }
231    pub async fn load_decodable_client_secret_opt<T: Decodable>(
232        db: &Database,
233    ) -> anyhow::Result<Option<T>> {
234        let mut dbtx = db.begin_transaction_nc().await;
235
236        let client_secret = dbtx.get_value(&EncodedClientSecretKey).await;
237
238        Ok(match client_secret {
239            Some(client_secret) => Some(
240                T::consensus_decode_whole(&client_secret, &ModuleRegistry::default())
241                    .map_err(|e| anyhow!("Decoding failed: {e}"))?,
242            ),
243            None => None,
244        })
245    }
246
247    pub async fn load_or_generate_client_secret(db: &Database) -> anyhow::Result<[u8; 64]> {
248        let client_secret = match Self::load_decodable_client_secret::<[u8; 64]>(db).await {
249            Ok(secret) => secret,
250            _ => {
251                let secret = PlainRootSecretStrategy::random(&mut thread_rng());
252                Self::store_encodable_client_secret(db, secret)
253                    .await
254                    .expect("Storing client secret must work");
255                secret
256            }
257        };
258        Ok(client_secret)
259    }
260
261    pub async fn is_initialized(db: &Database) -> bool {
262        Self::get_config_from_db(db).await.is_some()
263    }
264
265    pub fn start_executor(self: &Arc<Self>) {
266        debug!(
267            target: LOG_CLIENT,
268            "Starting fedimint client executor (version: {})",
269            fedimint_build_code_version_env!()
270        );
271        self.executor.start_executor(self.context_gen());
272    }
273
274    pub fn federation_id(&self) -> FederationId {
275        self.federation_id
276    }
277
278    fn context_gen(self: &Arc<Self>) -> ModuleGlobalContextGen {
279        let client_inner = Arc::downgrade(self);
280        Arc::new(move |module_instance, operation| {
281            ModuleGlobalClientContext {
282                client: client_inner
283                    .clone()
284                    .upgrade()
285                    .expect("ModuleGlobalContextGen called after client was dropped"),
286                module_instance_id: module_instance,
287                operation,
288            }
289            .into()
290        })
291    }
292
293    pub async fn config(&self) -> ClientConfig {
294        self.config.read().await.clone()
295    }
296
297    pub fn api_secret(&self) -> &Option<String> {
298        &self.api_secret
299    }
300
301    pub fn decoders(&self) -> &ModuleDecoderRegistry {
302        &self.decoders
303    }
304
305    /// Returns a reference to the module, panics if not found
306    fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
307        self.try_get_module(instance)
308            .expect("Module instance not found")
309    }
310
311    fn try_get_module(
312        &self,
313        instance: ModuleInstanceId,
314    ) -> Option<&maybe_add_send_sync!(dyn IClientModule)> {
315        Some(self.modules.get(instance)?.as_ref())
316    }
317
318    pub fn has_module(&self, instance: ModuleInstanceId) -> bool {
319        self.modules.get(instance).is_some()
320    }
321
322    /// Returns the input amount and output amount of a transaction
323    ///
324    /// # Panics
325    /// If any of the input or output versions in the transaction builder are
326    /// unknown by the respective module.
327    fn transaction_builder_balance(&self, builder: &TransactionBuilder) -> (Amount, Amount) {
328        // FIXME: prevent overflows, currently not suitable for untrusted input
329        let mut in_amount = Amount::ZERO;
330        let mut out_amount = Amount::ZERO;
331        let mut fee_amount = Amount::ZERO;
332
333        for input in builder.inputs() {
334            let module = self.get_module(input.input.module_instance_id());
335
336            let item_fee = module.input_fee(input.amount, &input.input).expect(
337                "We only build transactions with input versions that are supported by the module",
338            );
339
340            in_amount += input.amount;
341            fee_amount += item_fee;
342        }
343
344        for output in builder.outputs() {
345            let module = self.get_module(output.output.module_instance_id());
346
347            let item_fee = module.output_fee(output.amount, &output.output).expect(
348                "We only build transactions with output versions that are supported by the module",
349            );
350
351            out_amount += output.amount;
352            fee_amount += item_fee;
353        }
354
355        (in_amount, out_amount + fee_amount)
356    }
357
358    pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
359        Ok((self.federation_id().to_fake_ln_pub_key(&self.secp_ctx)?, 0))
360    }
361
362    /// Get metadata value from the federation config itself
363    pub fn get_config_meta(&self, key: &str) -> Option<String> {
364        self.federation_config_meta.get(key).cloned()
365    }
366
367    pub(crate) fn root_secret(&self) -> DerivableSecret {
368        self.root_secret.clone()
369    }
370
371    pub async fn add_state_machines(
372        &self,
373        dbtx: &mut DatabaseTransaction<'_>,
374        states: Vec<DynState>,
375    ) -> AddStateMachinesResult {
376        self.executor.add_state_machines_dbtx(dbtx, states).await
377    }
378
379    // TODO: implement as part of [`OperationLog`]
380    pub async fn get_active_operations(&self) -> HashSet<OperationId> {
381        let active_states = self.executor.get_active_states().await;
382        let mut active_operations = HashSet::with_capacity(active_states.len());
383        let mut dbtx = self.db().begin_transaction_nc().await;
384        for (state, _) in active_states {
385            let operation_id = state.operation_id();
386            if dbtx
387                .get_value(&OperationLogKey { operation_id })
388                .await
389                .is_some()
390            {
391                active_operations.insert(operation_id);
392            }
393        }
394        active_operations
395    }
396
397    pub fn operation_log(&self) -> &OperationLog {
398        &self.operation_log
399    }
400
401    /// Get the meta manager to read meta fields.
402    pub fn meta_service(&self) -> &Arc<MetaService> {
403        &self.meta_service
404    }
405
406    /// Get the meta manager to read meta fields.
407    pub async fn get_meta_expiration_timestamp(&self) -> Option<SystemTime> {
408        let meta_service = self.meta_service();
409        let ts = meta_service
410            .get_field::<u64>(self.db(), "federation_expiry_timestamp")
411            .await
412            .and_then(|v| v.value)?;
413        Some(UNIX_EPOCH + Duration::from_secs(ts))
414    }
415
416    /// Adds funding to a transaction or removes over-funding via change.
417    async fn finalize_transaction(
418        &self,
419        dbtx: &mut DatabaseTransaction<'_>,
420        operation_id: OperationId,
421        mut partial_transaction: TransactionBuilder,
422    ) -> anyhow::Result<(Transaction, Vec<DynState>, Range<u64>)> {
423        let (input_amount, output_amount) = self.transaction_builder_balance(&partial_transaction);
424
425        let (added_input_bundle, change_outputs) = self
426            .primary_module()
427            .create_final_inputs_and_outputs(
428                self.primary_module_instance,
429                dbtx,
430                operation_id,
431                input_amount,
432                output_amount,
433            )
434            .await?;
435
436        // This is the range of  outputs that will be added to the transaction
437        // in order to balance it. Notice that it may stay empty in case the transaction
438        // is already balanced.
439        let change_range = Range {
440            start: partial_transaction.outputs().count() as u64,
441            end: (partial_transaction.outputs().count() + change_outputs.outputs().len()) as u64,
442        };
443
444        partial_transaction = partial_transaction
445            .with_inputs(added_input_bundle)
446            .with_outputs(change_outputs);
447
448        let (input_amount, output_amount) = self.transaction_builder_balance(&partial_transaction);
449
450        assert!(input_amount >= output_amount, "Transaction is underfunded");
451
452        let (tx, states) = partial_transaction.build(&self.secp_ctx, thread_rng());
453
454        Ok((tx, states, change_range))
455    }
456
457    /// Add funding and/or change to the transaction builder as needed, finalize
458    /// the transaction and submit it to the federation.
459    ///
460    /// ## Errors
461    /// The function will return an error if the operation with given ID already
462    /// exists.
463    ///
464    /// ## Panics
465    /// The function will panic if the database transaction collides with
466    /// other and fails with others too often, this should not happen except for
467    /// excessively concurrent scenarios.
468    pub async fn finalize_and_submit_transaction<F, M>(
469        &self,
470        operation_id: OperationId,
471        operation_type: &str,
472        operation_meta_gen: F,
473        tx_builder: TransactionBuilder,
474    ) -> anyhow::Result<OutPointRange>
475    where
476        F: Fn(OutPointRange) -> M + Clone + MaybeSend + MaybeSync,
477        M: serde::Serialize + MaybeSend,
478    {
479        let operation_type = operation_type.to_owned();
480
481        let autocommit_res = self
482            .db
483            .autocommit(
484                |dbtx, _| {
485                    let operation_type = operation_type.clone();
486                    let tx_builder = tx_builder.clone();
487                    let operation_meta_gen = operation_meta_gen.clone();
488                    Box::pin(async move {
489                        if Client::operation_exists_dbtx(dbtx, operation_id).await {
490                            bail!("There already exists an operation with id {operation_id:?}")
491                        }
492
493                        let out_point_range = self
494                            .finalize_and_submit_transaction_inner(dbtx, operation_id, tx_builder)
495                            .await?;
496
497                        self.operation_log()
498                            .add_operation_log_entry_dbtx(
499                                dbtx,
500                                operation_id,
501                                &operation_type,
502                                operation_meta_gen(out_point_range),
503                            )
504                            .await;
505
506                        Ok(out_point_range)
507                    })
508                },
509                Some(100), // TODO: handle what happens after 100 retries
510            )
511            .await;
512
513        match autocommit_res {
514            Ok(txid) => Ok(txid),
515            Err(AutocommitError::ClosureError { error, .. }) => Err(error),
516            Err(AutocommitError::CommitFailed {
517                attempts,
518                last_error,
519            }) => panic!(
520                "Failed to commit tx submission dbtx after {attempts} attempts: {last_error}"
521            ),
522        }
523    }
524
525    async fn finalize_and_submit_transaction_inner(
526        &self,
527        dbtx: &mut DatabaseTransaction<'_>,
528        operation_id: OperationId,
529        tx_builder: TransactionBuilder,
530    ) -> anyhow::Result<OutPointRange> {
531        let (transaction, mut states, change_range) = self
532            .finalize_transaction(&mut dbtx.to_ref_nc(), operation_id, tx_builder)
533            .await?;
534
535        if transaction.consensus_encode_to_vec().len() > Transaction::MAX_TX_SIZE {
536            let inputs = transaction
537                .inputs
538                .iter()
539                .map(DynInput::module_instance_id)
540                .collect::<Vec<_>>();
541            let outputs = transaction
542                .outputs
543                .iter()
544                .map(DynOutput::module_instance_id)
545                .collect::<Vec<_>>();
546            warn!(
547                target: LOG_CLIENT_NET_API,
548                size=%transaction.consensus_encode_to_vec().len(),
549                ?inputs,
550                ?outputs,
551                "Transaction too large",
552            );
553            debug!(target: LOG_CLIENT_NET_API, ?transaction, "transaction details");
554            bail!(
555                "The generated transaction would be rejected by the federation for being too large."
556            );
557        }
558
559        let txid = transaction.tx_hash();
560
561        debug!(target: LOG_CLIENT_NET_API, %txid, ?transaction,  "Finalized and submitting transaction");
562
563        let tx_submission_sm = DynState::from_typed(
564            TRANSACTION_SUBMISSION_MODULE_INSTANCE,
565            TxSubmissionStatesSM {
566                operation_id,
567                state: TxSubmissionStates::Created(transaction),
568            },
569        );
570        states.push(tx_submission_sm);
571
572        self.executor.add_state_machines_dbtx(dbtx, states).await?;
573
574        self.log_event_dbtx(dbtx, None, TxCreatedEvent { txid, operation_id })
575            .await;
576
577        Ok(OutPointRange::new(txid, IdxRange::from(change_range)))
578    }
579
580    async fn transaction_update_stream(
581        &self,
582        operation_id: OperationId,
583    ) -> BoxStream<'static, TxSubmissionStatesSM> {
584        self.executor
585            .notifier()
586            .module_notifier::<TxSubmissionStatesSM>(
587                TRANSACTION_SUBMISSION_MODULE_INSTANCE,
588                self.final_client.clone(),
589            )
590            .subscribe(operation_id)
591            .await
592    }
593
594    pub async fn operation_exists(&self, operation_id: OperationId) -> bool {
595        let mut dbtx = self.db().begin_transaction_nc().await;
596
597        Client::operation_exists_dbtx(&mut dbtx, operation_id).await
598    }
599
600    pub async fn operation_exists_dbtx(
601        dbtx: &mut DatabaseTransaction<'_>,
602        operation_id: OperationId,
603    ) -> bool {
604        let active_state_exists = dbtx
605            .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
606            .await
607            .next()
608            .await
609            .is_some();
610
611        let inactive_state_exists = dbtx
612            .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
613            .await
614            .next()
615            .await
616            .is_some();
617
618        active_state_exists || inactive_state_exists
619    }
620
621    pub async fn has_active_states(&self, operation_id: OperationId) -> bool {
622        self.db
623            .begin_transaction_nc()
624            .await
625            .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
626            .await
627            .next()
628            .await
629            .is_some()
630    }
631
632    /// Waits for an output from the primary module to reach its final
633    /// state.
634    pub async fn await_primary_module_output(
635        &self,
636        operation_id: OperationId,
637        out_point: OutPoint,
638    ) -> anyhow::Result<()> {
639        self.primary_module()
640            .await_primary_module_output(operation_id, out_point)
641            .await
642    }
643
644    /// Returns a reference to a typed module client instance by kind
645    pub fn get_first_module<M: ClientModule>(&self) -> anyhow::Result<ClientModuleInstance<M>> {
646        let module_kind = M::kind();
647        let id = self
648            .get_first_instance(&module_kind)
649            .ok_or_else(|| format_err!("No modules found of kind {module_kind}"))?;
650        let module: &M = self
651            .try_get_module(id)
652            .ok_or_else(|| format_err!("Unknown module instance {id}"))?
653            .as_any()
654            .downcast_ref::<M>()
655            .ok_or_else(|| format_err!("Module is not of type {}", std::any::type_name::<M>()))?;
656        let (db, _) = self.db().with_prefix_module_id(id);
657        Ok(ClientModuleInstance {
658            id,
659            db,
660            api: self.api().with_module(id),
661            module,
662        })
663    }
664
665    pub fn get_module_client_dyn(
666        &self,
667        instance_id: ModuleInstanceId,
668    ) -> anyhow::Result<&maybe_add_send_sync!(dyn IClientModule)> {
669        self.try_get_module(instance_id)
670            .ok_or(anyhow!("Unknown module instance {}", instance_id))
671    }
672
673    pub fn db(&self) -> &Database {
674        &self.db
675    }
676
677    /// Returns a stream of transaction updates for the given operation id that
678    /// can later be used to watch for a specific transaction being accepted.
679    pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
680        TransactionUpdates {
681            update_stream: self.transaction_update_stream(operation_id).await,
682        }
683    }
684
685    /// Returns the instance id of the first module of the given kind. The
686    /// primary module will always be returned before any other modules (which
687    /// themselves are ordered by their instance ID).
688    pub fn get_first_instance(&self, module_kind: &ModuleKind) -> Option<ModuleInstanceId> {
689        if self
690            .modules
691            .get_with_kind(self.primary_module_instance)
692            .is_some_and(|(kind, _)| kind == module_kind)
693        {
694            return Some(self.primary_module_instance);
695        }
696
697        self.modules
698            .iter_modules()
699            .find(|(_, kind, _module)| *kind == module_kind)
700            .map(|(instance_id, _, _)| instance_id)
701    }
702
703    /// Returns the data from which the client's root secret is derived (e.g.
704    /// BIP39 seed phrase struct).
705    pub async fn root_secret_encoding<T: Decodable>(&self) -> anyhow::Result<T> {
706        get_decoded_client_secret::<T>(self.db()).await
707    }
708
709    /// Waits for outputs from the primary module to reach its final
710    /// state.
711    pub async fn await_primary_module_outputs(
712        &self,
713        operation_id: OperationId,
714        outputs: Vec<OutPoint>,
715    ) -> anyhow::Result<()> {
716        for out_point in outputs {
717            self.await_primary_module_output(operation_id, out_point)
718                .await?;
719        }
720
721        Ok(())
722    }
723
724    /// Returns the config of the client in JSON format.
725    ///
726    /// Compared to the consensus module format where module configs are binary
727    /// encoded this format cannot be cryptographically verified but is easier
728    /// to consume and to some degree human-readable.
729    pub async fn get_config_json(&self) -> JsonClientConfig {
730        self.config().await.to_json()
731    }
732
733    /// Get the primary module
734    pub fn primary_module(&self) -> &DynClientModule {
735        self.modules
736            .get(self.primary_module_instance)
737            .expect("primary module must be present")
738    }
739
740    /// Balance available to the client for spending
741    pub async fn get_balance(&self) -> Amount {
742        self.primary_module()
743            .get_balance(
744                self.primary_module_instance,
745                &mut self.db().begin_transaction_nc().await,
746            )
747            .await
748    }
749
750    /// Returns a stream that yields the current client balance every time it
751    /// changes.
752    pub async fn subscribe_balance_changes(&self) -> BoxStream<'static, Amount> {
753        let mut balance_changes = self.primary_module().subscribe_balance_changes().await;
754        let initial_balance = self.get_balance().await;
755        let db = self.db().clone();
756        let primary_module = self.primary_module().clone();
757        let primary_module_instance = self.primary_module_instance;
758
759        Box::pin(async_stream::stream! {
760            yield initial_balance;
761            let mut prev_balance = initial_balance;
762            while let Some(()) = balance_changes.next().await {
763                let mut dbtx = db.begin_transaction_nc().await;
764                let balance = primary_module
765                    .get_balance(primary_module_instance, &mut dbtx)
766                    .await;
767
768                // Deduplicate in case modules cannot always tell if the balance actually changed
769                if balance != prev_balance {
770                    prev_balance = balance;
771                    yield balance;
772                }
773            }
774        })
775    }
776
777    /// Query the federation for API version support and then calculate
778    /// the best API version to use (supported by most guardians).
779    pub async fn refresh_peers_api_versions(
780        num_peers: NumPeers,
781        api: DynGlobalApi,
782        db: Database,
783        num_responses_sender: watch::Sender<usize>,
784    ) {
785        // Make a single request to a peer after a delay
786        //
787        // The delay is here to unify the type of a future both for initial request and
788        // possible retries.
789        async fn make_request(
790            delay: Duration,
791            peer_id: PeerId,
792            api: &DynGlobalApi,
793        ) -> (
794            PeerId,
795            Result<SupportedApiVersionsSummary, fedimint_api_client::api::PeerError>,
796        ) {
797            runtime::sleep(delay).await;
798            (
799                peer_id,
800                api.request_single_peer::<SupportedApiVersionsSummary>(
801                    VERSION_ENDPOINT.to_owned(),
802                    ApiRequestErased::default(),
803                    peer_id,
804                )
805                .await,
806            )
807        }
808
809        // NOTE: `FuturesUnordered` is a footgun, but since we only poll it for result
810        // and make a single async db write operation, it should be OK.
811        let mut requests = FuturesUnordered::new();
812
813        for peer_id in num_peers.peer_ids() {
814            requests.push(make_request(Duration::ZERO, peer_id, &api));
815        }
816
817        let mut num_responses = 0;
818
819        while let Some((peer_id, response)) = requests.next().await {
820            match response {
821                Err(err) => {
822                    if db
823                        .begin_transaction_nc()
824                        .await
825                        .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
826                        .await
827                        .is_some()
828                    {
829                        debug!(target: LOG_CLIENT, %peer_id, err = %err.fmt_compact(), "Failed to refresh API versions of a peer, but we have a previous response");
830                    } else {
831                        debug!(target: LOG_CLIENT, %peer_id, err = %err.fmt_compact(), "Failed to refresh API versions of a peer, will retry");
832                        requests.push(make_request(Duration::from_secs(15), peer_id, &api));
833                    }
834                }
835                Ok(o) => {
836                    // Save the response to the database right away, just to
837                    // not lose it
838                    let mut dbtx = db.begin_transaction().await;
839                    dbtx.insert_entry(
840                        &PeerLastApiVersionsSummaryKey(peer_id),
841                        &PeerLastApiVersionsSummary(o),
842                    )
843                    .await;
844                    dbtx.commit_tx().await;
845                    num_responses += 1;
846                    // ignore errors: we don't care if anyone is still listening
847                    num_responses_sender.send_replace(num_responses);
848                }
849            }
850        }
851    }
852
853    /// [`SupportedApiVersionsSummary`] that the client and its modules support
854    pub fn supported_api_versions_summary_static(
855        config: &ClientConfig,
856        client_module_init: &ClientModuleInitRegistry,
857    ) -> SupportedApiVersionsSummary {
858        SupportedApiVersionsSummary {
859            core: SupportedCoreApiVersions {
860                core_consensus: config.global.consensus_version,
861                api: MultiApiVersion::try_from_iter(SUPPORTED_CORE_API_VERSIONS.to_owned())
862                    .expect("must not have conflicting versions"),
863            },
864            modules: config
865                .modules
866                .iter()
867                .filter_map(|(&module_instance_id, module_config)| {
868                    client_module_init
869                        .get(module_config.kind())
870                        .map(|module_init| {
871                            (
872                                module_instance_id,
873                                SupportedModuleApiVersions {
874                                    core_consensus: config.global.consensus_version,
875                                    module_consensus: module_config.version,
876                                    api: module_init.supported_api_versions(),
877                                },
878                            )
879                        })
880                })
881                .collect(),
882        }
883    }
884
885    pub async fn load_and_refresh_common_api_version(&self) -> anyhow::Result<ApiVersionSet> {
886        Self::load_and_refresh_common_api_version_static(
887            &self.config().await,
888            &self.module_inits,
889            &self.api,
890            &self.db,
891            &self.task_group,
892        )
893        .await
894    }
895
896    /// Load the common api versions to use from cache and start a background
897    /// process to refresh them.
898    ///
899    /// This is a compromise, so we not have to wait for version discovery to
900    /// complete every time a [`Client`] is being built.
901    async fn load_and_refresh_common_api_version_static(
902        config: &ClientConfig,
903        module_init: &ClientModuleInitRegistry,
904        api: &DynGlobalApi,
905        db: &Database,
906        task_group: &TaskGroup,
907    ) -> anyhow::Result<ApiVersionSet> {
908        if let Some(v) = db
909            .begin_transaction_nc()
910            .await
911            .get_value(&CachedApiVersionSetKey)
912            .await
913        {
914            debug!(
915                target: LOG_CLIENT,
916                "Found existing cached common api versions"
917            );
918            let config = config.clone();
919            let client_module_init = module_init.clone();
920            let api = api.clone();
921            let db = db.clone();
922            let task_group = task_group.clone();
923            // Separate task group, because we actually don't want to be waiting for this to
924            // finish, and it's just best effort.
925            task_group
926                .clone()
927                .spawn_cancellable("refresh_common_api_version_static", async move {
928                    if let Err(error) = Self::refresh_common_api_version_static(
929                        &config,
930                        &client_module_init,
931                        &api,
932                        &db,
933                        task_group,
934                    )
935                    .await
936                    {
937                        warn!(
938                            target: LOG_CLIENT,
939                            err = %error.fmt_compact_anyhow(), "Failed to discover common api versions"
940                        );
941                    }
942                });
943
944            return Ok(v.0);
945        }
946
947        debug!(
948            target: LOG_CLIENT,
949            "No existing cached common api versions found, waiting for initial discovery"
950        );
951        Self::refresh_common_api_version_static(config, module_init, api, db, task_group.clone())
952            .await
953    }
954
955    async fn refresh_common_api_version_static(
956        config: &ClientConfig,
957        client_module_init: &ClientModuleInitRegistry,
958        api: &DynGlobalApi,
959        db: &Database,
960        task_group: TaskGroup,
961    ) -> anyhow::Result<ApiVersionSet> {
962        debug!(
963            target: LOG_CLIENT,
964            "Refreshing common api versions"
965        );
966
967        let (num_responses_sender, mut num_responses_receiver) = tokio::sync::watch::channel(0);
968        let num_peers = NumPeers::from(config.global.api_endpoints.len());
969
970        task_group.spawn_cancellable("refresh peers api versions", {
971            Client::refresh_peers_api_versions(
972                num_peers,
973                api.clone(),
974                db.clone(),
975                num_responses_sender,
976            )
977        });
978
979        // Wait at most 15 seconds before calculating a set of common api versions to
980        // use. Note that all peers individual responses from previous attempts
981        // are still being used, and requests, or even retries for response of
982        // peers are not actually cancelled, as they are happening on a separate
983        // task. This is all just to bound the time user can be waiting
984        // for the join operation to finish, at the risk of picking wrong version in
985        // very rare circumstances.
986        let _: Result<_, Elapsed> = runtime::timeout(
987            Duration::from_secs(15),
988            num_responses_receiver.wait_for(|num| num_peers.threshold() <= *num),
989        )
990        .await;
991
992        let peer_api_version_sets = Self::load_peers_last_api_versions(db, num_peers).await;
993
994        let common_api_versions =
995            fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
996                &Self::supported_api_versions_summary_static(config, client_module_init),
997                &peer_api_version_sets,
998            )?;
999
1000        debug!(
1001            target: LOG_CLIENT,
1002            value = ?common_api_versions,
1003            "Updating the cached common api versions"
1004        );
1005        let mut dbtx = db.begin_transaction().await;
1006        let _ = dbtx
1007            .insert_entry(
1008                &CachedApiVersionSetKey,
1009                &CachedApiVersionSet(common_api_versions.clone()),
1010            )
1011            .await;
1012
1013        dbtx.commit_tx().await;
1014
1015        Ok(common_api_versions)
1016    }
1017
1018    /// Get the client [`Metadata`]
1019    pub async fn get_metadata(&self) -> Metadata {
1020        self.db
1021            .begin_transaction_nc()
1022            .await
1023            .get_value(&ClientMetadataKey)
1024            .await
1025            .unwrap_or_else(|| {
1026                warn!(
1027                    target: LOG_CLIENT,
1028                    "Missing existing metadata. This key should have been set on Client init"
1029                );
1030                Metadata::empty()
1031            })
1032    }
1033
1034    /// Set the client [`Metadata`]
1035    pub async fn set_metadata(&self, metadata: &Metadata) {
1036        self.db
1037            .autocommit::<_, _, anyhow::Error>(
1038                |dbtx, _| {
1039                    Box::pin(async {
1040                        Self::set_metadata_dbtx(dbtx, metadata).await;
1041                        Ok(())
1042                    })
1043                },
1044                None,
1045            )
1046            .await
1047            .expect("Failed to autocommit metadata");
1048    }
1049
1050    pub fn has_pending_recoveries(&self) -> bool {
1051        !self
1052            .client_recovery_progress_receiver
1053            .borrow()
1054            .iter()
1055            .all(|(_id, progress)| progress.is_done())
1056    }
1057
1058    /// Wait for all module recoveries to finish
1059    ///
1060    /// This will block until the recovery task is done with recoveries.
1061    /// Returns success if all recovery tasks are complete (success case),
1062    /// or an error if some modules could not complete the recovery at the time.
1063    ///
1064    /// A bit of a heavy approach.
1065    pub async fn wait_for_all_recoveries(&self) -> anyhow::Result<()> {
1066        let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1067        recovery_receiver
1068            .wait_for(|in_progress| {
1069                in_progress
1070                    .iter()
1071                    .all(|(_id, progress)| progress.is_done())
1072            })
1073            .await
1074            .context("Recovery task completed and update receiver disconnected, but some modules failed to recover")?;
1075
1076        Ok(())
1077    }
1078
1079    /// Subscribe to recover progress for all the modules.
1080    ///
1081    /// This stream can contain duplicate progress for a module.
1082    /// Don't use this stream for detecting completion of recovery.
1083    pub fn subscribe_to_recovery_progress(
1084        &self,
1085    ) -> impl Stream<Item = (ModuleInstanceId, RecoveryProgress)> + use<> {
1086        WatchStream::new(self.client_recovery_progress_receiver.clone())
1087            .flat_map(futures::stream::iter)
1088    }
1089
1090    pub async fn wait_for_module_kind_recovery(
1091        &self,
1092        module_kind: ModuleKind,
1093    ) -> anyhow::Result<()> {
1094        let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1095        let config = self.config().await;
1096        recovery_receiver
1097            .wait_for(|in_progress| {
1098                !in_progress
1099                    .iter()
1100                    .filter(|(module_instance_id, _progress)| {
1101                        config.modules[module_instance_id].kind == module_kind
1102                    })
1103                    .any(|(_id, progress)| !progress.is_done())
1104            })
1105            .await
1106            .context("Recovery task completed and update receiver disconnected, but the desired modules are still unavailable or failed to recover")?;
1107
1108        Ok(())
1109    }
1110
1111    pub async fn wait_for_all_active_state_machines(&self) -> anyhow::Result<()> {
1112        loop {
1113            if self.executor.get_active_states().await.is_empty() {
1114                break;
1115            }
1116            fedimint_core::runtime::sleep(Duration::from_millis(100)).await;
1117        }
1118        Ok(())
1119    }
1120
1121    /// Set the client [`Metadata`]
1122    pub async fn set_metadata_dbtx(dbtx: &mut DatabaseTransaction<'_>, metadata: &Metadata) {
1123        dbtx.insert_new_entry(&ClientMetadataKey, metadata).await;
1124    }
1125
1126    fn spawn_module_recoveries_task(
1127        &self,
1128        recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1129        module_recoveries: BTreeMap<
1130            ModuleInstanceId,
1131            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1132        >,
1133        module_recovery_progress_receivers: BTreeMap<
1134            ModuleInstanceId,
1135            watch::Receiver<RecoveryProgress>,
1136        >,
1137    ) {
1138        let db = self.db.clone();
1139        let log_ordering_wakeup_tx = self.log_ordering_wakeup_tx.clone();
1140        self.task_group
1141            .spawn("module recoveries", |_task_handle| async {
1142                Self::run_module_recoveries_task(
1143                    db,
1144                    log_ordering_wakeup_tx,
1145                    recovery_sender,
1146                    module_recoveries,
1147                    module_recovery_progress_receivers,
1148                )
1149                .await;
1150            });
1151    }
1152
1153    async fn run_module_recoveries_task(
1154        db: Database,
1155        log_ordering_wakeup_tx: watch::Sender<()>,
1156        recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1157        module_recoveries: BTreeMap<
1158            ModuleInstanceId,
1159            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1160        >,
1161        module_recovery_progress_receivers: BTreeMap<
1162            ModuleInstanceId,
1163            watch::Receiver<RecoveryProgress>,
1164        >,
1165    ) {
1166        debug!(target: LOG_CLIENT_RECOVERY, num_modules=%module_recovery_progress_receivers.len(), "Staring module recoveries");
1167        let mut completed_stream = Vec::new();
1168        let progress_stream = futures::stream::FuturesUnordered::new();
1169
1170        for (module_instance_id, f) in module_recoveries {
1171            completed_stream.push(futures::stream::once(Box::pin(async move {
1172                match f.await {
1173                    Ok(()) => (module_instance_id, None),
1174                    Err(err) => {
1175                        warn!(
1176                            target: LOG_CLIENT,
1177                            err = %err.fmt_compact_anyhow(), module_instance_id, "Module recovery failed"
1178                        );
1179                        // a module recovery that failed reports and error and
1180                        // just never finishes, so we don't need a separate state
1181                        // for it
1182                        futures::future::pending::<()>().await;
1183                        unreachable!()
1184                    }
1185                }
1186            })));
1187        }
1188
1189        for (module_instance_id, rx) in module_recovery_progress_receivers {
1190            progress_stream.push(
1191                tokio_stream::wrappers::WatchStream::new(rx)
1192                    .fuse()
1193                    .map(move |progress| (module_instance_id, Some(progress))),
1194            );
1195        }
1196
1197        let mut futures = futures::stream::select(
1198            futures::stream::select_all(progress_stream),
1199            futures::stream::select_all(completed_stream),
1200        );
1201
1202        while let Some((module_instance_id, progress)) = futures.next().await {
1203            let mut dbtx = db.begin_transaction().await;
1204
1205            let prev_progress = *recovery_sender
1206                .borrow()
1207                .get(&module_instance_id)
1208                .expect("existing progress must be present");
1209
1210            let progress = if prev_progress.is_done() {
1211                // since updates might be out of order, once done, stick with it
1212                prev_progress
1213            } else if let Some(progress) = progress {
1214                progress
1215            } else {
1216                prev_progress.to_complete()
1217            };
1218
1219            if !prev_progress.is_done() && progress.is_done() {
1220                info!(
1221                    target: LOG_CLIENT,
1222                    module_instance_id,
1223                    progress = format!("{}/{}", progress.complete, progress.total),
1224                    "Recovery complete"
1225                );
1226                dbtx.log_event(
1227                    log_ordering_wakeup_tx.clone(),
1228                    None,
1229                    ModuleRecoveryCompleted {
1230                        module_id: module_instance_id,
1231                    },
1232                )
1233                .await;
1234            } else {
1235                info!(
1236                    target: LOG_CLIENT,
1237                    module_instance_id,
1238                    progress = format!("{}/{}", progress.complete, progress.total),
1239                    "Recovery progress"
1240                );
1241            }
1242
1243            dbtx.insert_entry(
1244                &ClientModuleRecovery { module_instance_id },
1245                &ClientModuleRecoveryState { progress },
1246            )
1247            .await;
1248            dbtx.commit_tx().await;
1249
1250            recovery_sender.send_modify(|v| {
1251                v.insert(module_instance_id, progress);
1252            });
1253        }
1254        debug!(target: LOG_CLIENT_RECOVERY, "Recovery executor stopped");
1255    }
1256
1257    async fn load_peers_last_api_versions(
1258        db: &Database,
1259        num_peers: NumPeers,
1260    ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1261        let mut peer_api_version_sets = BTreeMap::new();
1262
1263        let mut dbtx = db.begin_transaction_nc().await;
1264        for peer_id in num_peers.peer_ids() {
1265            if let Some(v) = dbtx
1266                .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1267                .await
1268            {
1269                peer_api_version_sets.insert(peer_id, v.0);
1270            }
1271        }
1272        drop(dbtx);
1273        peer_api_version_sets
1274    }
1275
1276    /// You likely want to use [`Client::get_peer_urls`]. This function returns
1277    /// only the announcements and doesn't use the config as fallback.
1278    pub async fn get_peer_url_announcements(&self) -> BTreeMap<PeerId, SignedApiAnnouncement> {
1279        self.db()
1280            .begin_transaction_nc()
1281            .await
1282            .find_by_prefix(&ApiAnnouncementPrefix)
1283            .await
1284            .map(|(announcement_key, announcement)| (announcement_key.0, announcement))
1285            .collect()
1286            .await
1287    }
1288
1289    /// Returns a list of guardian API URLs
1290    pub async fn get_peer_urls(&self) -> BTreeMap<PeerId, SafeUrl> {
1291        get_api_urls(&self.db, &self.config().await).await
1292    }
1293
1294    /// Create an invite code with the api endpoint of the given peer which can
1295    /// be used to download this client config
1296    pub async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
1297        self.get_peer_urls()
1298            .await
1299            .into_iter()
1300            .find_map(|(peer_id, url)| (peer == peer_id).then_some(url))
1301            .map(|peer_url| {
1302                InviteCode::new(
1303                    peer_url.clone(),
1304                    peer,
1305                    self.federation_id(),
1306                    self.api_secret.clone(),
1307                )
1308            })
1309    }
1310
1311    /// Blocks till the client has synced the guardian public key set
1312    /// (introduced in version 0.4) and returns it. Once it has been fetched
1313    /// once this function is guaranteed to return immediately.
1314    pub async fn get_guardian_public_keys_blocking(
1315        &self,
1316    ) -> BTreeMap<PeerId, fedimint_core::secp256k1::PublicKey> {
1317        self.db
1318            .autocommit(
1319                |dbtx, _| {
1320                    Box::pin(async move {
1321                        let config = self.config().await;
1322
1323                        let guardian_pub_keys = self
1324                            .get_or_backfill_broadcast_public_keys(dbtx, config)
1325                            .await;
1326
1327                        Result::<_, ()>::Ok(guardian_pub_keys)
1328                    })
1329                },
1330                None,
1331            )
1332            .await
1333            .expect("Will retry forever")
1334    }
1335
1336    async fn get_or_backfill_broadcast_public_keys(
1337        &self,
1338        dbtx: &mut DatabaseTransaction<'_>,
1339        config: ClientConfig,
1340    ) -> BTreeMap<PeerId, PublicKey> {
1341        match config.global.broadcast_public_keys {
1342            Some(guardian_pub_keys) => guardian_pub_keys,
1343            _ => {
1344                let (guardian_pub_keys, new_config) = self.fetch_and_update_config(config).await;
1345
1346                dbtx.insert_entry(&ClientConfigKey, &new_config).await;
1347                *(self.config.write().await) = new_config;
1348                guardian_pub_keys
1349            }
1350        }
1351    }
1352
1353    async fn fetch_and_update_config(
1354        &self,
1355        config: ClientConfig,
1356    ) -> (BTreeMap<PeerId, PublicKey>, ClientConfig) {
1357        let fetched_config = retry(
1358            "Fetching guardian public keys",
1359            backoff_util::background_backoff(),
1360            || async {
1361                Ok(self
1362                    .api
1363                    .request_current_consensus::<ClientConfig>(
1364                        CLIENT_CONFIG_ENDPOINT.to_owned(),
1365                        ApiRequestErased::default(),
1366                    )
1367                    .await?)
1368            },
1369        )
1370        .await
1371        .expect("Will never return on error");
1372
1373        let Some(guardian_pub_keys) = fetched_config.global.broadcast_public_keys else {
1374            warn!(
1375                target: LOG_CLIENT,
1376                "Guardian public keys not found in fetched config, server not updated to 0.4 yet"
1377            );
1378            pending::<()>().await;
1379            unreachable!("Pending will never return");
1380        };
1381
1382        let new_config = ClientConfig {
1383            global: GlobalClientConfig {
1384                broadcast_public_keys: Some(guardian_pub_keys.clone()),
1385                ..config.global
1386            },
1387            modules: config.modules,
1388        };
1389        (guardian_pub_keys, new_config)
1390    }
1391
1392    pub fn handle_global_rpc(
1393        &self,
1394        method: String,
1395        params: serde_json::Value,
1396    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1397        Box::pin(try_stream! {
1398            match method.as_str() {
1399                "get_balance" => {
1400                    let balance = self.get_balance().await;
1401                    yield serde_json::to_value(balance)?;
1402                }
1403                "subscribe_balance_changes" => {
1404                    let mut stream = self.subscribe_balance_changes().await;
1405                    while let Some(balance) = stream.next().await {
1406                        yield serde_json::to_value(balance)?;
1407                    }
1408                }
1409                "get_config" => {
1410                    let config = self.config().await;
1411                    yield serde_json::to_value(config)?;
1412                }
1413                "get_federation_id" => {
1414                    let federation_id = self.federation_id();
1415                    yield serde_json::to_value(federation_id)?;
1416                }
1417                "get_invite_code" => {
1418                    let req: GetInviteCodeRequest = serde_json::from_value(params)?;
1419                    let invite_code = self.invite_code(req.peer).await;
1420                    yield serde_json::to_value(invite_code)?;
1421                }
1422                "get_operation" => {
1423                    let req: GetOperationIdRequest = serde_json::from_value(params)?;
1424                    let operation = self.operation_log().get_operation(req.operation_id).await;
1425                    yield serde_json::to_value(operation)?;
1426                }
1427                "list_operations" => {
1428                    let req: ListOperationsParams = serde_json::from_value(params)?;
1429                    let limit = if req.limit.is_none() && req.last_seen.is_none() {
1430                        usize::MAX
1431                    } else {
1432                        req.limit.unwrap_or(usize::MAX)
1433                    };
1434                    let operations = self.operation_log()
1435                        .paginate_operations_rev(limit, req.last_seen)
1436                        .await;
1437                    yield serde_json::to_value(operations)?;
1438                }
1439                "has_pending_recoveries" => {
1440                    let has_pending = self.has_pending_recoveries();
1441                    yield serde_json::to_value(has_pending)?;
1442                }
1443                "wait_for_all_recoveries" => {
1444                    self.wait_for_all_recoveries().await?;
1445                    yield serde_json::Value::Null;
1446                }
1447                "subscribe_to_recovery_progress" => {
1448                    let mut stream = self.subscribe_to_recovery_progress();
1449                    while let Some((module_id, progress)) = stream.next().await {
1450                        yield serde_json::json!({
1451                            "module_id": module_id,
1452                            "progress": progress
1453                        });
1454                    }
1455                }
1456                _ => {
1457                    Err(anyhow::format_err!("Unknown method: {}", method))?;
1458                    unreachable!()
1459                },
1460            }
1461        })
1462    }
1463
1464    pub async fn log_event<E>(&self, module_id: Option<ModuleInstanceId>, event: E)
1465    where
1466        E: Event + Send,
1467    {
1468        let mut dbtx = self.db.begin_transaction().await;
1469        self.log_event_dbtx(&mut dbtx, module_id, event).await;
1470        dbtx.commit_tx().await;
1471    }
1472
1473    pub async fn log_event_dbtx<E, Cap>(
1474        &self,
1475        dbtx: &mut DatabaseTransaction<'_, Cap>,
1476        module_id: Option<ModuleInstanceId>,
1477        event: E,
1478    ) where
1479        E: Event + Send,
1480        Cap: Send,
1481    {
1482        dbtx.log_event(self.log_ordering_wakeup_tx.clone(), module_id, event)
1483            .await;
1484    }
1485
1486    pub async fn log_event_raw_dbtx<Cap>(
1487        &self,
1488        dbtx: &mut DatabaseTransaction<'_, Cap>,
1489        kind: EventKind,
1490        module: Option<(ModuleKind, ModuleInstanceId)>,
1491        payload: Vec<u8>,
1492        persist: bool,
1493    ) where
1494        Cap: Send,
1495    {
1496        let module_id = module.as_ref().map(|m| m.1);
1497        let module_kind = module.map(|m| m.0);
1498        dbtx.log_event_raw(
1499            self.log_ordering_wakeup_tx.clone(),
1500            kind,
1501            module_kind,
1502            module_id,
1503            payload,
1504            persist,
1505        )
1506        .await;
1507    }
1508
1509    pub async fn handle_events<F, R, K>(&self, pos_key: &K, call_fn: F) -> anyhow::Result<()>
1510    where
1511        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1512        K: DatabaseRecord<Value = EventLogId>,
1513        F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
1514        R: Future<Output = anyhow::Result<()>>,
1515    {
1516        fedimint_eventlog::handle_events(
1517            self.db.clone(),
1518            pos_key,
1519            self.log_event_added_rx.clone(),
1520            call_fn,
1521        )
1522        .await
1523    }
1524
1525    pub async fn get_event_log(
1526        &self,
1527        pos: Option<EventLogId>,
1528        limit: u64,
1529    ) -> Vec<PersistedLogEntry> {
1530        self.get_event_log_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
1531            .await
1532    }
1533
1534    pub async fn get_event_log_dbtx<Cap>(
1535        &self,
1536        dbtx: &mut DatabaseTransaction<'_, Cap>,
1537        pos: Option<EventLogId>,
1538        limit: u64,
1539    ) -> Vec<PersistedLogEntry>
1540    where
1541        Cap: Send,
1542    {
1543        dbtx.get_event_log(pos, limit).await
1544    }
1545
1546    /// Register to receiver all new transient (unpersisted) events
1547    pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
1548        self.log_event_added_transient_tx.subscribe()
1549    }
1550}
1551
1552#[apply(async_trait_maybe_send!)]
1553impl ClientContextIface for Client {
1554    fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
1555        Client::get_module(self, instance)
1556    }
1557
1558    fn api_clone(&self) -> DynGlobalApi {
1559        Client::api_clone(self)
1560    }
1561    fn decoders(&self) -> &ModuleDecoderRegistry {
1562        Client::decoders(self)
1563    }
1564
1565    async fn finalize_and_submit_transaction(
1566        &self,
1567        operation_id: OperationId,
1568        operation_type: &str,
1569        operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
1570        tx_builder: TransactionBuilder,
1571    ) -> anyhow::Result<OutPointRange> {
1572        Client::finalize_and_submit_transaction(
1573            self,
1574            operation_id,
1575            operation_type,
1576            // |out_point_range| operation_meta_gen(out_point_range),
1577            &operation_meta_gen,
1578            tx_builder,
1579        )
1580        .await
1581    }
1582
1583    async fn finalize_and_submit_transaction_inner(
1584        &self,
1585        dbtx: &mut DatabaseTransaction<'_>,
1586        operation_id: OperationId,
1587        tx_builder: TransactionBuilder,
1588    ) -> anyhow::Result<OutPointRange> {
1589        Client::finalize_and_submit_transaction_inner(self, dbtx, operation_id, tx_builder).await
1590    }
1591
1592    async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
1593        Client::transaction_updates(self, operation_id).await
1594    }
1595
1596    async fn await_primary_module_outputs(
1597        &self,
1598        operation_id: OperationId,
1599        // TODO: make `impl Iterator<Item = ...>`
1600        outputs: Vec<OutPoint>,
1601    ) -> anyhow::Result<()> {
1602        Client::await_primary_module_outputs(self, operation_id, outputs).await
1603    }
1604
1605    fn operation_log(&self) -> &dyn IOperationLog {
1606        Client::operation_log(self)
1607    }
1608
1609    async fn has_active_states(&self, operation_id: OperationId) -> bool {
1610        Client::has_active_states(self, operation_id).await
1611    }
1612
1613    async fn operation_exists(&self, operation_id: OperationId) -> bool {
1614        Client::operation_exists(self, operation_id).await
1615    }
1616
1617    async fn config(&self) -> ClientConfig {
1618        Client::config(self).await
1619    }
1620
1621    fn db(&self) -> &Database {
1622        Client::db(self)
1623    }
1624
1625    fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static)) {
1626        Client::executor(self)
1627    }
1628
1629    async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
1630        Client::invite_code(self, peer).await
1631    }
1632
1633    fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
1634        Client::get_internal_payment_markers(self)
1635    }
1636
1637    async fn log_event_json(
1638        &self,
1639        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
1640        module_kind: Option<ModuleKind>,
1641        module_id: ModuleInstanceId,
1642        kind: EventKind,
1643        payload: serde_json::Value,
1644        persist: bool,
1645    ) {
1646        dbtx.ensure_global()
1647            .expect("Must be called with global dbtx");
1648        self.log_event_raw_dbtx(
1649            dbtx,
1650            kind,
1651            module_kind.map(|kind| (kind, module_id)),
1652            serde_json::to_vec(&payload).expect("Serialization can't fail"),
1653            persist,
1654        )
1655        .await;
1656    }
1657
1658    async fn read_operation_active_states<'dbtx>(
1659        &self,
1660        operation_id: OperationId,
1661        module_id: ModuleInstanceId,
1662        dbtx: &'dbtx mut DatabaseTransaction<'_>,
1663    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>
1664    {
1665        Box::pin(
1666            dbtx.find_by_prefix(&ActiveModuleOperationStateKeyPrefix {
1667                operation_id,
1668                module_instance: module_id,
1669            })
1670            .await
1671            .map(move |(k, v)| (k.0, v)),
1672        )
1673    }
1674    async fn read_operation_inactive_states<'dbtx>(
1675        &self,
1676        operation_id: OperationId,
1677        module_id: ModuleInstanceId,
1678        dbtx: &'dbtx mut DatabaseTransaction<'_>,
1679    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>
1680    {
1681        Box::pin(
1682            dbtx.find_by_prefix(&InactiveModuleOperationStateKeyPrefix {
1683                operation_id,
1684                module_instance: module_id,
1685            })
1686            .await
1687            .map(move |(k, v)| (k.0, v)),
1688        )
1689    }
1690}
1691
1692// TODO: impl `Debug` for `Client` and derive here
1693impl fmt::Debug for Client {
1694    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1695        write!(f, "Client")
1696    }
1697}
1698
1699pub fn client_decoders<'a>(
1700    registry: &ModuleInitRegistry<DynClientModuleInit>,
1701    module_kinds: impl Iterator<Item = (ModuleInstanceId, &'a ModuleKind)>,
1702) -> ModuleDecoderRegistry {
1703    let mut modules = BTreeMap::new();
1704    for (id, kind) in module_kinds {
1705        let Some(init) = registry.get(kind) else {
1706            debug!("Detected configuration for unsupported module id: {id}, kind: {kind}");
1707            continue;
1708        };
1709
1710        modules.insert(
1711            id,
1712            (
1713                kind.clone(),
1714                IClientModuleInit::decoder(AsRef::<dyn IClientModuleInit + 'static>::as_ref(init)),
1715            ),
1716        );
1717    }
1718    ModuleDecoderRegistry::from(modules)
1719}