fedimint_client/
client.rs

1use std::collections::{BTreeMap, HashSet};
2use std::fmt::{self, Formatter};
3use std::future::{Future, pending};
4use std::ops::Range;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9use anyhow::{Context as _, anyhow, bail, format_err};
10use async_stream::try_stream;
11use bitcoin::key::Secp256k1;
12use bitcoin::key::rand::thread_rng;
13use bitcoin::secp256k1::{self, PublicKey};
14use fedimint_api_client::api::global_api::with_request_hook::ApiRequestHook;
15use fedimint_api_client::api::net::Connector;
16use fedimint_api_client::api::{
17    ApiVersionSet, DynGlobalApi, FederationApiExt as _, IGlobalFederationApi,
18};
19use fedimint_client_module::module::recovery::RecoveryProgress;
20use fedimint_client_module::module::{
21    ClientContextIface, ClientModule, ClientModuleRegistry, DynClientModule, FinalClientIface,
22    IClientModule, IdxRange, OutPointRange,
23};
24use fedimint_client_module::oplog::IOperationLog;
25use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy as _};
26use fedimint_client_module::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
27use fedimint_client_module::sm::{ActiveStateMeta, DynState, InactiveStateMeta};
28use fedimint_client_module::transaction::{
29    TRANSACTION_SUBMISSION_MODULE_INSTANCE, TransactionBuilder, TxSubmissionStates,
30    TxSubmissionStatesSM,
31};
32use fedimint_client_module::{
33    AddStateMachinesResult, ClientModuleInstance, GetInviteCodeRequest, ModuleGlobalContextGen,
34    ModuleRecoveryCompleted, TransactionUpdates, TxCreatedEvent,
35};
36use fedimint_core::config::{
37    ClientConfig, FederationId, GlobalClientConfig, JsonClientConfig, ModuleInitRegistry,
38};
39use fedimint_core::core::{DynInput, DynOutput, ModuleInstanceId, ModuleKind, OperationId};
40use fedimint_core::db::{
41    AutocommitError, Database, DatabaseKey, DatabaseRecord, DatabaseTransaction,
42    IDatabaseTransactionOpsCoreTyped as _, NonCommittable,
43};
44use fedimint_core::encoding::{Decodable, Encodable};
45use fedimint_core::endpoint_constants::{CLIENT_CONFIG_ENDPOINT, VERSION_ENDPOINT};
46use fedimint_core::envs::is_running_in_test_env;
47use fedimint_core::invite_code::InviteCode;
48use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
49use fedimint_core::module::{
50    ApiRequestErased, ApiVersion, MultiApiVersion, SupportedApiVersionsSummary,
51    SupportedCoreApiVersions, SupportedModuleApiVersions,
52};
53use fedimint_core::net::api_announcement::SignedApiAnnouncement;
54use fedimint_core::task::{Elapsed, MaybeSend, MaybeSync, TaskGroup};
55use fedimint_core::transaction::Transaction;
56use fedimint_core::util::{
57    BoxStream, FmtCompact as _, FmtCompactAnyhow as _, SafeUrl, backoff_util, retry,
58};
59use fedimint_core::{
60    Amount, NumPeers, OutPoint, PeerId, apply, async_trait_maybe_send,
61    fedimint_build_code_version_env, maybe_add_send, maybe_add_send_sync, runtime,
62};
63use fedimint_derive_secret::DerivableSecret;
64use fedimint_eventlog::{
65    DBTransactionEventLogExt as _, Event, EventKind, EventLogEntry, EventLogId, PersistedLogEntry,
66};
67use fedimint_logging::{LOG_CLIENT, LOG_CLIENT_NET_API, LOG_CLIENT_RECOVERY};
68use futures::stream::FuturesUnordered;
69use futures::{Stream, StreamExt as _};
70use global_ctx::ModuleGlobalClientContext;
71use tokio::sync::{broadcast, watch};
72use tokio_stream::wrappers::WatchStream;
73use tracing::{debug, info, warn};
74
75use crate::ClientBuilder;
76use crate::api_announcements::{ApiAnnouncementPrefix, get_api_urls};
77use crate::backup::Metadata;
78use crate::db::{
79    ApiSecretKey, CachedApiVersionSet, CachedApiVersionSetKey, ClientConfigKey, ClientMetadataKey,
80    ClientModuleRecovery, ClientModuleRecoveryState, EncodedClientSecretKey, OperationLogKey,
81    PeerLastApiVersionsSummary, PeerLastApiVersionsSummaryKey, apply_migrations_core_client_dbtx,
82    get_decoded_client_secret, verify_client_db_integrity_dbtx,
83};
84use crate::meta::MetaService;
85use crate::module_init::{ClientModuleInitRegistry, DynClientModuleInit, IClientModuleInit};
86use crate::oplog::OperationLog;
87use crate::sm::executor::{
88    ActiveModuleOperationStateKeyPrefix, ActiveOperationStateKeyPrefix, Executor,
89    InactiveModuleOperationStateKeyPrefix, InactiveOperationStateKeyPrefix,
90};
91
92pub(crate) mod builder;
93pub(crate) mod global_ctx;
94pub(crate) mod handle;
95
96/// List of core api versions supported by the implementation.
97/// Notably `major` version is the one being supported, and corresponding
98/// `minor` version is the one required (for given `major` version).
99const SUPPORTED_CORE_API_VERSIONS: &[fedimint_core::module::ApiVersion] =
100    &[ApiVersion { major: 0, minor: 0 }];
101
102/// Main client type
103///
104/// A handle and API to interacting with a single federation. End user
105/// applications that want to support interacting with multiple federations at
106/// the same time, will need to instantiate and manage multiple instances of
107/// this struct.
108///
109/// Under the hood it is starting and managing service tasks, state machines,
110/// database and other resources required.
111///
112/// This type is shared externally and internally, and
113/// [`crate::ClientHandle`] is responsible for external lifecycle management
114/// and resource freeing of the [`Client`].
115pub struct Client {
116    final_client: FinalClientIface,
117    config: tokio::sync::RwLock<ClientConfig>,
118    api_secret: Option<String>,
119    decoders: ModuleDecoderRegistry,
120    db: Database,
121    federation_id: FederationId,
122    federation_config_meta: BTreeMap<String, String>,
123    primary_module_instance: ModuleInstanceId,
124    pub(crate) modules: ClientModuleRegistry,
125    module_inits: ClientModuleInitRegistry,
126    executor: Executor,
127    pub(crate) api: DynGlobalApi,
128    root_secret: DerivableSecret,
129    operation_log: OperationLog,
130    secp_ctx: Secp256k1<secp256k1::All>,
131    meta_service: Arc<MetaService>,
132    connector: Connector,
133
134    task_group: TaskGroup,
135
136    /// Updates about client recovery progress
137    client_recovery_progress_receiver:
138        watch::Receiver<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
139
140    /// Internal client sender to wake up log ordering task every time a
141    /// (unuordered) log event is added.
142    log_ordering_wakeup_tx: watch::Sender<()>,
143    /// Receiver for events fired every time (ordered) log event is added.
144    log_event_added_rx: watch::Receiver<()>,
145    log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
146    request_hook: ApiRequestHook,
147}
148
149impl Client {
150    /// Initialize a client builder that can be configured to create a new
151    /// client.
152    pub async fn builder(db: Database) -> anyhow::Result<ClientBuilder> {
153        let mut dbtx = db.begin_transaction().await;
154        apply_migrations_core_client_dbtx(&mut dbtx.to_ref_nc(), "fedimint-client".to_string())
155            .await?;
156        if is_running_in_test_env() {
157            verify_client_db_integrity_dbtx(&mut dbtx.to_ref_nc()).await;
158        }
159        dbtx.commit_tx_result().await?;
160
161        Ok(ClientBuilder::new(db))
162    }
163
164    pub fn api(&self) -> &(dyn IGlobalFederationApi + 'static) {
165        self.api.as_ref()
166    }
167
168    pub fn api_clone(&self) -> DynGlobalApi {
169        self.api.clone()
170    }
171
172    /// Get the [`TaskGroup`] that is tied to Client's lifetime.
173    pub fn task_group(&self) -> &TaskGroup {
174        &self.task_group
175    }
176
177    /// Useful for our CLI tooling, not meant for external use
178    #[doc(hidden)]
179    pub fn executor(&self) -> &Executor {
180        &self.executor
181    }
182
183    pub async fn get_config_from_db(db: &Database) -> Option<ClientConfig> {
184        let mut dbtx = db.begin_transaction_nc().await;
185        dbtx.get_value(&ClientConfigKey).await
186    }
187
188    pub async fn get_api_secret_from_db(db: &Database) -> Option<String> {
189        let mut dbtx = db.begin_transaction_nc().await;
190        dbtx.get_value(&ApiSecretKey).await
191    }
192
193    pub async fn store_encodable_client_secret<T: Encodable>(
194        db: &Database,
195        secret: T,
196    ) -> anyhow::Result<()> {
197        let mut dbtx = db.begin_transaction().await;
198
199        // Don't overwrite an existing secret
200        if dbtx.get_value(&EncodedClientSecretKey).await.is_some() {
201            bail!("Encoded client secret already exists, cannot overwrite")
202        }
203
204        let encoded_secret = T::consensus_encode_to_vec(&secret);
205        dbtx.insert_entry(&EncodedClientSecretKey, &encoded_secret)
206            .await;
207        dbtx.commit_tx().await;
208        Ok(())
209    }
210
211    pub async fn load_decodable_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
212        let Some(secret) = Self::load_decodable_client_secret_opt(db).await? else {
213            bail!("Encoded client secret not present in DB")
214        };
215
216        Ok(secret)
217    }
218    pub async fn load_decodable_client_secret_opt<T: Decodable>(
219        db: &Database,
220    ) -> anyhow::Result<Option<T>> {
221        let mut dbtx = db.begin_transaction_nc().await;
222
223        let client_secret = dbtx.get_value(&EncodedClientSecretKey).await;
224
225        Ok(match client_secret {
226            Some(client_secret) => Some(
227                T::consensus_decode_whole(&client_secret, &ModuleRegistry::default())
228                    .map_err(|e| anyhow!("Decoding failed: {e}"))?,
229            ),
230            None => None,
231        })
232    }
233
234    pub async fn load_or_generate_client_secret(db: &Database) -> anyhow::Result<[u8; 64]> {
235        let client_secret = match Self::load_decodable_client_secret::<[u8; 64]>(db).await {
236            Ok(secret) => secret,
237            _ => {
238                let secret = PlainRootSecretStrategy::random(&mut thread_rng());
239                Self::store_encodable_client_secret(db, secret)
240                    .await
241                    .expect("Storing client secret must work");
242                secret
243            }
244        };
245        Ok(client_secret)
246    }
247
248    pub async fn is_initialized(db: &Database) -> bool {
249        Self::get_config_from_db(db).await.is_some()
250    }
251
252    pub fn start_executor(self: &Arc<Self>) {
253        debug!(
254            target: LOG_CLIENT,
255            "Starting fedimint client executor (version: {})",
256            fedimint_build_code_version_env!()
257        );
258        self.executor.start_executor(self.context_gen());
259    }
260
261    pub fn federation_id(&self) -> FederationId {
262        self.federation_id
263    }
264
265    fn context_gen(self: &Arc<Self>) -> ModuleGlobalContextGen {
266        let client_inner = Arc::downgrade(self);
267        Arc::new(move |module_instance, operation| {
268            ModuleGlobalClientContext {
269                client: client_inner
270                    .clone()
271                    .upgrade()
272                    .expect("ModuleGlobalContextGen called after client was dropped"),
273                module_instance_id: module_instance,
274                operation,
275            }
276            .into()
277        })
278    }
279
280    pub async fn config(&self) -> ClientConfig {
281        self.config.read().await.clone()
282    }
283
284    pub fn api_secret(&self) -> &Option<String> {
285        &self.api_secret
286    }
287
288    pub fn decoders(&self) -> &ModuleDecoderRegistry {
289        &self.decoders
290    }
291
292    /// Returns a reference to the module, panics if not found
293    fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
294        self.try_get_module(instance)
295            .expect("Module instance not found")
296    }
297
298    fn try_get_module(
299        &self,
300        instance: ModuleInstanceId,
301    ) -> Option<&maybe_add_send_sync!(dyn IClientModule)> {
302        Some(self.modules.get(instance)?.as_ref())
303    }
304
305    pub fn has_module(&self, instance: ModuleInstanceId) -> bool {
306        self.modules.get(instance).is_some()
307    }
308
309    /// Returns the input amount and output amount of a transaction
310    ///
311    /// # Panics
312    /// If any of the input or output versions in the transaction builder are
313    /// unknown by the respective module.
314    fn transaction_builder_balance(&self, builder: &TransactionBuilder) -> (Amount, Amount) {
315        // FIXME: prevent overflows, currently not suitable for untrusted input
316        let mut in_amount = Amount::ZERO;
317        let mut out_amount = Amount::ZERO;
318        let mut fee_amount = Amount::ZERO;
319
320        for input in builder.inputs() {
321            let module = self.get_module(input.input.module_instance_id());
322
323            let item_fee = module.input_fee(input.amount, &input.input).expect(
324                "We only build transactions with input versions that are supported by the module",
325            );
326
327            in_amount += input.amount;
328            fee_amount += item_fee;
329        }
330
331        for output in builder.outputs() {
332            let module = self.get_module(output.output.module_instance_id());
333
334            let item_fee = module.output_fee(output.amount, &output.output).expect(
335                "We only build transactions with output versions that are supported by the module",
336            );
337
338            out_amount += output.amount;
339            fee_amount += item_fee;
340        }
341
342        (in_amount, out_amount + fee_amount)
343    }
344
345    pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
346        Ok((self.federation_id().to_fake_ln_pub_key(&self.secp_ctx)?, 0))
347    }
348
349    /// Get metadata value from the federation config itself
350    pub fn get_config_meta(&self, key: &str) -> Option<String> {
351        self.federation_config_meta.get(key).cloned()
352    }
353
354    pub(crate) fn root_secret(&self) -> DerivableSecret {
355        self.root_secret.clone()
356    }
357
358    pub async fn add_state_machines(
359        &self,
360        dbtx: &mut DatabaseTransaction<'_>,
361        states: Vec<DynState>,
362    ) -> AddStateMachinesResult {
363        self.executor.add_state_machines_dbtx(dbtx, states).await
364    }
365
366    // TODO: implement as part of [`OperationLog`]
367    pub async fn get_active_operations(&self) -> HashSet<OperationId> {
368        let active_states = self.executor.get_active_states().await;
369        let mut active_operations = HashSet::with_capacity(active_states.len());
370        let mut dbtx = self.db().begin_transaction_nc().await;
371        for (state, _) in active_states {
372            let operation_id = state.operation_id();
373            if dbtx
374                .get_value(&OperationLogKey { operation_id })
375                .await
376                .is_some()
377            {
378                active_operations.insert(operation_id);
379            }
380        }
381        active_operations
382    }
383
384    pub fn operation_log(&self) -> &OperationLog {
385        &self.operation_log
386    }
387
388    /// Get the meta manager to read meta fields.
389    pub fn meta_service(&self) -> &Arc<MetaService> {
390        &self.meta_service
391    }
392
393    /// Get the meta manager to read meta fields.
394    pub async fn get_meta_expiration_timestamp(&self) -> Option<SystemTime> {
395        let meta_service = self.meta_service();
396        let ts = meta_service
397            .get_field::<u64>(self.db(), "federation_expiry_timestamp")
398            .await
399            .and_then(|v| v.value)?;
400        Some(UNIX_EPOCH + Duration::from_secs(ts))
401    }
402
403    /// Adds funding to a transaction or removes over-funding via change.
404    async fn finalize_transaction(
405        &self,
406        dbtx: &mut DatabaseTransaction<'_>,
407        operation_id: OperationId,
408        mut partial_transaction: TransactionBuilder,
409    ) -> anyhow::Result<(Transaction, Vec<DynState>, Range<u64>)> {
410        let (input_amount, output_amount) = self.transaction_builder_balance(&partial_transaction);
411
412        let (added_input_bundle, change_outputs) = self
413            .primary_module()
414            .create_final_inputs_and_outputs(
415                self.primary_module_instance,
416                dbtx,
417                operation_id,
418                input_amount,
419                output_amount,
420            )
421            .await?;
422
423        // This is the range of  outputs that will be added to the transaction
424        // in order to balance it. Notice that it may stay empty in case the transaction
425        // is already balanced.
426        let change_range = Range {
427            start: partial_transaction.outputs().count() as u64,
428            end: (partial_transaction.outputs().count() + change_outputs.outputs().len()) as u64,
429        };
430
431        partial_transaction = partial_transaction
432            .with_inputs(added_input_bundle)
433            .with_outputs(change_outputs);
434
435        let (input_amount, output_amount) = self.transaction_builder_balance(&partial_transaction);
436
437        assert!(input_amount >= output_amount, "Transaction is underfunded");
438
439        let (tx, states) = partial_transaction.build(&self.secp_ctx, thread_rng());
440
441        Ok((tx, states, change_range))
442    }
443
444    /// Add funding and/or change to the transaction builder as needed, finalize
445    /// the transaction and submit it to the federation.
446    ///
447    /// ## Errors
448    /// The function will return an error if the operation with given ID already
449    /// exists.
450    ///
451    /// ## Panics
452    /// The function will panic if the database transaction collides with
453    /// other and fails with others too often, this should not happen except for
454    /// excessively concurrent scenarios.
455    pub async fn finalize_and_submit_transaction<F, M>(
456        &self,
457        operation_id: OperationId,
458        operation_type: &str,
459        operation_meta_gen: F,
460        tx_builder: TransactionBuilder,
461    ) -> anyhow::Result<OutPointRange>
462    where
463        F: Fn(OutPointRange) -> M + Clone + MaybeSend + MaybeSync,
464        M: serde::Serialize + MaybeSend,
465    {
466        let operation_type = operation_type.to_owned();
467
468        let autocommit_res = self
469            .db
470            .autocommit(
471                |dbtx, _| {
472                    let operation_type = operation_type.clone();
473                    let tx_builder = tx_builder.clone();
474                    let operation_meta_gen = operation_meta_gen.clone();
475                    Box::pin(async move {
476                        if Client::operation_exists_dbtx(dbtx, operation_id).await {
477                            bail!("There already exists an operation with id {operation_id:?}")
478                        }
479
480                        let out_point_range = self
481                            .finalize_and_submit_transaction_inner(dbtx, operation_id, tx_builder)
482                            .await?;
483
484                        self.operation_log()
485                            .add_operation_log_entry_dbtx(
486                                dbtx,
487                                operation_id,
488                                &operation_type,
489                                operation_meta_gen(out_point_range),
490                            )
491                            .await;
492
493                        Ok(out_point_range)
494                    })
495                },
496                Some(100), // TODO: handle what happens after 100 retries
497            )
498            .await;
499
500        match autocommit_res {
501            Ok(txid) => Ok(txid),
502            Err(AutocommitError::ClosureError { error, .. }) => Err(error),
503            Err(AutocommitError::CommitFailed {
504                attempts,
505                last_error,
506            }) => panic!(
507                "Failed to commit tx submission dbtx after {attempts} attempts: {last_error}"
508            ),
509        }
510    }
511
512    async fn finalize_and_submit_transaction_inner(
513        &self,
514        dbtx: &mut DatabaseTransaction<'_>,
515        operation_id: OperationId,
516        tx_builder: TransactionBuilder,
517    ) -> anyhow::Result<OutPointRange> {
518        let (transaction, mut states, change_range) = self
519            .finalize_transaction(&mut dbtx.to_ref_nc(), operation_id, tx_builder)
520            .await?;
521
522        if transaction.consensus_encode_to_vec().len() > Transaction::MAX_TX_SIZE {
523            let inputs = transaction
524                .inputs
525                .iter()
526                .map(DynInput::module_instance_id)
527                .collect::<Vec<_>>();
528            let outputs = transaction
529                .outputs
530                .iter()
531                .map(DynOutput::module_instance_id)
532                .collect::<Vec<_>>();
533            warn!(
534                target: LOG_CLIENT_NET_API,
535                size=%transaction.consensus_encode_to_vec().len(),
536                ?inputs,
537                ?outputs,
538                "Transaction too large",
539            );
540            debug!(target: LOG_CLIENT_NET_API, ?transaction, "transaction details");
541            bail!(
542                "The generated transaction would be rejected by the federation for being too large."
543            );
544        }
545
546        let txid = transaction.tx_hash();
547
548        debug!(target: LOG_CLIENT_NET_API, %txid, ?transaction,  "Finalized and submitting transaction");
549
550        let tx_submission_sm = DynState::from_typed(
551            TRANSACTION_SUBMISSION_MODULE_INSTANCE,
552            TxSubmissionStatesSM {
553                operation_id,
554                state: TxSubmissionStates::Created(transaction),
555            },
556        );
557        states.push(tx_submission_sm);
558
559        self.executor.add_state_machines_dbtx(dbtx, states).await?;
560
561        self.log_event_dbtx(dbtx, None, TxCreatedEvent { txid, operation_id })
562            .await;
563
564        Ok(OutPointRange::new(txid, IdxRange::from(change_range)))
565    }
566
567    async fn transaction_update_stream(
568        &self,
569        operation_id: OperationId,
570    ) -> BoxStream<'static, TxSubmissionStatesSM> {
571        self.executor
572            .notifier()
573            .module_notifier::<TxSubmissionStatesSM>(
574                TRANSACTION_SUBMISSION_MODULE_INSTANCE,
575                self.final_client.clone(),
576            )
577            .subscribe(operation_id)
578            .await
579    }
580
581    pub async fn operation_exists(&self, operation_id: OperationId) -> bool {
582        let mut dbtx = self.db().begin_transaction_nc().await;
583
584        Client::operation_exists_dbtx(&mut dbtx, operation_id).await
585    }
586
587    pub async fn operation_exists_dbtx(
588        dbtx: &mut DatabaseTransaction<'_>,
589        operation_id: OperationId,
590    ) -> bool {
591        let active_state_exists = dbtx
592            .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
593            .await
594            .next()
595            .await
596            .is_some();
597
598        let inactive_state_exists = dbtx
599            .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
600            .await
601            .next()
602            .await
603            .is_some();
604
605        active_state_exists || inactive_state_exists
606    }
607
608    pub async fn has_active_states(&self, operation_id: OperationId) -> bool {
609        self.db
610            .begin_transaction_nc()
611            .await
612            .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
613            .await
614            .next()
615            .await
616            .is_some()
617    }
618
619    /// Waits for an output from the primary module to reach its final
620    /// state.
621    pub async fn await_primary_module_output(
622        &self,
623        operation_id: OperationId,
624        out_point: OutPoint,
625    ) -> anyhow::Result<()> {
626        self.primary_module()
627            .await_primary_module_output(operation_id, out_point)
628            .await
629    }
630
631    /// Returns a reference to a typed module client instance by kind
632    pub fn get_first_module<M: ClientModule>(&self) -> anyhow::Result<ClientModuleInstance<M>> {
633        let module_kind = M::kind();
634        let id = self
635            .get_first_instance(&module_kind)
636            .ok_or_else(|| format_err!("No modules found of kind {module_kind}"))?;
637        let module: &M = self
638            .try_get_module(id)
639            .ok_or_else(|| format_err!("Unknown module instance {id}"))?
640            .as_any()
641            .downcast_ref::<M>()
642            .ok_or_else(|| format_err!("Module is not of type {}", std::any::type_name::<M>()))?;
643        let (db, _) = self.db().with_prefix_module_id(id);
644        Ok(ClientModuleInstance {
645            id,
646            db,
647            api: self.api().with_module(id),
648            module,
649        })
650    }
651
652    pub fn get_module_client_dyn(
653        &self,
654        instance_id: ModuleInstanceId,
655    ) -> anyhow::Result<&maybe_add_send_sync!(dyn IClientModule)> {
656        self.try_get_module(instance_id)
657            .ok_or(anyhow!("Unknown module instance {}", instance_id))
658    }
659
660    pub fn db(&self) -> &Database {
661        &self.db
662    }
663
664    /// Returns a stream of transaction updates for the given operation id that
665    /// can later be used to watch for a specific transaction being accepted.
666    pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
667        TransactionUpdates {
668            update_stream: self.transaction_update_stream(operation_id).await,
669        }
670    }
671
672    /// Returns the instance id of the first module of the given kind. The
673    /// primary module will always be returned before any other modules (which
674    /// themselves are ordered by their instance ID).
675    pub fn get_first_instance(&self, module_kind: &ModuleKind) -> Option<ModuleInstanceId> {
676        if self
677            .modules
678            .get_with_kind(self.primary_module_instance)
679            .is_some_and(|(kind, _)| kind == module_kind)
680        {
681            return Some(self.primary_module_instance);
682        }
683
684        self.modules
685            .iter_modules()
686            .find(|(_, kind, _module)| *kind == module_kind)
687            .map(|(instance_id, _, _)| instance_id)
688    }
689
690    /// Returns the data from which the client's root secret is derived (e.g.
691    /// BIP39 seed phrase struct).
692    pub async fn root_secret_encoding<T: Decodable>(&self) -> anyhow::Result<T> {
693        get_decoded_client_secret::<T>(self.db()).await
694    }
695
696    /// Waits for outputs from the primary module to reach its final
697    /// state.
698    pub async fn await_primary_module_outputs(
699        &self,
700        operation_id: OperationId,
701        outputs: Vec<OutPoint>,
702    ) -> anyhow::Result<()> {
703        for out_point in outputs {
704            self.await_primary_module_output(operation_id, out_point)
705                .await?;
706        }
707
708        Ok(())
709    }
710
711    /// Returns the config of the client in JSON format.
712    ///
713    /// Compared to the consensus module format where module configs are binary
714    /// encoded this format cannot be cryptographically verified but is easier
715    /// to consume and to some degree human-readable.
716    pub async fn get_config_json(&self) -> JsonClientConfig {
717        self.config().await.to_json()
718    }
719
720    /// Get the primary module
721    pub fn primary_module(&self) -> &DynClientModule {
722        self.modules
723            .get(self.primary_module_instance)
724            .expect("primary module must be present")
725    }
726
727    /// Balance available to the client for spending
728    pub async fn get_balance(&self) -> Amount {
729        self.primary_module()
730            .get_balance(
731                self.primary_module_instance,
732                &mut self.db().begin_transaction_nc().await,
733            )
734            .await
735    }
736
737    /// Returns a stream that yields the current client balance every time it
738    /// changes.
739    pub async fn subscribe_balance_changes(&self) -> BoxStream<'static, Amount> {
740        let mut balance_changes = self.primary_module().subscribe_balance_changes().await;
741        let initial_balance = self.get_balance().await;
742        let db = self.db().clone();
743        let primary_module = self.primary_module().clone();
744        let primary_module_instance = self.primary_module_instance;
745
746        Box::pin(async_stream::stream! {
747            yield initial_balance;
748            let mut prev_balance = initial_balance;
749            while let Some(()) = balance_changes.next().await {
750                let mut dbtx = db.begin_transaction_nc().await;
751                let balance = primary_module
752                    .get_balance(primary_module_instance, &mut dbtx)
753                    .await;
754
755                // Deduplicate in case modules cannot always tell if the balance actually changed
756                if balance != prev_balance {
757                    prev_balance = balance;
758                    yield balance;
759                }
760            }
761        })
762    }
763
764    /// Query the federation for API version support and then calculate
765    /// the best API version to use (supported by most guardians).
766    pub async fn refresh_peers_api_versions(
767        num_peers: NumPeers,
768        api: DynGlobalApi,
769        db: Database,
770        num_responses_sender: watch::Sender<usize>,
771    ) {
772        // Make a single request to a peer after a delay
773        //
774        // The delay is here to unify the type of a future both for initial request and
775        // possible retries.
776        async fn make_request(
777            delay: Duration,
778            peer_id: PeerId,
779            api: &DynGlobalApi,
780        ) -> (
781            PeerId,
782            Result<SupportedApiVersionsSummary, fedimint_api_client::api::PeerError>,
783        ) {
784            runtime::sleep(delay).await;
785            (
786                peer_id,
787                api.request_single_peer::<SupportedApiVersionsSummary>(
788                    VERSION_ENDPOINT.to_owned(),
789                    ApiRequestErased::default(),
790                    peer_id,
791                )
792                .await,
793            )
794        }
795
796        // NOTE: `FuturesUnordered` is a footgun, but since we only poll it for result
797        // and make a single async db write operation, it should be OK.
798        let mut requests = FuturesUnordered::new();
799
800        for peer_id in num_peers.peer_ids() {
801            requests.push(make_request(Duration::ZERO, peer_id, &api));
802        }
803
804        let mut num_responses = 0;
805
806        while let Some((peer_id, response)) = requests.next().await {
807            match response {
808                Err(err) => {
809                    if db
810                        .begin_transaction_nc()
811                        .await
812                        .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
813                        .await
814                        .is_some()
815                    {
816                        debug!(target: LOG_CLIENT, %peer_id, err = %err.fmt_compact(), "Failed to refresh API versions of a peer, but we have a previous response");
817                    } else {
818                        debug!(target: LOG_CLIENT, %peer_id, err = %err.fmt_compact(), "Failed to refresh API versions of a peer, will retry");
819                        requests.push(make_request(Duration::from_secs(15), peer_id, &api));
820                    }
821                }
822                Ok(o) => {
823                    // Save the response to the database right away, just to
824                    // not lose it
825                    let mut dbtx = db.begin_transaction().await;
826                    dbtx.insert_entry(
827                        &PeerLastApiVersionsSummaryKey(peer_id),
828                        &PeerLastApiVersionsSummary(o),
829                    )
830                    .await;
831                    dbtx.commit_tx().await;
832                    num_responses += 1;
833                    // ignore errors: we don't care if anyone is still listening
834                    num_responses_sender.send_replace(num_responses);
835                }
836            }
837        }
838    }
839
840    /// [`SupportedApiVersionsSummary`] that the client and its modules support
841    pub fn supported_api_versions_summary_static(
842        config: &ClientConfig,
843        client_module_init: &ClientModuleInitRegistry,
844    ) -> SupportedApiVersionsSummary {
845        SupportedApiVersionsSummary {
846            core: SupportedCoreApiVersions {
847                core_consensus: config.global.consensus_version,
848                api: MultiApiVersion::try_from_iter(SUPPORTED_CORE_API_VERSIONS.to_owned())
849                    .expect("must not have conflicting versions"),
850            },
851            modules: config
852                .modules
853                .iter()
854                .filter_map(|(&module_instance_id, module_config)| {
855                    client_module_init
856                        .get(module_config.kind())
857                        .map(|module_init| {
858                            (
859                                module_instance_id,
860                                SupportedModuleApiVersions {
861                                    core_consensus: config.global.consensus_version,
862                                    module_consensus: module_config.version,
863                                    api: module_init.supported_api_versions(),
864                                },
865                            )
866                        })
867                })
868                .collect(),
869        }
870    }
871
872    pub async fn load_and_refresh_common_api_version(&self) -> anyhow::Result<ApiVersionSet> {
873        Self::load_and_refresh_common_api_version_static(
874            &self.config().await,
875            &self.module_inits,
876            &self.api,
877            &self.db,
878            &self.task_group,
879        )
880        .await
881    }
882
883    /// Load the common api versions to use from cache and start a background
884    /// process to refresh them.
885    ///
886    /// This is a compromise, so we not have to wait for version discovery to
887    /// complete every time a [`Client`] is being built.
888    async fn load_and_refresh_common_api_version_static(
889        config: &ClientConfig,
890        module_init: &ClientModuleInitRegistry,
891        api: &DynGlobalApi,
892        db: &Database,
893        task_group: &TaskGroup,
894    ) -> anyhow::Result<ApiVersionSet> {
895        if let Some(v) = db
896            .begin_transaction_nc()
897            .await
898            .get_value(&CachedApiVersionSetKey)
899            .await
900        {
901            debug!(
902                target: LOG_CLIENT,
903                "Found existing cached common api versions"
904            );
905            let config = config.clone();
906            let client_module_init = module_init.clone();
907            let api = api.clone();
908            let db = db.clone();
909            let task_group = task_group.clone();
910            // Separate task group, because we actually don't want to be waiting for this to
911            // finish, and it's just best effort.
912            task_group
913                .clone()
914                .spawn_cancellable("refresh_common_api_version_static", async move {
915                    if let Err(error) = Self::refresh_common_api_version_static(
916                        &config,
917                        &client_module_init,
918                        &api,
919                        &db,
920                        task_group,
921                    )
922                    .await
923                    {
924                        warn!(
925                            target: LOG_CLIENT,
926                            err = %error.fmt_compact_anyhow(), "Failed to discover common api versions"
927                        );
928                    }
929                });
930
931            return Ok(v.0);
932        }
933
934        debug!(
935            target: LOG_CLIENT,
936            "No existing cached common api versions found, waiting for initial discovery"
937        );
938        Self::refresh_common_api_version_static(config, module_init, api, db, task_group.clone())
939            .await
940    }
941
942    async fn refresh_common_api_version_static(
943        config: &ClientConfig,
944        client_module_init: &ClientModuleInitRegistry,
945        api: &DynGlobalApi,
946        db: &Database,
947        task_group: TaskGroup,
948    ) -> anyhow::Result<ApiVersionSet> {
949        debug!(
950            target: LOG_CLIENT,
951            "Refreshing common api versions"
952        );
953
954        let (num_responses_sender, mut num_responses_receiver) = tokio::sync::watch::channel(0);
955        let num_peers = NumPeers::from(config.global.api_endpoints.len());
956
957        task_group.spawn_cancellable("refresh peers api versions", {
958            Client::refresh_peers_api_versions(
959                num_peers,
960                api.clone(),
961                db.clone(),
962                num_responses_sender,
963            )
964        });
965
966        // Wait at most 15 seconds before calculating a set of common api versions to
967        // use. Note that all peers individual responses from previous attempts
968        // are still being used, and requests, or even retries for response of
969        // peers are not actually cancelled, as they are happening on a separate
970        // task. This is all just to bound the time user can be waiting
971        // for the join operation to finish, at the risk of picking wrong version in
972        // very rare circumstances.
973        let _: Result<_, Elapsed> = runtime::timeout(
974            Duration::from_secs(15),
975            num_responses_receiver.wait_for(|num| num_peers.threshold() <= *num),
976        )
977        .await;
978
979        let peer_api_version_sets = Self::load_peers_last_api_versions(db, num_peers).await;
980
981        let common_api_versions =
982            fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
983                &Self::supported_api_versions_summary_static(config, client_module_init),
984                &peer_api_version_sets,
985            )?;
986
987        debug!(
988            target: LOG_CLIENT,
989            value = ?common_api_versions,
990            "Updating the cached common api versions"
991        );
992        let mut dbtx = db.begin_transaction().await;
993        let _ = dbtx
994            .insert_entry(
995                &CachedApiVersionSetKey,
996                &CachedApiVersionSet(common_api_versions.clone()),
997            )
998            .await;
999
1000        dbtx.commit_tx().await;
1001
1002        Ok(common_api_versions)
1003    }
1004
1005    /// Get the client [`Metadata`]
1006    pub async fn get_metadata(&self) -> Metadata {
1007        self.db
1008            .begin_transaction_nc()
1009            .await
1010            .get_value(&ClientMetadataKey)
1011            .await
1012            .unwrap_or_else(|| {
1013                warn!(
1014                    target: LOG_CLIENT,
1015                    "Missing existing metadata. This key should have been set on Client init"
1016                );
1017                Metadata::empty()
1018            })
1019    }
1020
1021    /// Set the client [`Metadata`]
1022    pub async fn set_metadata(&self, metadata: &Metadata) {
1023        self.db
1024            .autocommit::<_, _, anyhow::Error>(
1025                |dbtx, _| {
1026                    Box::pin(async {
1027                        Self::set_metadata_dbtx(dbtx, metadata).await;
1028                        Ok(())
1029                    })
1030                },
1031                None,
1032            )
1033            .await
1034            .expect("Failed to autocommit metadata");
1035    }
1036
1037    pub fn has_pending_recoveries(&self) -> bool {
1038        !self
1039            .client_recovery_progress_receiver
1040            .borrow()
1041            .iter()
1042            .all(|(_id, progress)| progress.is_done())
1043    }
1044
1045    /// Wait for all module recoveries to finish
1046    ///
1047    /// This will block until the recovery task is done with recoveries.
1048    /// Returns success if all recovery tasks are complete (success case),
1049    /// or an error if some modules could not complete the recovery at the time.
1050    ///
1051    /// A bit of a heavy approach.
1052    pub async fn wait_for_all_recoveries(&self) -> anyhow::Result<()> {
1053        let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1054        recovery_receiver
1055            .wait_for(|in_progress| {
1056                in_progress
1057                    .iter()
1058                    .all(|(_id, progress)| progress.is_done())
1059            })
1060            .await
1061            .context("Recovery task completed and update receiver disconnected, but some modules failed to recover")?;
1062
1063        Ok(())
1064    }
1065
1066    /// Subscribe to recover progress for all the modules.
1067    ///
1068    /// This stream can contain duplicate progress for a module.
1069    /// Don't use this stream for detecting completion of recovery.
1070    pub fn subscribe_to_recovery_progress(
1071        &self,
1072    ) -> impl Stream<Item = (ModuleInstanceId, RecoveryProgress)> + use<> {
1073        WatchStream::new(self.client_recovery_progress_receiver.clone())
1074            .flat_map(futures::stream::iter)
1075    }
1076
1077    pub async fn wait_for_module_kind_recovery(
1078        &self,
1079        module_kind: ModuleKind,
1080    ) -> anyhow::Result<()> {
1081        let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1082        let config = self.config().await;
1083        recovery_receiver
1084            .wait_for(|in_progress| {
1085                !in_progress
1086                    .iter()
1087                    .filter(|(module_instance_id, _progress)| {
1088                        config.modules[module_instance_id].kind == module_kind
1089                    })
1090                    .any(|(_id, progress)| !progress.is_done())
1091            })
1092            .await
1093            .context("Recovery task completed and update receiver disconnected, but the desired modules are still unavailable or failed to recover")?;
1094
1095        Ok(())
1096    }
1097
1098    pub async fn wait_for_all_active_state_machines(&self) -> anyhow::Result<()> {
1099        loop {
1100            if self.executor.get_active_states().await.is_empty() {
1101                break;
1102            }
1103            fedimint_core::runtime::sleep(Duration::from_millis(100)).await;
1104        }
1105        Ok(())
1106    }
1107
1108    /// Set the client [`Metadata`]
1109    pub async fn set_metadata_dbtx(dbtx: &mut DatabaseTransaction<'_>, metadata: &Metadata) {
1110        dbtx.insert_new_entry(&ClientMetadataKey, metadata).await;
1111    }
1112
1113    fn spawn_module_recoveries_task(
1114        &self,
1115        recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1116        module_recoveries: BTreeMap<
1117            ModuleInstanceId,
1118            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1119        >,
1120        module_recovery_progress_receivers: BTreeMap<
1121            ModuleInstanceId,
1122            watch::Receiver<RecoveryProgress>,
1123        >,
1124    ) {
1125        let db = self.db.clone();
1126        let log_ordering_wakeup_tx = self.log_ordering_wakeup_tx.clone();
1127        self.task_group
1128            .spawn("module recoveries", |_task_handle| async {
1129                Self::run_module_recoveries_task(
1130                    db,
1131                    log_ordering_wakeup_tx,
1132                    recovery_sender,
1133                    module_recoveries,
1134                    module_recovery_progress_receivers,
1135                )
1136                .await;
1137            });
1138    }
1139
1140    async fn run_module_recoveries_task(
1141        db: Database,
1142        log_ordering_wakeup_tx: watch::Sender<()>,
1143        recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1144        module_recoveries: BTreeMap<
1145            ModuleInstanceId,
1146            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1147        >,
1148        module_recovery_progress_receivers: BTreeMap<
1149            ModuleInstanceId,
1150            watch::Receiver<RecoveryProgress>,
1151        >,
1152    ) {
1153        debug!(target: LOG_CLIENT_RECOVERY, num_modules=%module_recovery_progress_receivers.len(), "Staring module recoveries");
1154        let mut completed_stream = Vec::new();
1155        let progress_stream = futures::stream::FuturesUnordered::new();
1156
1157        for (module_instance_id, f) in module_recoveries {
1158            completed_stream.push(futures::stream::once(Box::pin(async move {
1159                match f.await {
1160                    Ok(()) => (module_instance_id, None),
1161                    Err(err) => {
1162                        warn!(
1163                            target: LOG_CLIENT,
1164                            err = %err.fmt_compact_anyhow(), module_instance_id, "Module recovery failed"
1165                        );
1166                        // a module recovery that failed reports and error and
1167                        // just never finishes, so we don't need a separate state
1168                        // for it
1169                        futures::future::pending::<()>().await;
1170                        unreachable!()
1171                    }
1172                }
1173            })));
1174        }
1175
1176        for (module_instance_id, rx) in module_recovery_progress_receivers {
1177            progress_stream.push(
1178                tokio_stream::wrappers::WatchStream::new(rx)
1179                    .fuse()
1180                    .map(move |progress| (module_instance_id, Some(progress))),
1181            );
1182        }
1183
1184        let mut futures = futures::stream::select(
1185            futures::stream::select_all(progress_stream),
1186            futures::stream::select_all(completed_stream),
1187        );
1188
1189        while let Some((module_instance_id, progress)) = futures.next().await {
1190            let mut dbtx = db.begin_transaction().await;
1191
1192            let prev_progress = *recovery_sender
1193                .borrow()
1194                .get(&module_instance_id)
1195                .expect("existing progress must be present");
1196
1197            let progress = if prev_progress.is_done() {
1198                // since updates might be out of order, once done, stick with it
1199                prev_progress
1200            } else if let Some(progress) = progress {
1201                progress
1202            } else {
1203                prev_progress.to_complete()
1204            };
1205
1206            if !prev_progress.is_done() && progress.is_done() {
1207                info!(
1208                    target: LOG_CLIENT,
1209                    module_instance_id,
1210                    progress = format!("{}/{}", progress.complete, progress.total),
1211                    "Recovery complete"
1212                );
1213                dbtx.log_event(
1214                    log_ordering_wakeup_tx.clone(),
1215                    None,
1216                    ModuleRecoveryCompleted {
1217                        module_id: module_instance_id,
1218                    },
1219                )
1220                .await;
1221            } else {
1222                info!(
1223                    target: LOG_CLIENT,
1224                    module_instance_id,
1225                    progress = format!("{}/{}", progress.complete, progress.total),
1226                    "Recovery progress"
1227                );
1228            }
1229
1230            dbtx.insert_entry(
1231                &ClientModuleRecovery { module_instance_id },
1232                &ClientModuleRecoveryState { progress },
1233            )
1234            .await;
1235            dbtx.commit_tx().await;
1236
1237            recovery_sender.send_modify(|v| {
1238                v.insert(module_instance_id, progress);
1239            });
1240        }
1241        debug!(target: LOG_CLIENT_RECOVERY, "Recovery executor stopped");
1242    }
1243
1244    async fn load_peers_last_api_versions(
1245        db: &Database,
1246        num_peers: NumPeers,
1247    ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1248        let mut peer_api_version_sets = BTreeMap::new();
1249
1250        let mut dbtx = db.begin_transaction_nc().await;
1251        for peer_id in num_peers.peer_ids() {
1252            if let Some(v) = dbtx
1253                .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1254                .await
1255            {
1256                peer_api_version_sets.insert(peer_id, v.0);
1257            }
1258        }
1259        drop(dbtx);
1260        peer_api_version_sets
1261    }
1262
1263    /// You likely want to use [`Client::get_peer_urls`]. This function returns
1264    /// only the announcements and doesn't use the config as fallback.
1265    pub async fn get_peer_url_announcements(&self) -> BTreeMap<PeerId, SignedApiAnnouncement> {
1266        self.db()
1267            .begin_transaction_nc()
1268            .await
1269            .find_by_prefix(&ApiAnnouncementPrefix)
1270            .await
1271            .map(|(announcement_key, announcement)| (announcement_key.0, announcement))
1272            .collect()
1273            .await
1274    }
1275
1276    /// Returns a list of guardian API URLs
1277    pub async fn get_peer_urls(&self) -> BTreeMap<PeerId, SafeUrl> {
1278        get_api_urls(&self.db, &self.config().await).await
1279    }
1280
1281    /// Create an invite code with the api endpoint of the given peer which can
1282    /// be used to download this client config
1283    pub async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
1284        self.get_peer_urls()
1285            .await
1286            .into_iter()
1287            .find_map(|(peer_id, url)| (peer == peer_id).then_some(url))
1288            .map(|peer_url| {
1289                InviteCode::new(
1290                    peer_url.clone(),
1291                    peer,
1292                    self.federation_id(),
1293                    self.api_secret.clone(),
1294                )
1295            })
1296    }
1297
1298    /// Blocks till the client has synced the guardian public key set
1299    /// (introduced in version 0.4) and returns it. Once it has been fetched
1300    /// once this function is guaranteed to return immediately.
1301    pub async fn get_guardian_public_keys_blocking(
1302        &self,
1303    ) -> BTreeMap<PeerId, fedimint_core::secp256k1::PublicKey> {
1304        self.db.autocommit(|dbtx, _| Box::pin(async move {
1305            let config = self.config().await;
1306
1307            let guardian_pub_keys = match config.global.broadcast_public_keys { Some(guardian_pub_keys) => {guardian_pub_keys} _ => {
1308                let fetched_config = retry(
1309                    "Fetching guardian public keys",
1310                    backoff_util::background_backoff(),
1311                    || async {
1312                        Ok(self.api.request_current_consensus::<ClientConfig>(
1313                            CLIENT_CONFIG_ENDPOINT.to_owned(),
1314                            ApiRequestErased::default(),
1315                        ).await?)
1316                    },
1317                )
1318                .await
1319                .expect("Will never return on error");
1320
1321                let Some(guardian_pub_keys) = fetched_config.global.broadcast_public_keys else {
1322                    warn!(
1323                        target: LOG_CLIENT,
1324                        "Guardian public keys not found in fetched config, server not updated to 0.4 yet"
1325                    );
1326                    pending::<()>().await;
1327                    unreachable!("Pending will never return");
1328                };
1329
1330                let new_config = ClientConfig {
1331                    global: GlobalClientConfig {
1332                        broadcast_public_keys: Some(guardian_pub_keys.clone()),
1333                        ..config.global
1334                    },
1335                    modules: config.modules,
1336                };
1337
1338                dbtx.insert_entry(&ClientConfigKey, &new_config).await;
1339                *(self.config.write().await) = new_config;
1340                guardian_pub_keys
1341            }};
1342
1343            Result::<_, ()>::Ok(guardian_pub_keys)
1344        }), None).await.expect("Will retry forever")
1345    }
1346
1347    pub fn handle_global_rpc(
1348        &self,
1349        method: String,
1350        params: serde_json::Value,
1351    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1352        Box::pin(try_stream! {
1353            match method.as_str() {
1354                "get_balance" => {
1355                    let balance = self.get_balance().await;
1356                    yield serde_json::to_value(balance)?;
1357                }
1358                "subscribe_balance_changes" => {
1359                    let mut stream = self.subscribe_balance_changes().await;
1360                    while let Some(balance) = stream.next().await {
1361                        yield serde_json::to_value(balance)?;
1362                    }
1363                }
1364                "get_config" => {
1365                    let config = self.config().await;
1366                    yield serde_json::to_value(config)?;
1367                }
1368                "get_federation_id" => {
1369                    let federation_id = self.federation_id();
1370                    yield serde_json::to_value(federation_id)?;
1371                }
1372                "get_invite_code" => {
1373                    let req: GetInviteCodeRequest = serde_json::from_value(params)?;
1374                    let invite_code = self.invite_code(req.peer).await;
1375                    yield serde_json::to_value(invite_code)?;
1376                }
1377                "list_operations" => {
1378                    // TODO: support pagination
1379                    let operations = self.operation_log().paginate_operations_rev(usize::MAX, None).await;
1380                    yield serde_json::to_value(operations)?;
1381                }
1382                "has_pending_recoveries" => {
1383                    let has_pending = self.has_pending_recoveries();
1384                    yield serde_json::to_value(has_pending)?;
1385                }
1386                "wait_for_all_recoveries" => {
1387                    self.wait_for_all_recoveries().await?;
1388                    yield serde_json::Value::Null;
1389                }
1390                "subscribe_to_recovery_progress" => {
1391                    let mut stream = self.subscribe_to_recovery_progress();
1392                    while let Some((module_id, progress)) = stream.next().await {
1393                        yield serde_json::json!({
1394                            "module_id": module_id,
1395                            "progress": progress
1396                        });
1397                    }
1398                }
1399                _ => {
1400                    Err(anyhow::format_err!("Unknown method: {}", method))?;
1401                    unreachable!()
1402                },
1403            }
1404        })
1405    }
1406
1407    pub async fn log_event<E>(&self, module_id: Option<ModuleInstanceId>, event: E)
1408    where
1409        E: Event + Send,
1410    {
1411        let mut dbtx = self.db.begin_transaction().await;
1412        self.log_event_dbtx(&mut dbtx, module_id, event).await;
1413        dbtx.commit_tx().await;
1414    }
1415
1416    pub async fn log_event_dbtx<E, Cap>(
1417        &self,
1418        dbtx: &mut DatabaseTransaction<'_, Cap>,
1419        module_id: Option<ModuleInstanceId>,
1420        event: E,
1421    ) where
1422        E: Event + Send,
1423        Cap: Send,
1424    {
1425        dbtx.log_event(self.log_ordering_wakeup_tx.clone(), module_id, event)
1426            .await;
1427    }
1428
1429    pub async fn log_event_raw_dbtx<Cap>(
1430        &self,
1431        dbtx: &mut DatabaseTransaction<'_, Cap>,
1432        kind: EventKind,
1433        module: Option<(ModuleKind, ModuleInstanceId)>,
1434        payload: Vec<u8>,
1435        persist: bool,
1436    ) where
1437        Cap: Send,
1438    {
1439        let module_id = module.as_ref().map(|m| m.1);
1440        let module_kind = module.map(|m| m.0);
1441        dbtx.log_event_raw(
1442            self.log_ordering_wakeup_tx.clone(),
1443            kind,
1444            module_kind,
1445            module_id,
1446            payload,
1447            persist,
1448        )
1449        .await;
1450    }
1451
1452    pub async fn handle_events<F, R, K>(&self, pos_key: &K, call_fn: F) -> anyhow::Result<()>
1453    where
1454        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1455        K: DatabaseRecord<Value = EventLogId>,
1456        F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
1457        R: Future<Output = anyhow::Result<()>>,
1458    {
1459        fedimint_eventlog::handle_events(
1460            self.db.clone(),
1461            pos_key,
1462            self.log_event_added_rx.clone(),
1463            call_fn,
1464        )
1465        .await
1466    }
1467
1468    pub async fn get_event_log(
1469        &self,
1470        pos: Option<EventLogId>,
1471        limit: u64,
1472    ) -> Vec<PersistedLogEntry> {
1473        self.get_event_log_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
1474            .await
1475    }
1476
1477    pub async fn get_event_log_dbtx<Cap>(
1478        &self,
1479        dbtx: &mut DatabaseTransaction<'_, Cap>,
1480        pos: Option<EventLogId>,
1481        limit: u64,
1482    ) -> Vec<PersistedLogEntry>
1483    where
1484        Cap: Send,
1485    {
1486        dbtx.get_event_log(pos, limit).await
1487    }
1488
1489    /// Register to receiver all new transient (unpersisted) events
1490    pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
1491        self.log_event_added_transient_tx.subscribe()
1492    }
1493}
1494
1495#[apply(async_trait_maybe_send!)]
1496impl ClientContextIface for Client {
1497    fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
1498        Client::get_module(self, instance)
1499    }
1500
1501    fn api_clone(&self) -> DynGlobalApi {
1502        Client::api_clone(self)
1503    }
1504    fn decoders(&self) -> &ModuleDecoderRegistry {
1505        Client::decoders(self)
1506    }
1507
1508    async fn finalize_and_submit_transaction(
1509        &self,
1510        operation_id: OperationId,
1511        operation_type: &str,
1512        operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
1513        tx_builder: TransactionBuilder,
1514    ) -> anyhow::Result<OutPointRange> {
1515        Client::finalize_and_submit_transaction(
1516            self,
1517            operation_id,
1518            operation_type,
1519            // |out_point_range| operation_meta_gen(out_point_range),
1520            &operation_meta_gen,
1521            tx_builder,
1522        )
1523        .await
1524    }
1525
1526    async fn finalize_and_submit_transaction_inner(
1527        &self,
1528        dbtx: &mut DatabaseTransaction<'_>,
1529        operation_id: OperationId,
1530        tx_builder: TransactionBuilder,
1531    ) -> anyhow::Result<OutPointRange> {
1532        Client::finalize_and_submit_transaction_inner(self, dbtx, operation_id, tx_builder).await
1533    }
1534
1535    async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
1536        Client::transaction_updates(self, operation_id).await
1537    }
1538
1539    async fn await_primary_module_outputs(
1540        &self,
1541        operation_id: OperationId,
1542        // TODO: make `impl Iterator<Item = ...>`
1543        outputs: Vec<OutPoint>,
1544    ) -> anyhow::Result<()> {
1545        Client::await_primary_module_outputs(self, operation_id, outputs).await
1546    }
1547
1548    fn operation_log(&self) -> &dyn IOperationLog {
1549        Client::operation_log(self)
1550    }
1551
1552    async fn has_active_states(&self, operation_id: OperationId) -> bool {
1553        Client::has_active_states(self, operation_id).await
1554    }
1555
1556    async fn operation_exists(&self, operation_id: OperationId) -> bool {
1557        Client::operation_exists(self, operation_id).await
1558    }
1559
1560    async fn config(&self) -> ClientConfig {
1561        Client::config(self).await
1562    }
1563
1564    fn db(&self) -> &Database {
1565        Client::db(self)
1566    }
1567
1568    fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static)) {
1569        Client::executor(self)
1570    }
1571
1572    async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
1573        Client::invite_code(self, peer).await
1574    }
1575
1576    fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
1577        Client::get_internal_payment_markers(self)
1578    }
1579
1580    async fn log_event_json(
1581        &self,
1582        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
1583        module_kind: Option<ModuleKind>,
1584        module_id: ModuleInstanceId,
1585        kind: EventKind,
1586        payload: serde_json::Value,
1587        persist: bool,
1588    ) {
1589        dbtx.ensure_global()
1590            .expect("Must be called with global dbtx");
1591        self.log_event_raw_dbtx(
1592            dbtx,
1593            kind,
1594            module_kind.map(|kind| (kind, module_id)),
1595            serde_json::to_vec(&payload).expect("Serialization can't fail"),
1596            persist,
1597        )
1598        .await;
1599    }
1600
1601    async fn read_operation_active_states<'dbtx>(
1602        &self,
1603        operation_id: OperationId,
1604        module_id: ModuleInstanceId,
1605        dbtx: &'dbtx mut DatabaseTransaction<'_>,
1606    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>
1607    {
1608        Box::pin(
1609            dbtx.find_by_prefix(&ActiveModuleOperationStateKeyPrefix {
1610                operation_id,
1611                module_instance: module_id,
1612            })
1613            .await
1614            .map(move |(k, v)| (k.0, v)),
1615        )
1616    }
1617    async fn read_operation_inactive_states<'dbtx>(
1618        &self,
1619        operation_id: OperationId,
1620        module_id: ModuleInstanceId,
1621        dbtx: &'dbtx mut DatabaseTransaction<'_>,
1622    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>
1623    {
1624        Box::pin(
1625            dbtx.find_by_prefix(&InactiveModuleOperationStateKeyPrefix {
1626                operation_id,
1627                module_instance: module_id,
1628            })
1629            .await
1630            .map(move |(k, v)| (k.0, v)),
1631        )
1632    }
1633}
1634
1635// TODO: impl `Debug` for `Client` and derive here
1636impl fmt::Debug for Client {
1637    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1638        write!(f, "Client")
1639    }
1640}
1641
1642pub fn client_decoders<'a>(
1643    registry: &ModuleInitRegistry<DynClientModuleInit>,
1644    module_kinds: impl Iterator<Item = (ModuleInstanceId, &'a ModuleKind)>,
1645) -> ModuleDecoderRegistry {
1646    let mut modules = BTreeMap::new();
1647    for (id, kind) in module_kinds {
1648        let Some(init) = registry.get(kind) else {
1649            debug!("Detected configuration for unsupported module id: {id}, kind: {kind}");
1650            continue;
1651        };
1652
1653        modules.insert(
1654            id,
1655            (
1656                kind.clone(),
1657                IClientModuleInit::decoder(AsRef::<dyn IClientModuleInit + 'static>::as_ref(init)),
1658            ),
1659        );
1660    }
1661    ModuleDecoderRegistry::from(modules)
1662}