1use core::fmt;
2use std::any::Any;
3use std::collections::BTreeSet;
4use std::fmt::Debug;
5use std::pin::Pin;
6use std::sync::{Arc, Weak};
7use std::{ffi, marker, ops};
8
9use anyhow::{anyhow, bail};
10use bitcoin::secp256k1::PublicKey;
11use fedimint_api_client::api::{DynGlobalApi, DynModuleApi};
12use fedimint_core::config::ClientConfig;
13use fedimint_core::core::{
14 Decoder, DynInput, DynOutput, IInput, IntoDynInstance, ModuleInstanceId, ModuleKind,
15 OperationId,
16};
17use fedimint_core::db::{Database, DatabaseTransaction, GlobalDBTxAccessToken, NonCommittable};
18use fedimint_core::invite_code::InviteCode;
19use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
20use fedimint_core::module::{AmountUnit, Amounts, CommonModuleInit, ModuleCommon, ModuleInit};
21use fedimint_core::task::{MaybeSend, MaybeSync};
22use fedimint_core::util::BoxStream;
23use fedimint_core::{
24 Amount, OutPoint, PeerId, apply, async_trait_maybe_send, dyn_newtype_define, maybe_add_send,
25 maybe_add_send_sync,
26};
27use fedimint_eventlog::{Event, EventKind, EventPersistence};
28use fedimint_logging::LOG_CLIENT;
29use futures::Stream;
30use serde::Serialize;
31use serde::de::DeserializeOwned;
32use tracing::warn;
33
34use self::init::ClientModuleInit;
35use crate::module::recovery::{DynModuleBackup, ModuleBackup};
36use crate::oplog::{IOperationLog, OperationLogEntry, UpdateStreamOrOutcome};
37use crate::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
38use crate::sm::{self, ActiveStateMeta, Context, DynContext, DynState, InactiveStateMeta, State};
39use crate::transaction::{ClientInputBundle, ClientOutputBundle, TransactionBuilder};
40use crate::{AddStateMachinesResult, InstancelessDynClientInputBundle, TransactionUpdates, oplog};
41
42pub mod init;
43pub mod recovery;
44
45pub type ClientModuleRegistry = ModuleRegistry<DynClientModule>;
46
47#[apply(async_trait_maybe_send!)]
56pub trait ClientContextIface: MaybeSend + MaybeSync {
57 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule);
58 fn api_clone(&self) -> DynGlobalApi;
59 fn decoders(&self) -> &ModuleDecoderRegistry;
60 async fn finalize_and_submit_transaction(
61 &self,
62 operation_id: OperationId,
63 operation_type: &str,
64 operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
65 tx_builder: TransactionBuilder,
66 ) -> anyhow::Result<OutPointRange>;
67
68 async fn finalize_and_submit_transaction_inner(
70 &self,
71 dbtx: &mut DatabaseTransaction<'_>,
72 operation_id: OperationId,
73 tx_builder: TransactionBuilder,
74 ) -> anyhow::Result<OutPointRange>;
75
76 async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates;
77
78 async fn await_primary_module_outputs(
79 &self,
80 operation_id: OperationId,
81 outputs: Vec<OutPoint>,
83 ) -> anyhow::Result<()>;
84
85 fn operation_log(&self) -> &dyn IOperationLog;
86
87 async fn has_active_states(&self, operation_id: OperationId) -> bool;
88
89 async fn operation_exists(&self, operation_id: OperationId) -> bool;
90
91 async fn config(&self) -> ClientConfig;
92
93 fn db(&self) -> &Database;
94
95 fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static));
96
97 async fn invite_code(&self, peer: PeerId) -> Option<InviteCode>;
98
99 fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)>;
100
101 #[allow(clippy::too_many_arguments)]
102 async fn log_event_json(
103 &self,
104 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
105 module_kind: Option<ModuleKind>,
106 module_id: ModuleInstanceId,
107 kind: EventKind,
108 payload: serde_json::Value,
109 persist: EventPersistence,
110 );
111
112 async fn read_operation_active_states<'dbtx>(
113 &self,
114 operation_id: OperationId,
115 module_id: ModuleInstanceId,
116 dbtx: &'dbtx mut DatabaseTransaction<'_>,
117 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>;
118
119 async fn read_operation_inactive_states<'dbtx>(
120 &self,
121 operation_id: OperationId,
122 module_id: ModuleInstanceId,
123 dbtx: &'dbtx mut DatabaseTransaction<'_>,
124 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>;
125}
126
127#[derive(Clone, Default)]
133pub struct FinalClientIface(Arc<std::sync::OnceLock<Weak<dyn ClientContextIface>>>);
134
135impl FinalClientIface {
136 pub(crate) fn get(&self) -> Arc<dyn ClientContextIface> {
142 self.0
143 .get()
144 .expect("client must be already set")
145 .upgrade()
146 .expect("client module context must not be use past client shutdown")
147 }
148
149 pub fn set(&self, client: Weak<dyn ClientContextIface>) {
150 self.0.set(client).expect("FinalLazyClient already set");
151 }
152}
153
154impl fmt::Debug for FinalClientIface {
155 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156 f.write_str("FinalClientIface")
157 }
158}
159pub struct ClientContext<M> {
164 client: FinalClientIface,
165 module_instance_id: ModuleInstanceId,
166 global_dbtx_access_token: GlobalDBTxAccessToken,
167 module_db: Database,
168 _marker: marker::PhantomData<M>,
169}
170
171impl<M> Clone for ClientContext<M> {
172 fn clone(&self) -> Self {
173 Self {
174 client: self.client.clone(),
175 module_db: self.module_db.clone(),
176 module_instance_id: self.module_instance_id,
177 _marker: marker::PhantomData,
178 global_dbtx_access_token: self.global_dbtx_access_token,
179 }
180 }
181}
182
183pub struct ClientContextSelfRef<'s, M> {
186 client: Arc<dyn ClientContextIface>,
189 module_instance_id: ModuleInstanceId,
190 _marker: marker::PhantomData<&'s M>,
191}
192
193impl<M> ops::Deref for ClientContextSelfRef<'_, M>
194where
195 M: ClientModule,
196{
197 type Target = M;
198
199 fn deref(&self) -> &Self::Target {
200 self.client
201 .get_module(self.module_instance_id)
202 .as_any()
203 .downcast_ref::<M>()
204 .unwrap_or_else(|| panic!("Module is not of type {}", std::any::type_name::<M>()))
205 }
206}
207
208impl<M> fmt::Debug for ClientContext<M> {
209 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
210 f.write_str("ClientContext")
211 }
212}
213
214impl<M> ClientContext<M>
215where
216 M: ClientModule,
217{
218 pub fn new(
219 client: FinalClientIface,
220 module_instance_id: ModuleInstanceId,
221 global_dbtx_access_token: GlobalDBTxAccessToken,
222 module_db: Database,
223 ) -> Self {
224 Self {
225 client,
226 module_instance_id,
227 global_dbtx_access_token,
228 module_db,
229 _marker: marker::PhantomData,
230 }
231 }
232
233 #[allow(clippy::needless_lifetimes)] pub fn self_ref(&self) -> ClientContextSelfRef<'_, M> {
245 ClientContextSelfRef {
246 client: self.client.get(),
247 module_instance_id: self.module_instance_id,
248 _marker: marker::PhantomData,
249 }
250 }
251
252 pub fn global_api(&self) -> DynGlobalApi {
254 self.client.get().api_clone()
255 }
256
257 pub fn module_api(&self) -> DynModuleApi {
259 self.global_api().with_module(self.module_instance_id)
260 }
261
262 pub fn decoders(&self) -> ModuleDecoderRegistry {
264 Clone::clone(self.client.get().decoders())
265 }
266
267 pub fn input_from_dyn<'i>(
268 &self,
269 input: &'i DynInput,
270 ) -> Option<&'i <M::Common as ModuleCommon>::Input> {
271 (input.module_instance_id() == self.module_instance_id).then(|| {
272 input
273 .as_any()
274 .downcast_ref::<<M::Common as ModuleCommon>::Input>()
275 .unwrap_or_else(|| {
276 panic!("instance_id {} just checked", input.module_instance_id())
277 })
278 })
279 }
280
281 pub fn output_from_dyn<'o>(
282 &self,
283 output: &'o DynOutput,
284 ) -> Option<&'o <M::Common as ModuleCommon>::Output> {
285 (output.module_instance_id() == self.module_instance_id).then(|| {
286 output
287 .as_any()
288 .downcast_ref::<<M::Common as ModuleCommon>::Output>()
289 .unwrap_or_else(|| {
290 panic!("instance_id {} just checked", output.module_instance_id())
291 })
292 })
293 }
294
295 pub fn map_dyn<'s, 'i, 'o, I>(
296 &'s self,
297 typed: impl IntoIterator<Item = I> + 'i,
298 ) -> impl Iterator<Item = <I as IntoDynInstance>::DynType> + 'o
299 where
300 I: IntoDynInstance,
301 'i: 'o,
302 's: 'o,
303 {
304 typed.into_iter().map(|i| self.make_dyn(i))
305 }
306
307 pub fn make_dyn_output(&self, output: <M::Common as ModuleCommon>::Output) -> DynOutput {
309 self.make_dyn(output)
310 }
311
312 pub fn make_dyn_input(&self, input: <M::Common as ModuleCommon>::Input) -> DynInput {
314 self.make_dyn(input)
315 }
316
317 pub fn make_dyn<I>(&self, typed: I) -> <I as IntoDynInstance>::DynType
319 where
320 I: IntoDynInstance,
321 {
322 typed.into_dyn(self.module_instance_id)
323 }
324
325 pub fn make_client_outputs<O, S>(&self, output: ClientOutputBundle<O, S>) -> ClientOutputBundle
327 where
328 O: IntoDynInstance<DynType = DynOutput> + 'static,
329 S: IntoDynInstance<DynType = DynState> + 'static,
330 {
331 self.make_dyn(output)
332 }
333
334 pub fn make_client_inputs<I, S>(&self, inputs: ClientInputBundle<I, S>) -> ClientInputBundle
336 where
337 I: IntoDynInstance<DynType = DynInput> + 'static,
338 S: IntoDynInstance<DynType = DynState> + 'static,
339 {
340 self.make_dyn(inputs)
341 }
342
343 pub fn make_dyn_state<S>(&self, sm: S) -> DynState
344 where
345 S: sm::IState + 'static,
346 {
347 DynState::from_typed(self.module_instance_id, sm)
348 }
349
350 pub async fn finalize_and_submit_transaction<F, Meta>(
351 &self,
352 operation_id: OperationId,
353 operation_type: &str,
354 operation_meta_gen: F,
355 tx_builder: TransactionBuilder,
356 ) -> anyhow::Result<OutPointRange>
357 where
358 F: Fn(OutPointRange) -> Meta + Clone + MaybeSend + MaybeSync + 'static,
359 Meta: serde::Serialize + MaybeSend,
360 {
361 self.client
362 .get()
363 .finalize_and_submit_transaction(
364 operation_id,
365 operation_type,
366 Box::new(move |out_point_range| {
367 serde_json::to_value(operation_meta_gen(out_point_range)).expect("Can't fail")
368 }),
369 tx_builder,
370 )
371 .await
372 }
373
374 pub async fn finalize_and_submit_transaction_dbtx(
375 &self,
376 dbtx: &mut DatabaseTransaction<'_>,
377 operation_id: OperationId,
378 tx_builder: TransactionBuilder,
379 ) -> anyhow::Result<OutPointRange> {
380 self.client
381 .get()
382 .finalize_and_submit_transaction_inner(dbtx, operation_id, tx_builder)
383 .await
384 }
385
386 pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
387 self.client.get().transaction_updates(operation_id).await
388 }
389
390 pub async fn await_primary_module_outputs(
391 &self,
392 operation_id: OperationId,
393 outputs: Vec<OutPoint>,
395 ) -> anyhow::Result<()> {
396 self.client
397 .get()
398 .await_primary_module_outputs(operation_id, outputs)
399 .await
400 }
401
402 pub async fn get_operation(
404 &self,
405 operation_id: OperationId,
406 ) -> anyhow::Result<oplog::OperationLogEntry> {
407 let operation = self
408 .client
409 .get()
410 .operation_log()
411 .get_operation(operation_id)
412 .await
413 .ok_or(anyhow::anyhow!("Operation not found"))?;
414
415 if operation.operation_module_kind() != M::kind().as_str() {
416 bail!("Operation is not a lightning operation");
417 }
418
419 Ok(operation)
420 }
421
422 fn global_db(&self) -> fedimint_core::db::Database {
426 let db = Clone::clone(self.client.get().db());
427
428 db.ensure_global()
429 .expect("global_db must always return a global db");
430
431 db
432 }
433
434 pub fn module_db(&self) -> &Database {
435 self.module_db
436 .ensure_isolated()
437 .expect("module_db must always return isolated db");
438 &self.module_db
439 }
440
441 pub async fn has_active_states(&self, op_id: OperationId) -> bool {
442 self.client.get().has_active_states(op_id).await
443 }
444
445 pub async fn operation_exists(&self, op_id: OperationId) -> bool {
446 self.client.get().operation_exists(op_id).await
447 }
448
449 pub async fn get_own_active_states(&self) -> Vec<(M::States, ActiveStateMeta)> {
450 self.client
451 .get()
452 .executor()
453 .get_active_states()
454 .await
455 .into_iter()
456 .filter(|s| s.0.module_instance_id() == self.module_instance_id)
457 .map(|s| {
458 (
459 Clone::clone(
460 s.0.as_any()
461 .downcast_ref::<M::States>()
462 .expect("incorrect output type passed to module plugin"),
463 ),
464 s.1,
465 )
466 })
467 .collect()
468 }
469
470 pub async fn get_config(&self) -> ClientConfig {
471 self.client.get().config().await
472 }
473
474 pub async fn get_invite_code(&self) -> InviteCode {
477 let cfg = self.get_config().await.global;
478 self.client
479 .get()
480 .invite_code(
481 *cfg.api_endpoints
482 .keys()
483 .next()
484 .expect("A federation always has at least one guardian"),
485 )
486 .await
487 .expect("The guardian we requested an invite code for exists")
488 }
489
490 pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
491 self.client.get().get_internal_payment_markers()
492 }
493
494 pub async fn manual_operation_start(
497 &self,
498 operation_id: OperationId,
499 op_type: &str,
500 operation_meta: impl serde::Serialize + Debug,
501 sms: Vec<DynState>,
502 ) -> anyhow::Result<()> {
503 let db = self.module_db();
504 let mut dbtx = db.begin_transaction().await;
505 {
506 let dbtx = &mut dbtx.global_dbtx(self.global_dbtx_access_token);
507
508 self.manual_operation_start_inner(
509 &mut dbtx.to_ref_nc(),
510 operation_id,
511 op_type,
512 operation_meta,
513 sms,
514 )
515 .await?;
516 }
517
518 dbtx.commit_tx_result().await.map_err(|_| {
519 anyhow!(
520 "Operation with id {} already exists",
521 operation_id.fmt_short()
522 )
523 })?;
524
525 Ok(())
526 }
527
528 pub async fn manual_operation_start_dbtx(
529 &self,
530 dbtx: &mut DatabaseTransaction<'_>,
531 operation_id: OperationId,
532 op_type: &str,
533 operation_meta: impl serde::Serialize + Debug,
534 sms: Vec<DynState>,
535 ) -> anyhow::Result<()> {
536 self.manual_operation_start_inner(
537 &mut dbtx.global_dbtx(self.global_dbtx_access_token),
538 operation_id,
539 op_type,
540 operation_meta,
541 sms,
542 )
543 .await
544 }
545
546 async fn manual_operation_start_inner(
549 &self,
550 dbtx: &mut DatabaseTransaction<'_>,
551 operation_id: OperationId,
552 op_type: &str,
553 operation_meta: impl serde::Serialize + Debug,
554 sms: Vec<DynState>,
555 ) -> anyhow::Result<()> {
556 dbtx.ensure_global()
557 .expect("Must deal with global dbtx here");
558
559 if self
560 .client
561 .get()
562 .operation_log()
563 .get_operation_dbtx(&mut dbtx.to_ref_nc(), operation_id)
564 .await
565 .is_some()
566 {
567 bail!(
568 "Operation with id {} already exists",
569 operation_id.fmt_short()
570 );
571 }
572
573 self.client
574 .get()
575 .operation_log()
576 .add_operation_log_entry_dbtx(
577 &mut dbtx.to_ref_nc(),
578 operation_id,
579 op_type,
580 serde_json::to_value(operation_meta).expect("Can't fail"),
581 )
582 .await;
583
584 self.client
585 .get()
586 .executor()
587 .add_state_machines_dbtx(&mut dbtx.to_ref_nc(), sms)
588 .await
589 .expect("State machine is valid");
590
591 Ok(())
592 }
593
594 pub fn outcome_or_updates<U, S>(
595 &self,
596 operation: OperationLogEntry,
597 operation_id: OperationId,
598 stream_gen: impl FnOnce() -> S + 'static,
599 ) -> UpdateStreamOrOutcome<U>
600 where
601 U: Clone + Serialize + DeserializeOwned + Debug + MaybeSend + MaybeSync + 'static,
602 S: Stream<Item = U> + MaybeSend + 'static,
603 {
604 use futures::StreamExt;
605 match self.client.get().operation_log().outcome_or_updates(
606 &self.global_db(),
607 operation_id,
608 operation,
609 Box::new(move || {
610 let stream_gen = stream_gen();
611 Box::pin(
612 stream_gen.map(move |item| serde_json::to_value(item).expect("Can't fail")),
613 )
614 }),
615 ) {
616 UpdateStreamOrOutcome::UpdateStream(stream) => UpdateStreamOrOutcome::UpdateStream(
617 Box::pin(stream.map(|u| serde_json::from_value(u).expect("Can't fail"))),
618 ),
619 UpdateStreamOrOutcome::Outcome(o) => {
620 UpdateStreamOrOutcome::Outcome(serde_json::from_value(o).expect("Can't fail"))
621 }
622 }
623 }
624
625 pub async fn claim_inputs<I, S>(
626 &self,
627 dbtx: &mut DatabaseTransaction<'_>,
628 inputs: ClientInputBundle<I, S>,
629 operation_id: OperationId,
630 ) -> anyhow::Result<OutPointRange>
631 where
632 I: IInput + MaybeSend + MaybeSync + 'static,
633 S: sm::IState + MaybeSend + MaybeSync + 'static,
634 {
635 self.claim_inputs_dyn(dbtx, inputs.into_instanceless(), operation_id)
636 .await
637 }
638
639 async fn claim_inputs_dyn(
640 &self,
641 dbtx: &mut DatabaseTransaction<'_>,
642 inputs: InstancelessDynClientInputBundle,
643 operation_id: OperationId,
644 ) -> anyhow::Result<OutPointRange> {
645 let tx_builder =
646 TransactionBuilder::new().with_inputs(inputs.into_dyn(self.module_instance_id));
647
648 self.client
649 .get()
650 .finalize_and_submit_transaction_inner(
651 &mut dbtx.global_dbtx(self.global_dbtx_access_token),
652 operation_id,
653 tx_builder,
654 )
655 .await
656 }
657
658 pub async fn add_state_machines_dbtx(
659 &self,
660 dbtx: &mut DatabaseTransaction<'_>,
661 states: Vec<DynState>,
662 ) -> AddStateMachinesResult {
663 self.client
664 .get()
665 .executor()
666 .add_state_machines_dbtx(&mut dbtx.global_dbtx(self.global_dbtx_access_token), states)
667 .await
668 }
669
670 pub async fn add_operation_log_entry_dbtx(
671 &self,
672 dbtx: &mut DatabaseTransaction<'_>,
673 operation_id: OperationId,
674 operation_type: &str,
675 operation_meta: impl serde::Serialize,
676 ) {
677 self.client
678 .get()
679 .operation_log()
680 .add_operation_log_entry_dbtx(
681 &mut dbtx.global_dbtx(self.global_dbtx_access_token),
682 operation_id,
683 operation_type,
684 serde_json::to_value(operation_meta).expect("Can't fail"),
685 )
686 .await;
687 }
688
689 pub async fn log_event<E, Cap>(&self, dbtx: &mut DatabaseTransaction<'_, Cap>, event: E)
690 where
691 E: Event + Send,
692 Cap: Send,
693 {
694 if <E as Event>::MODULE != Some(<M as ClientModule>::kind()) {
695 warn!(
696 target: LOG_CLIENT,
697 module_kind = %<M as ClientModule>::kind(),
698 event_module = ?<E as Event>::MODULE,
699 "Client module logging events of different module than its own. This might become an error in the future."
700 );
701 }
702 self.client
703 .get()
704 .log_event_json(
705 &mut dbtx.global_dbtx(self.global_dbtx_access_token).to_ref_nc(),
706 <E as Event>::MODULE,
707 self.module_instance_id,
708 <E as Event>::KIND,
709 serde_json::to_value(event).expect("Can't fail"),
710 <E as Event>::PERSISTENCE,
711 )
712 .await;
713 }
714}
715
716#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
718pub struct PrimaryModulePriority(u64);
719
720impl PrimaryModulePriority {
721 pub const HIGH: Self = Self(100);
722 pub const LOW: Self = Self(10000);
723
724 pub fn custom(prio: u64) -> Self {
725 Self(prio)
726 }
727}
728pub enum PrimaryModuleSupport {
730 Any { priority: PrimaryModulePriority },
732 Selected {
734 priority: PrimaryModulePriority,
735 units: BTreeSet<AmountUnit>,
736 },
737 None,
739}
740
741impl PrimaryModuleSupport {
742 pub fn selected<const N: usize>(
743 priority: PrimaryModulePriority,
744 units: [AmountUnit; N],
745 ) -> Self {
746 Self::Selected {
747 priority,
748 units: BTreeSet::from(units),
749 }
750 }
751}
752
753#[apply(async_trait_maybe_send!)]
755pub trait ClientModule: Debug + MaybeSend + MaybeSync + 'static {
756 type Init: ClientModuleInit;
757
758 type Common: ModuleCommon;
760
761 type Backup: ModuleBackup;
764
765 type ModuleStateMachineContext: Context;
768
769 type States: State<ModuleContext = Self::ModuleStateMachineContext>
771 + IntoDynInstance<DynType = DynState>;
772
773 fn decoder() -> Decoder {
774 let mut decoder_builder = Self::Common::decoder_builder();
775 decoder_builder.with_decodable_type::<Self::States>();
776 decoder_builder.with_decodable_type::<Self::Backup>();
777 decoder_builder.build()
778 }
779
780 fn kind() -> ModuleKind {
781 <<<Self as ClientModule>::Init as ModuleInit>::Common as CommonModuleInit>::KIND
782 }
783
784 fn context(&self) -> Self::ModuleStateMachineContext;
785
786 async fn start(&self) {}
792
793 async fn handle_cli_command(
794 &self,
795 _args: &[ffi::OsString],
796 ) -> anyhow::Result<serde_json::Value> {
797 Err(anyhow::format_err!(
798 "This module does not implement cli commands"
799 ))
800 }
801
802 async fn handle_rpc(
803 &self,
804 _method: String,
805 _request: serde_json::Value,
806 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
807 Box::pin(futures::stream::once(std::future::ready(Err(
808 anyhow::format_err!("This module does not implement rpc"),
809 ))))
810 }
811
812 fn input_fee(
821 &self,
822 amount: &Amounts,
823 input: &<Self::Common as ModuleCommon>::Input,
824 ) -> Option<Amounts>;
825
826 fn output_fee(
835 &self,
836 amount: &Amounts,
837 output: &<Self::Common as ModuleCommon>::Output,
838 ) -> Option<Amounts>;
839
840 fn supports_backup(&self) -> bool {
841 false
842 }
843
844 async fn backup(&self) -> anyhow::Result<Self::Backup> {
845 anyhow::bail!("Backup not supported");
846 }
847
848 fn supports_being_primary(&self) -> PrimaryModuleSupport {
857 PrimaryModuleSupport::None
858 }
859
860 async fn create_final_inputs_and_outputs(
878 &self,
879 _dbtx: &mut DatabaseTransaction<'_>,
880 _operation_id: OperationId,
881 _unit: AmountUnit,
882 _input_amount: Amount,
883 _output_amount: Amount,
884 ) -> anyhow::Result<(
885 ClientInputBundle<<Self::Common as ModuleCommon>::Input, Self::States>,
886 ClientOutputBundle<<Self::Common as ModuleCommon>::Output, Self::States>,
887 )> {
888 unimplemented!()
889 }
890
891 async fn await_primary_module_output(
896 &self,
897 _operation_id: OperationId,
898 _out_point: OutPoint,
899 ) -> anyhow::Result<()> {
900 unimplemented!()
901 }
902
903 async fn get_balance(&self, _dbtx: &mut DatabaseTransaction<'_>, _unit: AmountUnit) -> Amount {
906 unimplemented!()
907 }
908
909 async fn get_balances(&self, _dbtx: &mut DatabaseTransaction<'_>) -> Amounts {
912 unimplemented!()
913 }
914
915 async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
918 unimplemented!()
919 }
920
921 async fn leave(&self, _dbtx: &mut DatabaseTransaction<'_>) -> anyhow::Result<()> {
977 bail!("Unable to determine if safe to leave the federation: Not implemented")
978 }
979}
980
981#[apply(async_trait_maybe_send!)]
983pub trait IClientModule: Debug {
984 fn as_any(&self) -> &(maybe_add_send_sync!(dyn std::any::Any));
985
986 fn decoder(&self) -> Decoder;
987
988 fn context(&self, instance: ModuleInstanceId) -> DynContext;
989
990 async fn start(&self);
991
992 async fn handle_cli_command(&self, args: &[ffi::OsString])
993 -> anyhow::Result<serde_json::Value>;
994
995 async fn handle_rpc(
996 &self,
997 method: String,
998 request: serde_json::Value,
999 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>>;
1000
1001 fn input_fee(&self, amount: &Amounts, input: &DynInput) -> Option<Amounts>;
1002
1003 fn output_fee(&self, amount: &Amounts, output: &DynOutput) -> Option<Amounts>;
1004
1005 fn supports_backup(&self) -> bool;
1006
1007 async fn backup(&self, module_instance_id: ModuleInstanceId)
1008 -> anyhow::Result<DynModuleBackup>;
1009
1010 fn supports_being_primary(&self) -> PrimaryModuleSupport;
1011
1012 async fn create_final_inputs_and_outputs(
1013 &self,
1014 module_instance: ModuleInstanceId,
1015 dbtx: &mut DatabaseTransaction<'_>,
1016 operation_id: OperationId,
1017 unit: AmountUnit,
1018 input_amount: Amount,
1019 output_amount: Amount,
1020 ) -> anyhow::Result<(ClientInputBundle, ClientOutputBundle)>;
1021
1022 async fn await_primary_module_output(
1023 &self,
1024 operation_id: OperationId,
1025 out_point: OutPoint,
1026 ) -> anyhow::Result<()>;
1027
1028 async fn get_balance(
1029 &self,
1030 module_instance: ModuleInstanceId,
1031 dbtx: &mut DatabaseTransaction<'_>,
1032 unit: AmountUnit,
1033 ) -> Amount;
1034
1035 async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()>;
1036}
1037
1038#[apply(async_trait_maybe_send!)]
1039impl<T> IClientModule for T
1040where
1041 T: ClientModule,
1042{
1043 fn as_any(&self) -> &(maybe_add_send_sync!(dyn Any)) {
1044 self
1045 }
1046
1047 fn decoder(&self) -> Decoder {
1048 T::decoder()
1049 }
1050
1051 fn context(&self, instance: ModuleInstanceId) -> DynContext {
1052 DynContext::from_typed(instance, <T as ClientModule>::context(self))
1053 }
1054
1055 async fn start(&self) {
1056 <T as ClientModule>::start(self).await;
1057 }
1058
1059 async fn handle_cli_command(
1060 &self,
1061 args: &[ffi::OsString],
1062 ) -> anyhow::Result<serde_json::Value> {
1063 <T as ClientModule>::handle_cli_command(self, args).await
1064 }
1065
1066 async fn handle_rpc(
1067 &self,
1068 method: String,
1069 request: serde_json::Value,
1070 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1071 <T as ClientModule>::handle_rpc(self, method, request).await
1072 }
1073
1074 fn input_fee(&self, amount: &Amounts, input: &DynInput) -> Option<Amounts> {
1075 <T as ClientModule>::input_fee(
1076 self,
1077 amount,
1078 input
1079 .as_any()
1080 .downcast_ref()
1081 .expect("Dispatched to correct module"),
1082 )
1083 }
1084
1085 fn output_fee(&self, amount: &Amounts, output: &DynOutput) -> Option<Amounts> {
1086 <T as ClientModule>::output_fee(
1087 self,
1088 amount,
1089 output
1090 .as_any()
1091 .downcast_ref()
1092 .expect("Dispatched to correct module"),
1093 )
1094 }
1095
1096 fn supports_backup(&self) -> bool {
1097 <T as ClientModule>::supports_backup(self)
1098 }
1099
1100 async fn backup(
1101 &self,
1102 module_instance_id: ModuleInstanceId,
1103 ) -> anyhow::Result<DynModuleBackup> {
1104 Ok(DynModuleBackup::from_typed(
1105 module_instance_id,
1106 <T as ClientModule>::backup(self).await?,
1107 ))
1108 }
1109
1110 fn supports_being_primary(&self) -> PrimaryModuleSupport {
1111 <T as ClientModule>::supports_being_primary(self)
1112 }
1113
1114 async fn create_final_inputs_and_outputs(
1115 &self,
1116 module_instance: ModuleInstanceId,
1117 dbtx: &mut DatabaseTransaction<'_>,
1118 operation_id: OperationId,
1119 unit: AmountUnit,
1120 input_amount: Amount,
1121 output_amount: Amount,
1122 ) -> anyhow::Result<(ClientInputBundle, ClientOutputBundle)> {
1123 let (inputs, outputs) = <T as ClientModule>::create_final_inputs_and_outputs(
1124 self,
1125 &mut dbtx.to_ref_with_prefix_module_id(module_instance).0,
1126 operation_id,
1127 unit,
1128 input_amount,
1129 output_amount,
1130 )
1131 .await?;
1132
1133 let inputs = inputs.into_dyn(module_instance);
1134
1135 let outputs = outputs.into_dyn(module_instance);
1136
1137 Ok((inputs, outputs))
1138 }
1139
1140 async fn await_primary_module_output(
1141 &self,
1142 operation_id: OperationId,
1143 out_point: OutPoint,
1144 ) -> anyhow::Result<()> {
1145 <T as ClientModule>::await_primary_module_output(self, operation_id, out_point).await
1146 }
1147
1148 async fn get_balance(
1149 &self,
1150 module_instance: ModuleInstanceId,
1151 dbtx: &mut DatabaseTransaction<'_>,
1152 unit: AmountUnit,
1153 ) -> Amount {
1154 <T as ClientModule>::get_balance(
1155 self,
1156 &mut dbtx.to_ref_with_prefix_module_id(module_instance).0,
1157 unit,
1158 )
1159 .await
1160 }
1161
1162 async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
1163 <T as ClientModule>::subscribe_balance_changes(self).await
1164 }
1165}
1166
1167dyn_newtype_define!(
1168 #[derive(Clone)]
1169 pub DynClientModule(Arc<IClientModule>)
1170);
1171
1172impl AsRef<maybe_add_send_sync!(dyn IClientModule + 'static)> for DynClientModule {
1173 fn as_ref(&self) -> &maybe_add_send_sync!(dyn IClientModule + 'static) {
1174 self.inner.as_ref()
1175 }
1176}
1177
1178pub use fedimint_core::{IdxRange, OutPointRange, OutPointRangeIter};
1180
1181pub type StateGenerator<S> = Arc<maybe_add_send_sync!(dyn Fn(OutPointRange) -> Vec<S> + 'static)>;