1use core::fmt;
2use std::any::Any;
3use std::collections::BTreeSet;
4use std::fmt::Debug;
5use std::pin::Pin;
6use std::sync::{Arc, Weak};
7use std::{ffi, marker, ops};
8
9use anyhow::{anyhow, bail};
10use bitcoin::secp256k1::PublicKey;
11use fedimint_api_client::api::{DynGlobalApi, DynModuleApi};
12use fedimint_core::config::ClientConfig;
13use fedimint_core::core::{
14 Decoder, DynInput, DynOutput, IInput, IntoDynInstance, ModuleInstanceId, ModuleKind,
15 OperationId,
16};
17use fedimint_core::db::{Database, DatabaseTransaction, GlobalDBTxAccessToken, NonCommittable};
18use fedimint_core::invite_code::InviteCode;
19use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
20use fedimint_core::module::{AmountUnit, Amounts, CommonModuleInit, ModuleCommon, ModuleInit};
21use fedimint_core::task::{MaybeSend, MaybeSync};
22use fedimint_core::util::BoxStream;
23use fedimint_core::{
24 Amount, OutPoint, PeerId, apply, async_trait_maybe_send, dyn_newtype_define, maybe_add_send,
25 maybe_add_send_sync,
26};
27use fedimint_eventlog::{Event, EventKind, EventPersistence};
28use fedimint_logging::LOG_CLIENT;
29use futures::Stream;
30use serde::Serialize;
31use serde::de::DeserializeOwned;
32use tracing::warn;
33
34use self::init::ClientModuleInit;
35use crate::module::recovery::{DynModuleBackup, ModuleBackup};
36use crate::oplog::{IOperationLog, OperationLogEntry, UpdateStreamOrOutcome};
37use crate::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
38use crate::sm::{self, ActiveStateMeta, Context, DynContext, DynState, InactiveStateMeta, State};
39use crate::transaction::{ClientInputBundle, ClientOutputBundle, TransactionBuilder};
40use crate::{AddStateMachinesResult, InstancelessDynClientInputBundle, TransactionUpdates, oplog};
41
42pub mod init;
43pub mod recovery;
44
45pub type ClientModuleRegistry = ModuleRegistry<DynClientModule>;
46
47#[apply(async_trait_maybe_send!)]
56pub trait ClientContextIface: MaybeSend + MaybeSync {
57 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule);
58 fn api_clone(&self) -> DynGlobalApi;
59 fn decoders(&self) -> &ModuleDecoderRegistry;
60 async fn finalize_and_submit_transaction(
61 &self,
62 operation_id: OperationId,
63 operation_type: &str,
64 operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
65 tx_builder: TransactionBuilder,
66 ) -> anyhow::Result<OutPointRange>;
67
68 async fn finalize_and_submit_transaction_dbtx(
69 &self,
70 dbtx: &mut DatabaseTransaction<'_>,
71 operation_id: OperationId,
72 operation_type: &str,
73 operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
74 tx_builder: TransactionBuilder,
75 ) -> anyhow::Result<OutPointRange>;
76
77 async fn finalize_and_submit_transaction_inner(
79 &self,
80 dbtx: &mut DatabaseTransaction<'_>,
81 operation_id: OperationId,
82 tx_builder: TransactionBuilder,
83 ) -> anyhow::Result<OutPointRange>;
84
85 async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates;
86
87 async fn await_primary_module_outputs(
88 &self,
89 operation_id: OperationId,
90 outputs: Vec<OutPoint>,
92 ) -> anyhow::Result<()>;
93
94 fn operation_log(&self) -> &dyn IOperationLog;
95
96 async fn has_active_states(&self, operation_id: OperationId) -> bool;
97
98 async fn operation_exists(&self, operation_id: OperationId) -> bool;
99
100 async fn config(&self) -> ClientConfig;
101
102 fn db(&self) -> &Database;
103
104 fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static));
105
106 async fn invite_code(&self, peer: PeerId) -> Option<InviteCode>;
107
108 fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)>;
109
110 #[allow(clippy::too_many_arguments)]
111 async fn log_event_json(
112 &self,
113 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
114 module_kind: Option<ModuleKind>,
115 module_id: ModuleInstanceId,
116 kind: EventKind,
117 payload: serde_json::Value,
118 persist: EventPersistence,
119 );
120
121 async fn read_operation_active_states<'dbtx>(
122 &self,
123 operation_id: OperationId,
124 module_id: ModuleInstanceId,
125 dbtx: &'dbtx mut DatabaseTransaction<'_>,
126 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>;
127
128 async fn read_operation_inactive_states<'dbtx>(
129 &self,
130 operation_id: OperationId,
131 module_id: ModuleInstanceId,
132 dbtx: &'dbtx mut DatabaseTransaction<'_>,
133 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>;
134}
135
136#[derive(Clone, Default)]
142pub struct FinalClientIface(Arc<std::sync::OnceLock<Weak<dyn ClientContextIface>>>);
143
144impl FinalClientIface {
145 pub(crate) fn get(&self) -> Arc<dyn ClientContextIface> {
151 self.0
152 .get()
153 .expect("client must be already set")
154 .upgrade()
155 .expect("client module context must not be use past client shutdown")
156 }
157
158 pub fn set(&self, client: Weak<dyn ClientContextIface>) {
159 self.0.set(client).expect("FinalLazyClient already set");
160 }
161}
162
163impl fmt::Debug for FinalClientIface {
164 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
165 f.write_str("FinalClientIface")
166 }
167}
168pub struct ClientContext<M> {
173 client: FinalClientIface,
174 module_instance_id: ModuleInstanceId,
175 global_dbtx_access_token: GlobalDBTxAccessToken,
176 module_db: Database,
177 _marker: marker::PhantomData<M>,
178}
179
180impl<M> Clone for ClientContext<M> {
181 fn clone(&self) -> Self {
182 Self {
183 client: self.client.clone(),
184 module_db: self.module_db.clone(),
185 module_instance_id: self.module_instance_id,
186 _marker: marker::PhantomData,
187 global_dbtx_access_token: self.global_dbtx_access_token,
188 }
189 }
190}
191
192pub struct ClientContextSelfRef<'s, M> {
195 client: Arc<dyn ClientContextIface>,
198 module_instance_id: ModuleInstanceId,
199 _marker: marker::PhantomData<&'s M>,
200}
201
202impl<M> ops::Deref for ClientContextSelfRef<'_, M>
203where
204 M: ClientModule,
205{
206 type Target = M;
207
208 fn deref(&self) -> &Self::Target {
209 self.client
210 .get_module(self.module_instance_id)
211 .as_any()
212 .downcast_ref::<M>()
213 .unwrap_or_else(|| panic!("Module is not of type {}", std::any::type_name::<M>()))
214 }
215}
216
217impl<M> fmt::Debug for ClientContext<M> {
218 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
219 f.write_str("ClientContext")
220 }
221}
222
223impl<M> ClientContext<M>
224where
225 M: ClientModule,
226{
227 pub fn new(
228 client: FinalClientIface,
229 module_instance_id: ModuleInstanceId,
230 global_dbtx_access_token: GlobalDBTxAccessToken,
231 module_db: Database,
232 ) -> Self {
233 Self {
234 client,
235 module_instance_id,
236 global_dbtx_access_token,
237 module_db,
238 _marker: marker::PhantomData,
239 }
240 }
241
242 #[allow(clippy::needless_lifetimes)] pub fn self_ref(&self) -> ClientContextSelfRef<'_, M> {
254 ClientContextSelfRef {
255 client: self.client.get(),
256 module_instance_id: self.module_instance_id,
257 _marker: marker::PhantomData,
258 }
259 }
260
261 pub fn global_api(&self) -> DynGlobalApi {
263 self.client.get().api_clone()
264 }
265
266 pub fn module_api(&self) -> DynModuleApi {
268 self.global_api().with_module(self.module_instance_id)
269 }
270
271 pub fn decoders(&self) -> ModuleDecoderRegistry {
273 Clone::clone(self.client.get().decoders())
274 }
275
276 pub fn input_from_dyn<'i>(
277 &self,
278 input: &'i DynInput,
279 ) -> Option<&'i <M::Common as ModuleCommon>::Input> {
280 (input.module_instance_id() == self.module_instance_id).then(|| {
281 input
282 .as_any()
283 .downcast_ref::<<M::Common as ModuleCommon>::Input>()
284 .unwrap_or_else(|| {
285 panic!("instance_id {} just checked", input.module_instance_id())
286 })
287 })
288 }
289
290 pub fn output_from_dyn<'o>(
291 &self,
292 output: &'o DynOutput,
293 ) -> Option<&'o <M::Common as ModuleCommon>::Output> {
294 (output.module_instance_id() == self.module_instance_id).then(|| {
295 output
296 .as_any()
297 .downcast_ref::<<M::Common as ModuleCommon>::Output>()
298 .unwrap_or_else(|| {
299 panic!("instance_id {} just checked", output.module_instance_id())
300 })
301 })
302 }
303
304 pub fn map_dyn<'s, 'i, 'o, I>(
305 &'s self,
306 typed: impl IntoIterator<Item = I> + 'i,
307 ) -> impl Iterator<Item = <I as IntoDynInstance>::DynType> + 'o
308 where
309 I: IntoDynInstance,
310 'i: 'o,
311 's: 'o,
312 {
313 typed.into_iter().map(|i| self.make_dyn(i))
314 }
315
316 pub fn make_dyn_output(&self, output: <M::Common as ModuleCommon>::Output) -> DynOutput {
318 self.make_dyn(output)
319 }
320
321 pub fn make_dyn_input(&self, input: <M::Common as ModuleCommon>::Input) -> DynInput {
323 self.make_dyn(input)
324 }
325
326 pub fn make_dyn<I>(&self, typed: I) -> <I as IntoDynInstance>::DynType
328 where
329 I: IntoDynInstance,
330 {
331 typed.into_dyn(self.module_instance_id)
332 }
333
334 pub fn make_client_outputs<O, S>(&self, output: ClientOutputBundle<O, S>) -> ClientOutputBundle
336 where
337 O: IntoDynInstance<DynType = DynOutput> + 'static,
338 S: IntoDynInstance<DynType = DynState> + 'static,
339 {
340 self.make_dyn(output)
341 }
342
343 pub fn make_client_inputs<I, S>(&self, inputs: ClientInputBundle<I, S>) -> ClientInputBundle
345 where
346 I: IntoDynInstance<DynType = DynInput> + 'static,
347 S: IntoDynInstance<DynType = DynState> + 'static,
348 {
349 self.make_dyn(inputs)
350 }
351
352 pub fn make_dyn_state<S>(&self, sm: S) -> DynState
353 where
354 S: sm::IState + 'static,
355 {
356 DynState::from_typed(self.module_instance_id, sm)
357 }
358
359 pub async fn finalize_and_submit_transaction<F, Meta>(
360 &self,
361 operation_id: OperationId,
362 operation_type: &str,
363 operation_meta_gen: F,
364 tx_builder: TransactionBuilder,
365 ) -> anyhow::Result<OutPointRange>
366 where
367 F: Fn(OutPointRange) -> Meta + Clone + MaybeSend + MaybeSync + 'static,
368 Meta: serde::Serialize + MaybeSend,
369 {
370 self.client
371 .get()
372 .finalize_and_submit_transaction(
373 operation_id,
374 operation_type,
375 Box::new(move |out_point_range| {
376 serde_json::to_value(operation_meta_gen(out_point_range)).expect("Can't fail")
377 }),
378 tx_builder,
379 )
380 .await
381 }
382
383 pub async fn finalize_and_submit_transaction_dbtx<F, Meta>(
384 &self,
385 dbtx: &mut DatabaseTransaction<'_>,
386 operation_id: OperationId,
387 operation_type: &str,
388 operation_meta_gen: F,
389 tx_builder: TransactionBuilder,
390 ) -> anyhow::Result<OutPointRange>
391 where
392 F: Fn(OutPointRange) -> Meta + MaybeSend + MaybeSync + 'static,
393 Meta: serde::Serialize + MaybeSend,
394 {
395 self.client
396 .get()
397 .finalize_and_submit_transaction_dbtx(
398 &mut dbtx.global_dbtx(self.global_dbtx_access_token),
399 operation_id,
400 operation_type,
401 Box::new(move |out_point_range| {
402 serde_json::to_value(operation_meta_gen(out_point_range)).expect("Can't fail")
403 }),
404 tx_builder,
405 )
406 .await
407 }
408
409 pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
410 self.client.get().transaction_updates(operation_id).await
411 }
412
413 pub async fn await_primary_module_outputs(
414 &self,
415 operation_id: OperationId,
416 outputs: Vec<OutPoint>,
418 ) -> anyhow::Result<()> {
419 self.client
420 .get()
421 .await_primary_module_outputs(operation_id, outputs)
422 .await
423 }
424
425 pub async fn get_operation(
427 &self,
428 operation_id: OperationId,
429 ) -> anyhow::Result<oplog::OperationLogEntry> {
430 let operation = self
431 .client
432 .get()
433 .operation_log()
434 .get_operation(operation_id)
435 .await
436 .ok_or(anyhow::anyhow!("Operation not found"))?;
437
438 if operation.operation_module_kind() != M::kind().as_str() {
439 bail!("Operation is not a lightning operation");
440 }
441
442 Ok(operation)
443 }
444
445 fn global_db(&self) -> fedimint_core::db::Database {
449 let db = Clone::clone(self.client.get().db());
450
451 db.ensure_global()
452 .expect("global_db must always return a global db");
453
454 db
455 }
456
457 pub fn module_db(&self) -> &Database {
458 self.module_db
459 .ensure_isolated()
460 .expect("module_db must always return isolated db");
461 &self.module_db
462 }
463
464 pub async fn has_active_states(&self, op_id: OperationId) -> bool {
465 self.client.get().has_active_states(op_id).await
466 }
467
468 pub async fn operation_exists(&self, op_id: OperationId) -> bool {
469 self.client.get().operation_exists(op_id).await
470 }
471
472 pub async fn get_own_active_states(&self) -> Vec<(M::States, ActiveStateMeta)> {
473 self.client
474 .get()
475 .executor()
476 .get_active_states()
477 .await
478 .into_iter()
479 .filter(|s| s.0.module_instance_id() == self.module_instance_id)
480 .map(|s| {
481 (
482 Clone::clone(
483 s.0.as_any()
484 .downcast_ref::<M::States>()
485 .expect("incorrect output type passed to module plugin"),
486 ),
487 s.1,
488 )
489 })
490 .collect()
491 }
492
493 pub async fn get_config(&self) -> ClientConfig {
494 self.client.get().config().await
495 }
496
497 pub async fn get_invite_code(&self) -> InviteCode {
500 let cfg = self.get_config().await.global;
501 self.client
502 .get()
503 .invite_code(
504 *cfg.api_endpoints
505 .keys()
506 .next()
507 .expect("A federation always has at least one guardian"),
508 )
509 .await
510 .expect("The guardian we requested an invite code for exists")
511 }
512
513 pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
514 self.client.get().get_internal_payment_markers()
515 }
516
517 pub async fn manual_operation_start(
520 &self,
521 operation_id: OperationId,
522 op_type: &str,
523 operation_meta: impl serde::Serialize + Debug,
524 sms: Vec<DynState>,
525 ) -> anyhow::Result<()> {
526 let db = self.module_db();
527 let mut dbtx = db.begin_transaction().await;
528 {
529 let dbtx = &mut dbtx.global_dbtx(self.global_dbtx_access_token);
530
531 self.manual_operation_start_inner(
532 &mut dbtx.to_ref_nc(),
533 operation_id,
534 op_type,
535 operation_meta,
536 sms,
537 )
538 .await?;
539 }
540
541 dbtx.commit_tx_result().await.map_err(|_| {
542 anyhow!(
543 "Operation with id {} already exists",
544 operation_id.fmt_short()
545 )
546 })?;
547
548 Ok(())
549 }
550
551 pub async fn manual_operation_start_dbtx(
552 &self,
553 dbtx: &mut DatabaseTransaction<'_>,
554 operation_id: OperationId,
555 op_type: &str,
556 operation_meta: impl serde::Serialize + Debug,
557 sms: Vec<DynState>,
558 ) -> anyhow::Result<()> {
559 self.manual_operation_start_inner(
560 &mut dbtx.global_dbtx(self.global_dbtx_access_token),
561 operation_id,
562 op_type,
563 operation_meta,
564 sms,
565 )
566 .await
567 }
568
569 async fn manual_operation_start_inner(
572 &self,
573 dbtx: &mut DatabaseTransaction<'_>,
574 operation_id: OperationId,
575 op_type: &str,
576 operation_meta: impl serde::Serialize + Debug,
577 sms: Vec<DynState>,
578 ) -> anyhow::Result<()> {
579 dbtx.ensure_global()
580 .expect("Must deal with global dbtx here");
581
582 if self
583 .client
584 .get()
585 .operation_log()
586 .get_operation_dbtx(&mut dbtx.to_ref_nc(), operation_id)
587 .await
588 .is_some()
589 {
590 bail!(
591 "Operation with id {} already exists",
592 operation_id.fmt_short()
593 );
594 }
595
596 self.client
597 .get()
598 .operation_log()
599 .add_operation_log_entry_dbtx(
600 &mut dbtx.to_ref_nc(),
601 operation_id,
602 op_type,
603 serde_json::to_value(operation_meta).expect("Can't fail"),
604 )
605 .await;
606
607 self.client
608 .get()
609 .executor()
610 .add_state_machines_dbtx(&mut dbtx.to_ref_nc(), sms)
611 .await
612 .expect("State machine is valid");
613
614 Ok(())
615 }
616
617 pub fn outcome_or_updates<U, S>(
618 &self,
619 operation: OperationLogEntry,
620 operation_id: OperationId,
621 stream_gen: impl FnOnce() -> S + 'static,
622 ) -> UpdateStreamOrOutcome<U>
623 where
624 U: Clone + Serialize + DeserializeOwned + Debug + MaybeSend + MaybeSync + 'static,
625 S: Stream<Item = U> + MaybeSend + 'static,
626 {
627 use futures::StreamExt;
628 match self.client.get().operation_log().outcome_or_updates(
629 &self.global_db(),
630 operation_id,
631 operation,
632 Box::new(move || {
633 let stream_gen = stream_gen();
634 Box::pin(
635 stream_gen.map(move |item| serde_json::to_value(item).expect("Can't fail")),
636 )
637 }),
638 ) {
639 UpdateStreamOrOutcome::UpdateStream(stream) => UpdateStreamOrOutcome::UpdateStream(
640 Box::pin(stream.map(|u| serde_json::from_value(u).expect("Can't fail"))),
641 ),
642 UpdateStreamOrOutcome::Outcome(o) => {
643 UpdateStreamOrOutcome::Outcome(serde_json::from_value(o).expect("Can't fail"))
644 }
645 }
646 }
647
648 pub async fn claim_inputs<I, S>(
649 &self,
650 dbtx: &mut DatabaseTransaction<'_>,
651 inputs: ClientInputBundle<I, S>,
652 operation_id: OperationId,
653 ) -> anyhow::Result<OutPointRange>
654 where
655 I: IInput + MaybeSend + MaybeSync + 'static,
656 S: sm::IState + MaybeSend + MaybeSync + 'static,
657 {
658 self.claim_inputs_dyn(dbtx, inputs.into_instanceless(), operation_id)
659 .await
660 }
661
662 async fn claim_inputs_dyn(
663 &self,
664 dbtx: &mut DatabaseTransaction<'_>,
665 inputs: InstancelessDynClientInputBundle,
666 operation_id: OperationId,
667 ) -> anyhow::Result<OutPointRange> {
668 let tx_builder =
669 TransactionBuilder::new().with_inputs(inputs.into_dyn(self.module_instance_id));
670
671 self.client
672 .get()
673 .finalize_and_submit_transaction_inner(
674 &mut dbtx.global_dbtx(self.global_dbtx_access_token),
675 operation_id,
676 tx_builder,
677 )
678 .await
679 }
680
681 pub async fn add_state_machines_dbtx(
682 &self,
683 dbtx: &mut DatabaseTransaction<'_>,
684 states: Vec<DynState>,
685 ) -> AddStateMachinesResult {
686 self.client
687 .get()
688 .executor()
689 .add_state_machines_dbtx(&mut dbtx.global_dbtx(self.global_dbtx_access_token), states)
690 .await
691 }
692
693 pub async fn add_operation_log_entry_dbtx(
694 &self,
695 dbtx: &mut DatabaseTransaction<'_>,
696 operation_id: OperationId,
697 operation_type: &str,
698 operation_meta: impl serde::Serialize,
699 ) {
700 self.client
701 .get()
702 .operation_log()
703 .add_operation_log_entry_dbtx(
704 &mut dbtx.global_dbtx(self.global_dbtx_access_token),
705 operation_id,
706 operation_type,
707 serde_json::to_value(operation_meta).expect("Can't fail"),
708 )
709 .await;
710 }
711
712 pub async fn log_event<E, Cap>(&self, dbtx: &mut DatabaseTransaction<'_, Cap>, event: E)
713 where
714 E: Event + Send,
715 Cap: Send,
716 {
717 if <E as Event>::MODULE != Some(<M as ClientModule>::kind()) {
718 warn!(
719 target: LOG_CLIENT,
720 module_kind = %<M as ClientModule>::kind(),
721 event_module = ?<E as Event>::MODULE,
722 "Client module logging events of different module than its own. This might become an error in the future."
723 );
724 }
725 self.client
726 .get()
727 .log_event_json(
728 &mut dbtx.global_dbtx(self.global_dbtx_access_token).to_ref_nc(),
729 <E as Event>::MODULE,
730 self.module_instance_id,
731 <E as Event>::KIND,
732 serde_json::to_value(event).expect("Can't fail"),
733 <E as Event>::PERSISTENCE,
734 )
735 .await;
736 }
737}
738
739#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
741pub struct PrimaryModulePriority(u64);
742
743impl PrimaryModulePriority {
744 pub const HIGH: Self = Self(100);
745 pub const LOW: Self = Self(10000);
746
747 pub fn custom(prio: u64) -> Self {
748 Self(prio)
749 }
750}
751pub enum PrimaryModuleSupport {
753 Any { priority: PrimaryModulePriority },
755 Selected {
757 priority: PrimaryModulePriority,
758 units: BTreeSet<AmountUnit>,
759 },
760 None,
762}
763
764impl PrimaryModuleSupport {
765 pub fn selected<const N: usize>(
766 priority: PrimaryModulePriority,
767 units: [AmountUnit; N],
768 ) -> Self {
769 Self::Selected {
770 priority,
771 units: BTreeSet::from(units),
772 }
773 }
774}
775
776#[apply(async_trait_maybe_send!)]
778pub trait ClientModule: Debug + MaybeSend + MaybeSync + 'static {
779 type Init: ClientModuleInit;
780
781 type Common: ModuleCommon;
783
784 type Backup: ModuleBackup;
787
788 type ModuleStateMachineContext: Context;
791
792 type States: State<ModuleContext = Self::ModuleStateMachineContext>
794 + IntoDynInstance<DynType = DynState>;
795
796 fn decoder() -> Decoder {
797 let mut decoder_builder = Self::Common::decoder_builder();
798 decoder_builder.with_decodable_type::<Self::States>();
799 decoder_builder.with_decodable_type::<Self::Backup>();
800 decoder_builder.build()
801 }
802
803 fn kind() -> ModuleKind {
804 <<<Self as ClientModule>::Init as ModuleInit>::Common as CommonModuleInit>::KIND
805 }
806
807 fn context(&self) -> Self::ModuleStateMachineContext;
808
809 async fn start(&self) {}
815
816 async fn handle_cli_command(
817 &self,
818 _args: &[ffi::OsString],
819 ) -> anyhow::Result<serde_json::Value> {
820 Err(anyhow::format_err!(
821 "This module does not implement cli commands"
822 ))
823 }
824
825 async fn handle_rpc(
826 &self,
827 _method: String,
828 _request: serde_json::Value,
829 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
830 Box::pin(futures::stream::once(std::future::ready(Err(
831 anyhow::format_err!("This module does not implement rpc"),
832 ))))
833 }
834
835 fn input_fee(
844 &self,
845 amount: &Amounts,
846 input: &<Self::Common as ModuleCommon>::Input,
847 ) -> Option<Amounts>;
848
849 fn output_fee(
858 &self,
859 amount: &Amounts,
860 output: &<Self::Common as ModuleCommon>::Output,
861 ) -> Option<Amounts>;
862
863 fn supports_backup(&self) -> bool {
864 false
865 }
866
867 async fn backup(&self) -> anyhow::Result<Self::Backup> {
868 anyhow::bail!("Backup not supported");
869 }
870
871 fn supports_being_primary(&self) -> PrimaryModuleSupport {
880 PrimaryModuleSupport::None
881 }
882
883 async fn create_final_inputs_and_outputs(
901 &self,
902 _dbtx: &mut DatabaseTransaction<'_>,
903 _operation_id: OperationId,
904 _unit: AmountUnit,
905 _input_amount: Amount,
906 _output_amount: Amount,
907 ) -> anyhow::Result<(
908 ClientInputBundle<<Self::Common as ModuleCommon>::Input, Self::States>,
909 ClientOutputBundle<<Self::Common as ModuleCommon>::Output, Self::States>,
910 )> {
911 unimplemented!()
912 }
913
914 async fn await_primary_module_output(
919 &self,
920 _operation_id: OperationId,
921 _out_point: OutPoint,
922 ) -> anyhow::Result<()> {
923 unimplemented!()
924 }
925
926 async fn get_balance(&self, _dbtx: &mut DatabaseTransaction<'_>, _unit: AmountUnit) -> Amount {
929 unimplemented!()
930 }
931
932 async fn get_balances(&self, _dbtx: &mut DatabaseTransaction<'_>) -> Amounts {
935 unimplemented!()
936 }
937
938 async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
941 unimplemented!()
942 }
943
944 async fn leave(&self, _dbtx: &mut DatabaseTransaction<'_>) -> anyhow::Result<()> {
1000 bail!("Unable to determine if safe to leave the federation: Not implemented")
1001 }
1002}
1003
1004#[apply(async_trait_maybe_send!)]
1006pub trait IClientModule: Debug {
1007 fn as_any(&self) -> &(maybe_add_send_sync!(dyn std::any::Any));
1008
1009 fn decoder(&self) -> Decoder;
1010
1011 fn context(&self, instance: ModuleInstanceId) -> DynContext;
1012
1013 async fn start(&self);
1014
1015 async fn handle_cli_command(&self, args: &[ffi::OsString])
1016 -> anyhow::Result<serde_json::Value>;
1017
1018 async fn handle_rpc(
1019 &self,
1020 method: String,
1021 request: serde_json::Value,
1022 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>>;
1023
1024 fn input_fee(&self, amount: &Amounts, input: &DynInput) -> Option<Amounts>;
1025
1026 fn output_fee(&self, amount: &Amounts, output: &DynOutput) -> Option<Amounts>;
1027
1028 fn supports_backup(&self) -> bool;
1029
1030 async fn backup(&self, module_instance_id: ModuleInstanceId)
1031 -> anyhow::Result<DynModuleBackup>;
1032
1033 fn supports_being_primary(&self) -> PrimaryModuleSupport;
1034
1035 async fn create_final_inputs_and_outputs(
1036 &self,
1037 module_instance: ModuleInstanceId,
1038 dbtx: &mut DatabaseTransaction<'_>,
1039 operation_id: OperationId,
1040 unit: AmountUnit,
1041 input_amount: Amount,
1042 output_amount: Amount,
1043 ) -> anyhow::Result<(ClientInputBundle, ClientOutputBundle)>;
1044
1045 async fn await_primary_module_output(
1046 &self,
1047 operation_id: OperationId,
1048 out_point: OutPoint,
1049 ) -> anyhow::Result<()>;
1050
1051 async fn get_balance(
1052 &self,
1053 module_instance: ModuleInstanceId,
1054 dbtx: &mut DatabaseTransaction<'_>,
1055 unit: AmountUnit,
1056 ) -> Amount;
1057
1058 async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()>;
1059}
1060
1061#[apply(async_trait_maybe_send!)]
1062impl<T> IClientModule for T
1063where
1064 T: ClientModule,
1065{
1066 fn as_any(&self) -> &(maybe_add_send_sync!(dyn Any)) {
1067 self
1068 }
1069
1070 fn decoder(&self) -> Decoder {
1071 T::decoder()
1072 }
1073
1074 fn context(&self, instance: ModuleInstanceId) -> DynContext {
1075 DynContext::from_typed(instance, <T as ClientModule>::context(self))
1076 }
1077
1078 async fn start(&self) {
1079 <T as ClientModule>::start(self).await;
1080 }
1081
1082 async fn handle_cli_command(
1083 &self,
1084 args: &[ffi::OsString],
1085 ) -> anyhow::Result<serde_json::Value> {
1086 <T as ClientModule>::handle_cli_command(self, args).await
1087 }
1088
1089 async fn handle_rpc(
1090 &self,
1091 method: String,
1092 request: serde_json::Value,
1093 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1094 <T as ClientModule>::handle_rpc(self, method, request).await
1095 }
1096
1097 fn input_fee(&self, amount: &Amounts, input: &DynInput) -> Option<Amounts> {
1098 <T as ClientModule>::input_fee(
1099 self,
1100 amount,
1101 input
1102 .as_any()
1103 .downcast_ref()
1104 .expect("Dispatched to correct module"),
1105 )
1106 }
1107
1108 fn output_fee(&self, amount: &Amounts, output: &DynOutput) -> Option<Amounts> {
1109 <T as ClientModule>::output_fee(
1110 self,
1111 amount,
1112 output
1113 .as_any()
1114 .downcast_ref()
1115 .expect("Dispatched to correct module"),
1116 )
1117 }
1118
1119 fn supports_backup(&self) -> bool {
1120 <T as ClientModule>::supports_backup(self)
1121 }
1122
1123 async fn backup(
1124 &self,
1125 module_instance_id: ModuleInstanceId,
1126 ) -> anyhow::Result<DynModuleBackup> {
1127 Ok(DynModuleBackup::from_typed(
1128 module_instance_id,
1129 <T as ClientModule>::backup(self).await?,
1130 ))
1131 }
1132
1133 fn supports_being_primary(&self) -> PrimaryModuleSupport {
1134 <T as ClientModule>::supports_being_primary(self)
1135 }
1136
1137 async fn create_final_inputs_and_outputs(
1138 &self,
1139 module_instance: ModuleInstanceId,
1140 dbtx: &mut DatabaseTransaction<'_>,
1141 operation_id: OperationId,
1142 unit: AmountUnit,
1143 input_amount: Amount,
1144 output_amount: Amount,
1145 ) -> anyhow::Result<(ClientInputBundle, ClientOutputBundle)> {
1146 let (inputs, outputs) = <T as ClientModule>::create_final_inputs_and_outputs(
1147 self,
1148 &mut dbtx.to_ref_with_prefix_module_id(module_instance).0,
1149 operation_id,
1150 unit,
1151 input_amount,
1152 output_amount,
1153 )
1154 .await?;
1155
1156 let inputs = inputs.into_dyn(module_instance);
1157
1158 let outputs = outputs.into_dyn(module_instance);
1159
1160 Ok((inputs, outputs))
1161 }
1162
1163 async fn await_primary_module_output(
1164 &self,
1165 operation_id: OperationId,
1166 out_point: OutPoint,
1167 ) -> anyhow::Result<()> {
1168 <T as ClientModule>::await_primary_module_output(self, operation_id, out_point).await
1169 }
1170
1171 async fn get_balance(
1172 &self,
1173 module_instance: ModuleInstanceId,
1174 dbtx: &mut DatabaseTransaction<'_>,
1175 unit: AmountUnit,
1176 ) -> Amount {
1177 <T as ClientModule>::get_balance(
1178 self,
1179 &mut dbtx.to_ref_with_prefix_module_id(module_instance).0,
1180 unit,
1181 )
1182 .await
1183 }
1184
1185 async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
1186 <T as ClientModule>::subscribe_balance_changes(self).await
1187 }
1188}
1189
1190dyn_newtype_define!(
1191 #[derive(Clone)]
1192 pub DynClientModule(Arc<IClientModule>)
1193);
1194
1195impl AsRef<maybe_add_send_sync!(dyn IClientModule + 'static)> for DynClientModule {
1196 fn as_ref(&self) -> &maybe_add_send_sync!(dyn IClientModule + 'static) {
1197 self.inner.as_ref()
1198 }
1199}
1200
1201pub use fedimint_core::{IdxRange, OutPointRange, OutPointRangeIter};
1203
1204pub type StateGenerator<S> = Arc<maybe_add_send_sync!(dyn Fn(OutPointRange) -> Vec<S> + 'static)>;