1use std::collections::{BTreeMap, HashSet};
2use std::fmt::{self, Formatter};
3use std::future::{Future, pending};
4use std::ops::Range;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9use anyhow::{Context as _, anyhow, bail, format_err};
10use async_stream::try_stream;
11use bitcoin::key::Secp256k1;
12use bitcoin::key::rand::thread_rng;
13use bitcoin::secp256k1::{self, PublicKey};
14use fedimint_api_client::api::global_api::with_request_hook::ApiRequestHook;
15use fedimint_api_client::api::net::Connector;
16use fedimint_api_client::api::{
17 ApiVersionSet, DynGlobalApi, FederationApiExt as _, IGlobalFederationApi,
18};
19use fedimint_client_module::module::recovery::RecoveryProgress;
20use fedimint_client_module::module::{
21 ClientContextIface, ClientModule, ClientModuleRegistry, DynClientModule, FinalClientIface,
22 IClientModule, IdxRange, OutPointRange,
23};
24use fedimint_client_module::oplog::IOperationLog;
25use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy as _};
26use fedimint_client_module::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
27use fedimint_client_module::sm::{ActiveStateMeta, DynState, InactiveStateMeta};
28use fedimint_client_module::transaction::{
29 TRANSACTION_SUBMISSION_MODULE_INSTANCE, TransactionBuilder, TxSubmissionStates,
30 TxSubmissionStatesSM,
31};
32use fedimint_client_module::{
33 AddStateMachinesResult, ClientModuleInstance, GetInviteCodeRequest, ModuleGlobalContextGen,
34 ModuleRecoveryCompleted, TransactionUpdates, TxCreatedEvent,
35};
36use fedimint_core::config::{
37 ClientConfig, FederationId, GlobalClientConfig, JsonClientConfig, ModuleInitRegistry,
38};
39use fedimint_core::core::{DynInput, DynOutput, ModuleInstanceId, ModuleKind, OperationId};
40use fedimint_core::db::{
41 AutocommitError, Database, DatabaseKey, DatabaseRecord, DatabaseTransaction,
42 IDatabaseTransactionOpsCoreTyped as _, NonCommittable,
43};
44use fedimint_core::encoding::{Decodable, Encodable};
45use fedimint_core::endpoint_constants::{CLIENT_CONFIG_ENDPOINT, VERSION_ENDPOINT};
46use fedimint_core::envs::is_running_in_test_env;
47use fedimint_core::invite_code::InviteCode;
48use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
49use fedimint_core::module::{
50 ApiRequestErased, ApiVersion, MultiApiVersion, SupportedApiVersionsSummary,
51 SupportedCoreApiVersions, SupportedModuleApiVersions,
52};
53use fedimint_core::net::api_announcement::SignedApiAnnouncement;
54use fedimint_core::task::{Elapsed, MaybeSend, MaybeSync, TaskGroup};
55use fedimint_core::transaction::Transaction;
56use fedimint_core::util::{
57 BoxStream, FmtCompact as _, FmtCompactAnyhow as _, SafeUrl, backoff_util, retry,
58};
59use fedimint_core::{
60 Amount, NumPeers, OutPoint, PeerId, apply, async_trait_maybe_send,
61 fedimint_build_code_version_env, maybe_add_send, maybe_add_send_sync, runtime,
62};
63use fedimint_derive_secret::DerivableSecret;
64use fedimint_eventlog::{
65 DBTransactionEventLogExt as _, Event, EventKind, EventLogEntry, EventLogId, PersistedLogEntry,
66};
67use fedimint_logging::{LOG_CLIENT, LOG_CLIENT_NET_API, LOG_CLIENT_RECOVERY};
68use futures::stream::FuturesUnordered;
69use futures::{Stream, StreamExt as _};
70use global_ctx::ModuleGlobalClientContext;
71use tokio::sync::{broadcast, watch};
72use tokio_stream::wrappers::WatchStream;
73use tracing::{debug, info, warn};
74
75use crate::ClientBuilder;
76use crate::api_announcements::{ApiAnnouncementPrefix, get_api_urls};
77use crate::backup::Metadata;
78use crate::db::{
79 ApiSecretKey, CachedApiVersionSet, CachedApiVersionSetKey, ClientConfigKey, ClientMetadataKey,
80 ClientModuleRecovery, ClientModuleRecoveryState, EncodedClientSecretKey, OperationLogKey,
81 PeerLastApiVersionsSummary, PeerLastApiVersionsSummaryKey, apply_migrations_core_client_dbtx,
82 get_decoded_client_secret, verify_client_db_integrity_dbtx,
83};
84use crate::meta::MetaService;
85use crate::module_init::{ClientModuleInitRegistry, DynClientModuleInit, IClientModuleInit};
86use crate::oplog::OperationLog;
87use crate::sm::executor::{
88 ActiveModuleOperationStateKeyPrefix, ActiveOperationStateKeyPrefix, Executor,
89 InactiveModuleOperationStateKeyPrefix, InactiveOperationStateKeyPrefix,
90};
91
92pub(crate) mod builder;
93pub(crate) mod global_ctx;
94pub(crate) mod handle;
95
96const SUPPORTED_CORE_API_VERSIONS: &[fedimint_core::module::ApiVersion] =
100 &[ApiVersion { major: 0, minor: 0 }];
101
102pub struct Client {
116 final_client: FinalClientIface,
117 config: tokio::sync::RwLock<ClientConfig>,
118 api_secret: Option<String>,
119 decoders: ModuleDecoderRegistry,
120 db: Database,
121 federation_id: FederationId,
122 federation_config_meta: BTreeMap<String, String>,
123 primary_module_instance: ModuleInstanceId,
124 pub(crate) modules: ClientModuleRegistry,
125 module_inits: ClientModuleInitRegistry,
126 executor: Executor,
127 pub(crate) api: DynGlobalApi,
128 root_secret: DerivableSecret,
129 operation_log: OperationLog,
130 secp_ctx: Secp256k1<secp256k1::All>,
131 meta_service: Arc<MetaService>,
132 connector: Connector,
133
134 task_group: TaskGroup,
135
136 client_recovery_progress_receiver:
138 watch::Receiver<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
139
140 log_ordering_wakeup_tx: watch::Sender<()>,
143 log_event_added_rx: watch::Receiver<()>,
145 log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
146 request_hook: ApiRequestHook,
147}
148
149impl Client {
150 pub async fn builder(db: Database) -> anyhow::Result<ClientBuilder> {
153 let mut dbtx = db.begin_transaction().await;
154 apply_migrations_core_client_dbtx(&mut dbtx.to_ref_nc(), "fedimint-client".to_string())
155 .await?;
156 if is_running_in_test_env() {
157 verify_client_db_integrity_dbtx(&mut dbtx.to_ref_nc()).await;
158 }
159 dbtx.commit_tx_result().await?;
160
161 Ok(ClientBuilder::new(db))
162 }
163
164 pub fn api(&self) -> &(dyn IGlobalFederationApi + 'static) {
165 self.api.as_ref()
166 }
167
168 pub fn api_clone(&self) -> DynGlobalApi {
169 self.api.clone()
170 }
171
172 pub fn task_group(&self) -> &TaskGroup {
174 &self.task_group
175 }
176
177 #[doc(hidden)]
179 pub fn executor(&self) -> &Executor {
180 &self.executor
181 }
182
183 pub async fn get_config_from_db(db: &Database) -> Option<ClientConfig> {
184 let mut dbtx = db.begin_transaction_nc().await;
185 dbtx.get_value(&ClientConfigKey).await
186 }
187
188 pub async fn get_api_secret_from_db(db: &Database) -> Option<String> {
189 let mut dbtx = db.begin_transaction_nc().await;
190 dbtx.get_value(&ApiSecretKey).await
191 }
192
193 pub async fn store_encodable_client_secret<T: Encodable>(
194 db: &Database,
195 secret: T,
196 ) -> anyhow::Result<()> {
197 let mut dbtx = db.begin_transaction().await;
198
199 if dbtx.get_value(&EncodedClientSecretKey).await.is_some() {
201 bail!("Encoded client secret already exists, cannot overwrite")
202 }
203
204 let encoded_secret = T::consensus_encode_to_vec(&secret);
205 dbtx.insert_entry(&EncodedClientSecretKey, &encoded_secret)
206 .await;
207 dbtx.commit_tx().await;
208 Ok(())
209 }
210
211 pub async fn load_decodable_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
212 let Some(secret) = Self::load_decodable_client_secret_opt(db).await? else {
213 bail!("Encoded client secret not present in DB")
214 };
215
216 Ok(secret)
217 }
218 pub async fn load_decodable_client_secret_opt<T: Decodable>(
219 db: &Database,
220 ) -> anyhow::Result<Option<T>> {
221 let mut dbtx = db.begin_transaction_nc().await;
222
223 let client_secret = dbtx.get_value(&EncodedClientSecretKey).await;
224
225 Ok(match client_secret {
226 Some(client_secret) => Some(
227 T::consensus_decode_whole(&client_secret, &ModuleRegistry::default())
228 .map_err(|e| anyhow!("Decoding failed: {e}"))?,
229 ),
230 None => None,
231 })
232 }
233
234 pub async fn load_or_generate_client_secret(db: &Database) -> anyhow::Result<[u8; 64]> {
235 let client_secret = match Self::load_decodable_client_secret::<[u8; 64]>(db).await {
236 Ok(secret) => secret,
237 _ => {
238 let secret = PlainRootSecretStrategy::random(&mut thread_rng());
239 Self::store_encodable_client_secret(db, secret)
240 .await
241 .expect("Storing client secret must work");
242 secret
243 }
244 };
245 Ok(client_secret)
246 }
247
248 pub async fn is_initialized(db: &Database) -> bool {
249 Self::get_config_from_db(db).await.is_some()
250 }
251
252 pub fn start_executor(self: &Arc<Self>) {
253 debug!(
254 target: LOG_CLIENT,
255 "Starting fedimint client executor (version: {})",
256 fedimint_build_code_version_env!()
257 );
258 self.executor.start_executor(self.context_gen());
259 }
260
261 pub fn federation_id(&self) -> FederationId {
262 self.federation_id
263 }
264
265 fn context_gen(self: &Arc<Self>) -> ModuleGlobalContextGen {
266 let client_inner = Arc::downgrade(self);
267 Arc::new(move |module_instance, operation| {
268 ModuleGlobalClientContext {
269 client: client_inner
270 .clone()
271 .upgrade()
272 .expect("ModuleGlobalContextGen called after client was dropped"),
273 module_instance_id: module_instance,
274 operation,
275 }
276 .into()
277 })
278 }
279
280 pub async fn config(&self) -> ClientConfig {
281 self.config.read().await.clone()
282 }
283
284 pub fn api_secret(&self) -> &Option<String> {
285 &self.api_secret
286 }
287
288 pub fn decoders(&self) -> &ModuleDecoderRegistry {
289 &self.decoders
290 }
291
292 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
294 self.try_get_module(instance)
295 .expect("Module instance not found")
296 }
297
298 fn try_get_module(
299 &self,
300 instance: ModuleInstanceId,
301 ) -> Option<&maybe_add_send_sync!(dyn IClientModule)> {
302 Some(self.modules.get(instance)?.as_ref())
303 }
304
305 pub fn has_module(&self, instance: ModuleInstanceId) -> bool {
306 self.modules.get(instance).is_some()
307 }
308
309 fn transaction_builder_balance(&self, builder: &TransactionBuilder) -> (Amount, Amount) {
315 let mut in_amount = Amount::ZERO;
317 let mut out_amount = Amount::ZERO;
318 let mut fee_amount = Amount::ZERO;
319
320 for input in builder.inputs() {
321 let module = self.get_module(input.input.module_instance_id());
322
323 let item_fee = module.input_fee(input.amount, &input.input).expect(
324 "We only build transactions with input versions that are supported by the module",
325 );
326
327 in_amount += input.amount;
328 fee_amount += item_fee;
329 }
330
331 for output in builder.outputs() {
332 let module = self.get_module(output.output.module_instance_id());
333
334 let item_fee = module.output_fee(output.amount, &output.output).expect(
335 "We only build transactions with output versions that are supported by the module",
336 );
337
338 out_amount += output.amount;
339 fee_amount += item_fee;
340 }
341
342 (in_amount, out_amount + fee_amount)
343 }
344
345 pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
346 Ok((self.federation_id().to_fake_ln_pub_key(&self.secp_ctx)?, 0))
347 }
348
349 pub fn get_config_meta(&self, key: &str) -> Option<String> {
351 self.federation_config_meta.get(key).cloned()
352 }
353
354 pub(crate) fn root_secret(&self) -> DerivableSecret {
355 self.root_secret.clone()
356 }
357
358 pub async fn add_state_machines(
359 &self,
360 dbtx: &mut DatabaseTransaction<'_>,
361 states: Vec<DynState>,
362 ) -> AddStateMachinesResult {
363 self.executor.add_state_machines_dbtx(dbtx, states).await
364 }
365
366 pub async fn get_active_operations(&self) -> HashSet<OperationId> {
368 let active_states = self.executor.get_active_states().await;
369 let mut active_operations = HashSet::with_capacity(active_states.len());
370 let mut dbtx = self.db().begin_transaction_nc().await;
371 for (state, _) in active_states {
372 let operation_id = state.operation_id();
373 if dbtx
374 .get_value(&OperationLogKey { operation_id })
375 .await
376 .is_some()
377 {
378 active_operations.insert(operation_id);
379 }
380 }
381 active_operations
382 }
383
384 pub fn operation_log(&self) -> &OperationLog {
385 &self.operation_log
386 }
387
388 pub fn meta_service(&self) -> &Arc<MetaService> {
390 &self.meta_service
391 }
392
393 pub async fn get_meta_expiration_timestamp(&self) -> Option<SystemTime> {
395 let meta_service = self.meta_service();
396 let ts = meta_service
397 .get_field::<u64>(self.db(), "federation_expiry_timestamp")
398 .await
399 .and_then(|v| v.value)?;
400 Some(UNIX_EPOCH + Duration::from_secs(ts))
401 }
402
403 async fn finalize_transaction(
405 &self,
406 dbtx: &mut DatabaseTransaction<'_>,
407 operation_id: OperationId,
408 mut partial_transaction: TransactionBuilder,
409 ) -> anyhow::Result<(Transaction, Vec<DynState>, Range<u64>)> {
410 let (input_amount, output_amount) = self.transaction_builder_balance(&partial_transaction);
411
412 let (added_input_bundle, change_outputs) = self
413 .primary_module()
414 .create_final_inputs_and_outputs(
415 self.primary_module_instance,
416 dbtx,
417 operation_id,
418 input_amount,
419 output_amount,
420 )
421 .await?;
422
423 let change_range = Range {
427 start: partial_transaction.outputs().count() as u64,
428 end: (partial_transaction.outputs().count() + change_outputs.outputs().len()) as u64,
429 };
430
431 partial_transaction = partial_transaction
432 .with_inputs(added_input_bundle)
433 .with_outputs(change_outputs);
434
435 let (input_amount, output_amount) = self.transaction_builder_balance(&partial_transaction);
436
437 assert!(input_amount >= output_amount, "Transaction is underfunded");
438
439 let (tx, states) = partial_transaction.build(&self.secp_ctx, thread_rng());
440
441 Ok((tx, states, change_range))
442 }
443
444 pub async fn finalize_and_submit_transaction<F, M>(
456 &self,
457 operation_id: OperationId,
458 operation_type: &str,
459 operation_meta_gen: F,
460 tx_builder: TransactionBuilder,
461 ) -> anyhow::Result<OutPointRange>
462 where
463 F: Fn(OutPointRange) -> M + Clone + MaybeSend + MaybeSync,
464 M: serde::Serialize + MaybeSend,
465 {
466 let operation_type = operation_type.to_owned();
467
468 let autocommit_res = self
469 .db
470 .autocommit(
471 |dbtx, _| {
472 let operation_type = operation_type.clone();
473 let tx_builder = tx_builder.clone();
474 let operation_meta_gen = operation_meta_gen.clone();
475 Box::pin(async move {
476 if Client::operation_exists_dbtx(dbtx, operation_id).await {
477 bail!("There already exists an operation with id {operation_id:?}")
478 }
479
480 let out_point_range = self
481 .finalize_and_submit_transaction_inner(dbtx, operation_id, tx_builder)
482 .await?;
483
484 self.operation_log()
485 .add_operation_log_entry_dbtx(
486 dbtx,
487 operation_id,
488 &operation_type,
489 operation_meta_gen(out_point_range),
490 )
491 .await;
492
493 Ok(out_point_range)
494 })
495 },
496 Some(100), )
498 .await;
499
500 match autocommit_res {
501 Ok(txid) => Ok(txid),
502 Err(AutocommitError::ClosureError { error, .. }) => Err(error),
503 Err(AutocommitError::CommitFailed {
504 attempts,
505 last_error,
506 }) => panic!(
507 "Failed to commit tx submission dbtx after {attempts} attempts: {last_error}"
508 ),
509 }
510 }
511
512 async fn finalize_and_submit_transaction_inner(
513 &self,
514 dbtx: &mut DatabaseTransaction<'_>,
515 operation_id: OperationId,
516 tx_builder: TransactionBuilder,
517 ) -> anyhow::Result<OutPointRange> {
518 let (transaction, mut states, change_range) = self
519 .finalize_transaction(&mut dbtx.to_ref_nc(), operation_id, tx_builder)
520 .await?;
521
522 if transaction.consensus_encode_to_vec().len() > Transaction::MAX_TX_SIZE {
523 let inputs = transaction
524 .inputs
525 .iter()
526 .map(DynInput::module_instance_id)
527 .collect::<Vec<_>>();
528 let outputs = transaction
529 .outputs
530 .iter()
531 .map(DynOutput::module_instance_id)
532 .collect::<Vec<_>>();
533 warn!(
534 target: LOG_CLIENT_NET_API,
535 size=%transaction.consensus_encode_to_vec().len(),
536 ?inputs,
537 ?outputs,
538 "Transaction too large",
539 );
540 debug!(target: LOG_CLIENT_NET_API, ?transaction, "transaction details");
541 bail!(
542 "The generated transaction would be rejected by the federation for being too large."
543 );
544 }
545
546 let txid = transaction.tx_hash();
547
548 debug!(target: LOG_CLIENT_NET_API, %txid, ?transaction, "Finalized and submitting transaction");
549
550 let tx_submission_sm = DynState::from_typed(
551 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
552 TxSubmissionStatesSM {
553 operation_id,
554 state: TxSubmissionStates::Created(transaction),
555 },
556 );
557 states.push(tx_submission_sm);
558
559 self.executor.add_state_machines_dbtx(dbtx, states).await?;
560
561 self.log_event_dbtx(dbtx, None, TxCreatedEvent { txid, operation_id })
562 .await;
563
564 Ok(OutPointRange::new(txid, IdxRange::from(change_range)))
565 }
566
567 async fn transaction_update_stream(
568 &self,
569 operation_id: OperationId,
570 ) -> BoxStream<'static, TxSubmissionStatesSM> {
571 self.executor
572 .notifier()
573 .module_notifier::<TxSubmissionStatesSM>(
574 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
575 self.final_client.clone(),
576 )
577 .subscribe(operation_id)
578 .await
579 }
580
581 pub async fn operation_exists(&self, operation_id: OperationId) -> bool {
582 let mut dbtx = self.db().begin_transaction_nc().await;
583
584 Client::operation_exists_dbtx(&mut dbtx, operation_id).await
585 }
586
587 pub async fn operation_exists_dbtx(
588 dbtx: &mut DatabaseTransaction<'_>,
589 operation_id: OperationId,
590 ) -> bool {
591 let active_state_exists = dbtx
592 .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
593 .await
594 .next()
595 .await
596 .is_some();
597
598 let inactive_state_exists = dbtx
599 .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
600 .await
601 .next()
602 .await
603 .is_some();
604
605 active_state_exists || inactive_state_exists
606 }
607
608 pub async fn has_active_states(&self, operation_id: OperationId) -> bool {
609 self.db
610 .begin_transaction_nc()
611 .await
612 .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
613 .await
614 .next()
615 .await
616 .is_some()
617 }
618
619 pub async fn await_primary_module_output(
622 &self,
623 operation_id: OperationId,
624 out_point: OutPoint,
625 ) -> anyhow::Result<()> {
626 self.primary_module()
627 .await_primary_module_output(operation_id, out_point)
628 .await
629 }
630
631 pub fn get_first_module<M: ClientModule>(&self) -> anyhow::Result<ClientModuleInstance<M>> {
633 let module_kind = M::kind();
634 let id = self
635 .get_first_instance(&module_kind)
636 .ok_or_else(|| format_err!("No modules found of kind {module_kind}"))?;
637 let module: &M = self
638 .try_get_module(id)
639 .ok_or_else(|| format_err!("Unknown module instance {id}"))?
640 .as_any()
641 .downcast_ref::<M>()
642 .ok_or_else(|| format_err!("Module is not of type {}", std::any::type_name::<M>()))?;
643 let (db, _) = self.db().with_prefix_module_id(id);
644 Ok(ClientModuleInstance {
645 id,
646 db,
647 api: self.api().with_module(id),
648 module,
649 })
650 }
651
652 pub fn get_module_client_dyn(
653 &self,
654 instance_id: ModuleInstanceId,
655 ) -> anyhow::Result<&maybe_add_send_sync!(dyn IClientModule)> {
656 self.try_get_module(instance_id)
657 .ok_or(anyhow!("Unknown module instance {}", instance_id))
658 }
659
660 pub fn db(&self) -> &Database {
661 &self.db
662 }
663
664 pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
667 TransactionUpdates {
668 update_stream: self.transaction_update_stream(operation_id).await,
669 }
670 }
671
672 pub fn get_first_instance(&self, module_kind: &ModuleKind) -> Option<ModuleInstanceId> {
676 if self
677 .modules
678 .get_with_kind(self.primary_module_instance)
679 .is_some_and(|(kind, _)| kind == module_kind)
680 {
681 return Some(self.primary_module_instance);
682 }
683
684 self.modules
685 .iter_modules()
686 .find(|(_, kind, _module)| *kind == module_kind)
687 .map(|(instance_id, _, _)| instance_id)
688 }
689
690 pub async fn root_secret_encoding<T: Decodable>(&self) -> anyhow::Result<T> {
693 get_decoded_client_secret::<T>(self.db()).await
694 }
695
696 pub async fn await_primary_module_outputs(
699 &self,
700 operation_id: OperationId,
701 outputs: Vec<OutPoint>,
702 ) -> anyhow::Result<()> {
703 for out_point in outputs {
704 self.await_primary_module_output(operation_id, out_point)
705 .await?;
706 }
707
708 Ok(())
709 }
710
711 pub async fn get_config_json(&self) -> JsonClientConfig {
717 self.config().await.to_json()
718 }
719
720 pub fn primary_module(&self) -> &DynClientModule {
722 self.modules
723 .get(self.primary_module_instance)
724 .expect("primary module must be present")
725 }
726
727 pub async fn get_balance(&self) -> Amount {
729 self.primary_module()
730 .get_balance(
731 self.primary_module_instance,
732 &mut self.db().begin_transaction_nc().await,
733 )
734 .await
735 }
736
737 pub async fn subscribe_balance_changes(&self) -> BoxStream<'static, Amount> {
740 let mut balance_changes = self.primary_module().subscribe_balance_changes().await;
741 let initial_balance = self.get_balance().await;
742 let db = self.db().clone();
743 let primary_module = self.primary_module().clone();
744 let primary_module_instance = self.primary_module_instance;
745
746 Box::pin(async_stream::stream! {
747 yield initial_balance;
748 let mut prev_balance = initial_balance;
749 while let Some(()) = balance_changes.next().await {
750 let mut dbtx = db.begin_transaction_nc().await;
751 let balance = primary_module
752 .get_balance(primary_module_instance, &mut dbtx)
753 .await;
754
755 if balance != prev_balance {
757 prev_balance = balance;
758 yield balance;
759 }
760 }
761 })
762 }
763
764 pub async fn refresh_peers_api_versions(
767 num_peers: NumPeers,
768 api: DynGlobalApi,
769 db: Database,
770 num_responses_sender: watch::Sender<usize>,
771 ) {
772 async fn make_request(
777 delay: Duration,
778 peer_id: PeerId,
779 api: &DynGlobalApi,
780 ) -> (
781 PeerId,
782 Result<SupportedApiVersionsSummary, fedimint_api_client::api::PeerError>,
783 ) {
784 runtime::sleep(delay).await;
785 (
786 peer_id,
787 api.request_single_peer::<SupportedApiVersionsSummary>(
788 VERSION_ENDPOINT.to_owned(),
789 ApiRequestErased::default(),
790 peer_id,
791 )
792 .await,
793 )
794 }
795
796 let mut requests = FuturesUnordered::new();
799
800 for peer_id in num_peers.peer_ids() {
801 requests.push(make_request(Duration::ZERO, peer_id, &api));
802 }
803
804 let mut num_responses = 0;
805
806 while let Some((peer_id, response)) = requests.next().await {
807 match response {
808 Err(err) => {
809 if db
810 .begin_transaction_nc()
811 .await
812 .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
813 .await
814 .is_some()
815 {
816 debug!(target: LOG_CLIENT, %peer_id, err = %err.fmt_compact(), "Failed to refresh API versions of a peer, but we have a previous response");
817 } else {
818 debug!(target: LOG_CLIENT, %peer_id, err = %err.fmt_compact(), "Failed to refresh API versions of a peer, will retry");
819 requests.push(make_request(Duration::from_secs(15), peer_id, &api));
820 }
821 }
822 Ok(o) => {
823 let mut dbtx = db.begin_transaction().await;
826 dbtx.insert_entry(
827 &PeerLastApiVersionsSummaryKey(peer_id),
828 &PeerLastApiVersionsSummary(o),
829 )
830 .await;
831 dbtx.commit_tx().await;
832 num_responses += 1;
833 num_responses_sender.send_replace(num_responses);
835 }
836 }
837 }
838 }
839
840 pub fn supported_api_versions_summary_static(
842 config: &ClientConfig,
843 client_module_init: &ClientModuleInitRegistry,
844 ) -> SupportedApiVersionsSummary {
845 SupportedApiVersionsSummary {
846 core: SupportedCoreApiVersions {
847 core_consensus: config.global.consensus_version,
848 api: MultiApiVersion::try_from_iter(SUPPORTED_CORE_API_VERSIONS.to_owned())
849 .expect("must not have conflicting versions"),
850 },
851 modules: config
852 .modules
853 .iter()
854 .filter_map(|(&module_instance_id, module_config)| {
855 client_module_init
856 .get(module_config.kind())
857 .map(|module_init| {
858 (
859 module_instance_id,
860 SupportedModuleApiVersions {
861 core_consensus: config.global.consensus_version,
862 module_consensus: module_config.version,
863 api: module_init.supported_api_versions(),
864 },
865 )
866 })
867 })
868 .collect(),
869 }
870 }
871
872 pub async fn load_and_refresh_common_api_version(&self) -> anyhow::Result<ApiVersionSet> {
873 Self::load_and_refresh_common_api_version_static(
874 &self.config().await,
875 &self.module_inits,
876 &self.api,
877 &self.db,
878 &self.task_group,
879 )
880 .await
881 }
882
883 async fn load_and_refresh_common_api_version_static(
889 config: &ClientConfig,
890 module_init: &ClientModuleInitRegistry,
891 api: &DynGlobalApi,
892 db: &Database,
893 task_group: &TaskGroup,
894 ) -> anyhow::Result<ApiVersionSet> {
895 if let Some(v) = db
896 .begin_transaction_nc()
897 .await
898 .get_value(&CachedApiVersionSetKey)
899 .await
900 {
901 debug!(
902 target: LOG_CLIENT,
903 "Found existing cached common api versions"
904 );
905 let config = config.clone();
906 let client_module_init = module_init.clone();
907 let api = api.clone();
908 let db = db.clone();
909 let task_group = task_group.clone();
910 task_group
913 .clone()
914 .spawn_cancellable("refresh_common_api_version_static", async move {
915 if let Err(error) = Self::refresh_common_api_version_static(
916 &config,
917 &client_module_init,
918 &api,
919 &db,
920 task_group,
921 )
922 .await
923 {
924 warn!(
925 target: LOG_CLIENT,
926 err = %error.fmt_compact_anyhow(), "Failed to discover common api versions"
927 );
928 }
929 });
930
931 return Ok(v.0);
932 }
933
934 debug!(
935 target: LOG_CLIENT,
936 "No existing cached common api versions found, waiting for initial discovery"
937 );
938 Self::refresh_common_api_version_static(config, module_init, api, db, task_group.clone())
939 .await
940 }
941
942 async fn refresh_common_api_version_static(
943 config: &ClientConfig,
944 client_module_init: &ClientModuleInitRegistry,
945 api: &DynGlobalApi,
946 db: &Database,
947 task_group: TaskGroup,
948 ) -> anyhow::Result<ApiVersionSet> {
949 debug!(
950 target: LOG_CLIENT,
951 "Refreshing common api versions"
952 );
953
954 let (num_responses_sender, mut num_responses_receiver) = tokio::sync::watch::channel(0);
955 let num_peers = NumPeers::from(config.global.api_endpoints.len());
956
957 task_group.spawn_cancellable("refresh peers api versions", {
958 Client::refresh_peers_api_versions(
959 num_peers,
960 api.clone(),
961 db.clone(),
962 num_responses_sender,
963 )
964 });
965
966 let _: Result<_, Elapsed> = runtime::timeout(
974 Duration::from_secs(15),
975 num_responses_receiver.wait_for(|num| num_peers.threshold() <= *num),
976 )
977 .await;
978
979 let peer_api_version_sets = Self::load_peers_last_api_versions(db, num_peers).await;
980
981 let common_api_versions =
982 fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
983 &Self::supported_api_versions_summary_static(config, client_module_init),
984 &peer_api_version_sets,
985 )?;
986
987 debug!(
988 target: LOG_CLIENT,
989 value = ?common_api_versions,
990 "Updating the cached common api versions"
991 );
992 let mut dbtx = db.begin_transaction().await;
993 let _ = dbtx
994 .insert_entry(
995 &CachedApiVersionSetKey,
996 &CachedApiVersionSet(common_api_versions.clone()),
997 )
998 .await;
999
1000 dbtx.commit_tx().await;
1001
1002 Ok(common_api_versions)
1003 }
1004
1005 pub async fn get_metadata(&self) -> Metadata {
1007 self.db
1008 .begin_transaction_nc()
1009 .await
1010 .get_value(&ClientMetadataKey)
1011 .await
1012 .unwrap_or_else(|| {
1013 warn!(
1014 target: LOG_CLIENT,
1015 "Missing existing metadata. This key should have been set on Client init"
1016 );
1017 Metadata::empty()
1018 })
1019 }
1020
1021 pub async fn set_metadata(&self, metadata: &Metadata) {
1023 self.db
1024 .autocommit::<_, _, anyhow::Error>(
1025 |dbtx, _| {
1026 Box::pin(async {
1027 Self::set_metadata_dbtx(dbtx, metadata).await;
1028 Ok(())
1029 })
1030 },
1031 None,
1032 )
1033 .await
1034 .expect("Failed to autocommit metadata");
1035 }
1036
1037 pub fn has_pending_recoveries(&self) -> bool {
1038 !self
1039 .client_recovery_progress_receiver
1040 .borrow()
1041 .iter()
1042 .all(|(_id, progress)| progress.is_done())
1043 }
1044
1045 pub async fn wait_for_all_recoveries(&self) -> anyhow::Result<()> {
1053 let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1054 recovery_receiver
1055 .wait_for(|in_progress| {
1056 in_progress
1057 .iter()
1058 .all(|(_id, progress)| progress.is_done())
1059 })
1060 .await
1061 .context("Recovery task completed and update receiver disconnected, but some modules failed to recover")?;
1062
1063 Ok(())
1064 }
1065
1066 pub fn subscribe_to_recovery_progress(
1071 &self,
1072 ) -> impl Stream<Item = (ModuleInstanceId, RecoveryProgress)> + use<> {
1073 WatchStream::new(self.client_recovery_progress_receiver.clone())
1074 .flat_map(futures::stream::iter)
1075 }
1076
1077 pub async fn wait_for_module_kind_recovery(
1078 &self,
1079 module_kind: ModuleKind,
1080 ) -> anyhow::Result<()> {
1081 let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1082 let config = self.config().await;
1083 recovery_receiver
1084 .wait_for(|in_progress| {
1085 !in_progress
1086 .iter()
1087 .filter(|(module_instance_id, _progress)| {
1088 config.modules[module_instance_id].kind == module_kind
1089 })
1090 .any(|(_id, progress)| !progress.is_done())
1091 })
1092 .await
1093 .context("Recovery task completed and update receiver disconnected, but the desired modules are still unavailable or failed to recover")?;
1094
1095 Ok(())
1096 }
1097
1098 pub async fn wait_for_all_active_state_machines(&self) -> anyhow::Result<()> {
1099 loop {
1100 if self.executor.get_active_states().await.is_empty() {
1101 break;
1102 }
1103 fedimint_core::runtime::sleep(Duration::from_millis(100)).await;
1104 }
1105 Ok(())
1106 }
1107
1108 pub async fn set_metadata_dbtx(dbtx: &mut DatabaseTransaction<'_>, metadata: &Metadata) {
1110 dbtx.insert_new_entry(&ClientMetadataKey, metadata).await;
1111 }
1112
1113 fn spawn_module_recoveries_task(
1114 &self,
1115 recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1116 module_recoveries: BTreeMap<
1117 ModuleInstanceId,
1118 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1119 >,
1120 module_recovery_progress_receivers: BTreeMap<
1121 ModuleInstanceId,
1122 watch::Receiver<RecoveryProgress>,
1123 >,
1124 ) {
1125 let db = self.db.clone();
1126 let log_ordering_wakeup_tx = self.log_ordering_wakeup_tx.clone();
1127 self.task_group
1128 .spawn("module recoveries", |_task_handle| async {
1129 Self::run_module_recoveries_task(
1130 db,
1131 log_ordering_wakeup_tx,
1132 recovery_sender,
1133 module_recoveries,
1134 module_recovery_progress_receivers,
1135 )
1136 .await;
1137 });
1138 }
1139
1140 async fn run_module_recoveries_task(
1141 db: Database,
1142 log_ordering_wakeup_tx: watch::Sender<()>,
1143 recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1144 module_recoveries: BTreeMap<
1145 ModuleInstanceId,
1146 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1147 >,
1148 module_recovery_progress_receivers: BTreeMap<
1149 ModuleInstanceId,
1150 watch::Receiver<RecoveryProgress>,
1151 >,
1152 ) {
1153 debug!(target: LOG_CLIENT_RECOVERY, num_modules=%module_recovery_progress_receivers.len(), "Staring module recoveries");
1154 let mut completed_stream = Vec::new();
1155 let progress_stream = futures::stream::FuturesUnordered::new();
1156
1157 for (module_instance_id, f) in module_recoveries {
1158 completed_stream.push(futures::stream::once(Box::pin(async move {
1159 match f.await {
1160 Ok(()) => (module_instance_id, None),
1161 Err(err) => {
1162 warn!(
1163 target: LOG_CLIENT,
1164 err = %err.fmt_compact_anyhow(), module_instance_id, "Module recovery failed"
1165 );
1166 futures::future::pending::<()>().await;
1170 unreachable!()
1171 }
1172 }
1173 })));
1174 }
1175
1176 for (module_instance_id, rx) in module_recovery_progress_receivers {
1177 progress_stream.push(
1178 tokio_stream::wrappers::WatchStream::new(rx)
1179 .fuse()
1180 .map(move |progress| (module_instance_id, Some(progress))),
1181 );
1182 }
1183
1184 let mut futures = futures::stream::select(
1185 futures::stream::select_all(progress_stream),
1186 futures::stream::select_all(completed_stream),
1187 );
1188
1189 while let Some((module_instance_id, progress)) = futures.next().await {
1190 let mut dbtx = db.begin_transaction().await;
1191
1192 let prev_progress = *recovery_sender
1193 .borrow()
1194 .get(&module_instance_id)
1195 .expect("existing progress must be present");
1196
1197 let progress = if prev_progress.is_done() {
1198 prev_progress
1200 } else if let Some(progress) = progress {
1201 progress
1202 } else {
1203 prev_progress.to_complete()
1204 };
1205
1206 if !prev_progress.is_done() && progress.is_done() {
1207 info!(
1208 target: LOG_CLIENT,
1209 module_instance_id,
1210 progress = format!("{}/{}", progress.complete, progress.total),
1211 "Recovery complete"
1212 );
1213 dbtx.log_event(
1214 log_ordering_wakeup_tx.clone(),
1215 None,
1216 ModuleRecoveryCompleted {
1217 module_id: module_instance_id,
1218 },
1219 )
1220 .await;
1221 } else {
1222 info!(
1223 target: LOG_CLIENT,
1224 module_instance_id,
1225 progress = format!("{}/{}", progress.complete, progress.total),
1226 "Recovery progress"
1227 );
1228 }
1229
1230 dbtx.insert_entry(
1231 &ClientModuleRecovery { module_instance_id },
1232 &ClientModuleRecoveryState { progress },
1233 )
1234 .await;
1235 dbtx.commit_tx().await;
1236
1237 recovery_sender.send_modify(|v| {
1238 v.insert(module_instance_id, progress);
1239 });
1240 }
1241 debug!(target: LOG_CLIENT_RECOVERY, "Recovery executor stopped");
1242 }
1243
1244 async fn load_peers_last_api_versions(
1245 db: &Database,
1246 num_peers: NumPeers,
1247 ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1248 let mut peer_api_version_sets = BTreeMap::new();
1249
1250 let mut dbtx = db.begin_transaction_nc().await;
1251 for peer_id in num_peers.peer_ids() {
1252 if let Some(v) = dbtx
1253 .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1254 .await
1255 {
1256 peer_api_version_sets.insert(peer_id, v.0);
1257 }
1258 }
1259 drop(dbtx);
1260 peer_api_version_sets
1261 }
1262
1263 pub async fn get_peer_url_announcements(&self) -> BTreeMap<PeerId, SignedApiAnnouncement> {
1266 self.db()
1267 .begin_transaction_nc()
1268 .await
1269 .find_by_prefix(&ApiAnnouncementPrefix)
1270 .await
1271 .map(|(announcement_key, announcement)| (announcement_key.0, announcement))
1272 .collect()
1273 .await
1274 }
1275
1276 pub async fn get_peer_urls(&self) -> BTreeMap<PeerId, SafeUrl> {
1278 get_api_urls(&self.db, &self.config().await).await
1279 }
1280
1281 pub async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
1284 self.get_peer_urls()
1285 .await
1286 .into_iter()
1287 .find_map(|(peer_id, url)| (peer == peer_id).then_some(url))
1288 .map(|peer_url| {
1289 InviteCode::new(
1290 peer_url.clone(),
1291 peer,
1292 self.federation_id(),
1293 self.api_secret.clone(),
1294 )
1295 })
1296 }
1297
1298 pub async fn get_guardian_public_keys_blocking(
1302 &self,
1303 ) -> BTreeMap<PeerId, fedimint_core::secp256k1::PublicKey> {
1304 self.db.autocommit(|dbtx, _| Box::pin(async move {
1305 let config = self.config().await;
1306
1307 let guardian_pub_keys = match config.global.broadcast_public_keys { Some(guardian_pub_keys) => {guardian_pub_keys} _ => {
1308 let fetched_config = retry(
1309 "Fetching guardian public keys",
1310 backoff_util::background_backoff(),
1311 || async {
1312 Ok(self.api.request_current_consensus::<ClientConfig>(
1313 CLIENT_CONFIG_ENDPOINT.to_owned(),
1314 ApiRequestErased::default(),
1315 ).await?)
1316 },
1317 )
1318 .await
1319 .expect("Will never return on error");
1320
1321 let Some(guardian_pub_keys) = fetched_config.global.broadcast_public_keys else {
1322 warn!(
1323 target: LOG_CLIENT,
1324 "Guardian public keys not found in fetched config, server not updated to 0.4 yet"
1325 );
1326 pending::<()>().await;
1327 unreachable!("Pending will never return");
1328 };
1329
1330 let new_config = ClientConfig {
1331 global: GlobalClientConfig {
1332 broadcast_public_keys: Some(guardian_pub_keys.clone()),
1333 ..config.global
1334 },
1335 modules: config.modules,
1336 };
1337
1338 dbtx.insert_entry(&ClientConfigKey, &new_config).await;
1339 *(self.config.write().await) = new_config;
1340 guardian_pub_keys
1341 }};
1342
1343 Result::<_, ()>::Ok(guardian_pub_keys)
1344 }), None).await.expect("Will retry forever")
1345 }
1346
1347 pub fn handle_global_rpc(
1348 &self,
1349 method: String,
1350 params: serde_json::Value,
1351 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1352 Box::pin(try_stream! {
1353 match method.as_str() {
1354 "get_balance" => {
1355 let balance = self.get_balance().await;
1356 yield serde_json::to_value(balance)?;
1357 }
1358 "subscribe_balance_changes" => {
1359 let mut stream = self.subscribe_balance_changes().await;
1360 while let Some(balance) = stream.next().await {
1361 yield serde_json::to_value(balance)?;
1362 }
1363 }
1364 "get_config" => {
1365 let config = self.config().await;
1366 yield serde_json::to_value(config)?;
1367 }
1368 "get_federation_id" => {
1369 let federation_id = self.federation_id();
1370 yield serde_json::to_value(federation_id)?;
1371 }
1372 "get_invite_code" => {
1373 let req: GetInviteCodeRequest = serde_json::from_value(params)?;
1374 let invite_code = self.invite_code(req.peer).await;
1375 yield serde_json::to_value(invite_code)?;
1376 }
1377 "list_operations" => {
1378 let operations = self.operation_log().paginate_operations_rev(usize::MAX, None).await;
1380 yield serde_json::to_value(operations)?;
1381 }
1382 "has_pending_recoveries" => {
1383 let has_pending = self.has_pending_recoveries();
1384 yield serde_json::to_value(has_pending)?;
1385 }
1386 "wait_for_all_recoveries" => {
1387 self.wait_for_all_recoveries().await?;
1388 yield serde_json::Value::Null;
1389 }
1390 "subscribe_to_recovery_progress" => {
1391 let mut stream = self.subscribe_to_recovery_progress();
1392 while let Some((module_id, progress)) = stream.next().await {
1393 yield serde_json::json!({
1394 "module_id": module_id,
1395 "progress": progress
1396 });
1397 }
1398 }
1399 _ => {
1400 Err(anyhow::format_err!("Unknown method: {}", method))?;
1401 unreachable!()
1402 },
1403 }
1404 })
1405 }
1406
1407 pub async fn log_event<E>(&self, module_id: Option<ModuleInstanceId>, event: E)
1408 where
1409 E: Event + Send,
1410 {
1411 let mut dbtx = self.db.begin_transaction().await;
1412 self.log_event_dbtx(&mut dbtx, module_id, event).await;
1413 dbtx.commit_tx().await;
1414 }
1415
1416 pub async fn log_event_dbtx<E, Cap>(
1417 &self,
1418 dbtx: &mut DatabaseTransaction<'_, Cap>,
1419 module_id: Option<ModuleInstanceId>,
1420 event: E,
1421 ) where
1422 E: Event + Send,
1423 Cap: Send,
1424 {
1425 dbtx.log_event(self.log_ordering_wakeup_tx.clone(), module_id, event)
1426 .await;
1427 }
1428
1429 pub async fn log_event_raw_dbtx<Cap>(
1430 &self,
1431 dbtx: &mut DatabaseTransaction<'_, Cap>,
1432 kind: EventKind,
1433 module: Option<(ModuleKind, ModuleInstanceId)>,
1434 payload: Vec<u8>,
1435 persist: bool,
1436 ) where
1437 Cap: Send,
1438 {
1439 let module_id = module.as_ref().map(|m| m.1);
1440 let module_kind = module.map(|m| m.0);
1441 dbtx.log_event_raw(
1442 self.log_ordering_wakeup_tx.clone(),
1443 kind,
1444 module_kind,
1445 module_id,
1446 payload,
1447 persist,
1448 )
1449 .await;
1450 }
1451
1452 pub async fn handle_events<F, R, K>(&self, pos_key: &K, call_fn: F) -> anyhow::Result<()>
1453 where
1454 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1455 K: DatabaseRecord<Value = EventLogId>,
1456 F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
1457 R: Future<Output = anyhow::Result<()>>,
1458 {
1459 fedimint_eventlog::handle_events(
1460 self.db.clone(),
1461 pos_key,
1462 self.log_event_added_rx.clone(),
1463 call_fn,
1464 )
1465 .await
1466 }
1467
1468 pub async fn get_event_log(
1469 &self,
1470 pos: Option<EventLogId>,
1471 limit: u64,
1472 ) -> Vec<PersistedLogEntry> {
1473 self.get_event_log_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
1474 .await
1475 }
1476
1477 pub async fn get_event_log_dbtx<Cap>(
1478 &self,
1479 dbtx: &mut DatabaseTransaction<'_, Cap>,
1480 pos: Option<EventLogId>,
1481 limit: u64,
1482 ) -> Vec<PersistedLogEntry>
1483 where
1484 Cap: Send,
1485 {
1486 dbtx.get_event_log(pos, limit).await
1487 }
1488
1489 pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
1491 self.log_event_added_transient_tx.subscribe()
1492 }
1493}
1494
1495#[apply(async_trait_maybe_send!)]
1496impl ClientContextIface for Client {
1497 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
1498 Client::get_module(self, instance)
1499 }
1500
1501 fn api_clone(&self) -> DynGlobalApi {
1502 Client::api_clone(self)
1503 }
1504 fn decoders(&self) -> &ModuleDecoderRegistry {
1505 Client::decoders(self)
1506 }
1507
1508 async fn finalize_and_submit_transaction(
1509 &self,
1510 operation_id: OperationId,
1511 operation_type: &str,
1512 operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
1513 tx_builder: TransactionBuilder,
1514 ) -> anyhow::Result<OutPointRange> {
1515 Client::finalize_and_submit_transaction(
1516 self,
1517 operation_id,
1518 operation_type,
1519 &operation_meta_gen,
1521 tx_builder,
1522 )
1523 .await
1524 }
1525
1526 async fn finalize_and_submit_transaction_inner(
1527 &self,
1528 dbtx: &mut DatabaseTransaction<'_>,
1529 operation_id: OperationId,
1530 tx_builder: TransactionBuilder,
1531 ) -> anyhow::Result<OutPointRange> {
1532 Client::finalize_and_submit_transaction_inner(self, dbtx, operation_id, tx_builder).await
1533 }
1534
1535 async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
1536 Client::transaction_updates(self, operation_id).await
1537 }
1538
1539 async fn await_primary_module_outputs(
1540 &self,
1541 operation_id: OperationId,
1542 outputs: Vec<OutPoint>,
1544 ) -> anyhow::Result<()> {
1545 Client::await_primary_module_outputs(self, operation_id, outputs).await
1546 }
1547
1548 fn operation_log(&self) -> &dyn IOperationLog {
1549 Client::operation_log(self)
1550 }
1551
1552 async fn has_active_states(&self, operation_id: OperationId) -> bool {
1553 Client::has_active_states(self, operation_id).await
1554 }
1555
1556 async fn operation_exists(&self, operation_id: OperationId) -> bool {
1557 Client::operation_exists(self, operation_id).await
1558 }
1559
1560 async fn config(&self) -> ClientConfig {
1561 Client::config(self).await
1562 }
1563
1564 fn db(&self) -> &Database {
1565 Client::db(self)
1566 }
1567
1568 fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static)) {
1569 Client::executor(self)
1570 }
1571
1572 async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
1573 Client::invite_code(self, peer).await
1574 }
1575
1576 fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
1577 Client::get_internal_payment_markers(self)
1578 }
1579
1580 async fn log_event_json(
1581 &self,
1582 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
1583 module_kind: Option<ModuleKind>,
1584 module_id: ModuleInstanceId,
1585 kind: EventKind,
1586 payload: serde_json::Value,
1587 persist: bool,
1588 ) {
1589 dbtx.ensure_global()
1590 .expect("Must be called with global dbtx");
1591 self.log_event_raw_dbtx(
1592 dbtx,
1593 kind,
1594 module_kind.map(|kind| (kind, module_id)),
1595 serde_json::to_vec(&payload).expect("Serialization can't fail"),
1596 persist,
1597 )
1598 .await;
1599 }
1600
1601 async fn read_operation_active_states<'dbtx>(
1602 &self,
1603 operation_id: OperationId,
1604 module_id: ModuleInstanceId,
1605 dbtx: &'dbtx mut DatabaseTransaction<'_>,
1606 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>
1607 {
1608 Box::pin(
1609 dbtx.find_by_prefix(&ActiveModuleOperationStateKeyPrefix {
1610 operation_id,
1611 module_instance: module_id,
1612 })
1613 .await
1614 .map(move |(k, v)| (k.0, v)),
1615 )
1616 }
1617 async fn read_operation_inactive_states<'dbtx>(
1618 &self,
1619 operation_id: OperationId,
1620 module_id: ModuleInstanceId,
1621 dbtx: &'dbtx mut DatabaseTransaction<'_>,
1622 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>
1623 {
1624 Box::pin(
1625 dbtx.find_by_prefix(&InactiveModuleOperationStateKeyPrefix {
1626 operation_id,
1627 module_instance: module_id,
1628 })
1629 .await
1630 .map(move |(k, v)| (k.0, v)),
1631 )
1632 }
1633}
1634
1635impl fmt::Debug for Client {
1637 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1638 write!(f, "Client")
1639 }
1640}
1641
1642pub fn client_decoders<'a>(
1643 registry: &ModuleInitRegistry<DynClientModuleInit>,
1644 module_kinds: impl Iterator<Item = (ModuleInstanceId, &'a ModuleKind)>,
1645) -> ModuleDecoderRegistry {
1646 let mut modules = BTreeMap::new();
1647 for (id, kind) in module_kinds {
1648 let Some(init) = registry.get(kind) else {
1649 debug!("Detected configuration for unsupported module id: {id}, kind: {kind}");
1650 continue;
1651 };
1652
1653 modules.insert(
1654 id,
1655 (
1656 kind.clone(),
1657 IClientModuleInit::decoder(AsRef::<dyn IClientModuleInit + 'static>::as_ref(init)),
1658 ),
1659 );
1660 }
1661 ModuleDecoderRegistry::from(modules)
1662}