1use core::fmt;
2use std::any::Any;
3use std::collections::BTreeSet;
4use std::fmt::Debug;
5use std::ops::Range;
6use std::pin::Pin;
7use std::sync::{Arc, Weak};
8use std::{ffi, marker, ops};
9
10use anyhow::{anyhow, bail};
11use bitcoin::secp256k1::PublicKey;
12use fedimint_api_client::api::{DynGlobalApi, DynModuleApi};
13use fedimint_core::config::ClientConfig;
14use fedimint_core::core::{
15 Decoder, DynInput, DynOutput, IInput, IntoDynInstance, ModuleInstanceId, ModuleKind,
16 OperationId,
17};
18use fedimint_core::db::{Database, DatabaseTransaction, GlobalDBTxAccessToken, NonCommittable};
19use fedimint_core::encoding::{Decodable, Encodable};
20use fedimint_core::invite_code::InviteCode;
21use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
22use fedimint_core::module::{AmountUnit, Amounts, CommonModuleInit, ModuleCommon, ModuleInit};
23use fedimint_core::task::{MaybeSend, MaybeSync};
24use fedimint_core::util::BoxStream;
25use fedimint_core::{
26 Amount, OutPoint, PeerId, TransactionId, apply, async_trait_maybe_send, dyn_newtype_define,
27 maybe_add_send, maybe_add_send_sync,
28};
29use fedimint_eventlog::{Event, EventKind, EventPersistence};
30use fedimint_logging::LOG_CLIENT;
31use futures::Stream;
32use serde::de::DeserializeOwned;
33use serde::{Deserialize, Serialize};
34use tracing::warn;
35
36use self::init::ClientModuleInit;
37use crate::module::recovery::{DynModuleBackup, ModuleBackup};
38use crate::oplog::{IOperationLog, OperationLogEntry, UpdateStreamOrOutcome};
39use crate::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
40use crate::sm::{self, ActiveStateMeta, Context, DynContext, DynState, InactiveStateMeta, State};
41use crate::transaction::{ClientInputBundle, ClientOutputBundle, TransactionBuilder};
42use crate::{AddStateMachinesResult, InstancelessDynClientInputBundle, TransactionUpdates, oplog};
43
44pub mod init;
45pub mod recovery;
46
47pub type ClientModuleRegistry = ModuleRegistry<DynClientModule>;
48
49#[apply(async_trait_maybe_send!)]
58pub trait ClientContextIface: MaybeSend + MaybeSync {
59 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule);
60 fn api_clone(&self) -> DynGlobalApi;
61 fn decoders(&self) -> &ModuleDecoderRegistry;
62 async fn finalize_and_submit_transaction(
63 &self,
64 operation_id: OperationId,
65 operation_type: &str,
66 operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
67 tx_builder: TransactionBuilder,
68 ) -> anyhow::Result<OutPointRange>;
69
70 async fn finalize_and_submit_transaction_inner(
72 &self,
73 dbtx: &mut DatabaseTransaction<'_>,
74 operation_id: OperationId,
75 tx_builder: TransactionBuilder,
76 ) -> anyhow::Result<OutPointRange>;
77
78 async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates;
79
80 async fn await_primary_module_outputs(
81 &self,
82 operation_id: OperationId,
83 outputs: Vec<OutPoint>,
85 ) -> anyhow::Result<()>;
86
87 fn operation_log(&self) -> &dyn IOperationLog;
88
89 async fn has_active_states(&self, operation_id: OperationId) -> bool;
90
91 async fn operation_exists(&self, operation_id: OperationId) -> bool;
92
93 async fn config(&self) -> ClientConfig;
94
95 fn db(&self) -> &Database;
96
97 fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static));
98
99 async fn invite_code(&self, peer: PeerId) -> Option<InviteCode>;
100
101 fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)>;
102
103 #[allow(clippy::too_many_arguments)]
104 async fn log_event_json(
105 &self,
106 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
107 module_kind: Option<ModuleKind>,
108 module_id: ModuleInstanceId,
109 kind: EventKind,
110 payload: serde_json::Value,
111 persist: EventPersistence,
112 );
113
114 async fn read_operation_active_states<'dbtx>(
115 &self,
116 operation_id: OperationId,
117 module_id: ModuleInstanceId,
118 dbtx: &'dbtx mut DatabaseTransaction<'_>,
119 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>;
120
121 async fn read_operation_inactive_states<'dbtx>(
122 &self,
123 operation_id: OperationId,
124 module_id: ModuleInstanceId,
125 dbtx: &'dbtx mut DatabaseTransaction<'_>,
126 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>;
127}
128
129#[derive(Clone, Default)]
135pub struct FinalClientIface(Arc<std::sync::OnceLock<Weak<dyn ClientContextIface>>>);
136
137impl FinalClientIface {
138 pub(crate) fn get(&self) -> Arc<dyn ClientContextIface> {
144 self.0
145 .get()
146 .expect("client must be already set")
147 .upgrade()
148 .expect("client module context must not be use past client shutdown")
149 }
150
151 pub fn set(&self, client: Weak<dyn ClientContextIface>) {
152 self.0.set(client).expect("FinalLazyClient already set");
153 }
154}
155
156impl fmt::Debug for FinalClientIface {
157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158 f.write_str("FinalClientIface")
159 }
160}
161pub struct ClientContext<M> {
166 client: FinalClientIface,
167 module_instance_id: ModuleInstanceId,
168 global_dbtx_access_token: GlobalDBTxAccessToken,
169 module_db: Database,
170 _marker: marker::PhantomData<M>,
171}
172
173impl<M> Clone for ClientContext<M> {
174 fn clone(&self) -> Self {
175 Self {
176 client: self.client.clone(),
177 module_db: self.module_db.clone(),
178 module_instance_id: self.module_instance_id,
179 _marker: marker::PhantomData,
180 global_dbtx_access_token: self.global_dbtx_access_token,
181 }
182 }
183}
184
185pub struct ClientContextSelfRef<'s, M> {
188 client: Arc<dyn ClientContextIface>,
191 module_instance_id: ModuleInstanceId,
192 _marker: marker::PhantomData<&'s M>,
193}
194
195impl<M> ops::Deref for ClientContextSelfRef<'_, M>
196where
197 M: ClientModule,
198{
199 type Target = M;
200
201 fn deref(&self) -> &Self::Target {
202 self.client
203 .get_module(self.module_instance_id)
204 .as_any()
205 .downcast_ref::<M>()
206 .unwrap_or_else(|| panic!("Module is not of type {}", std::any::type_name::<M>()))
207 }
208}
209
210impl<M> fmt::Debug for ClientContext<M> {
211 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212 f.write_str("ClientContext")
213 }
214}
215
216impl<M> ClientContext<M>
217where
218 M: ClientModule,
219{
220 pub fn new(
221 client: FinalClientIface,
222 module_instance_id: ModuleInstanceId,
223 global_dbtx_access_token: GlobalDBTxAccessToken,
224 module_db: Database,
225 ) -> Self {
226 Self {
227 client,
228 module_instance_id,
229 global_dbtx_access_token,
230 module_db,
231 _marker: marker::PhantomData,
232 }
233 }
234
235 #[allow(clippy::needless_lifetimes)] pub fn self_ref(&self) -> ClientContextSelfRef<'_, M> {
247 ClientContextSelfRef {
248 client: self.client.get(),
249 module_instance_id: self.module_instance_id,
250 _marker: marker::PhantomData,
251 }
252 }
253
254 pub fn global_api(&self) -> DynGlobalApi {
256 self.client.get().api_clone()
257 }
258
259 pub fn module_api(&self) -> DynModuleApi {
261 self.global_api().with_module(self.module_instance_id)
262 }
263
264 pub fn decoders(&self) -> ModuleDecoderRegistry {
266 Clone::clone(self.client.get().decoders())
267 }
268
269 pub fn input_from_dyn<'i>(
270 &self,
271 input: &'i DynInput,
272 ) -> Option<&'i <M::Common as ModuleCommon>::Input> {
273 (input.module_instance_id() == self.module_instance_id).then(|| {
274 input
275 .as_any()
276 .downcast_ref::<<M::Common as ModuleCommon>::Input>()
277 .unwrap_or_else(|| {
278 panic!("instance_id {} just checked", input.module_instance_id())
279 })
280 })
281 }
282
283 pub fn output_from_dyn<'o>(
284 &self,
285 output: &'o DynOutput,
286 ) -> Option<&'o <M::Common as ModuleCommon>::Output> {
287 (output.module_instance_id() == self.module_instance_id).then(|| {
288 output
289 .as_any()
290 .downcast_ref::<<M::Common as ModuleCommon>::Output>()
291 .unwrap_or_else(|| {
292 panic!("instance_id {} just checked", output.module_instance_id())
293 })
294 })
295 }
296
297 pub fn map_dyn<'s, 'i, 'o, I>(
298 &'s self,
299 typed: impl IntoIterator<Item = I> + 'i,
300 ) -> impl Iterator<Item = <I as IntoDynInstance>::DynType> + 'o
301 where
302 I: IntoDynInstance,
303 'i: 'o,
304 's: 'o,
305 {
306 typed.into_iter().map(|i| self.make_dyn(i))
307 }
308
309 pub fn make_dyn_output(&self, output: <M::Common as ModuleCommon>::Output) -> DynOutput {
311 self.make_dyn(output)
312 }
313
314 pub fn make_dyn_input(&self, input: <M::Common as ModuleCommon>::Input) -> DynInput {
316 self.make_dyn(input)
317 }
318
319 pub fn make_dyn<I>(&self, typed: I) -> <I as IntoDynInstance>::DynType
321 where
322 I: IntoDynInstance,
323 {
324 typed.into_dyn(self.module_instance_id)
325 }
326
327 pub fn make_client_outputs<O, S>(&self, output: ClientOutputBundle<O, S>) -> ClientOutputBundle
329 where
330 O: IntoDynInstance<DynType = DynOutput> + 'static,
331 S: IntoDynInstance<DynType = DynState> + 'static,
332 {
333 self.make_dyn(output)
334 }
335
336 pub fn make_client_inputs<I, S>(&self, inputs: ClientInputBundle<I, S>) -> ClientInputBundle
338 where
339 I: IntoDynInstance<DynType = DynInput> + 'static,
340 S: IntoDynInstance<DynType = DynState> + 'static,
341 {
342 self.make_dyn(inputs)
343 }
344
345 pub fn make_dyn_state<S>(&self, sm: S) -> DynState
346 where
347 S: sm::IState + 'static,
348 {
349 DynState::from_typed(self.module_instance_id, sm)
350 }
351
352 pub async fn finalize_and_submit_transaction<F, Meta>(
353 &self,
354 operation_id: OperationId,
355 operation_type: &str,
356 operation_meta_gen: F,
357 tx_builder: TransactionBuilder,
358 ) -> anyhow::Result<OutPointRange>
359 where
360 F: Fn(OutPointRange) -> Meta + Clone + MaybeSend + MaybeSync + 'static,
361 Meta: serde::Serialize + MaybeSend,
362 {
363 self.client
364 .get()
365 .finalize_and_submit_transaction(
366 operation_id,
367 operation_type,
368 Box::new(move |out_point_range| {
369 serde_json::to_value(operation_meta_gen(out_point_range)).expect("Can't fail")
370 }),
371 tx_builder,
372 )
373 .await
374 }
375
376 pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
377 self.client.get().transaction_updates(operation_id).await
378 }
379
380 pub async fn await_primary_module_outputs(
381 &self,
382 operation_id: OperationId,
383 outputs: Vec<OutPoint>,
385 ) -> anyhow::Result<()> {
386 self.client
387 .get()
388 .await_primary_module_outputs(operation_id, outputs)
389 .await
390 }
391
392 pub async fn get_operation(
394 &self,
395 operation_id: OperationId,
396 ) -> anyhow::Result<oplog::OperationLogEntry> {
397 let operation = self
398 .client
399 .get()
400 .operation_log()
401 .get_operation(operation_id)
402 .await
403 .ok_or(anyhow::anyhow!("Operation not found"))?;
404
405 if operation.operation_module_kind() != M::kind().as_str() {
406 bail!("Operation is not a lightning operation");
407 }
408
409 Ok(operation)
410 }
411
412 fn global_db(&self) -> fedimint_core::db::Database {
416 let db = Clone::clone(self.client.get().db());
417
418 db.ensure_global()
419 .expect("global_db must always return a global db");
420
421 db
422 }
423
424 pub fn module_db(&self) -> &Database {
425 self.module_db
426 .ensure_isolated()
427 .expect("module_db must always return isolated db");
428 &self.module_db
429 }
430
431 pub async fn has_active_states(&self, op_id: OperationId) -> bool {
432 self.client.get().has_active_states(op_id).await
433 }
434
435 pub async fn operation_exists(&self, op_id: OperationId) -> bool {
436 self.client.get().operation_exists(op_id).await
437 }
438
439 pub async fn get_own_active_states(&self) -> Vec<(M::States, ActiveStateMeta)> {
440 self.client
441 .get()
442 .executor()
443 .get_active_states()
444 .await
445 .into_iter()
446 .filter(|s| s.0.module_instance_id() == self.module_instance_id)
447 .map(|s| {
448 (
449 Clone::clone(
450 s.0.as_any()
451 .downcast_ref::<M::States>()
452 .expect("incorrect output type passed to module plugin"),
453 ),
454 s.1,
455 )
456 })
457 .collect()
458 }
459
460 pub async fn get_config(&self) -> ClientConfig {
461 self.client.get().config().await
462 }
463
464 pub async fn get_invite_code(&self) -> InviteCode {
467 let cfg = self.get_config().await.global;
468 self.client
469 .get()
470 .invite_code(
471 *cfg.api_endpoints
472 .keys()
473 .next()
474 .expect("A federation always has at least one guardian"),
475 )
476 .await
477 .expect("The guardian we requested an invite code for exists")
478 }
479
480 pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
481 self.client.get().get_internal_payment_markers()
482 }
483
484 pub async fn manual_operation_start(
487 &self,
488 operation_id: OperationId,
489 op_type: &str,
490 operation_meta: impl serde::Serialize + Debug,
491 sms: Vec<DynState>,
492 ) -> anyhow::Result<()> {
493 let db = self.module_db();
494 let mut dbtx = db.begin_transaction().await;
495 {
496 let dbtx = &mut dbtx.global_dbtx(self.global_dbtx_access_token);
497
498 self.manual_operation_start_inner(
499 &mut dbtx.to_ref_nc(),
500 operation_id,
501 op_type,
502 operation_meta,
503 sms,
504 )
505 .await?;
506 }
507
508 dbtx.commit_tx_result().await.map_err(|_| {
509 anyhow!(
510 "Operation with id {} already exists",
511 operation_id.fmt_short()
512 )
513 })?;
514
515 Ok(())
516 }
517
518 pub async fn manual_operation_start_dbtx(
519 &self,
520 dbtx: &mut DatabaseTransaction<'_>,
521 operation_id: OperationId,
522 op_type: &str,
523 operation_meta: impl serde::Serialize + Debug,
524 sms: Vec<DynState>,
525 ) -> anyhow::Result<()> {
526 self.manual_operation_start_inner(
527 &mut dbtx.global_dbtx(self.global_dbtx_access_token),
528 operation_id,
529 op_type,
530 operation_meta,
531 sms,
532 )
533 .await
534 }
535
536 async fn manual_operation_start_inner(
539 &self,
540 dbtx: &mut DatabaseTransaction<'_>,
541 operation_id: OperationId,
542 op_type: &str,
543 operation_meta: impl serde::Serialize + Debug,
544 sms: Vec<DynState>,
545 ) -> anyhow::Result<()> {
546 dbtx.ensure_global()
547 .expect("Must deal with global dbtx here");
548
549 if self
550 .client
551 .get()
552 .operation_log()
553 .get_operation_dbtx(&mut dbtx.to_ref_nc(), operation_id)
554 .await
555 .is_some()
556 {
557 bail!(
558 "Operation with id {} already exists",
559 operation_id.fmt_short()
560 );
561 }
562
563 self.client
564 .get()
565 .operation_log()
566 .add_operation_log_entry_dbtx(
567 &mut dbtx.to_ref_nc(),
568 operation_id,
569 op_type,
570 serde_json::to_value(operation_meta).expect("Can't fail"),
571 )
572 .await;
573
574 self.client
575 .get()
576 .executor()
577 .add_state_machines_dbtx(&mut dbtx.to_ref_nc(), sms)
578 .await
579 .expect("State machine is valid");
580
581 Ok(())
582 }
583
584 pub fn outcome_or_updates<U, S>(
585 &self,
586 operation: OperationLogEntry,
587 operation_id: OperationId,
588 stream_gen: impl FnOnce() -> S + 'static,
589 ) -> UpdateStreamOrOutcome<U>
590 where
591 U: Clone + Serialize + DeserializeOwned + Debug + MaybeSend + MaybeSync + 'static,
592 S: Stream<Item = U> + MaybeSend + 'static,
593 {
594 use futures::StreamExt;
595 match self.client.get().operation_log().outcome_or_updates(
596 &self.global_db(),
597 operation_id,
598 operation,
599 Box::new(move || {
600 let stream_gen = stream_gen();
601 Box::pin(
602 stream_gen.map(move |item| serde_json::to_value(item).expect("Can't fail")),
603 )
604 }),
605 ) {
606 UpdateStreamOrOutcome::UpdateStream(stream) => UpdateStreamOrOutcome::UpdateStream(
607 Box::pin(stream.map(|u| serde_json::from_value(u).expect("Can't fail"))),
608 ),
609 UpdateStreamOrOutcome::Outcome(o) => {
610 UpdateStreamOrOutcome::Outcome(serde_json::from_value(o).expect("Can't fail"))
611 }
612 }
613 }
614
615 pub async fn claim_inputs<I, S>(
616 &self,
617 dbtx: &mut DatabaseTransaction<'_>,
618 inputs: ClientInputBundle<I, S>,
619 operation_id: OperationId,
620 ) -> anyhow::Result<OutPointRange>
621 where
622 I: IInput + MaybeSend + MaybeSync + 'static,
623 S: sm::IState + MaybeSend + MaybeSync + 'static,
624 {
625 self.claim_inputs_dyn(dbtx, inputs.into_instanceless(), operation_id)
626 .await
627 }
628
629 async fn claim_inputs_dyn(
630 &self,
631 dbtx: &mut DatabaseTransaction<'_>,
632 inputs: InstancelessDynClientInputBundle,
633 operation_id: OperationId,
634 ) -> anyhow::Result<OutPointRange> {
635 let tx_builder =
636 TransactionBuilder::new().with_inputs(inputs.into_dyn(self.module_instance_id));
637
638 self.client
639 .get()
640 .finalize_and_submit_transaction_inner(
641 &mut dbtx.global_dbtx(self.global_dbtx_access_token),
642 operation_id,
643 tx_builder,
644 )
645 .await
646 }
647
648 pub async fn add_state_machines_dbtx(
649 &self,
650 dbtx: &mut DatabaseTransaction<'_>,
651 states: Vec<DynState>,
652 ) -> AddStateMachinesResult {
653 self.client
654 .get()
655 .executor()
656 .add_state_machines_dbtx(&mut dbtx.global_dbtx(self.global_dbtx_access_token), states)
657 .await
658 }
659
660 pub async fn add_operation_log_entry_dbtx(
661 &self,
662 dbtx: &mut DatabaseTransaction<'_>,
663 operation_id: OperationId,
664 operation_type: &str,
665 operation_meta: impl serde::Serialize,
666 ) {
667 self.client
668 .get()
669 .operation_log()
670 .add_operation_log_entry_dbtx(
671 &mut dbtx.global_dbtx(self.global_dbtx_access_token),
672 operation_id,
673 operation_type,
674 serde_json::to_value(operation_meta).expect("Can't fail"),
675 )
676 .await;
677 }
678
679 pub async fn log_event<E, Cap>(&self, dbtx: &mut DatabaseTransaction<'_, Cap>, event: E)
680 where
681 E: Event + Send,
682 Cap: Send,
683 {
684 if <E as Event>::MODULE != Some(<M as ClientModule>::kind()) {
685 warn!(
686 target: LOG_CLIENT,
687 module_kind = %<M as ClientModule>::kind(),
688 event_module = ?<E as Event>::MODULE,
689 "Client module logging events of different module than its own. This might become an error in the future."
690 );
691 }
692 self.client
693 .get()
694 .log_event_json(
695 &mut dbtx.global_dbtx(self.global_dbtx_access_token).to_ref_nc(),
696 <E as Event>::MODULE,
697 self.module_instance_id,
698 <E as Event>::KIND,
699 serde_json::to_value(event).expect("Can't fail"),
700 <E as Event>::PERSISTENCE,
701 )
702 .await;
703 }
704}
705
706#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
708pub struct PrimaryModulePriority(u64);
709
710impl PrimaryModulePriority {
711 pub const HIGH: Self = Self(100);
712 pub const LOW: Self = Self(10000);
713
714 pub fn custom(prio: u64) -> Self {
715 Self(prio)
716 }
717}
718pub enum PrimaryModuleSupport {
720 Any { priority: PrimaryModulePriority },
722 Selected {
724 priority: PrimaryModulePriority,
725 units: BTreeSet<AmountUnit>,
726 },
727 None,
729}
730
731impl PrimaryModuleSupport {
732 pub fn selected<const N: usize>(
733 priority: PrimaryModulePriority,
734 units: [AmountUnit; N],
735 ) -> Self {
736 Self::Selected {
737 priority,
738 units: BTreeSet::from(units),
739 }
740 }
741}
742
743#[apply(async_trait_maybe_send!)]
745pub trait ClientModule: Debug + MaybeSend + MaybeSync + 'static {
746 type Init: ClientModuleInit;
747
748 type Common: ModuleCommon;
750
751 type Backup: ModuleBackup;
754
755 type ModuleStateMachineContext: Context;
758
759 type States: State<ModuleContext = Self::ModuleStateMachineContext>
761 + IntoDynInstance<DynType = DynState>;
762
763 fn decoder() -> Decoder {
764 let mut decoder_builder = Self::Common::decoder_builder();
765 decoder_builder.with_decodable_type::<Self::States>();
766 decoder_builder.with_decodable_type::<Self::Backup>();
767 decoder_builder.build()
768 }
769
770 fn kind() -> ModuleKind {
771 <<<Self as ClientModule>::Init as ModuleInit>::Common as CommonModuleInit>::KIND
772 }
773
774 fn context(&self) -> Self::ModuleStateMachineContext;
775
776 async fn start(&self) {}
782
783 async fn handle_cli_command(
784 &self,
785 _args: &[ffi::OsString],
786 ) -> anyhow::Result<serde_json::Value> {
787 Err(anyhow::format_err!(
788 "This module does not implement cli commands"
789 ))
790 }
791
792 async fn handle_rpc(
793 &self,
794 _method: String,
795 _request: serde_json::Value,
796 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
797 Box::pin(futures::stream::once(std::future::ready(Err(
798 anyhow::format_err!("This module does not implement rpc"),
799 ))))
800 }
801
802 fn input_fee(
811 &self,
812 amount: &Amounts,
813 input: &<Self::Common as ModuleCommon>::Input,
814 ) -> Option<Amounts>;
815
816 fn output_fee(
825 &self,
826 amount: &Amounts,
827 output: &<Self::Common as ModuleCommon>::Output,
828 ) -> Option<Amounts>;
829
830 fn supports_backup(&self) -> bool {
831 false
832 }
833
834 async fn backup(&self) -> anyhow::Result<Self::Backup> {
835 anyhow::bail!("Backup not supported");
836 }
837
838 fn supports_being_primary(&self) -> PrimaryModuleSupport {
847 PrimaryModuleSupport::None
848 }
849
850 async fn create_final_inputs_and_outputs(
868 &self,
869 _dbtx: &mut DatabaseTransaction<'_>,
870 _operation_id: OperationId,
871 _unit: AmountUnit,
872 _input_amount: Amount,
873 _output_amount: Amount,
874 ) -> anyhow::Result<(
875 ClientInputBundle<<Self::Common as ModuleCommon>::Input, Self::States>,
876 ClientOutputBundle<<Self::Common as ModuleCommon>::Output, Self::States>,
877 )> {
878 unimplemented!()
879 }
880
881 async fn await_primary_module_output(
886 &self,
887 _operation_id: OperationId,
888 _out_point: OutPoint,
889 ) -> anyhow::Result<()> {
890 unimplemented!()
891 }
892
893 async fn get_balance(&self, _dbtx: &mut DatabaseTransaction<'_>, _unit: AmountUnit) -> Amount {
896 unimplemented!()
897 }
898
899 async fn get_balances(&self, _dbtx: &mut DatabaseTransaction<'_>) -> Amounts {
902 unimplemented!()
903 }
904
905 async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
908 unimplemented!()
909 }
910
911 async fn leave(&self, _dbtx: &mut DatabaseTransaction<'_>) -> anyhow::Result<()> {
967 bail!("Unable to determine if safe to leave the federation: Not implemented")
968 }
969}
970
971#[apply(async_trait_maybe_send!)]
973pub trait IClientModule: Debug {
974 fn as_any(&self) -> &(maybe_add_send_sync!(dyn std::any::Any));
975
976 fn decoder(&self) -> Decoder;
977
978 fn context(&self, instance: ModuleInstanceId) -> DynContext;
979
980 async fn start(&self);
981
982 async fn handle_cli_command(&self, args: &[ffi::OsString])
983 -> anyhow::Result<serde_json::Value>;
984
985 async fn handle_rpc(
986 &self,
987 method: String,
988 request: serde_json::Value,
989 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>>;
990
991 fn input_fee(&self, amount: &Amounts, input: &DynInput) -> Option<Amounts>;
992
993 fn output_fee(&self, amount: &Amounts, output: &DynOutput) -> Option<Amounts>;
994
995 fn supports_backup(&self) -> bool;
996
997 async fn backup(&self, module_instance_id: ModuleInstanceId)
998 -> anyhow::Result<DynModuleBackup>;
999
1000 fn supports_being_primary(&self) -> PrimaryModuleSupport;
1001
1002 async fn create_final_inputs_and_outputs(
1003 &self,
1004 module_instance: ModuleInstanceId,
1005 dbtx: &mut DatabaseTransaction<'_>,
1006 operation_id: OperationId,
1007 unit: AmountUnit,
1008 input_amount: Amount,
1009 output_amount: Amount,
1010 ) -> anyhow::Result<(ClientInputBundle, ClientOutputBundle)>;
1011
1012 async fn await_primary_module_output(
1013 &self,
1014 operation_id: OperationId,
1015 out_point: OutPoint,
1016 ) -> anyhow::Result<()>;
1017
1018 async fn get_balance(
1019 &self,
1020 module_instance: ModuleInstanceId,
1021 dbtx: &mut DatabaseTransaction<'_>,
1022 unit: AmountUnit,
1023 ) -> Amount;
1024
1025 async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()>;
1026}
1027
1028#[apply(async_trait_maybe_send!)]
1029impl<T> IClientModule for T
1030where
1031 T: ClientModule,
1032{
1033 fn as_any(&self) -> &(maybe_add_send_sync!(dyn Any)) {
1034 self
1035 }
1036
1037 fn decoder(&self) -> Decoder {
1038 T::decoder()
1039 }
1040
1041 fn context(&self, instance: ModuleInstanceId) -> DynContext {
1042 DynContext::from_typed(instance, <T as ClientModule>::context(self))
1043 }
1044
1045 async fn start(&self) {
1046 <T as ClientModule>::start(self).await;
1047 }
1048
1049 async fn handle_cli_command(
1050 &self,
1051 args: &[ffi::OsString],
1052 ) -> anyhow::Result<serde_json::Value> {
1053 <T as ClientModule>::handle_cli_command(self, args).await
1054 }
1055
1056 async fn handle_rpc(
1057 &self,
1058 method: String,
1059 request: serde_json::Value,
1060 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1061 <T as ClientModule>::handle_rpc(self, method, request).await
1062 }
1063
1064 fn input_fee(&self, amount: &Amounts, input: &DynInput) -> Option<Amounts> {
1065 <T as ClientModule>::input_fee(
1066 self,
1067 amount,
1068 input
1069 .as_any()
1070 .downcast_ref()
1071 .expect("Dispatched to correct module"),
1072 )
1073 }
1074
1075 fn output_fee(&self, amount: &Amounts, output: &DynOutput) -> Option<Amounts> {
1076 <T as ClientModule>::output_fee(
1077 self,
1078 amount,
1079 output
1080 .as_any()
1081 .downcast_ref()
1082 .expect("Dispatched to correct module"),
1083 )
1084 }
1085
1086 fn supports_backup(&self) -> bool {
1087 <T as ClientModule>::supports_backup(self)
1088 }
1089
1090 async fn backup(
1091 &self,
1092 module_instance_id: ModuleInstanceId,
1093 ) -> anyhow::Result<DynModuleBackup> {
1094 Ok(DynModuleBackup::from_typed(
1095 module_instance_id,
1096 <T as ClientModule>::backup(self).await?,
1097 ))
1098 }
1099
1100 fn supports_being_primary(&self) -> PrimaryModuleSupport {
1101 <T as ClientModule>::supports_being_primary(self)
1102 }
1103
1104 async fn create_final_inputs_and_outputs(
1105 &self,
1106 module_instance: ModuleInstanceId,
1107 dbtx: &mut DatabaseTransaction<'_>,
1108 operation_id: OperationId,
1109 unit: AmountUnit,
1110 input_amount: Amount,
1111 output_amount: Amount,
1112 ) -> anyhow::Result<(ClientInputBundle, ClientOutputBundle)> {
1113 let (inputs, outputs) = <T as ClientModule>::create_final_inputs_and_outputs(
1114 self,
1115 &mut dbtx.to_ref_with_prefix_module_id(module_instance).0,
1116 operation_id,
1117 unit,
1118 input_amount,
1119 output_amount,
1120 )
1121 .await?;
1122
1123 let inputs = inputs.into_dyn(module_instance);
1124
1125 let outputs = outputs.into_dyn(module_instance);
1126
1127 Ok((inputs, outputs))
1128 }
1129
1130 async fn await_primary_module_output(
1131 &self,
1132 operation_id: OperationId,
1133 out_point: OutPoint,
1134 ) -> anyhow::Result<()> {
1135 <T as ClientModule>::await_primary_module_output(self, operation_id, out_point).await
1136 }
1137
1138 async fn get_balance(
1139 &self,
1140 module_instance: ModuleInstanceId,
1141 dbtx: &mut DatabaseTransaction<'_>,
1142 unit: AmountUnit,
1143 ) -> Amount {
1144 <T as ClientModule>::get_balance(
1145 self,
1146 &mut dbtx.to_ref_with_prefix_module_id(module_instance).0,
1147 unit,
1148 )
1149 .await
1150 }
1151
1152 async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
1153 <T as ClientModule>::subscribe_balance_changes(self).await
1154 }
1155}
1156
1157dyn_newtype_define!(
1158 #[derive(Clone)]
1159 pub DynClientModule(Arc<IClientModule>)
1160);
1161
1162impl AsRef<maybe_add_send_sync!(dyn IClientModule + 'static)> for DynClientModule {
1163 fn as_ref(&self) -> &maybe_add_send_sync!(dyn IClientModule + 'static) {
1164 self.inner.as_ref()
1165 }
1166}
1167
1168#[derive(Copy, Clone, Encodable, Decodable, Serialize, Deserialize, PartialEq, Eq, Hash, Debug)]
1170pub struct IdxRange {
1171 start: u64,
1172 end: u64,
1173}
1174
1175impl IdxRange {
1176 pub fn new_single(start: u64) -> Option<Self> {
1177 start.checked_add(1).map(|end| Self { start, end })
1178 }
1179
1180 pub fn start(self) -> u64 {
1181 self.start
1182 }
1183
1184 pub fn count(self) -> usize {
1185 self.into_iter().count()
1186 }
1187
1188 pub fn from_inclusive(range: ops::RangeInclusive<u64>) -> Option<Self> {
1189 range.end().checked_add(1).map(|end| Self {
1190 start: *range.start(),
1191 end,
1192 })
1193 }
1194}
1195
1196impl From<Range<u64>> for IdxRange {
1197 fn from(Range { start, end }: Range<u64>) -> Self {
1198 Self { start, end }
1199 }
1200}
1201
1202impl IntoIterator for IdxRange {
1203 type Item = u64;
1204
1205 type IntoIter = ops::Range<u64>;
1206
1207 fn into_iter(self) -> Self::IntoIter {
1208 ops::Range {
1209 start: self.start,
1210 end: self.end,
1211 }
1212 }
1213}
1214
1215#[derive(Copy, Clone, Encodable, Decodable, Serialize, Deserialize, PartialEq, Eq, Hash, Debug)]
1216pub struct OutPointRange {
1217 pub txid: TransactionId,
1218 idx_range: IdxRange,
1219}
1220
1221impl OutPointRange {
1222 pub fn new(txid: TransactionId, idx_range: IdxRange) -> Self {
1223 Self { txid, idx_range }
1224 }
1225
1226 pub fn new_single(txid: TransactionId, idx: u64) -> Option<Self> {
1227 IdxRange::new_single(idx).map(|idx_range| Self { txid, idx_range })
1228 }
1229
1230 pub fn start_idx(self) -> u64 {
1231 self.idx_range.start()
1232 }
1233
1234 pub fn out_idx_iter(self) -> impl Iterator<Item = u64> {
1235 self.idx_range.into_iter()
1236 }
1237
1238 pub fn count(self) -> usize {
1239 self.idx_range.count()
1240 }
1241}
1242
1243impl IntoIterator for OutPointRange {
1244 type Item = OutPoint;
1245
1246 type IntoIter = OutPointRangeIter;
1247
1248 fn into_iter(self) -> Self::IntoIter {
1249 OutPointRangeIter {
1250 txid: self.txid,
1251 inner: self.idx_range.into_iter(),
1252 }
1253 }
1254}
1255
1256pub struct OutPointRangeIter {
1257 txid: TransactionId,
1258
1259 inner: ops::Range<u64>,
1260}
1261
1262impl OutPointRange {
1263 pub fn txid(&self) -> TransactionId {
1264 self.txid
1265 }
1266}
1267
1268impl Iterator for OutPointRangeIter {
1269 type Item = OutPoint;
1270
1271 fn next(&mut self) -> Option<Self::Item> {
1272 self.inner.next().map(|idx| OutPoint {
1273 txid: self.txid,
1274 out_idx: idx,
1275 })
1276 }
1277}
1278
1279pub type StateGenerator<S> = Arc<maybe_add_send_sync!(dyn Fn(OutPointRange) -> Vec<S> + 'static)>;