1use core::fmt;
2use std::any::Any;
3use std::collections::BTreeSet;
4use std::fmt::Debug;
5use std::pin::Pin;
6use std::sync::{Arc, Weak};
7use std::{ffi, marker, ops};
8
9use anyhow::{anyhow, bail};
10use bitcoin::secp256k1::PublicKey;
11use fedimint_api_client::api::{DynGlobalApi, DynModuleApi};
12use fedimint_core::config::ClientConfig;
13use fedimint_core::core::{
14 Decoder, DynInput, DynOutput, IInput, IntoDynInstance, ModuleInstanceId, ModuleKind,
15 OperationId,
16};
17use fedimint_core::db::{Database, DatabaseTransaction, GlobalDBTxAccessToken, NonCommittable};
18use fedimint_core::invite_code::InviteCode;
19use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
20use fedimint_core::module::{AmountUnit, Amounts, CommonModuleInit, ModuleCommon, ModuleInit};
21use fedimint_core::task::{MaybeSend, MaybeSync};
22use fedimint_core::util::BoxStream;
23use fedimint_core::{
24 Amount, OutPoint, PeerId, apply, async_trait_maybe_send, dyn_newtype_define, maybe_add_send,
25 maybe_add_send_sync,
26};
27use fedimint_eventlog::{Event, EventKind, EventPersistence};
28use fedimint_logging::LOG_CLIENT;
29use futures::Stream;
30use serde::Serialize;
31use serde::de::DeserializeOwned;
32use tracing::warn;
33
34use self::init::ClientModuleInit;
35use crate::module::recovery::{DynModuleBackup, ModuleBackup};
36use crate::oplog::{IOperationLog, OperationLogEntry, UpdateStreamOrOutcome};
37use crate::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
38use crate::sm::{self, ActiveStateMeta, Context, DynContext, DynState, InactiveStateMeta, State};
39use crate::transaction::{ClientInputBundle, ClientOutputBundle, TransactionBuilder};
40use crate::{AddStateMachinesResult, InstancelessDynClientInputBundle, TransactionUpdates, oplog};
41
42pub mod init;
43pub mod recovery;
44
45pub type ClientModuleRegistry = ModuleRegistry<DynClientModule>;
46
47#[apply(async_trait_maybe_send!)]
56pub trait ClientContextIface: MaybeSend + MaybeSync {
57 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule);
58 fn api_clone(&self) -> DynGlobalApi;
59 fn decoders(&self) -> &ModuleDecoderRegistry;
60 async fn finalize_and_submit_transaction(
61 &self,
62 operation_id: OperationId,
63 operation_type: &str,
64 operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
65 tx_builder: TransactionBuilder,
66 ) -> anyhow::Result<OutPointRange>;
67
68 async fn finalize_and_submit_transaction_inner(
70 &self,
71 dbtx: &mut DatabaseTransaction<'_>,
72 operation_id: OperationId,
73 tx_builder: TransactionBuilder,
74 ) -> anyhow::Result<OutPointRange>;
75
76 async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates;
77
78 async fn await_primary_module_outputs(
79 &self,
80 operation_id: OperationId,
81 outputs: Vec<OutPoint>,
83 ) -> anyhow::Result<()>;
84
85 fn operation_log(&self) -> &dyn IOperationLog;
86
87 async fn has_active_states(&self, operation_id: OperationId) -> bool;
88
89 async fn operation_exists(&self, operation_id: OperationId) -> bool;
90
91 async fn config(&self) -> ClientConfig;
92
93 fn db(&self) -> &Database;
94
95 fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static));
96
97 async fn invite_code(&self, peer: PeerId) -> Option<InviteCode>;
98
99 fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)>;
100
101 #[allow(clippy::too_many_arguments)]
102 async fn log_event_json(
103 &self,
104 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
105 module_kind: Option<ModuleKind>,
106 module_id: ModuleInstanceId,
107 kind: EventKind,
108 payload: serde_json::Value,
109 persist: EventPersistence,
110 );
111
112 async fn read_operation_active_states<'dbtx>(
113 &self,
114 operation_id: OperationId,
115 module_id: ModuleInstanceId,
116 dbtx: &'dbtx mut DatabaseTransaction<'_>,
117 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>;
118
119 async fn read_operation_inactive_states<'dbtx>(
120 &self,
121 operation_id: OperationId,
122 module_id: ModuleInstanceId,
123 dbtx: &'dbtx mut DatabaseTransaction<'_>,
124 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>;
125}
126
127#[derive(Clone, Default)]
133pub struct FinalClientIface(Arc<std::sync::OnceLock<Weak<dyn ClientContextIface>>>);
134
135impl FinalClientIface {
136 pub(crate) fn get(&self) -> Arc<dyn ClientContextIface> {
142 self.0
143 .get()
144 .expect("client must be already set")
145 .upgrade()
146 .expect("client module context must not be use past client shutdown")
147 }
148
149 pub fn set(&self, client: Weak<dyn ClientContextIface>) {
150 self.0.set(client).expect("FinalLazyClient already set");
151 }
152}
153
154impl fmt::Debug for FinalClientIface {
155 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156 f.write_str("FinalClientIface")
157 }
158}
159pub struct ClientContext<M> {
164 client: FinalClientIface,
165 module_instance_id: ModuleInstanceId,
166 global_dbtx_access_token: GlobalDBTxAccessToken,
167 module_db: Database,
168 _marker: marker::PhantomData<M>,
169}
170
171impl<M> Clone for ClientContext<M> {
172 fn clone(&self) -> Self {
173 Self {
174 client: self.client.clone(),
175 module_db: self.module_db.clone(),
176 module_instance_id: self.module_instance_id,
177 _marker: marker::PhantomData,
178 global_dbtx_access_token: self.global_dbtx_access_token,
179 }
180 }
181}
182
183pub struct ClientContextSelfRef<'s, M> {
186 client: Arc<dyn ClientContextIface>,
189 module_instance_id: ModuleInstanceId,
190 _marker: marker::PhantomData<&'s M>,
191}
192
193impl<M> ops::Deref for ClientContextSelfRef<'_, M>
194where
195 M: ClientModule,
196{
197 type Target = M;
198
199 fn deref(&self) -> &Self::Target {
200 self.client
201 .get_module(self.module_instance_id)
202 .as_any()
203 .downcast_ref::<M>()
204 .unwrap_or_else(|| panic!("Module is not of type {}", std::any::type_name::<M>()))
205 }
206}
207
208impl<M> fmt::Debug for ClientContext<M> {
209 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
210 f.write_str("ClientContext")
211 }
212}
213
214impl<M> ClientContext<M>
215where
216 M: ClientModule,
217{
218 pub fn new(
219 client: FinalClientIface,
220 module_instance_id: ModuleInstanceId,
221 global_dbtx_access_token: GlobalDBTxAccessToken,
222 module_db: Database,
223 ) -> Self {
224 Self {
225 client,
226 module_instance_id,
227 global_dbtx_access_token,
228 module_db,
229 _marker: marker::PhantomData,
230 }
231 }
232
233 #[allow(clippy::needless_lifetimes)] pub fn self_ref(&self) -> ClientContextSelfRef<'_, M> {
245 ClientContextSelfRef {
246 client: self.client.get(),
247 module_instance_id: self.module_instance_id,
248 _marker: marker::PhantomData,
249 }
250 }
251
252 pub fn global_api(&self) -> DynGlobalApi {
254 self.client.get().api_clone()
255 }
256
257 pub fn module_api(&self) -> DynModuleApi {
259 self.global_api().with_module(self.module_instance_id)
260 }
261
262 pub fn decoders(&self) -> ModuleDecoderRegistry {
264 Clone::clone(self.client.get().decoders())
265 }
266
267 pub fn input_from_dyn<'i>(
268 &self,
269 input: &'i DynInput,
270 ) -> Option<&'i <M::Common as ModuleCommon>::Input> {
271 (input.module_instance_id() == self.module_instance_id).then(|| {
272 input
273 .as_any()
274 .downcast_ref::<<M::Common as ModuleCommon>::Input>()
275 .unwrap_or_else(|| {
276 panic!("instance_id {} just checked", input.module_instance_id())
277 })
278 })
279 }
280
281 pub fn output_from_dyn<'o>(
282 &self,
283 output: &'o DynOutput,
284 ) -> Option<&'o <M::Common as ModuleCommon>::Output> {
285 (output.module_instance_id() == self.module_instance_id).then(|| {
286 output
287 .as_any()
288 .downcast_ref::<<M::Common as ModuleCommon>::Output>()
289 .unwrap_or_else(|| {
290 panic!("instance_id {} just checked", output.module_instance_id())
291 })
292 })
293 }
294
295 pub fn map_dyn<'s, 'i, 'o, I>(
296 &'s self,
297 typed: impl IntoIterator<Item = I> + 'i,
298 ) -> impl Iterator<Item = <I as IntoDynInstance>::DynType> + 'o
299 where
300 I: IntoDynInstance,
301 'i: 'o,
302 's: 'o,
303 {
304 typed.into_iter().map(|i| self.make_dyn(i))
305 }
306
307 pub fn make_dyn_output(&self, output: <M::Common as ModuleCommon>::Output) -> DynOutput {
309 self.make_dyn(output)
310 }
311
312 pub fn make_dyn_input(&self, input: <M::Common as ModuleCommon>::Input) -> DynInput {
314 self.make_dyn(input)
315 }
316
317 pub fn make_dyn<I>(&self, typed: I) -> <I as IntoDynInstance>::DynType
319 where
320 I: IntoDynInstance,
321 {
322 typed.into_dyn(self.module_instance_id)
323 }
324
325 pub fn make_client_outputs<O, S>(&self, output: ClientOutputBundle<O, S>) -> ClientOutputBundle
327 where
328 O: IntoDynInstance<DynType = DynOutput> + 'static,
329 S: IntoDynInstance<DynType = DynState> + 'static,
330 {
331 self.make_dyn(output)
332 }
333
334 pub fn make_client_inputs<I, S>(&self, inputs: ClientInputBundle<I, S>) -> ClientInputBundle
336 where
337 I: IntoDynInstance<DynType = DynInput> + 'static,
338 S: IntoDynInstance<DynType = DynState> + 'static,
339 {
340 self.make_dyn(inputs)
341 }
342
343 pub fn make_dyn_state<S>(&self, sm: S) -> DynState
344 where
345 S: sm::IState + 'static,
346 {
347 DynState::from_typed(self.module_instance_id, sm)
348 }
349
350 pub async fn finalize_and_submit_transaction<F, Meta>(
351 &self,
352 operation_id: OperationId,
353 operation_type: &str,
354 operation_meta_gen: F,
355 tx_builder: TransactionBuilder,
356 ) -> anyhow::Result<OutPointRange>
357 where
358 F: Fn(OutPointRange) -> Meta + Clone + MaybeSend + MaybeSync + 'static,
359 Meta: serde::Serialize + MaybeSend,
360 {
361 self.client
362 .get()
363 .finalize_and_submit_transaction(
364 operation_id,
365 operation_type,
366 Box::new(move |out_point_range| {
367 serde_json::to_value(operation_meta_gen(out_point_range)).expect("Can't fail")
368 }),
369 tx_builder,
370 )
371 .await
372 }
373
374 pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
375 self.client.get().transaction_updates(operation_id).await
376 }
377
378 pub async fn await_primary_module_outputs(
379 &self,
380 operation_id: OperationId,
381 outputs: Vec<OutPoint>,
383 ) -> anyhow::Result<()> {
384 self.client
385 .get()
386 .await_primary_module_outputs(operation_id, outputs)
387 .await
388 }
389
390 pub async fn get_operation(
392 &self,
393 operation_id: OperationId,
394 ) -> anyhow::Result<oplog::OperationLogEntry> {
395 let operation = self
396 .client
397 .get()
398 .operation_log()
399 .get_operation(operation_id)
400 .await
401 .ok_or(anyhow::anyhow!("Operation not found"))?;
402
403 if operation.operation_module_kind() != M::kind().as_str() {
404 bail!("Operation is not a lightning operation");
405 }
406
407 Ok(operation)
408 }
409
410 fn global_db(&self) -> fedimint_core::db::Database {
414 let db = Clone::clone(self.client.get().db());
415
416 db.ensure_global()
417 .expect("global_db must always return a global db");
418
419 db
420 }
421
422 pub fn module_db(&self) -> &Database {
423 self.module_db
424 .ensure_isolated()
425 .expect("module_db must always return isolated db");
426 &self.module_db
427 }
428
429 pub async fn has_active_states(&self, op_id: OperationId) -> bool {
430 self.client.get().has_active_states(op_id).await
431 }
432
433 pub async fn operation_exists(&self, op_id: OperationId) -> bool {
434 self.client.get().operation_exists(op_id).await
435 }
436
437 pub async fn get_own_active_states(&self) -> Vec<(M::States, ActiveStateMeta)> {
438 self.client
439 .get()
440 .executor()
441 .get_active_states()
442 .await
443 .into_iter()
444 .filter(|s| s.0.module_instance_id() == self.module_instance_id)
445 .map(|s| {
446 (
447 Clone::clone(
448 s.0.as_any()
449 .downcast_ref::<M::States>()
450 .expect("incorrect output type passed to module plugin"),
451 ),
452 s.1,
453 )
454 })
455 .collect()
456 }
457
458 pub async fn get_config(&self) -> ClientConfig {
459 self.client.get().config().await
460 }
461
462 pub async fn get_invite_code(&self) -> InviteCode {
465 let cfg = self.get_config().await.global;
466 self.client
467 .get()
468 .invite_code(
469 *cfg.api_endpoints
470 .keys()
471 .next()
472 .expect("A federation always has at least one guardian"),
473 )
474 .await
475 .expect("The guardian we requested an invite code for exists")
476 }
477
478 pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
479 self.client.get().get_internal_payment_markers()
480 }
481
482 pub async fn manual_operation_start(
485 &self,
486 operation_id: OperationId,
487 op_type: &str,
488 operation_meta: impl serde::Serialize + Debug,
489 sms: Vec<DynState>,
490 ) -> anyhow::Result<()> {
491 let db = self.module_db();
492 let mut dbtx = db.begin_transaction().await;
493 {
494 let dbtx = &mut dbtx.global_dbtx(self.global_dbtx_access_token);
495
496 self.manual_operation_start_inner(
497 &mut dbtx.to_ref_nc(),
498 operation_id,
499 op_type,
500 operation_meta,
501 sms,
502 )
503 .await?;
504 }
505
506 dbtx.commit_tx_result().await.map_err(|_| {
507 anyhow!(
508 "Operation with id {} already exists",
509 operation_id.fmt_short()
510 )
511 })?;
512
513 Ok(())
514 }
515
516 pub async fn manual_operation_start_dbtx(
517 &self,
518 dbtx: &mut DatabaseTransaction<'_>,
519 operation_id: OperationId,
520 op_type: &str,
521 operation_meta: impl serde::Serialize + Debug,
522 sms: Vec<DynState>,
523 ) -> anyhow::Result<()> {
524 self.manual_operation_start_inner(
525 &mut dbtx.global_dbtx(self.global_dbtx_access_token),
526 operation_id,
527 op_type,
528 operation_meta,
529 sms,
530 )
531 .await
532 }
533
534 async fn manual_operation_start_inner(
537 &self,
538 dbtx: &mut DatabaseTransaction<'_>,
539 operation_id: OperationId,
540 op_type: &str,
541 operation_meta: impl serde::Serialize + Debug,
542 sms: Vec<DynState>,
543 ) -> anyhow::Result<()> {
544 dbtx.ensure_global()
545 .expect("Must deal with global dbtx here");
546
547 if self
548 .client
549 .get()
550 .operation_log()
551 .get_operation_dbtx(&mut dbtx.to_ref_nc(), operation_id)
552 .await
553 .is_some()
554 {
555 bail!(
556 "Operation with id {} already exists",
557 operation_id.fmt_short()
558 );
559 }
560
561 self.client
562 .get()
563 .operation_log()
564 .add_operation_log_entry_dbtx(
565 &mut dbtx.to_ref_nc(),
566 operation_id,
567 op_type,
568 serde_json::to_value(operation_meta).expect("Can't fail"),
569 )
570 .await;
571
572 self.client
573 .get()
574 .executor()
575 .add_state_machines_dbtx(&mut dbtx.to_ref_nc(), sms)
576 .await
577 .expect("State machine is valid");
578
579 Ok(())
580 }
581
582 pub fn outcome_or_updates<U, S>(
583 &self,
584 operation: OperationLogEntry,
585 operation_id: OperationId,
586 stream_gen: impl FnOnce() -> S + 'static,
587 ) -> UpdateStreamOrOutcome<U>
588 where
589 U: Clone + Serialize + DeserializeOwned + Debug + MaybeSend + MaybeSync + 'static,
590 S: Stream<Item = U> + MaybeSend + 'static,
591 {
592 use futures::StreamExt;
593 match self.client.get().operation_log().outcome_or_updates(
594 &self.global_db(),
595 operation_id,
596 operation,
597 Box::new(move || {
598 let stream_gen = stream_gen();
599 Box::pin(
600 stream_gen.map(move |item| serde_json::to_value(item).expect("Can't fail")),
601 )
602 }),
603 ) {
604 UpdateStreamOrOutcome::UpdateStream(stream) => UpdateStreamOrOutcome::UpdateStream(
605 Box::pin(stream.map(|u| serde_json::from_value(u).expect("Can't fail"))),
606 ),
607 UpdateStreamOrOutcome::Outcome(o) => {
608 UpdateStreamOrOutcome::Outcome(serde_json::from_value(o).expect("Can't fail"))
609 }
610 }
611 }
612
613 pub async fn claim_inputs<I, S>(
614 &self,
615 dbtx: &mut DatabaseTransaction<'_>,
616 inputs: ClientInputBundle<I, S>,
617 operation_id: OperationId,
618 ) -> anyhow::Result<OutPointRange>
619 where
620 I: IInput + MaybeSend + MaybeSync + 'static,
621 S: sm::IState + MaybeSend + MaybeSync + 'static,
622 {
623 self.claim_inputs_dyn(dbtx, inputs.into_instanceless(), operation_id)
624 .await
625 }
626
627 async fn claim_inputs_dyn(
628 &self,
629 dbtx: &mut DatabaseTransaction<'_>,
630 inputs: InstancelessDynClientInputBundle,
631 operation_id: OperationId,
632 ) -> anyhow::Result<OutPointRange> {
633 let tx_builder =
634 TransactionBuilder::new().with_inputs(inputs.into_dyn(self.module_instance_id));
635
636 self.client
637 .get()
638 .finalize_and_submit_transaction_inner(
639 &mut dbtx.global_dbtx(self.global_dbtx_access_token),
640 operation_id,
641 tx_builder,
642 )
643 .await
644 }
645
646 pub async fn add_state_machines_dbtx(
647 &self,
648 dbtx: &mut DatabaseTransaction<'_>,
649 states: Vec<DynState>,
650 ) -> AddStateMachinesResult {
651 self.client
652 .get()
653 .executor()
654 .add_state_machines_dbtx(&mut dbtx.global_dbtx(self.global_dbtx_access_token), states)
655 .await
656 }
657
658 pub async fn add_operation_log_entry_dbtx(
659 &self,
660 dbtx: &mut DatabaseTransaction<'_>,
661 operation_id: OperationId,
662 operation_type: &str,
663 operation_meta: impl serde::Serialize,
664 ) {
665 self.client
666 .get()
667 .operation_log()
668 .add_operation_log_entry_dbtx(
669 &mut dbtx.global_dbtx(self.global_dbtx_access_token),
670 operation_id,
671 operation_type,
672 serde_json::to_value(operation_meta).expect("Can't fail"),
673 )
674 .await;
675 }
676
677 pub async fn log_event<E, Cap>(&self, dbtx: &mut DatabaseTransaction<'_, Cap>, event: E)
678 where
679 E: Event + Send,
680 Cap: Send,
681 {
682 if <E as Event>::MODULE != Some(<M as ClientModule>::kind()) {
683 warn!(
684 target: LOG_CLIENT,
685 module_kind = %<M as ClientModule>::kind(),
686 event_module = ?<E as Event>::MODULE,
687 "Client module logging events of different module than its own. This might become an error in the future."
688 );
689 }
690 self.client
691 .get()
692 .log_event_json(
693 &mut dbtx.global_dbtx(self.global_dbtx_access_token).to_ref_nc(),
694 <E as Event>::MODULE,
695 self.module_instance_id,
696 <E as Event>::KIND,
697 serde_json::to_value(event).expect("Can't fail"),
698 <E as Event>::PERSISTENCE,
699 )
700 .await;
701 }
702}
703
704#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
706pub struct PrimaryModulePriority(u64);
707
708impl PrimaryModulePriority {
709 pub const HIGH: Self = Self(100);
710 pub const LOW: Self = Self(10000);
711
712 pub fn custom(prio: u64) -> Self {
713 Self(prio)
714 }
715}
716pub enum PrimaryModuleSupport {
718 Any { priority: PrimaryModulePriority },
720 Selected {
722 priority: PrimaryModulePriority,
723 units: BTreeSet<AmountUnit>,
724 },
725 None,
727}
728
729impl PrimaryModuleSupport {
730 pub fn selected<const N: usize>(
731 priority: PrimaryModulePriority,
732 units: [AmountUnit; N],
733 ) -> Self {
734 Self::Selected {
735 priority,
736 units: BTreeSet::from(units),
737 }
738 }
739}
740
741#[apply(async_trait_maybe_send!)]
743pub trait ClientModule: Debug + MaybeSend + MaybeSync + 'static {
744 type Init: ClientModuleInit;
745
746 type Common: ModuleCommon;
748
749 type Backup: ModuleBackup;
752
753 type ModuleStateMachineContext: Context;
756
757 type States: State<ModuleContext = Self::ModuleStateMachineContext>
759 + IntoDynInstance<DynType = DynState>;
760
761 fn decoder() -> Decoder {
762 let mut decoder_builder = Self::Common::decoder_builder();
763 decoder_builder.with_decodable_type::<Self::States>();
764 decoder_builder.with_decodable_type::<Self::Backup>();
765 decoder_builder.build()
766 }
767
768 fn kind() -> ModuleKind {
769 <<<Self as ClientModule>::Init as ModuleInit>::Common as CommonModuleInit>::KIND
770 }
771
772 fn context(&self) -> Self::ModuleStateMachineContext;
773
774 async fn start(&self) {}
780
781 async fn handle_cli_command(
782 &self,
783 _args: &[ffi::OsString],
784 ) -> anyhow::Result<serde_json::Value> {
785 Err(anyhow::format_err!(
786 "This module does not implement cli commands"
787 ))
788 }
789
790 async fn handle_rpc(
791 &self,
792 _method: String,
793 _request: serde_json::Value,
794 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
795 Box::pin(futures::stream::once(std::future::ready(Err(
796 anyhow::format_err!("This module does not implement rpc"),
797 ))))
798 }
799
800 fn input_fee(
809 &self,
810 amount: &Amounts,
811 input: &<Self::Common as ModuleCommon>::Input,
812 ) -> Option<Amounts>;
813
814 fn output_fee(
823 &self,
824 amount: &Amounts,
825 output: &<Self::Common as ModuleCommon>::Output,
826 ) -> Option<Amounts>;
827
828 fn supports_backup(&self) -> bool {
829 false
830 }
831
832 async fn backup(&self) -> anyhow::Result<Self::Backup> {
833 anyhow::bail!("Backup not supported");
834 }
835
836 fn supports_being_primary(&self) -> PrimaryModuleSupport {
845 PrimaryModuleSupport::None
846 }
847
848 async fn create_final_inputs_and_outputs(
866 &self,
867 _dbtx: &mut DatabaseTransaction<'_>,
868 _operation_id: OperationId,
869 _unit: AmountUnit,
870 _input_amount: Amount,
871 _output_amount: Amount,
872 ) -> anyhow::Result<(
873 ClientInputBundle<<Self::Common as ModuleCommon>::Input, Self::States>,
874 ClientOutputBundle<<Self::Common as ModuleCommon>::Output, Self::States>,
875 )> {
876 unimplemented!()
877 }
878
879 async fn await_primary_module_output(
884 &self,
885 _operation_id: OperationId,
886 _out_point: OutPoint,
887 ) -> anyhow::Result<()> {
888 unimplemented!()
889 }
890
891 async fn get_balance(&self, _dbtx: &mut DatabaseTransaction<'_>, _unit: AmountUnit) -> Amount {
894 unimplemented!()
895 }
896
897 async fn get_balances(&self, _dbtx: &mut DatabaseTransaction<'_>) -> Amounts {
900 unimplemented!()
901 }
902
903 async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
906 unimplemented!()
907 }
908
909 async fn leave(&self, _dbtx: &mut DatabaseTransaction<'_>) -> anyhow::Result<()> {
965 bail!("Unable to determine if safe to leave the federation: Not implemented")
966 }
967}
968
969#[apply(async_trait_maybe_send!)]
971pub trait IClientModule: Debug {
972 fn as_any(&self) -> &(maybe_add_send_sync!(dyn std::any::Any));
973
974 fn decoder(&self) -> Decoder;
975
976 fn context(&self, instance: ModuleInstanceId) -> DynContext;
977
978 async fn start(&self);
979
980 async fn handle_cli_command(&self, args: &[ffi::OsString])
981 -> anyhow::Result<serde_json::Value>;
982
983 async fn handle_rpc(
984 &self,
985 method: String,
986 request: serde_json::Value,
987 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>>;
988
989 fn input_fee(&self, amount: &Amounts, input: &DynInput) -> Option<Amounts>;
990
991 fn output_fee(&self, amount: &Amounts, output: &DynOutput) -> Option<Amounts>;
992
993 fn supports_backup(&self) -> bool;
994
995 async fn backup(&self, module_instance_id: ModuleInstanceId)
996 -> anyhow::Result<DynModuleBackup>;
997
998 fn supports_being_primary(&self) -> PrimaryModuleSupport;
999
1000 async fn create_final_inputs_and_outputs(
1001 &self,
1002 module_instance: ModuleInstanceId,
1003 dbtx: &mut DatabaseTransaction<'_>,
1004 operation_id: OperationId,
1005 unit: AmountUnit,
1006 input_amount: Amount,
1007 output_amount: Amount,
1008 ) -> anyhow::Result<(ClientInputBundle, ClientOutputBundle)>;
1009
1010 async fn await_primary_module_output(
1011 &self,
1012 operation_id: OperationId,
1013 out_point: OutPoint,
1014 ) -> anyhow::Result<()>;
1015
1016 async fn get_balance(
1017 &self,
1018 module_instance: ModuleInstanceId,
1019 dbtx: &mut DatabaseTransaction<'_>,
1020 unit: AmountUnit,
1021 ) -> Amount;
1022
1023 async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()>;
1024}
1025
1026#[apply(async_trait_maybe_send!)]
1027impl<T> IClientModule for T
1028where
1029 T: ClientModule,
1030{
1031 fn as_any(&self) -> &(maybe_add_send_sync!(dyn Any)) {
1032 self
1033 }
1034
1035 fn decoder(&self) -> Decoder {
1036 T::decoder()
1037 }
1038
1039 fn context(&self, instance: ModuleInstanceId) -> DynContext {
1040 DynContext::from_typed(instance, <T as ClientModule>::context(self))
1041 }
1042
1043 async fn start(&self) {
1044 <T as ClientModule>::start(self).await;
1045 }
1046
1047 async fn handle_cli_command(
1048 &self,
1049 args: &[ffi::OsString],
1050 ) -> anyhow::Result<serde_json::Value> {
1051 <T as ClientModule>::handle_cli_command(self, args).await
1052 }
1053
1054 async fn handle_rpc(
1055 &self,
1056 method: String,
1057 request: serde_json::Value,
1058 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1059 <T as ClientModule>::handle_rpc(self, method, request).await
1060 }
1061
1062 fn input_fee(&self, amount: &Amounts, input: &DynInput) -> Option<Amounts> {
1063 <T as ClientModule>::input_fee(
1064 self,
1065 amount,
1066 input
1067 .as_any()
1068 .downcast_ref()
1069 .expect("Dispatched to correct module"),
1070 )
1071 }
1072
1073 fn output_fee(&self, amount: &Amounts, output: &DynOutput) -> Option<Amounts> {
1074 <T as ClientModule>::output_fee(
1075 self,
1076 amount,
1077 output
1078 .as_any()
1079 .downcast_ref()
1080 .expect("Dispatched to correct module"),
1081 )
1082 }
1083
1084 fn supports_backup(&self) -> bool {
1085 <T as ClientModule>::supports_backup(self)
1086 }
1087
1088 async fn backup(
1089 &self,
1090 module_instance_id: ModuleInstanceId,
1091 ) -> anyhow::Result<DynModuleBackup> {
1092 Ok(DynModuleBackup::from_typed(
1093 module_instance_id,
1094 <T as ClientModule>::backup(self).await?,
1095 ))
1096 }
1097
1098 fn supports_being_primary(&self) -> PrimaryModuleSupport {
1099 <T as ClientModule>::supports_being_primary(self)
1100 }
1101
1102 async fn create_final_inputs_and_outputs(
1103 &self,
1104 module_instance: ModuleInstanceId,
1105 dbtx: &mut DatabaseTransaction<'_>,
1106 operation_id: OperationId,
1107 unit: AmountUnit,
1108 input_amount: Amount,
1109 output_amount: Amount,
1110 ) -> anyhow::Result<(ClientInputBundle, ClientOutputBundle)> {
1111 let (inputs, outputs) = <T as ClientModule>::create_final_inputs_and_outputs(
1112 self,
1113 &mut dbtx.to_ref_with_prefix_module_id(module_instance).0,
1114 operation_id,
1115 unit,
1116 input_amount,
1117 output_amount,
1118 )
1119 .await?;
1120
1121 let inputs = inputs.into_dyn(module_instance);
1122
1123 let outputs = outputs.into_dyn(module_instance);
1124
1125 Ok((inputs, outputs))
1126 }
1127
1128 async fn await_primary_module_output(
1129 &self,
1130 operation_id: OperationId,
1131 out_point: OutPoint,
1132 ) -> anyhow::Result<()> {
1133 <T as ClientModule>::await_primary_module_output(self, operation_id, out_point).await
1134 }
1135
1136 async fn get_balance(
1137 &self,
1138 module_instance: ModuleInstanceId,
1139 dbtx: &mut DatabaseTransaction<'_>,
1140 unit: AmountUnit,
1141 ) -> Amount {
1142 <T as ClientModule>::get_balance(
1143 self,
1144 &mut dbtx.to_ref_with_prefix_module_id(module_instance).0,
1145 unit,
1146 )
1147 .await
1148 }
1149
1150 async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
1151 <T as ClientModule>::subscribe_balance_changes(self).await
1152 }
1153}
1154
1155dyn_newtype_define!(
1156 #[derive(Clone)]
1157 pub DynClientModule(Arc<IClientModule>)
1158);
1159
1160impl AsRef<maybe_add_send_sync!(dyn IClientModule + 'static)> for DynClientModule {
1161 fn as_ref(&self) -> &maybe_add_send_sync!(dyn IClientModule + 'static) {
1162 self.inner.as_ref()
1163 }
1164}
1165
1166pub use fedimint_core::{IdxRange, OutPointRange, OutPointRangeIter};
1168
1169pub type StateGenerator<S> = Arc<maybe_add_send_sync!(dyn Fn(OutPointRange) -> Vec<S> + 'static)>;