1use core::fmt;
2use std::any::Any;
3use std::collections::BTreeSet;
4use std::fmt::Debug;
5use std::ops::Range;
6use std::pin::Pin;
7use std::sync::{Arc, Weak};
8use std::{ffi, marker, ops};
9
10use anyhow::{anyhow, bail};
11use bitcoin::secp256k1::PublicKey;
12use fedimint_api_client::api::{DynGlobalApi, DynModuleApi};
13use fedimint_core::config::ClientConfig;
14use fedimint_core::core::{
15 Decoder, DynInput, DynOutput, IInput, IntoDynInstance, ModuleInstanceId, ModuleKind,
16 OperationId,
17};
18use fedimint_core::db::{Database, DatabaseTransaction, GlobalDBTxAccessToken, NonCommittable};
19use fedimint_core::encoding::{Decodable, Encodable};
20use fedimint_core::invite_code::InviteCode;
21use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
22use fedimint_core::module::{AmountUnit, Amounts, CommonModuleInit, ModuleCommon, ModuleInit};
23use fedimint_core::task::{MaybeSend, MaybeSync};
24use fedimint_core::util::BoxStream;
25use fedimint_core::{
26 Amount, OutPoint, PeerId, TransactionId, apply, async_trait_maybe_send, dyn_newtype_define,
27 maybe_add_send, maybe_add_send_sync,
28};
29use fedimint_eventlog::{Event, EventKind, EventPersistence};
30use fedimint_logging::LOG_CLIENT;
31use futures::Stream;
32use serde::de::DeserializeOwned;
33use serde::{Deserialize, Serialize};
34use tracing::warn;
35
36use self::init::ClientModuleInit;
37use crate::module::recovery::{DynModuleBackup, ModuleBackup};
38use crate::oplog::{IOperationLog, OperationLogEntry, UpdateStreamOrOutcome};
39use crate::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
40use crate::sm::{self, ActiveStateMeta, Context, DynContext, DynState, InactiveStateMeta, State};
41use crate::transaction::{ClientInputBundle, ClientOutputBundle, TransactionBuilder};
42use crate::{AddStateMachinesResult, InstancelessDynClientInputBundle, TransactionUpdates, oplog};
43
44pub mod init;
45pub mod recovery;
46
47pub type ClientModuleRegistry = ModuleRegistry<DynClientModule>;
48
49#[apply(async_trait_maybe_send!)]
58pub trait ClientContextIface: MaybeSend + MaybeSync {
59 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule);
60 fn api_clone(&self) -> DynGlobalApi;
61 fn decoders(&self) -> &ModuleDecoderRegistry;
62 async fn finalize_and_submit_transaction(
63 &self,
64 operation_id: OperationId,
65 operation_type: &str,
66 operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
67 tx_builder: TransactionBuilder,
68 ) -> anyhow::Result<OutPointRange>;
69
70 async fn finalize_and_submit_transaction_inner(
72 &self,
73 dbtx: &mut DatabaseTransaction<'_>,
74 operation_id: OperationId,
75 tx_builder: TransactionBuilder,
76 ) -> anyhow::Result<OutPointRange>;
77
78 async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates;
79
80 async fn await_primary_module_outputs(
81 &self,
82 operation_id: OperationId,
83 outputs: Vec<OutPoint>,
85 ) -> anyhow::Result<()>;
86
87 fn operation_log(&self) -> &dyn IOperationLog;
88
89 async fn has_active_states(&self, operation_id: OperationId) -> bool;
90
91 async fn operation_exists(&self, operation_id: OperationId) -> bool;
92
93 async fn config(&self) -> ClientConfig;
94
95 fn db(&self) -> &Database;
96
97 fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static));
98
99 async fn invite_code(&self, peer: PeerId) -> Option<InviteCode>;
100
101 fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)>;
102
103 #[allow(clippy::too_many_arguments)]
104 async fn log_event_json(
105 &self,
106 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
107 module_kind: Option<ModuleKind>,
108 module_id: ModuleInstanceId,
109 kind: EventKind,
110 payload: serde_json::Value,
111 persist: EventPersistence,
112 );
113
114 async fn read_operation_active_states<'dbtx>(
115 &self,
116 operation_id: OperationId,
117 module_id: ModuleInstanceId,
118 dbtx: &'dbtx mut DatabaseTransaction<'_>,
119 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>;
120
121 async fn read_operation_inactive_states<'dbtx>(
122 &self,
123 operation_id: OperationId,
124 module_id: ModuleInstanceId,
125 dbtx: &'dbtx mut DatabaseTransaction<'_>,
126 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>;
127}
128
129#[derive(Clone, Default)]
135pub struct FinalClientIface(Arc<std::sync::OnceLock<Weak<dyn ClientContextIface>>>);
136
137impl FinalClientIface {
138 pub(crate) fn get(&self) -> Arc<dyn ClientContextIface> {
144 self.0
145 .get()
146 .expect("client must be already set")
147 .upgrade()
148 .expect("client module context must not be use past client shutdown")
149 }
150
151 pub fn set(&self, client: Weak<dyn ClientContextIface>) {
152 self.0.set(client).expect("FinalLazyClient already set");
153 }
154}
155
156impl fmt::Debug for FinalClientIface {
157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158 f.write_str("FinalClientIface")
159 }
160}
161pub struct ClientContext<M> {
166 client: FinalClientIface,
167 module_instance_id: ModuleInstanceId,
168 global_dbtx_access_token: GlobalDBTxAccessToken,
169 module_db: Database,
170 _marker: marker::PhantomData<M>,
171}
172
173impl<M> Clone for ClientContext<M> {
174 fn clone(&self) -> Self {
175 Self {
176 client: self.client.clone(),
177 module_db: self.module_db.clone(),
178 module_instance_id: self.module_instance_id,
179 _marker: marker::PhantomData,
180 global_dbtx_access_token: self.global_dbtx_access_token,
181 }
182 }
183}
184
185pub struct ClientContextSelfRef<'s, M> {
188 client: Arc<dyn ClientContextIface>,
191 module_instance_id: ModuleInstanceId,
192 _marker: marker::PhantomData<&'s M>,
193}
194
195impl<M> ops::Deref for ClientContextSelfRef<'_, M>
196where
197 M: ClientModule,
198{
199 type Target = M;
200
201 fn deref(&self) -> &Self::Target {
202 self.client
203 .get_module(self.module_instance_id)
204 .as_any()
205 .downcast_ref::<M>()
206 .unwrap_or_else(|| panic!("Module is not of type {}", std::any::type_name::<M>()))
207 }
208}
209
210impl<M> fmt::Debug for ClientContext<M> {
211 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212 f.write_str("ClientContext")
213 }
214}
215
216impl<M> ClientContext<M>
217where
218 M: ClientModule,
219{
220 pub fn new(
221 client: FinalClientIface,
222 module_instance_id: ModuleInstanceId,
223 global_dbtx_access_token: GlobalDBTxAccessToken,
224 module_db: Database,
225 ) -> Self {
226 Self {
227 client,
228 module_instance_id,
229 global_dbtx_access_token,
230 module_db,
231 _marker: marker::PhantomData,
232 }
233 }
234
235 #[allow(clippy::needless_lifetimes)] pub fn self_ref(&self) -> ClientContextSelfRef<'_, M> {
247 ClientContextSelfRef {
248 client: self.client.get(),
249 module_instance_id: self.module_instance_id,
250 _marker: marker::PhantomData,
251 }
252 }
253
254 pub fn global_api(&self) -> DynGlobalApi {
256 self.client.get().api_clone()
257 }
258
259 pub fn module_api(&self) -> DynModuleApi {
261 self.global_api().with_module(self.module_instance_id)
262 }
263
264 pub fn decoders(&self) -> ModuleDecoderRegistry {
266 Clone::clone(self.client.get().decoders())
267 }
268
269 pub fn input_from_dyn<'i>(
270 &self,
271 input: &'i DynInput,
272 ) -> Option<&'i <M::Common as ModuleCommon>::Input> {
273 (input.module_instance_id() == self.module_instance_id).then(|| {
274 input
275 .as_any()
276 .downcast_ref::<<M::Common as ModuleCommon>::Input>()
277 .unwrap_or_else(|| {
278 panic!("instance_id {} just checked", input.module_instance_id())
279 })
280 })
281 }
282
283 pub fn output_from_dyn<'o>(
284 &self,
285 output: &'o DynOutput,
286 ) -> Option<&'o <M::Common as ModuleCommon>::Output> {
287 (output.module_instance_id() == self.module_instance_id).then(|| {
288 output
289 .as_any()
290 .downcast_ref::<<M::Common as ModuleCommon>::Output>()
291 .unwrap_or_else(|| {
292 panic!("instance_id {} just checked", output.module_instance_id())
293 })
294 })
295 }
296
297 pub fn map_dyn<'s, 'i, 'o, I>(
298 &'s self,
299 typed: impl IntoIterator<Item = I> + 'i,
300 ) -> impl Iterator<Item = <I as IntoDynInstance>::DynType> + 'o
301 where
302 I: IntoDynInstance,
303 'i: 'o,
304 's: 'o,
305 {
306 typed.into_iter().map(|i| self.make_dyn(i))
307 }
308
309 pub fn make_dyn_output(&self, output: <M::Common as ModuleCommon>::Output) -> DynOutput {
311 self.make_dyn(output)
312 }
313
314 pub fn make_dyn_input(&self, input: <M::Common as ModuleCommon>::Input) -> DynInput {
316 self.make_dyn(input)
317 }
318
319 pub fn make_dyn<I>(&self, typed: I) -> <I as IntoDynInstance>::DynType
321 where
322 I: IntoDynInstance,
323 {
324 typed.into_dyn(self.module_instance_id)
325 }
326
327 pub fn make_client_outputs<O, S>(&self, output: ClientOutputBundle<O, S>) -> ClientOutputBundle
329 where
330 O: IntoDynInstance<DynType = DynOutput> + 'static,
331 S: IntoDynInstance<DynType = DynState> + 'static,
332 {
333 self.make_dyn(output)
334 }
335
336 pub fn make_client_inputs<I, S>(&self, inputs: ClientInputBundle<I, S>) -> ClientInputBundle
338 where
339 I: IntoDynInstance<DynType = DynInput> + 'static,
340 S: IntoDynInstance<DynType = DynState> + 'static,
341 {
342 self.make_dyn(inputs)
343 }
344
345 pub fn make_dyn_state<S>(&self, sm: S) -> DynState
346 where
347 S: sm::IState + 'static,
348 {
349 DynState::from_typed(self.module_instance_id, sm)
350 }
351
352 pub async fn finalize_and_submit_transaction<F, Meta>(
353 &self,
354 operation_id: OperationId,
355 operation_type: &str,
356 operation_meta_gen: F,
357 tx_builder: TransactionBuilder,
358 ) -> anyhow::Result<OutPointRange>
359 where
360 F: Fn(OutPointRange) -> Meta + Clone + MaybeSend + MaybeSync + 'static,
361 Meta: serde::Serialize + MaybeSend,
362 {
363 self.client
364 .get()
365 .finalize_and_submit_transaction(
366 operation_id,
367 operation_type,
368 Box::new(move |out_point_range| {
369 serde_json::to_value(operation_meta_gen(out_point_range)).expect("Can't fail")
370 }),
371 tx_builder,
372 )
373 .await
374 }
375
376 pub async fn finalize_and_submit_transaction_dbtx(
377 &self,
378 dbtx: &mut DatabaseTransaction<'_>,
379 operation_id: OperationId,
380 tx_builder: TransactionBuilder,
381 ) -> anyhow::Result<OutPointRange> {
382 self.client
383 .get()
384 .finalize_and_submit_transaction_inner(dbtx, operation_id, tx_builder)
385 .await
386 }
387
388 pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
389 self.client.get().transaction_updates(operation_id).await
390 }
391
392 pub async fn await_primary_module_outputs(
393 &self,
394 operation_id: OperationId,
395 outputs: Vec<OutPoint>,
397 ) -> anyhow::Result<()> {
398 self.client
399 .get()
400 .await_primary_module_outputs(operation_id, outputs)
401 .await
402 }
403
404 pub async fn get_operation(
406 &self,
407 operation_id: OperationId,
408 ) -> anyhow::Result<oplog::OperationLogEntry> {
409 let operation = self
410 .client
411 .get()
412 .operation_log()
413 .get_operation(operation_id)
414 .await
415 .ok_or(anyhow::anyhow!("Operation not found"))?;
416
417 if operation.operation_module_kind() != M::kind().as_str() {
418 bail!("Operation is not a lightning operation");
419 }
420
421 Ok(operation)
422 }
423
424 fn global_db(&self) -> fedimint_core::db::Database {
428 let db = Clone::clone(self.client.get().db());
429
430 db.ensure_global()
431 .expect("global_db must always return a global db");
432
433 db
434 }
435
436 pub fn module_db(&self) -> &Database {
437 self.module_db
438 .ensure_isolated()
439 .expect("module_db must always return isolated db");
440 &self.module_db
441 }
442
443 pub async fn has_active_states(&self, op_id: OperationId) -> bool {
444 self.client.get().has_active_states(op_id).await
445 }
446
447 pub async fn operation_exists(&self, op_id: OperationId) -> bool {
448 self.client.get().operation_exists(op_id).await
449 }
450
451 pub async fn get_own_active_states(&self) -> Vec<(M::States, ActiveStateMeta)> {
452 self.client
453 .get()
454 .executor()
455 .get_active_states()
456 .await
457 .into_iter()
458 .filter(|s| s.0.module_instance_id() == self.module_instance_id)
459 .map(|s| {
460 (
461 Clone::clone(
462 s.0.as_any()
463 .downcast_ref::<M::States>()
464 .expect("incorrect output type passed to module plugin"),
465 ),
466 s.1,
467 )
468 })
469 .collect()
470 }
471
472 pub async fn get_config(&self) -> ClientConfig {
473 self.client.get().config().await
474 }
475
476 pub async fn get_invite_code(&self) -> InviteCode {
479 let cfg = self.get_config().await.global;
480 self.client
481 .get()
482 .invite_code(
483 *cfg.api_endpoints
484 .keys()
485 .next()
486 .expect("A federation always has at least one guardian"),
487 )
488 .await
489 .expect("The guardian we requested an invite code for exists")
490 }
491
492 pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
493 self.client.get().get_internal_payment_markers()
494 }
495
496 pub async fn manual_operation_start(
499 &self,
500 operation_id: OperationId,
501 op_type: &str,
502 operation_meta: impl serde::Serialize + Debug,
503 sms: Vec<DynState>,
504 ) -> anyhow::Result<()> {
505 let db = self.module_db();
506 let mut dbtx = db.begin_transaction().await;
507 {
508 let dbtx = &mut dbtx.global_dbtx(self.global_dbtx_access_token);
509
510 self.manual_operation_start_inner(
511 &mut dbtx.to_ref_nc(),
512 operation_id,
513 op_type,
514 operation_meta,
515 sms,
516 )
517 .await?;
518 }
519
520 dbtx.commit_tx_result().await.map_err(|_| {
521 anyhow!(
522 "Operation with id {} already exists",
523 operation_id.fmt_short()
524 )
525 })?;
526
527 Ok(())
528 }
529
530 pub async fn manual_operation_start_dbtx(
531 &self,
532 dbtx: &mut DatabaseTransaction<'_>,
533 operation_id: OperationId,
534 op_type: &str,
535 operation_meta: impl serde::Serialize + Debug,
536 sms: Vec<DynState>,
537 ) -> anyhow::Result<()> {
538 self.manual_operation_start_inner(
539 &mut dbtx.global_dbtx(self.global_dbtx_access_token),
540 operation_id,
541 op_type,
542 operation_meta,
543 sms,
544 )
545 .await
546 }
547
548 async fn manual_operation_start_inner(
551 &self,
552 dbtx: &mut DatabaseTransaction<'_>,
553 operation_id: OperationId,
554 op_type: &str,
555 operation_meta: impl serde::Serialize + Debug,
556 sms: Vec<DynState>,
557 ) -> anyhow::Result<()> {
558 dbtx.ensure_global()
559 .expect("Must deal with global dbtx here");
560
561 if self
562 .client
563 .get()
564 .operation_log()
565 .get_operation_dbtx(&mut dbtx.to_ref_nc(), operation_id)
566 .await
567 .is_some()
568 {
569 bail!(
570 "Operation with id {} already exists",
571 operation_id.fmt_short()
572 );
573 }
574
575 self.client
576 .get()
577 .operation_log()
578 .add_operation_log_entry_dbtx(
579 &mut dbtx.to_ref_nc(),
580 operation_id,
581 op_type,
582 serde_json::to_value(operation_meta).expect("Can't fail"),
583 )
584 .await;
585
586 self.client
587 .get()
588 .executor()
589 .add_state_machines_dbtx(&mut dbtx.to_ref_nc(), sms)
590 .await
591 .expect("State machine is valid");
592
593 Ok(())
594 }
595
596 pub fn outcome_or_updates<U, S>(
597 &self,
598 operation: OperationLogEntry,
599 operation_id: OperationId,
600 stream_gen: impl FnOnce() -> S + 'static,
601 ) -> UpdateStreamOrOutcome<U>
602 where
603 U: Clone + Serialize + DeserializeOwned + Debug + MaybeSend + MaybeSync + 'static,
604 S: Stream<Item = U> + MaybeSend + 'static,
605 {
606 use futures::StreamExt;
607 match self.client.get().operation_log().outcome_or_updates(
608 &self.global_db(),
609 operation_id,
610 operation,
611 Box::new(move || {
612 let stream_gen = stream_gen();
613 Box::pin(
614 stream_gen.map(move |item| serde_json::to_value(item).expect("Can't fail")),
615 )
616 }),
617 ) {
618 UpdateStreamOrOutcome::UpdateStream(stream) => UpdateStreamOrOutcome::UpdateStream(
619 Box::pin(stream.map(|u| serde_json::from_value(u).expect("Can't fail"))),
620 ),
621 UpdateStreamOrOutcome::Outcome(o) => {
622 UpdateStreamOrOutcome::Outcome(serde_json::from_value(o).expect("Can't fail"))
623 }
624 }
625 }
626
627 pub async fn claim_inputs<I, S>(
628 &self,
629 dbtx: &mut DatabaseTransaction<'_>,
630 inputs: ClientInputBundle<I, S>,
631 operation_id: OperationId,
632 ) -> anyhow::Result<OutPointRange>
633 where
634 I: IInput + MaybeSend + MaybeSync + 'static,
635 S: sm::IState + MaybeSend + MaybeSync + 'static,
636 {
637 self.claim_inputs_dyn(dbtx, inputs.into_instanceless(), operation_id)
638 .await
639 }
640
641 async fn claim_inputs_dyn(
642 &self,
643 dbtx: &mut DatabaseTransaction<'_>,
644 inputs: InstancelessDynClientInputBundle,
645 operation_id: OperationId,
646 ) -> anyhow::Result<OutPointRange> {
647 let tx_builder =
648 TransactionBuilder::new().with_inputs(inputs.into_dyn(self.module_instance_id));
649
650 self.client
651 .get()
652 .finalize_and_submit_transaction_inner(
653 &mut dbtx.global_dbtx(self.global_dbtx_access_token),
654 operation_id,
655 tx_builder,
656 )
657 .await
658 }
659
660 pub async fn add_state_machines_dbtx(
661 &self,
662 dbtx: &mut DatabaseTransaction<'_>,
663 states: Vec<DynState>,
664 ) -> AddStateMachinesResult {
665 self.client
666 .get()
667 .executor()
668 .add_state_machines_dbtx(&mut dbtx.global_dbtx(self.global_dbtx_access_token), states)
669 .await
670 }
671
672 pub async fn add_operation_log_entry_dbtx(
673 &self,
674 dbtx: &mut DatabaseTransaction<'_>,
675 operation_id: OperationId,
676 operation_type: &str,
677 operation_meta: impl serde::Serialize,
678 ) {
679 self.client
680 .get()
681 .operation_log()
682 .add_operation_log_entry_dbtx(
683 &mut dbtx.global_dbtx(self.global_dbtx_access_token),
684 operation_id,
685 operation_type,
686 serde_json::to_value(operation_meta).expect("Can't fail"),
687 )
688 .await;
689 }
690
691 pub async fn log_event<E, Cap>(&self, dbtx: &mut DatabaseTransaction<'_, Cap>, event: E)
692 where
693 E: Event + Send,
694 Cap: Send,
695 {
696 if <E as Event>::MODULE != Some(<M as ClientModule>::kind()) {
697 warn!(
698 target: LOG_CLIENT,
699 module_kind = %<M as ClientModule>::kind(),
700 event_module = ?<E as Event>::MODULE,
701 "Client module logging events of different module than its own. This might become an error in the future."
702 );
703 }
704 self.client
705 .get()
706 .log_event_json(
707 &mut dbtx.global_dbtx(self.global_dbtx_access_token).to_ref_nc(),
708 <E as Event>::MODULE,
709 self.module_instance_id,
710 <E as Event>::KIND,
711 serde_json::to_value(event).expect("Can't fail"),
712 <E as Event>::PERSISTENCE,
713 )
714 .await;
715 }
716}
717
718#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
720pub struct PrimaryModulePriority(u64);
721
722impl PrimaryModulePriority {
723 pub const HIGH: Self = Self(100);
724 pub const LOW: Self = Self(10000);
725
726 pub fn custom(prio: u64) -> Self {
727 Self(prio)
728 }
729}
730pub enum PrimaryModuleSupport {
732 Any { priority: PrimaryModulePriority },
734 Selected {
736 priority: PrimaryModulePriority,
737 units: BTreeSet<AmountUnit>,
738 },
739 None,
741}
742
743impl PrimaryModuleSupport {
744 pub fn selected<const N: usize>(
745 priority: PrimaryModulePriority,
746 units: [AmountUnit; N],
747 ) -> Self {
748 Self::Selected {
749 priority,
750 units: BTreeSet::from(units),
751 }
752 }
753}
754
755#[apply(async_trait_maybe_send!)]
757pub trait ClientModule: Debug + MaybeSend + MaybeSync + 'static {
758 type Init: ClientModuleInit;
759
760 type Common: ModuleCommon;
762
763 type Backup: ModuleBackup;
766
767 type ModuleStateMachineContext: Context;
770
771 type States: State<ModuleContext = Self::ModuleStateMachineContext>
773 + IntoDynInstance<DynType = DynState>;
774
775 fn decoder() -> Decoder {
776 let mut decoder_builder = Self::Common::decoder_builder();
777 decoder_builder.with_decodable_type::<Self::States>();
778 decoder_builder.with_decodable_type::<Self::Backup>();
779 decoder_builder.build()
780 }
781
782 fn kind() -> ModuleKind {
783 <<<Self as ClientModule>::Init as ModuleInit>::Common as CommonModuleInit>::KIND
784 }
785
786 fn context(&self) -> Self::ModuleStateMachineContext;
787
788 async fn start(&self) {}
794
795 async fn handle_cli_command(
796 &self,
797 _args: &[ffi::OsString],
798 ) -> anyhow::Result<serde_json::Value> {
799 Err(anyhow::format_err!(
800 "This module does not implement cli commands"
801 ))
802 }
803
804 async fn handle_rpc(
805 &self,
806 _method: String,
807 _request: serde_json::Value,
808 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
809 Box::pin(futures::stream::once(std::future::ready(Err(
810 anyhow::format_err!("This module does not implement rpc"),
811 ))))
812 }
813
814 fn input_fee(
823 &self,
824 amount: &Amounts,
825 input: &<Self::Common as ModuleCommon>::Input,
826 ) -> Option<Amounts>;
827
828 fn output_fee(
837 &self,
838 amount: &Amounts,
839 output: &<Self::Common as ModuleCommon>::Output,
840 ) -> Option<Amounts>;
841
842 fn supports_backup(&self) -> bool {
843 false
844 }
845
846 async fn backup(&self) -> anyhow::Result<Self::Backup> {
847 anyhow::bail!("Backup not supported");
848 }
849
850 fn supports_being_primary(&self) -> PrimaryModuleSupport {
859 PrimaryModuleSupport::None
860 }
861
862 async fn create_final_inputs_and_outputs(
880 &self,
881 _dbtx: &mut DatabaseTransaction<'_>,
882 _operation_id: OperationId,
883 _unit: AmountUnit,
884 _input_amount: Amount,
885 _output_amount: Amount,
886 ) -> anyhow::Result<(
887 ClientInputBundle<<Self::Common as ModuleCommon>::Input, Self::States>,
888 ClientOutputBundle<<Self::Common as ModuleCommon>::Output, Self::States>,
889 )> {
890 unimplemented!()
891 }
892
893 async fn await_primary_module_output(
898 &self,
899 _operation_id: OperationId,
900 _out_point: OutPoint,
901 ) -> anyhow::Result<()> {
902 unimplemented!()
903 }
904
905 async fn get_balance(&self, _dbtx: &mut DatabaseTransaction<'_>, _unit: AmountUnit) -> Amount {
908 unimplemented!()
909 }
910
911 async fn get_balances(&self, _dbtx: &mut DatabaseTransaction<'_>) -> Amounts {
914 unimplemented!()
915 }
916
917 async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
920 unimplemented!()
921 }
922
923 async fn leave(&self, _dbtx: &mut DatabaseTransaction<'_>) -> anyhow::Result<()> {
979 bail!("Unable to determine if safe to leave the federation: Not implemented")
980 }
981}
982
983#[apply(async_trait_maybe_send!)]
985pub trait IClientModule: Debug {
986 fn as_any(&self) -> &(maybe_add_send_sync!(dyn std::any::Any));
987
988 fn decoder(&self) -> Decoder;
989
990 fn context(&self, instance: ModuleInstanceId) -> DynContext;
991
992 async fn start(&self);
993
994 async fn handle_cli_command(&self, args: &[ffi::OsString])
995 -> anyhow::Result<serde_json::Value>;
996
997 async fn handle_rpc(
998 &self,
999 method: String,
1000 request: serde_json::Value,
1001 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>>;
1002
1003 fn input_fee(&self, amount: &Amounts, input: &DynInput) -> Option<Amounts>;
1004
1005 fn output_fee(&self, amount: &Amounts, output: &DynOutput) -> Option<Amounts>;
1006
1007 fn supports_backup(&self) -> bool;
1008
1009 async fn backup(&self, module_instance_id: ModuleInstanceId)
1010 -> anyhow::Result<DynModuleBackup>;
1011
1012 fn supports_being_primary(&self) -> PrimaryModuleSupport;
1013
1014 async fn create_final_inputs_and_outputs(
1015 &self,
1016 module_instance: ModuleInstanceId,
1017 dbtx: &mut DatabaseTransaction<'_>,
1018 operation_id: OperationId,
1019 unit: AmountUnit,
1020 input_amount: Amount,
1021 output_amount: Amount,
1022 ) -> anyhow::Result<(ClientInputBundle, ClientOutputBundle)>;
1023
1024 async fn await_primary_module_output(
1025 &self,
1026 operation_id: OperationId,
1027 out_point: OutPoint,
1028 ) -> anyhow::Result<()>;
1029
1030 async fn get_balance(
1031 &self,
1032 module_instance: ModuleInstanceId,
1033 dbtx: &mut DatabaseTransaction<'_>,
1034 unit: AmountUnit,
1035 ) -> Amount;
1036
1037 async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()>;
1038}
1039
1040#[apply(async_trait_maybe_send!)]
1041impl<T> IClientModule for T
1042where
1043 T: ClientModule,
1044{
1045 fn as_any(&self) -> &(maybe_add_send_sync!(dyn Any)) {
1046 self
1047 }
1048
1049 fn decoder(&self) -> Decoder {
1050 T::decoder()
1051 }
1052
1053 fn context(&self, instance: ModuleInstanceId) -> DynContext {
1054 DynContext::from_typed(instance, <T as ClientModule>::context(self))
1055 }
1056
1057 async fn start(&self) {
1058 <T as ClientModule>::start(self).await;
1059 }
1060
1061 async fn handle_cli_command(
1062 &self,
1063 args: &[ffi::OsString],
1064 ) -> anyhow::Result<serde_json::Value> {
1065 <T as ClientModule>::handle_cli_command(self, args).await
1066 }
1067
1068 async fn handle_rpc(
1069 &self,
1070 method: String,
1071 request: serde_json::Value,
1072 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1073 <T as ClientModule>::handle_rpc(self, method, request).await
1074 }
1075
1076 fn input_fee(&self, amount: &Amounts, input: &DynInput) -> Option<Amounts> {
1077 <T as ClientModule>::input_fee(
1078 self,
1079 amount,
1080 input
1081 .as_any()
1082 .downcast_ref()
1083 .expect("Dispatched to correct module"),
1084 )
1085 }
1086
1087 fn output_fee(&self, amount: &Amounts, output: &DynOutput) -> Option<Amounts> {
1088 <T as ClientModule>::output_fee(
1089 self,
1090 amount,
1091 output
1092 .as_any()
1093 .downcast_ref()
1094 .expect("Dispatched to correct module"),
1095 )
1096 }
1097
1098 fn supports_backup(&self) -> bool {
1099 <T as ClientModule>::supports_backup(self)
1100 }
1101
1102 async fn backup(
1103 &self,
1104 module_instance_id: ModuleInstanceId,
1105 ) -> anyhow::Result<DynModuleBackup> {
1106 Ok(DynModuleBackup::from_typed(
1107 module_instance_id,
1108 <T as ClientModule>::backup(self).await?,
1109 ))
1110 }
1111
1112 fn supports_being_primary(&self) -> PrimaryModuleSupport {
1113 <T as ClientModule>::supports_being_primary(self)
1114 }
1115
1116 async fn create_final_inputs_and_outputs(
1117 &self,
1118 module_instance: ModuleInstanceId,
1119 dbtx: &mut DatabaseTransaction<'_>,
1120 operation_id: OperationId,
1121 unit: AmountUnit,
1122 input_amount: Amount,
1123 output_amount: Amount,
1124 ) -> anyhow::Result<(ClientInputBundle, ClientOutputBundle)> {
1125 let (inputs, outputs) = <T as ClientModule>::create_final_inputs_and_outputs(
1126 self,
1127 &mut dbtx.to_ref_with_prefix_module_id(module_instance).0,
1128 operation_id,
1129 unit,
1130 input_amount,
1131 output_amount,
1132 )
1133 .await?;
1134
1135 let inputs = inputs.into_dyn(module_instance);
1136
1137 let outputs = outputs.into_dyn(module_instance);
1138
1139 Ok((inputs, outputs))
1140 }
1141
1142 async fn await_primary_module_output(
1143 &self,
1144 operation_id: OperationId,
1145 out_point: OutPoint,
1146 ) -> anyhow::Result<()> {
1147 <T as ClientModule>::await_primary_module_output(self, operation_id, out_point).await
1148 }
1149
1150 async fn get_balance(
1151 &self,
1152 module_instance: ModuleInstanceId,
1153 dbtx: &mut DatabaseTransaction<'_>,
1154 unit: AmountUnit,
1155 ) -> Amount {
1156 <T as ClientModule>::get_balance(
1157 self,
1158 &mut dbtx.to_ref_with_prefix_module_id(module_instance).0,
1159 unit,
1160 )
1161 .await
1162 }
1163
1164 async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
1165 <T as ClientModule>::subscribe_balance_changes(self).await
1166 }
1167}
1168
1169dyn_newtype_define!(
1170 #[derive(Clone)]
1171 pub DynClientModule(Arc<IClientModule>)
1172);
1173
1174impl AsRef<maybe_add_send_sync!(dyn IClientModule + 'static)> for DynClientModule {
1175 fn as_ref(&self) -> &maybe_add_send_sync!(dyn IClientModule + 'static) {
1176 self.inner.as_ref()
1177 }
1178}
1179
1180#[derive(Copy, Clone, Encodable, Decodable, Serialize, Deserialize, PartialEq, Eq, Hash, Debug)]
1182pub struct IdxRange {
1183 start: u64,
1184 end: u64,
1185}
1186
1187impl IdxRange {
1188 pub fn new_single(start: u64) -> Option<Self> {
1189 start.checked_add(1).map(|end| Self { start, end })
1190 }
1191
1192 pub fn start(self) -> u64 {
1193 self.start
1194 }
1195
1196 pub fn count(self) -> usize {
1197 self.into_iter().count()
1198 }
1199
1200 pub fn from_inclusive(range: ops::RangeInclusive<u64>) -> Option<Self> {
1201 range.end().checked_add(1).map(|end| Self {
1202 start: *range.start(),
1203 end,
1204 })
1205 }
1206}
1207
1208impl From<Range<u64>> for IdxRange {
1209 fn from(Range { start, end }: Range<u64>) -> Self {
1210 Self { start, end }
1211 }
1212}
1213
1214impl IntoIterator for IdxRange {
1215 type Item = u64;
1216
1217 type IntoIter = ops::Range<u64>;
1218
1219 fn into_iter(self) -> Self::IntoIter {
1220 ops::Range {
1221 start: self.start,
1222 end: self.end,
1223 }
1224 }
1225}
1226
1227#[derive(Copy, Clone, Encodable, Decodable, Serialize, Deserialize, PartialEq, Eq, Hash, Debug)]
1228pub struct OutPointRange {
1229 pub txid: TransactionId,
1230 idx_range: IdxRange,
1231}
1232
1233impl OutPointRange {
1234 pub fn new(txid: TransactionId, idx_range: IdxRange) -> Self {
1235 Self { txid, idx_range }
1236 }
1237
1238 pub fn new_single(txid: TransactionId, idx: u64) -> Option<Self> {
1239 IdxRange::new_single(idx).map(|idx_range| Self { txid, idx_range })
1240 }
1241
1242 pub fn start_idx(self) -> u64 {
1243 self.idx_range.start()
1244 }
1245
1246 pub fn out_idx_iter(self) -> impl Iterator<Item = u64> {
1247 self.idx_range.into_iter()
1248 }
1249
1250 pub fn count(self) -> usize {
1251 self.idx_range.count()
1252 }
1253}
1254
1255impl IntoIterator for OutPointRange {
1256 type Item = OutPoint;
1257
1258 type IntoIter = OutPointRangeIter;
1259
1260 fn into_iter(self) -> Self::IntoIter {
1261 OutPointRangeIter {
1262 txid: self.txid,
1263 inner: self.idx_range.into_iter(),
1264 }
1265 }
1266}
1267
1268pub struct OutPointRangeIter {
1269 txid: TransactionId,
1270
1271 inner: ops::Range<u64>,
1272}
1273
1274impl OutPointRange {
1275 pub fn txid(&self) -> TransactionId {
1276 self.txid
1277 }
1278}
1279
1280impl Iterator for OutPointRangeIter {
1281 type Item = OutPoint;
1282
1283 fn next(&mut self) -> Option<Self::Item> {
1284 self.inner.next().map(|idx| OutPoint {
1285 txid: self.txid,
1286 out_idx: idx,
1287 })
1288 }
1289}
1290
1291pub type StateGenerator<S> = Arc<maybe_add_send_sync!(dyn Fn(OutPointRange) -> Vec<S> + 'static)>;