1use std::collections::{BTreeMap, HashSet};
2use std::fmt::{self, Formatter};
3use std::future::{Future, pending};
4use std::ops::Range;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9use anyhow::{Context as _, anyhow, bail, format_err};
10use async_stream::try_stream;
11use bitcoin::key::Secp256k1;
12use bitcoin::key::rand::thread_rng;
13use bitcoin::secp256k1::{self, PublicKey};
14use fedimint_api_client::api::global_api::with_request_hook::ApiRequestHook;
15use fedimint_api_client::api::net::Connector;
16use fedimint_api_client::api::{
17 ApiVersionSet, DynGlobalApi, FederationApiExt as _, IGlobalFederationApi,
18};
19use fedimint_client_module::module::recovery::RecoveryProgress;
20use fedimint_client_module::module::{
21 ClientContextIface, ClientModule, ClientModuleRegistry, DynClientModule, FinalClientIface,
22 IClientModule, IdxRange, OutPointRange,
23};
24use fedimint_client_module::oplog::IOperationLog;
25use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy as _};
26use fedimint_client_module::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
27use fedimint_client_module::sm::{ActiveStateMeta, DynState, InactiveStateMeta};
28use fedimint_client_module::transaction::{
29 TRANSACTION_SUBMISSION_MODULE_INSTANCE, TransactionBuilder, TxSubmissionStates,
30 TxSubmissionStatesSM,
31};
32use fedimint_client_module::{
33 AddStateMachinesResult, ClientModuleInstance, GetInviteCodeRequest, ModuleGlobalContextGen,
34 ModuleRecoveryCompleted, TransactionUpdates, TxCreatedEvent,
35};
36use fedimint_core::config::{
37 ClientConfig, FederationId, GlobalClientConfig, JsonClientConfig, ModuleInitRegistry,
38};
39use fedimint_core::core::{DynInput, DynOutput, ModuleInstanceId, ModuleKind, OperationId};
40use fedimint_core::db::{
41 AutocommitError, Database, DatabaseKey, DatabaseRecord, DatabaseTransaction,
42 IDatabaseTransactionOpsCoreTyped as _, NonCommittable,
43};
44use fedimint_core::encoding::{Decodable, Encodable};
45use fedimint_core::endpoint_constants::{CLIENT_CONFIG_ENDPOINT, VERSION_ENDPOINT};
46use fedimint_core::envs::is_running_in_test_env;
47use fedimint_core::invite_code::InviteCode;
48use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
49use fedimint_core::module::{
50 ApiRequestErased, ApiVersion, MultiApiVersion, SupportedApiVersionsSummary,
51 SupportedCoreApiVersions, SupportedModuleApiVersions,
52};
53use fedimint_core::net::api_announcement::SignedApiAnnouncement;
54use fedimint_core::task::{Elapsed, MaybeSend, MaybeSync, TaskGroup};
55use fedimint_core::transaction::Transaction;
56use fedimint_core::util::{
57 BoxStream, FmtCompact as _, FmtCompactAnyhow as _, SafeUrl, backoff_util, retry,
58};
59use fedimint_core::{
60 Amount, NumPeers, OutPoint, PeerId, apply, async_trait_maybe_send,
61 fedimint_build_code_version_env, maybe_add_send, maybe_add_send_sync, runtime,
62};
63use fedimint_derive_secret::DerivableSecret;
64use fedimint_eventlog::{
65 DBTransactionEventLogExt as _, Event, EventKind, EventLogEntry, EventLogId, PersistedLogEntry,
66};
67use fedimint_logging::{LOG_CLIENT, LOG_CLIENT_NET_API, LOG_CLIENT_RECOVERY};
68use futures::stream::FuturesUnordered;
69use futures::{Stream, StreamExt as _};
70use global_ctx::ModuleGlobalClientContext;
71use serde::{Deserialize, Serialize};
72use tokio::sync::{broadcast, watch};
73use tokio_stream::wrappers::WatchStream;
74use tracing::{debug, info, warn};
75
76use crate::ClientBuilder;
77use crate::api_announcements::{ApiAnnouncementPrefix, get_api_urls};
78use crate::backup::Metadata;
79use crate::db::{
80 ApiSecretKey, CachedApiVersionSet, CachedApiVersionSetKey, ChronologicalOperationLogKey,
81 ClientConfigKey, ClientMetadataKey, ClientModuleRecovery, ClientModuleRecoveryState,
82 EncodedClientSecretKey, OperationLogKey, PeerLastApiVersionsSummary,
83 PeerLastApiVersionsSummaryKey, apply_migrations_core_client_dbtx, get_decoded_client_secret,
84 verify_client_db_integrity_dbtx,
85};
86use crate::meta::MetaService;
87use crate::module_init::{ClientModuleInitRegistry, DynClientModuleInit, IClientModuleInit};
88use crate::oplog::OperationLog;
89use crate::sm::executor::{
90 ActiveModuleOperationStateKeyPrefix, ActiveOperationStateKeyPrefix, Executor,
91 InactiveModuleOperationStateKeyPrefix, InactiveOperationStateKeyPrefix,
92};
93
94pub(crate) mod builder;
95pub(crate) mod global_ctx;
96pub(crate) mod handle;
97
98const SUPPORTED_CORE_API_VERSIONS: &[fedimint_core::module::ApiVersion] =
102 &[ApiVersion { major: 0, minor: 0 }];
103
104pub struct Client {
118 final_client: FinalClientIface,
119 config: tokio::sync::RwLock<ClientConfig>,
120 api_secret: Option<String>,
121 decoders: ModuleDecoderRegistry,
122 db: Database,
123 federation_id: FederationId,
124 federation_config_meta: BTreeMap<String, String>,
125 primary_module_instance: ModuleInstanceId,
126 pub(crate) modules: ClientModuleRegistry,
127 module_inits: ClientModuleInitRegistry,
128 executor: Executor,
129 pub(crate) api: DynGlobalApi,
130 root_secret: DerivableSecret,
131 operation_log: OperationLog,
132 secp_ctx: Secp256k1<secp256k1::All>,
133 meta_service: Arc<MetaService>,
134 connector: Connector,
135
136 task_group: TaskGroup,
137
138 client_recovery_progress_receiver:
140 watch::Receiver<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
141
142 log_ordering_wakeup_tx: watch::Sender<()>,
145 log_event_added_rx: watch::Receiver<()>,
147 log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
148 request_hook: ApiRequestHook,
149}
150
151#[derive(Debug, Serialize, Deserialize)]
152struct ListOperationsParams {
153 limit: Option<usize>,
154 last_seen: Option<ChronologicalOperationLogKey>,
155}
156
157#[derive(Debug, Clone, Serialize, Deserialize)]
158pub struct GetOperationIdRequest {
159 operation_id: OperationId,
160}
161
162impl Client {
163 pub async fn builder(db: Database) -> anyhow::Result<ClientBuilder> {
166 let mut dbtx = db.begin_transaction().await;
167 apply_migrations_core_client_dbtx(&mut dbtx.to_ref_nc(), "fedimint-client".to_string())
168 .await?;
169 if is_running_in_test_env() {
170 verify_client_db_integrity_dbtx(&mut dbtx.to_ref_nc()).await;
171 }
172 dbtx.commit_tx_result().await?;
173
174 Ok(ClientBuilder::new(db))
175 }
176
177 pub fn api(&self) -> &(dyn IGlobalFederationApi + 'static) {
178 self.api.as_ref()
179 }
180
181 pub fn api_clone(&self) -> DynGlobalApi {
182 self.api.clone()
183 }
184
185 pub fn task_group(&self) -> &TaskGroup {
187 &self.task_group
188 }
189
190 #[doc(hidden)]
192 pub fn executor(&self) -> &Executor {
193 &self.executor
194 }
195
196 pub async fn get_config_from_db(db: &Database) -> Option<ClientConfig> {
197 let mut dbtx = db.begin_transaction_nc().await;
198 dbtx.get_value(&ClientConfigKey).await
199 }
200
201 pub async fn get_api_secret_from_db(db: &Database) -> Option<String> {
202 let mut dbtx = db.begin_transaction_nc().await;
203 dbtx.get_value(&ApiSecretKey).await
204 }
205
206 pub async fn store_encodable_client_secret<T: Encodable>(
207 db: &Database,
208 secret: T,
209 ) -> anyhow::Result<()> {
210 let mut dbtx = db.begin_transaction().await;
211
212 if dbtx.get_value(&EncodedClientSecretKey).await.is_some() {
214 bail!("Encoded client secret already exists, cannot overwrite")
215 }
216
217 let encoded_secret = T::consensus_encode_to_vec(&secret);
218 dbtx.insert_entry(&EncodedClientSecretKey, &encoded_secret)
219 .await;
220 dbtx.commit_tx().await;
221 Ok(())
222 }
223
224 pub async fn load_decodable_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
225 let Some(secret) = Self::load_decodable_client_secret_opt(db).await? else {
226 bail!("Encoded client secret not present in DB")
227 };
228
229 Ok(secret)
230 }
231 pub async fn load_decodable_client_secret_opt<T: Decodable>(
232 db: &Database,
233 ) -> anyhow::Result<Option<T>> {
234 let mut dbtx = db.begin_transaction_nc().await;
235
236 let client_secret = dbtx.get_value(&EncodedClientSecretKey).await;
237
238 Ok(match client_secret {
239 Some(client_secret) => Some(
240 T::consensus_decode_whole(&client_secret, &ModuleRegistry::default())
241 .map_err(|e| anyhow!("Decoding failed: {e}"))?,
242 ),
243 None => None,
244 })
245 }
246
247 pub async fn load_or_generate_client_secret(db: &Database) -> anyhow::Result<[u8; 64]> {
248 let client_secret = match Self::load_decodable_client_secret::<[u8; 64]>(db).await {
249 Ok(secret) => secret,
250 _ => {
251 let secret = PlainRootSecretStrategy::random(&mut thread_rng());
252 Self::store_encodable_client_secret(db, secret)
253 .await
254 .expect("Storing client secret must work");
255 secret
256 }
257 };
258 Ok(client_secret)
259 }
260
261 pub async fn is_initialized(db: &Database) -> bool {
262 Self::get_config_from_db(db).await.is_some()
263 }
264
265 pub fn start_executor(self: &Arc<Self>) {
266 debug!(
267 target: LOG_CLIENT,
268 "Starting fedimint client executor (version: {})",
269 fedimint_build_code_version_env!()
270 );
271 self.executor.start_executor(self.context_gen());
272 }
273
274 pub fn federation_id(&self) -> FederationId {
275 self.federation_id
276 }
277
278 fn context_gen(self: &Arc<Self>) -> ModuleGlobalContextGen {
279 let client_inner = Arc::downgrade(self);
280 Arc::new(move |module_instance, operation| {
281 ModuleGlobalClientContext {
282 client: client_inner
283 .clone()
284 .upgrade()
285 .expect("ModuleGlobalContextGen called after client was dropped"),
286 module_instance_id: module_instance,
287 operation,
288 }
289 .into()
290 })
291 }
292
293 pub async fn config(&self) -> ClientConfig {
294 self.config.read().await.clone()
295 }
296
297 pub fn api_secret(&self) -> &Option<String> {
298 &self.api_secret
299 }
300
301 pub fn decoders(&self) -> &ModuleDecoderRegistry {
302 &self.decoders
303 }
304
305 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
307 self.try_get_module(instance)
308 .expect("Module instance not found")
309 }
310
311 fn try_get_module(
312 &self,
313 instance: ModuleInstanceId,
314 ) -> Option<&maybe_add_send_sync!(dyn IClientModule)> {
315 Some(self.modules.get(instance)?.as_ref())
316 }
317
318 pub fn has_module(&self, instance: ModuleInstanceId) -> bool {
319 self.modules.get(instance).is_some()
320 }
321
322 fn transaction_builder_balance(&self, builder: &TransactionBuilder) -> (Amount, Amount) {
328 let mut in_amount = Amount::ZERO;
330 let mut out_amount = Amount::ZERO;
331 let mut fee_amount = Amount::ZERO;
332
333 for input in builder.inputs() {
334 let module = self.get_module(input.input.module_instance_id());
335
336 let item_fee = module.input_fee(input.amount, &input.input).expect(
337 "We only build transactions with input versions that are supported by the module",
338 );
339
340 in_amount += input.amount;
341 fee_amount += item_fee;
342 }
343
344 for output in builder.outputs() {
345 let module = self.get_module(output.output.module_instance_id());
346
347 let item_fee = module.output_fee(output.amount, &output.output).expect(
348 "We only build transactions with output versions that are supported by the module",
349 );
350
351 out_amount += output.amount;
352 fee_amount += item_fee;
353 }
354
355 (in_amount, out_amount + fee_amount)
356 }
357
358 pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
359 Ok((self.federation_id().to_fake_ln_pub_key(&self.secp_ctx)?, 0))
360 }
361
362 pub fn get_config_meta(&self, key: &str) -> Option<String> {
364 self.federation_config_meta.get(key).cloned()
365 }
366
367 pub(crate) fn root_secret(&self) -> DerivableSecret {
368 self.root_secret.clone()
369 }
370
371 pub async fn add_state_machines(
372 &self,
373 dbtx: &mut DatabaseTransaction<'_>,
374 states: Vec<DynState>,
375 ) -> AddStateMachinesResult {
376 self.executor.add_state_machines_dbtx(dbtx, states).await
377 }
378
379 pub async fn get_active_operations(&self) -> HashSet<OperationId> {
381 let active_states = self.executor.get_active_states().await;
382 let mut active_operations = HashSet::with_capacity(active_states.len());
383 let mut dbtx = self.db().begin_transaction_nc().await;
384 for (state, _) in active_states {
385 let operation_id = state.operation_id();
386 if dbtx
387 .get_value(&OperationLogKey { operation_id })
388 .await
389 .is_some()
390 {
391 active_operations.insert(operation_id);
392 }
393 }
394 active_operations
395 }
396
397 pub fn operation_log(&self) -> &OperationLog {
398 &self.operation_log
399 }
400
401 pub fn meta_service(&self) -> &Arc<MetaService> {
403 &self.meta_service
404 }
405
406 pub async fn get_meta_expiration_timestamp(&self) -> Option<SystemTime> {
408 let meta_service = self.meta_service();
409 let ts = meta_service
410 .get_field::<u64>(self.db(), "federation_expiry_timestamp")
411 .await
412 .and_then(|v| v.value)?;
413 Some(UNIX_EPOCH + Duration::from_secs(ts))
414 }
415
416 async fn finalize_transaction(
418 &self,
419 dbtx: &mut DatabaseTransaction<'_>,
420 operation_id: OperationId,
421 mut partial_transaction: TransactionBuilder,
422 ) -> anyhow::Result<(Transaction, Vec<DynState>, Range<u64>)> {
423 let (input_amount, output_amount) = self.transaction_builder_balance(&partial_transaction);
424
425 let (added_input_bundle, change_outputs) = self
426 .primary_module()
427 .create_final_inputs_and_outputs(
428 self.primary_module_instance,
429 dbtx,
430 operation_id,
431 input_amount,
432 output_amount,
433 )
434 .await?;
435
436 let change_range = Range {
440 start: partial_transaction.outputs().count() as u64,
441 end: (partial_transaction.outputs().count() + change_outputs.outputs().len()) as u64,
442 };
443
444 partial_transaction = partial_transaction
445 .with_inputs(added_input_bundle)
446 .with_outputs(change_outputs);
447
448 let (input_amount, output_amount) = self.transaction_builder_balance(&partial_transaction);
449
450 assert!(input_amount >= output_amount, "Transaction is underfunded");
451
452 let (tx, states) = partial_transaction.build(&self.secp_ctx, thread_rng());
453
454 Ok((tx, states, change_range))
455 }
456
457 pub async fn finalize_and_submit_transaction<F, M>(
469 &self,
470 operation_id: OperationId,
471 operation_type: &str,
472 operation_meta_gen: F,
473 tx_builder: TransactionBuilder,
474 ) -> anyhow::Result<OutPointRange>
475 where
476 F: Fn(OutPointRange) -> M + Clone + MaybeSend + MaybeSync,
477 M: serde::Serialize + MaybeSend,
478 {
479 let operation_type = operation_type.to_owned();
480
481 let autocommit_res = self
482 .db
483 .autocommit(
484 |dbtx, _| {
485 let operation_type = operation_type.clone();
486 let tx_builder = tx_builder.clone();
487 let operation_meta_gen = operation_meta_gen.clone();
488 Box::pin(async move {
489 if Client::operation_exists_dbtx(dbtx, operation_id).await {
490 bail!("There already exists an operation with id {operation_id:?}")
491 }
492
493 let out_point_range = self
494 .finalize_and_submit_transaction_inner(dbtx, operation_id, tx_builder)
495 .await?;
496
497 self.operation_log()
498 .add_operation_log_entry_dbtx(
499 dbtx,
500 operation_id,
501 &operation_type,
502 operation_meta_gen(out_point_range),
503 )
504 .await;
505
506 Ok(out_point_range)
507 })
508 },
509 Some(100), )
511 .await;
512
513 match autocommit_res {
514 Ok(txid) => Ok(txid),
515 Err(AutocommitError::ClosureError { error, .. }) => Err(error),
516 Err(AutocommitError::CommitFailed {
517 attempts,
518 last_error,
519 }) => panic!(
520 "Failed to commit tx submission dbtx after {attempts} attempts: {last_error}"
521 ),
522 }
523 }
524
525 async fn finalize_and_submit_transaction_inner(
526 &self,
527 dbtx: &mut DatabaseTransaction<'_>,
528 operation_id: OperationId,
529 tx_builder: TransactionBuilder,
530 ) -> anyhow::Result<OutPointRange> {
531 let (transaction, mut states, change_range) = self
532 .finalize_transaction(&mut dbtx.to_ref_nc(), operation_id, tx_builder)
533 .await?;
534
535 if transaction.consensus_encode_to_vec().len() > Transaction::MAX_TX_SIZE {
536 let inputs = transaction
537 .inputs
538 .iter()
539 .map(DynInput::module_instance_id)
540 .collect::<Vec<_>>();
541 let outputs = transaction
542 .outputs
543 .iter()
544 .map(DynOutput::module_instance_id)
545 .collect::<Vec<_>>();
546 warn!(
547 target: LOG_CLIENT_NET_API,
548 size=%transaction.consensus_encode_to_vec().len(),
549 ?inputs,
550 ?outputs,
551 "Transaction too large",
552 );
553 debug!(target: LOG_CLIENT_NET_API, ?transaction, "transaction details");
554 bail!(
555 "The generated transaction would be rejected by the federation for being too large."
556 );
557 }
558
559 let txid = transaction.tx_hash();
560
561 debug!(target: LOG_CLIENT_NET_API, %txid, ?transaction, "Finalized and submitting transaction");
562
563 let tx_submission_sm = DynState::from_typed(
564 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
565 TxSubmissionStatesSM {
566 operation_id,
567 state: TxSubmissionStates::Created(transaction),
568 },
569 );
570 states.push(tx_submission_sm);
571
572 self.executor.add_state_machines_dbtx(dbtx, states).await?;
573
574 self.log_event_dbtx(dbtx, None, TxCreatedEvent { txid, operation_id })
575 .await;
576
577 Ok(OutPointRange::new(txid, IdxRange::from(change_range)))
578 }
579
580 async fn transaction_update_stream(
581 &self,
582 operation_id: OperationId,
583 ) -> BoxStream<'static, TxSubmissionStatesSM> {
584 self.executor
585 .notifier()
586 .module_notifier::<TxSubmissionStatesSM>(
587 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
588 self.final_client.clone(),
589 )
590 .subscribe(operation_id)
591 .await
592 }
593
594 pub async fn operation_exists(&self, operation_id: OperationId) -> bool {
595 let mut dbtx = self.db().begin_transaction_nc().await;
596
597 Client::operation_exists_dbtx(&mut dbtx, operation_id).await
598 }
599
600 pub async fn operation_exists_dbtx(
601 dbtx: &mut DatabaseTransaction<'_>,
602 operation_id: OperationId,
603 ) -> bool {
604 let active_state_exists = dbtx
605 .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
606 .await
607 .next()
608 .await
609 .is_some();
610
611 let inactive_state_exists = dbtx
612 .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
613 .await
614 .next()
615 .await
616 .is_some();
617
618 active_state_exists || inactive_state_exists
619 }
620
621 pub async fn has_active_states(&self, operation_id: OperationId) -> bool {
622 self.db
623 .begin_transaction_nc()
624 .await
625 .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
626 .await
627 .next()
628 .await
629 .is_some()
630 }
631
632 pub async fn await_primary_module_output(
635 &self,
636 operation_id: OperationId,
637 out_point: OutPoint,
638 ) -> anyhow::Result<()> {
639 self.primary_module()
640 .await_primary_module_output(operation_id, out_point)
641 .await
642 }
643
644 pub fn get_first_module<M: ClientModule>(&self) -> anyhow::Result<ClientModuleInstance<M>> {
646 let module_kind = M::kind();
647 let id = self
648 .get_first_instance(&module_kind)
649 .ok_or_else(|| format_err!("No modules found of kind {module_kind}"))?;
650 let module: &M = self
651 .try_get_module(id)
652 .ok_or_else(|| format_err!("Unknown module instance {id}"))?
653 .as_any()
654 .downcast_ref::<M>()
655 .ok_or_else(|| format_err!("Module is not of type {}", std::any::type_name::<M>()))?;
656 let (db, _) = self.db().with_prefix_module_id(id);
657 Ok(ClientModuleInstance {
658 id,
659 db,
660 api: self.api().with_module(id),
661 module,
662 })
663 }
664
665 pub fn get_module_client_dyn(
666 &self,
667 instance_id: ModuleInstanceId,
668 ) -> anyhow::Result<&maybe_add_send_sync!(dyn IClientModule)> {
669 self.try_get_module(instance_id)
670 .ok_or(anyhow!("Unknown module instance {}", instance_id))
671 }
672
673 pub fn db(&self) -> &Database {
674 &self.db
675 }
676
677 pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
680 TransactionUpdates {
681 update_stream: self.transaction_update_stream(operation_id).await,
682 }
683 }
684
685 pub fn get_first_instance(&self, module_kind: &ModuleKind) -> Option<ModuleInstanceId> {
689 if self
690 .modules
691 .get_with_kind(self.primary_module_instance)
692 .is_some_and(|(kind, _)| kind == module_kind)
693 {
694 return Some(self.primary_module_instance);
695 }
696
697 self.modules
698 .iter_modules()
699 .find(|(_, kind, _module)| *kind == module_kind)
700 .map(|(instance_id, _, _)| instance_id)
701 }
702
703 pub async fn root_secret_encoding<T: Decodable>(&self) -> anyhow::Result<T> {
706 get_decoded_client_secret::<T>(self.db()).await
707 }
708
709 pub async fn await_primary_module_outputs(
712 &self,
713 operation_id: OperationId,
714 outputs: Vec<OutPoint>,
715 ) -> anyhow::Result<()> {
716 for out_point in outputs {
717 self.await_primary_module_output(operation_id, out_point)
718 .await?;
719 }
720
721 Ok(())
722 }
723
724 pub async fn get_config_json(&self) -> JsonClientConfig {
730 self.config().await.to_json()
731 }
732
733 pub fn primary_module(&self) -> &DynClientModule {
735 self.modules
736 .get(self.primary_module_instance)
737 .expect("primary module must be present")
738 }
739
740 pub async fn get_balance(&self) -> Amount {
742 self.primary_module()
743 .get_balance(
744 self.primary_module_instance,
745 &mut self.db().begin_transaction_nc().await,
746 )
747 .await
748 }
749
750 pub async fn subscribe_balance_changes(&self) -> BoxStream<'static, Amount> {
753 let mut balance_changes = self.primary_module().subscribe_balance_changes().await;
754 let initial_balance = self.get_balance().await;
755 let db = self.db().clone();
756 let primary_module = self.primary_module().clone();
757 let primary_module_instance = self.primary_module_instance;
758
759 Box::pin(async_stream::stream! {
760 yield initial_balance;
761 let mut prev_balance = initial_balance;
762 while let Some(()) = balance_changes.next().await {
763 let mut dbtx = db.begin_transaction_nc().await;
764 let balance = primary_module
765 .get_balance(primary_module_instance, &mut dbtx)
766 .await;
767
768 if balance != prev_balance {
770 prev_balance = balance;
771 yield balance;
772 }
773 }
774 })
775 }
776
777 pub async fn refresh_peers_api_versions(
780 num_peers: NumPeers,
781 api: DynGlobalApi,
782 db: Database,
783 num_responses_sender: watch::Sender<usize>,
784 ) {
785 async fn make_request(
790 delay: Duration,
791 peer_id: PeerId,
792 api: &DynGlobalApi,
793 ) -> (
794 PeerId,
795 Result<SupportedApiVersionsSummary, fedimint_api_client::api::PeerError>,
796 ) {
797 runtime::sleep(delay).await;
798 (
799 peer_id,
800 api.request_single_peer::<SupportedApiVersionsSummary>(
801 VERSION_ENDPOINT.to_owned(),
802 ApiRequestErased::default(),
803 peer_id,
804 )
805 .await,
806 )
807 }
808
809 let mut requests = FuturesUnordered::new();
812
813 for peer_id in num_peers.peer_ids() {
814 requests.push(make_request(Duration::ZERO, peer_id, &api));
815 }
816
817 let mut num_responses = 0;
818
819 while let Some((peer_id, response)) = requests.next().await {
820 match response {
821 Err(err) => {
822 if db
823 .begin_transaction_nc()
824 .await
825 .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
826 .await
827 .is_some()
828 {
829 debug!(target: LOG_CLIENT, %peer_id, err = %err.fmt_compact(), "Failed to refresh API versions of a peer, but we have a previous response");
830 } else {
831 debug!(target: LOG_CLIENT, %peer_id, err = %err.fmt_compact(), "Failed to refresh API versions of a peer, will retry");
832 requests.push(make_request(Duration::from_secs(15), peer_id, &api));
833 }
834 }
835 Ok(o) => {
836 let mut dbtx = db.begin_transaction().await;
839 dbtx.insert_entry(
840 &PeerLastApiVersionsSummaryKey(peer_id),
841 &PeerLastApiVersionsSummary(o),
842 )
843 .await;
844 dbtx.commit_tx().await;
845 num_responses += 1;
846 num_responses_sender.send_replace(num_responses);
848 }
849 }
850 }
851 }
852
853 pub fn supported_api_versions_summary_static(
855 config: &ClientConfig,
856 client_module_init: &ClientModuleInitRegistry,
857 ) -> SupportedApiVersionsSummary {
858 SupportedApiVersionsSummary {
859 core: SupportedCoreApiVersions {
860 core_consensus: config.global.consensus_version,
861 api: MultiApiVersion::try_from_iter(SUPPORTED_CORE_API_VERSIONS.to_owned())
862 .expect("must not have conflicting versions"),
863 },
864 modules: config
865 .modules
866 .iter()
867 .filter_map(|(&module_instance_id, module_config)| {
868 client_module_init
869 .get(module_config.kind())
870 .map(|module_init| {
871 (
872 module_instance_id,
873 SupportedModuleApiVersions {
874 core_consensus: config.global.consensus_version,
875 module_consensus: module_config.version,
876 api: module_init.supported_api_versions(),
877 },
878 )
879 })
880 })
881 .collect(),
882 }
883 }
884
885 pub async fn load_and_refresh_common_api_version(&self) -> anyhow::Result<ApiVersionSet> {
886 Self::load_and_refresh_common_api_version_static(
887 &self.config().await,
888 &self.module_inits,
889 &self.api,
890 &self.db,
891 &self.task_group,
892 )
893 .await
894 }
895
896 async fn load_and_refresh_common_api_version_static(
902 config: &ClientConfig,
903 module_init: &ClientModuleInitRegistry,
904 api: &DynGlobalApi,
905 db: &Database,
906 task_group: &TaskGroup,
907 ) -> anyhow::Result<ApiVersionSet> {
908 if let Some(v) = db
909 .begin_transaction_nc()
910 .await
911 .get_value(&CachedApiVersionSetKey)
912 .await
913 {
914 debug!(
915 target: LOG_CLIENT,
916 "Found existing cached common api versions"
917 );
918 let config = config.clone();
919 let client_module_init = module_init.clone();
920 let api = api.clone();
921 let db = db.clone();
922 let task_group = task_group.clone();
923 task_group
926 .clone()
927 .spawn_cancellable("refresh_common_api_version_static", async move {
928 if let Err(error) = Self::refresh_common_api_version_static(
929 &config,
930 &client_module_init,
931 &api,
932 &db,
933 task_group,
934 )
935 .await
936 {
937 warn!(
938 target: LOG_CLIENT,
939 err = %error.fmt_compact_anyhow(), "Failed to discover common api versions"
940 );
941 }
942 });
943
944 return Ok(v.0);
945 }
946
947 debug!(
948 target: LOG_CLIENT,
949 "No existing cached common api versions found, waiting for initial discovery"
950 );
951 Self::refresh_common_api_version_static(config, module_init, api, db, task_group.clone())
952 .await
953 }
954
955 async fn refresh_common_api_version_static(
956 config: &ClientConfig,
957 client_module_init: &ClientModuleInitRegistry,
958 api: &DynGlobalApi,
959 db: &Database,
960 task_group: TaskGroup,
961 ) -> anyhow::Result<ApiVersionSet> {
962 debug!(
963 target: LOG_CLIENT,
964 "Refreshing common api versions"
965 );
966
967 let (num_responses_sender, mut num_responses_receiver) = tokio::sync::watch::channel(0);
968 let num_peers = NumPeers::from(config.global.api_endpoints.len());
969
970 task_group.spawn_cancellable("refresh peers api versions", {
971 Client::refresh_peers_api_versions(
972 num_peers,
973 api.clone(),
974 db.clone(),
975 num_responses_sender,
976 )
977 });
978
979 let _: Result<_, Elapsed> = runtime::timeout(
987 Duration::from_secs(15),
988 num_responses_receiver.wait_for(|num| num_peers.threshold() <= *num),
989 )
990 .await;
991
992 let peer_api_version_sets = Self::load_peers_last_api_versions(db, num_peers).await;
993
994 let common_api_versions =
995 fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
996 &Self::supported_api_versions_summary_static(config, client_module_init),
997 &peer_api_version_sets,
998 )?;
999
1000 debug!(
1001 target: LOG_CLIENT,
1002 value = ?common_api_versions,
1003 "Updating the cached common api versions"
1004 );
1005 let mut dbtx = db.begin_transaction().await;
1006 let _ = dbtx
1007 .insert_entry(
1008 &CachedApiVersionSetKey,
1009 &CachedApiVersionSet(common_api_versions.clone()),
1010 )
1011 .await;
1012
1013 dbtx.commit_tx().await;
1014
1015 Ok(common_api_versions)
1016 }
1017
1018 pub async fn get_metadata(&self) -> Metadata {
1020 self.db
1021 .begin_transaction_nc()
1022 .await
1023 .get_value(&ClientMetadataKey)
1024 .await
1025 .unwrap_or_else(|| {
1026 warn!(
1027 target: LOG_CLIENT,
1028 "Missing existing metadata. This key should have been set on Client init"
1029 );
1030 Metadata::empty()
1031 })
1032 }
1033
1034 pub async fn set_metadata(&self, metadata: &Metadata) {
1036 self.db
1037 .autocommit::<_, _, anyhow::Error>(
1038 |dbtx, _| {
1039 Box::pin(async {
1040 Self::set_metadata_dbtx(dbtx, metadata).await;
1041 Ok(())
1042 })
1043 },
1044 None,
1045 )
1046 .await
1047 .expect("Failed to autocommit metadata");
1048 }
1049
1050 pub fn has_pending_recoveries(&self) -> bool {
1051 !self
1052 .client_recovery_progress_receiver
1053 .borrow()
1054 .iter()
1055 .all(|(_id, progress)| progress.is_done())
1056 }
1057
1058 pub async fn wait_for_all_recoveries(&self) -> anyhow::Result<()> {
1066 let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1067 recovery_receiver
1068 .wait_for(|in_progress| {
1069 in_progress
1070 .iter()
1071 .all(|(_id, progress)| progress.is_done())
1072 })
1073 .await
1074 .context("Recovery task completed and update receiver disconnected, but some modules failed to recover")?;
1075
1076 Ok(())
1077 }
1078
1079 pub fn subscribe_to_recovery_progress(
1084 &self,
1085 ) -> impl Stream<Item = (ModuleInstanceId, RecoveryProgress)> + use<> {
1086 WatchStream::new(self.client_recovery_progress_receiver.clone())
1087 .flat_map(futures::stream::iter)
1088 }
1089
1090 pub async fn wait_for_module_kind_recovery(
1091 &self,
1092 module_kind: ModuleKind,
1093 ) -> anyhow::Result<()> {
1094 let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1095 let config = self.config().await;
1096 recovery_receiver
1097 .wait_for(|in_progress| {
1098 !in_progress
1099 .iter()
1100 .filter(|(module_instance_id, _progress)| {
1101 config.modules[module_instance_id].kind == module_kind
1102 })
1103 .any(|(_id, progress)| !progress.is_done())
1104 })
1105 .await
1106 .context("Recovery task completed and update receiver disconnected, but the desired modules are still unavailable or failed to recover")?;
1107
1108 Ok(())
1109 }
1110
1111 pub async fn wait_for_all_active_state_machines(&self) -> anyhow::Result<()> {
1112 loop {
1113 if self.executor.get_active_states().await.is_empty() {
1114 break;
1115 }
1116 fedimint_core::runtime::sleep(Duration::from_millis(100)).await;
1117 }
1118 Ok(())
1119 }
1120
1121 pub async fn set_metadata_dbtx(dbtx: &mut DatabaseTransaction<'_>, metadata: &Metadata) {
1123 dbtx.insert_new_entry(&ClientMetadataKey, metadata).await;
1124 }
1125
1126 fn spawn_module_recoveries_task(
1127 &self,
1128 recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1129 module_recoveries: BTreeMap<
1130 ModuleInstanceId,
1131 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1132 >,
1133 module_recovery_progress_receivers: BTreeMap<
1134 ModuleInstanceId,
1135 watch::Receiver<RecoveryProgress>,
1136 >,
1137 ) {
1138 let db = self.db.clone();
1139 let log_ordering_wakeup_tx = self.log_ordering_wakeup_tx.clone();
1140 self.task_group
1141 .spawn("module recoveries", |_task_handle| async {
1142 Self::run_module_recoveries_task(
1143 db,
1144 log_ordering_wakeup_tx,
1145 recovery_sender,
1146 module_recoveries,
1147 module_recovery_progress_receivers,
1148 )
1149 .await;
1150 });
1151 }
1152
1153 async fn run_module_recoveries_task(
1154 db: Database,
1155 log_ordering_wakeup_tx: watch::Sender<()>,
1156 recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1157 module_recoveries: BTreeMap<
1158 ModuleInstanceId,
1159 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1160 >,
1161 module_recovery_progress_receivers: BTreeMap<
1162 ModuleInstanceId,
1163 watch::Receiver<RecoveryProgress>,
1164 >,
1165 ) {
1166 debug!(target: LOG_CLIENT_RECOVERY, num_modules=%module_recovery_progress_receivers.len(), "Staring module recoveries");
1167 let mut completed_stream = Vec::new();
1168 let progress_stream = futures::stream::FuturesUnordered::new();
1169
1170 for (module_instance_id, f) in module_recoveries {
1171 completed_stream.push(futures::stream::once(Box::pin(async move {
1172 match f.await {
1173 Ok(()) => (module_instance_id, None),
1174 Err(err) => {
1175 warn!(
1176 target: LOG_CLIENT,
1177 err = %err.fmt_compact_anyhow(), module_instance_id, "Module recovery failed"
1178 );
1179 futures::future::pending::<()>().await;
1183 unreachable!()
1184 }
1185 }
1186 })));
1187 }
1188
1189 for (module_instance_id, rx) in module_recovery_progress_receivers {
1190 progress_stream.push(
1191 tokio_stream::wrappers::WatchStream::new(rx)
1192 .fuse()
1193 .map(move |progress| (module_instance_id, Some(progress))),
1194 );
1195 }
1196
1197 let mut futures = futures::stream::select(
1198 futures::stream::select_all(progress_stream),
1199 futures::stream::select_all(completed_stream),
1200 );
1201
1202 while let Some((module_instance_id, progress)) = futures.next().await {
1203 let mut dbtx = db.begin_transaction().await;
1204
1205 let prev_progress = *recovery_sender
1206 .borrow()
1207 .get(&module_instance_id)
1208 .expect("existing progress must be present");
1209
1210 let progress = if prev_progress.is_done() {
1211 prev_progress
1213 } else if let Some(progress) = progress {
1214 progress
1215 } else {
1216 prev_progress.to_complete()
1217 };
1218
1219 if !prev_progress.is_done() && progress.is_done() {
1220 info!(
1221 target: LOG_CLIENT,
1222 module_instance_id,
1223 progress = format!("{}/{}", progress.complete, progress.total),
1224 "Recovery complete"
1225 );
1226 dbtx.log_event(
1227 log_ordering_wakeup_tx.clone(),
1228 None,
1229 ModuleRecoveryCompleted {
1230 module_id: module_instance_id,
1231 },
1232 )
1233 .await;
1234 } else {
1235 info!(
1236 target: LOG_CLIENT,
1237 module_instance_id,
1238 progress = format!("{}/{}", progress.complete, progress.total),
1239 "Recovery progress"
1240 );
1241 }
1242
1243 dbtx.insert_entry(
1244 &ClientModuleRecovery { module_instance_id },
1245 &ClientModuleRecoveryState { progress },
1246 )
1247 .await;
1248 dbtx.commit_tx().await;
1249
1250 recovery_sender.send_modify(|v| {
1251 v.insert(module_instance_id, progress);
1252 });
1253 }
1254 debug!(target: LOG_CLIENT_RECOVERY, "Recovery executor stopped");
1255 }
1256
1257 async fn load_peers_last_api_versions(
1258 db: &Database,
1259 num_peers: NumPeers,
1260 ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1261 let mut peer_api_version_sets = BTreeMap::new();
1262
1263 let mut dbtx = db.begin_transaction_nc().await;
1264 for peer_id in num_peers.peer_ids() {
1265 if let Some(v) = dbtx
1266 .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1267 .await
1268 {
1269 peer_api_version_sets.insert(peer_id, v.0);
1270 }
1271 }
1272 drop(dbtx);
1273 peer_api_version_sets
1274 }
1275
1276 pub async fn get_peer_url_announcements(&self) -> BTreeMap<PeerId, SignedApiAnnouncement> {
1279 self.db()
1280 .begin_transaction_nc()
1281 .await
1282 .find_by_prefix(&ApiAnnouncementPrefix)
1283 .await
1284 .map(|(announcement_key, announcement)| (announcement_key.0, announcement))
1285 .collect()
1286 .await
1287 }
1288
1289 pub async fn get_peer_urls(&self) -> BTreeMap<PeerId, SafeUrl> {
1291 get_api_urls(&self.db, &self.config().await).await
1292 }
1293
1294 pub async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
1297 self.get_peer_urls()
1298 .await
1299 .into_iter()
1300 .find_map(|(peer_id, url)| (peer == peer_id).then_some(url))
1301 .map(|peer_url| {
1302 InviteCode::new(
1303 peer_url.clone(),
1304 peer,
1305 self.federation_id(),
1306 self.api_secret.clone(),
1307 )
1308 })
1309 }
1310
1311 pub async fn get_guardian_public_keys_blocking(
1315 &self,
1316 ) -> BTreeMap<PeerId, fedimint_core::secp256k1::PublicKey> {
1317 self.db
1318 .autocommit(
1319 |dbtx, _| {
1320 Box::pin(async move {
1321 let config = self.config().await;
1322
1323 let guardian_pub_keys = self
1324 .get_or_backfill_broadcast_public_keys(dbtx, config)
1325 .await;
1326
1327 Result::<_, ()>::Ok(guardian_pub_keys)
1328 })
1329 },
1330 None,
1331 )
1332 .await
1333 .expect("Will retry forever")
1334 }
1335
1336 async fn get_or_backfill_broadcast_public_keys(
1337 &self,
1338 dbtx: &mut DatabaseTransaction<'_>,
1339 config: ClientConfig,
1340 ) -> BTreeMap<PeerId, PublicKey> {
1341 match config.global.broadcast_public_keys {
1342 Some(guardian_pub_keys) => guardian_pub_keys,
1343 _ => {
1344 let (guardian_pub_keys, new_config) = self.fetch_and_update_config(config).await;
1345
1346 dbtx.insert_entry(&ClientConfigKey, &new_config).await;
1347 *(self.config.write().await) = new_config;
1348 guardian_pub_keys
1349 }
1350 }
1351 }
1352
1353 async fn fetch_and_update_config(
1354 &self,
1355 config: ClientConfig,
1356 ) -> (BTreeMap<PeerId, PublicKey>, ClientConfig) {
1357 let fetched_config = retry(
1358 "Fetching guardian public keys",
1359 backoff_util::background_backoff(),
1360 || async {
1361 Ok(self
1362 .api
1363 .request_current_consensus::<ClientConfig>(
1364 CLIENT_CONFIG_ENDPOINT.to_owned(),
1365 ApiRequestErased::default(),
1366 )
1367 .await?)
1368 },
1369 )
1370 .await
1371 .expect("Will never return on error");
1372
1373 let Some(guardian_pub_keys) = fetched_config.global.broadcast_public_keys else {
1374 warn!(
1375 target: LOG_CLIENT,
1376 "Guardian public keys not found in fetched config, server not updated to 0.4 yet"
1377 );
1378 pending::<()>().await;
1379 unreachable!("Pending will never return");
1380 };
1381
1382 let new_config = ClientConfig {
1383 global: GlobalClientConfig {
1384 broadcast_public_keys: Some(guardian_pub_keys.clone()),
1385 ..config.global
1386 },
1387 modules: config.modules,
1388 };
1389 (guardian_pub_keys, new_config)
1390 }
1391
1392 pub fn handle_global_rpc(
1393 &self,
1394 method: String,
1395 params: serde_json::Value,
1396 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1397 Box::pin(try_stream! {
1398 match method.as_str() {
1399 "get_balance" => {
1400 let balance = self.get_balance().await;
1401 yield serde_json::to_value(balance)?;
1402 }
1403 "subscribe_balance_changes" => {
1404 let mut stream = self.subscribe_balance_changes().await;
1405 while let Some(balance) = stream.next().await {
1406 yield serde_json::to_value(balance)?;
1407 }
1408 }
1409 "get_config" => {
1410 let config = self.config().await;
1411 yield serde_json::to_value(config)?;
1412 }
1413 "get_federation_id" => {
1414 let federation_id = self.federation_id();
1415 yield serde_json::to_value(federation_id)?;
1416 }
1417 "get_invite_code" => {
1418 let req: GetInviteCodeRequest = serde_json::from_value(params)?;
1419 let invite_code = self.invite_code(req.peer).await;
1420 yield serde_json::to_value(invite_code)?;
1421 }
1422 "get_operation" => {
1423 let req: GetOperationIdRequest = serde_json::from_value(params)?;
1424 let operation = self.operation_log().get_operation(req.operation_id).await;
1425 yield serde_json::to_value(operation)?;
1426 }
1427 "list_operations" => {
1428 let req: ListOperationsParams = serde_json::from_value(params)?;
1429 let limit = if req.limit.is_none() && req.last_seen.is_none() {
1430 usize::MAX
1431 } else {
1432 req.limit.unwrap_or(usize::MAX)
1433 };
1434 let operations = self.operation_log()
1435 .paginate_operations_rev(limit, req.last_seen)
1436 .await;
1437 yield serde_json::to_value(operations)?;
1438 }
1439 "has_pending_recoveries" => {
1440 let has_pending = self.has_pending_recoveries();
1441 yield serde_json::to_value(has_pending)?;
1442 }
1443 "wait_for_all_recoveries" => {
1444 self.wait_for_all_recoveries().await?;
1445 yield serde_json::Value::Null;
1446 }
1447 "subscribe_to_recovery_progress" => {
1448 let mut stream = self.subscribe_to_recovery_progress();
1449 while let Some((module_id, progress)) = stream.next().await {
1450 yield serde_json::json!({
1451 "module_id": module_id,
1452 "progress": progress
1453 });
1454 }
1455 }
1456 _ => {
1457 Err(anyhow::format_err!("Unknown method: {}", method))?;
1458 unreachable!()
1459 },
1460 }
1461 })
1462 }
1463
1464 pub async fn log_event<E>(&self, module_id: Option<ModuleInstanceId>, event: E)
1465 where
1466 E: Event + Send,
1467 {
1468 let mut dbtx = self.db.begin_transaction().await;
1469 self.log_event_dbtx(&mut dbtx, module_id, event).await;
1470 dbtx.commit_tx().await;
1471 }
1472
1473 pub async fn log_event_dbtx<E, Cap>(
1474 &self,
1475 dbtx: &mut DatabaseTransaction<'_, Cap>,
1476 module_id: Option<ModuleInstanceId>,
1477 event: E,
1478 ) where
1479 E: Event + Send,
1480 Cap: Send,
1481 {
1482 dbtx.log_event(self.log_ordering_wakeup_tx.clone(), module_id, event)
1483 .await;
1484 }
1485
1486 pub async fn log_event_raw_dbtx<Cap>(
1487 &self,
1488 dbtx: &mut DatabaseTransaction<'_, Cap>,
1489 kind: EventKind,
1490 module: Option<(ModuleKind, ModuleInstanceId)>,
1491 payload: Vec<u8>,
1492 persist: bool,
1493 ) where
1494 Cap: Send,
1495 {
1496 let module_id = module.as_ref().map(|m| m.1);
1497 let module_kind = module.map(|m| m.0);
1498 dbtx.log_event_raw(
1499 self.log_ordering_wakeup_tx.clone(),
1500 kind,
1501 module_kind,
1502 module_id,
1503 payload,
1504 persist,
1505 )
1506 .await;
1507 }
1508
1509 pub async fn handle_events<F, R, K>(&self, pos_key: &K, call_fn: F) -> anyhow::Result<()>
1510 where
1511 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1512 K: DatabaseRecord<Value = EventLogId>,
1513 F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
1514 R: Future<Output = anyhow::Result<()>>,
1515 {
1516 fedimint_eventlog::handle_events(
1517 self.db.clone(),
1518 pos_key,
1519 self.log_event_added_rx.clone(),
1520 call_fn,
1521 )
1522 .await
1523 }
1524
1525 pub async fn get_event_log(
1526 &self,
1527 pos: Option<EventLogId>,
1528 limit: u64,
1529 ) -> Vec<PersistedLogEntry> {
1530 self.get_event_log_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
1531 .await
1532 }
1533
1534 pub async fn get_event_log_dbtx<Cap>(
1535 &self,
1536 dbtx: &mut DatabaseTransaction<'_, Cap>,
1537 pos: Option<EventLogId>,
1538 limit: u64,
1539 ) -> Vec<PersistedLogEntry>
1540 where
1541 Cap: Send,
1542 {
1543 dbtx.get_event_log(pos, limit).await
1544 }
1545
1546 pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
1548 self.log_event_added_transient_tx.subscribe()
1549 }
1550}
1551
1552#[apply(async_trait_maybe_send!)]
1553impl ClientContextIface for Client {
1554 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
1555 Client::get_module(self, instance)
1556 }
1557
1558 fn api_clone(&self) -> DynGlobalApi {
1559 Client::api_clone(self)
1560 }
1561 fn decoders(&self) -> &ModuleDecoderRegistry {
1562 Client::decoders(self)
1563 }
1564
1565 async fn finalize_and_submit_transaction(
1566 &self,
1567 operation_id: OperationId,
1568 operation_type: &str,
1569 operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
1570 tx_builder: TransactionBuilder,
1571 ) -> anyhow::Result<OutPointRange> {
1572 Client::finalize_and_submit_transaction(
1573 self,
1574 operation_id,
1575 operation_type,
1576 &operation_meta_gen,
1578 tx_builder,
1579 )
1580 .await
1581 }
1582
1583 async fn finalize_and_submit_transaction_inner(
1584 &self,
1585 dbtx: &mut DatabaseTransaction<'_>,
1586 operation_id: OperationId,
1587 tx_builder: TransactionBuilder,
1588 ) -> anyhow::Result<OutPointRange> {
1589 Client::finalize_and_submit_transaction_inner(self, dbtx, operation_id, tx_builder).await
1590 }
1591
1592 async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
1593 Client::transaction_updates(self, operation_id).await
1594 }
1595
1596 async fn await_primary_module_outputs(
1597 &self,
1598 operation_id: OperationId,
1599 outputs: Vec<OutPoint>,
1601 ) -> anyhow::Result<()> {
1602 Client::await_primary_module_outputs(self, operation_id, outputs).await
1603 }
1604
1605 fn operation_log(&self) -> &dyn IOperationLog {
1606 Client::operation_log(self)
1607 }
1608
1609 async fn has_active_states(&self, operation_id: OperationId) -> bool {
1610 Client::has_active_states(self, operation_id).await
1611 }
1612
1613 async fn operation_exists(&self, operation_id: OperationId) -> bool {
1614 Client::operation_exists(self, operation_id).await
1615 }
1616
1617 async fn config(&self) -> ClientConfig {
1618 Client::config(self).await
1619 }
1620
1621 fn db(&self) -> &Database {
1622 Client::db(self)
1623 }
1624
1625 fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static)) {
1626 Client::executor(self)
1627 }
1628
1629 async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
1630 Client::invite_code(self, peer).await
1631 }
1632
1633 fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
1634 Client::get_internal_payment_markers(self)
1635 }
1636
1637 async fn log_event_json(
1638 &self,
1639 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
1640 module_kind: Option<ModuleKind>,
1641 module_id: ModuleInstanceId,
1642 kind: EventKind,
1643 payload: serde_json::Value,
1644 persist: bool,
1645 ) {
1646 dbtx.ensure_global()
1647 .expect("Must be called with global dbtx");
1648 self.log_event_raw_dbtx(
1649 dbtx,
1650 kind,
1651 module_kind.map(|kind| (kind, module_id)),
1652 serde_json::to_vec(&payload).expect("Serialization can't fail"),
1653 persist,
1654 )
1655 .await;
1656 }
1657
1658 async fn read_operation_active_states<'dbtx>(
1659 &self,
1660 operation_id: OperationId,
1661 module_id: ModuleInstanceId,
1662 dbtx: &'dbtx mut DatabaseTransaction<'_>,
1663 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>
1664 {
1665 Box::pin(
1666 dbtx.find_by_prefix(&ActiveModuleOperationStateKeyPrefix {
1667 operation_id,
1668 module_instance: module_id,
1669 })
1670 .await
1671 .map(move |(k, v)| (k.0, v)),
1672 )
1673 }
1674 async fn read_operation_inactive_states<'dbtx>(
1675 &self,
1676 operation_id: OperationId,
1677 module_id: ModuleInstanceId,
1678 dbtx: &'dbtx mut DatabaseTransaction<'_>,
1679 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>
1680 {
1681 Box::pin(
1682 dbtx.find_by_prefix(&InactiveModuleOperationStateKeyPrefix {
1683 operation_id,
1684 module_instance: module_id,
1685 })
1686 .await
1687 .map(move |(k, v)| (k.0, v)),
1688 )
1689 }
1690}
1691
1692impl fmt::Debug for Client {
1694 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1695 write!(f, "Client")
1696 }
1697}
1698
1699pub fn client_decoders<'a>(
1700 registry: &ModuleInitRegistry<DynClientModuleInit>,
1701 module_kinds: impl Iterator<Item = (ModuleInstanceId, &'a ModuleKind)>,
1702) -> ModuleDecoderRegistry {
1703 let mut modules = BTreeMap::new();
1704 for (id, kind) in module_kinds {
1705 let Some(init) = registry.get(kind) else {
1706 debug!("Detected configuration for unsupported module id: {id}, kind: {kind}");
1707 continue;
1708 };
1709
1710 modules.insert(
1711 id,
1712 (
1713 kind.clone(),
1714 IClientModuleInit::decoder(AsRef::<dyn IClientModuleInit + 'static>::as_ref(init)),
1715 ),
1716 );
1717 }
1718 ModuleDecoderRegistry::from(modules)
1719}