1use core::fmt;
2use std::any::Any;
3use std::fmt::Debug;
4use std::ops::Range;
5use std::pin::Pin;
6use std::sync::{Arc, Weak};
7use std::{ffi, marker, ops};
8
9use anyhow::{anyhow, bail};
10use bitcoin::secp256k1::PublicKey;
11use fedimint_api_client::api::DynGlobalApi;
12use fedimint_core::config::ClientConfig;
13use fedimint_core::core::{
14 Decoder, DynInput, DynOutput, IInput, IntoDynInstance, ModuleInstanceId, ModuleKind,
15 OperationId,
16};
17use fedimint_core::db::{Database, DatabaseTransaction, GlobalDBTxAccessToken, NonCommittable};
18use fedimint_core::encoding::{Decodable, Encodable};
19use fedimint_core::invite_code::InviteCode;
20use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
21use fedimint_core::module::{CommonModuleInit, ModuleCommon, ModuleInit};
22use fedimint_core::task::{MaybeSend, MaybeSync};
23use fedimint_core::util::BoxStream;
24use fedimint_core::{
25 Amount, OutPoint, PeerId, TransactionId, apply, async_trait_maybe_send, dyn_newtype_define,
26 maybe_add_send, maybe_add_send_sync,
27};
28use fedimint_eventlog::{Event, EventKind};
29use fedimint_logging::LOG_CLIENT;
30use futures::Stream;
31use serde::de::DeserializeOwned;
32use serde::{Deserialize, Serialize};
33use tracing::warn;
34
35use self::init::ClientModuleInit;
36use crate::module::recovery::{DynModuleBackup, ModuleBackup};
37use crate::oplog::{IOperationLog, OperationLogEntry, UpdateStreamOrOutcome};
38use crate::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
39use crate::sm::{self, ActiveStateMeta, Context, DynContext, DynState, InactiveStateMeta, State};
40use crate::transaction::{ClientInputBundle, ClientOutputBundle, TransactionBuilder};
41use crate::{AddStateMachinesResult, InstancelessDynClientInputBundle, TransactionUpdates, oplog};
42
43pub mod init;
44pub mod recovery;
45
46pub type ClientModuleRegistry = ModuleRegistry<DynClientModule>;
47
48#[apply(async_trait_maybe_send!)]
57pub trait ClientContextIface: MaybeSend + MaybeSync {
58 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule);
59 fn api_clone(&self) -> DynGlobalApi;
60 fn decoders(&self) -> &ModuleDecoderRegistry;
61 async fn finalize_and_submit_transaction(
62 &self,
63 operation_id: OperationId,
64 operation_type: &str,
65 operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
66 tx_builder: TransactionBuilder,
67 ) -> anyhow::Result<OutPointRange>;
68
69 async fn finalize_and_submit_transaction_inner(
71 &self,
72 dbtx: &mut DatabaseTransaction<'_>,
73 operation_id: OperationId,
74 tx_builder: TransactionBuilder,
75 ) -> anyhow::Result<OutPointRange>;
76
77 async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates;
78
79 async fn await_primary_module_outputs(
80 &self,
81 operation_id: OperationId,
82 outputs: Vec<OutPoint>,
84 ) -> anyhow::Result<()>;
85
86 fn operation_log(&self) -> &dyn IOperationLog;
87
88 async fn has_active_states(&self, operation_id: OperationId) -> bool;
89
90 async fn operation_exists(&self, operation_id: OperationId) -> bool;
91
92 async fn config(&self) -> ClientConfig;
93
94 fn db(&self) -> &Database;
95
96 fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static));
97
98 async fn invite_code(&self, peer: PeerId) -> Option<InviteCode>;
99
100 fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)>;
101
102 async fn log_event_json(
103 &self,
104 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
105 module_kind: Option<ModuleKind>,
106 module_id: ModuleInstanceId,
107 kind: EventKind,
108 payload: serde_json::Value,
109 persist: bool,
110 );
111
112 async fn read_operation_active_states<'dbtx>(
113 &self,
114 operation_id: OperationId,
115 module_id: ModuleInstanceId,
116 dbtx: &'dbtx mut DatabaseTransaction<'_>,
117 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>;
118
119 async fn read_operation_inactive_states<'dbtx>(
120 &self,
121 operation_id: OperationId,
122 module_id: ModuleInstanceId,
123 dbtx: &'dbtx mut DatabaseTransaction<'_>,
124 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>;
125}
126
127#[derive(Clone, Default)]
133pub struct FinalClientIface(Arc<std::sync::OnceLock<Weak<dyn ClientContextIface>>>);
134
135impl FinalClientIface {
136 pub(crate) fn get(&self) -> Arc<dyn ClientContextIface> {
142 self.0
143 .get()
144 .expect("client must be already set")
145 .upgrade()
146 .expect("client module context must not be use past client shutdown")
147 }
148
149 pub fn set(&self, client: Weak<dyn ClientContextIface>) {
150 self.0.set(client).expect("FinalLazyClient already set");
151 }
152}
153
154impl fmt::Debug for FinalClientIface {
155 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156 f.write_str("FinalClientIface")
157 }
158}
159pub struct ClientContext<M> {
164 client: FinalClientIface,
165 module_instance_id: ModuleInstanceId,
166 global_dbtx_access_token: GlobalDBTxAccessToken,
167 module_db: Database,
168 _marker: marker::PhantomData<M>,
169}
170
171impl<M> Clone for ClientContext<M> {
172 fn clone(&self) -> Self {
173 Self {
174 client: self.client.clone(),
175 module_db: self.module_db.clone(),
176 module_instance_id: self.module_instance_id,
177 _marker: marker::PhantomData,
178 global_dbtx_access_token: self.global_dbtx_access_token,
179 }
180 }
181}
182
183pub struct ClientContextSelfRef<'s, M> {
186 client: Arc<dyn ClientContextIface>,
189 module_instance_id: ModuleInstanceId,
190 _marker: marker::PhantomData<&'s M>,
191}
192
193impl<M> ops::Deref for ClientContextSelfRef<'_, M>
194where
195 M: ClientModule,
196{
197 type Target = M;
198
199 fn deref(&self) -> &Self::Target {
200 self.client
201 .get_module(self.module_instance_id)
202 .as_any()
203 .downcast_ref::<M>()
204 .unwrap_or_else(|| panic!("Module is not of type {}", std::any::type_name::<M>()))
205 }
206}
207
208impl<M> fmt::Debug for ClientContext<M> {
209 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
210 f.write_str("ClientContext")
211 }
212}
213
214impl<M> ClientContext<M>
215where
216 M: ClientModule,
217{
218 pub fn new(
219 client: FinalClientIface,
220 module_instance_id: ModuleInstanceId,
221 global_dbtx_access_token: GlobalDBTxAccessToken,
222 module_db: Database,
223 ) -> Self {
224 Self {
225 client,
226 module_instance_id,
227 global_dbtx_access_token,
228 module_db,
229 _marker: marker::PhantomData,
230 }
231 }
232
233 #[allow(clippy::needless_lifetimes)] pub fn self_ref<'s>(&'s self) -> ClientContextSelfRef<'s, M> {
245 ClientContextSelfRef {
246 client: self.client.get(),
247 module_instance_id: self.module_instance_id,
248 _marker: marker::PhantomData,
249 }
250 }
251
252 pub fn global_api(&self) -> DynGlobalApi {
254 self.client.get().api_clone()
255 }
256
257 pub fn decoders(&self) -> ModuleDecoderRegistry {
259 Clone::clone(self.client.get().decoders())
260 }
261
262 pub fn input_from_dyn<'i>(
263 &self,
264 input: &'i DynInput,
265 ) -> Option<&'i <M::Common as ModuleCommon>::Input> {
266 (input.module_instance_id() == self.module_instance_id).then(|| {
267 input
268 .as_any()
269 .downcast_ref::<<M::Common as ModuleCommon>::Input>()
270 .unwrap_or_else(|| {
271 panic!("instance_id {} just checked", input.module_instance_id())
272 })
273 })
274 }
275
276 pub fn output_from_dyn<'o>(
277 &self,
278 output: &'o DynOutput,
279 ) -> Option<&'o <M::Common as ModuleCommon>::Output> {
280 (output.module_instance_id() == self.module_instance_id).then(|| {
281 output
282 .as_any()
283 .downcast_ref::<<M::Common as ModuleCommon>::Output>()
284 .unwrap_or_else(|| {
285 panic!("instance_id {} just checked", output.module_instance_id())
286 })
287 })
288 }
289
290 pub fn map_dyn<'s, 'i, 'o, I>(
291 &'s self,
292 typed: impl IntoIterator<Item = I> + 'i,
293 ) -> impl Iterator<Item = <I as IntoDynInstance>::DynType> + 'o
294 where
295 I: IntoDynInstance,
296 'i: 'o,
297 's: 'o,
298 {
299 typed.into_iter().map(|i| self.make_dyn(i))
300 }
301
302 pub fn make_dyn_output(&self, output: <M::Common as ModuleCommon>::Output) -> DynOutput {
304 self.make_dyn(output)
305 }
306
307 pub fn make_dyn_input(&self, input: <M::Common as ModuleCommon>::Input) -> DynInput {
309 self.make_dyn(input)
310 }
311
312 pub fn make_dyn<I>(&self, typed: I) -> <I as IntoDynInstance>::DynType
314 where
315 I: IntoDynInstance,
316 {
317 typed.into_dyn(self.module_instance_id)
318 }
319
320 pub fn make_client_outputs<O, S>(&self, output: ClientOutputBundle<O, S>) -> ClientOutputBundle
322 where
323 O: IntoDynInstance<DynType = DynOutput> + 'static,
324 S: IntoDynInstance<DynType = DynState> + 'static,
325 {
326 self.make_dyn(output)
327 }
328
329 pub fn make_client_inputs<I, S>(&self, inputs: ClientInputBundle<I, S>) -> ClientInputBundle
331 where
332 I: IntoDynInstance<DynType = DynInput> + 'static,
333 S: IntoDynInstance<DynType = DynState> + 'static,
334 {
335 self.make_dyn(inputs)
336 }
337
338 pub fn make_dyn_state<S>(&self, sm: S) -> DynState
339 where
340 S: sm::IState + 'static,
341 {
342 DynState::from_typed(self.module_instance_id, sm)
343 }
344
345 pub async fn finalize_and_submit_transaction<F, Meta>(
346 &self,
347 operation_id: OperationId,
348 operation_type: &str,
349 operation_meta_gen: F,
350 tx_builder: TransactionBuilder,
351 ) -> anyhow::Result<OutPointRange>
352 where
353 F: Fn(OutPointRange) -> Meta + Clone + MaybeSend + MaybeSync + 'static,
354 Meta: serde::Serialize + MaybeSend,
355 {
356 self.client
357 .get()
358 .finalize_and_submit_transaction(
359 operation_id,
360 operation_type,
361 Box::new(move |out_point_range| {
362 serde_json::to_value(operation_meta_gen(out_point_range)).expect("Can't fail")
363 }),
364 tx_builder,
365 )
366 .await
367 }
368
369 pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
370 self.client.get().transaction_updates(operation_id).await
371 }
372
373 pub async fn await_primary_module_outputs(
374 &self,
375 operation_id: OperationId,
376 outputs: Vec<OutPoint>,
378 ) -> anyhow::Result<()> {
379 self.client
380 .get()
381 .await_primary_module_outputs(operation_id, outputs)
382 .await
383 }
384
385 pub async fn get_operation(
387 &self,
388 operation_id: OperationId,
389 ) -> anyhow::Result<oplog::OperationLogEntry> {
390 let operation = self
391 .client
392 .get()
393 .operation_log()
394 .get_operation(operation_id)
395 .await
396 .ok_or(anyhow::anyhow!("Operation not found"))?;
397
398 if operation.operation_module_kind() != M::kind().as_str() {
399 bail!("Operation is not a lightning operation");
400 }
401
402 Ok(operation)
403 }
404
405 fn global_db(&self) -> fedimint_core::db::Database {
409 let db = Clone::clone(self.client.get().db());
410
411 db.ensure_global()
412 .expect("global_db must always return a global db");
413
414 db
415 }
416
417 pub fn module_db(&self) -> &Database {
418 self.module_db
419 .ensure_isolated()
420 .expect("module_db must always return isolated db");
421 &self.module_db
422 }
423
424 pub async fn has_active_states(&self, op_id: OperationId) -> bool {
425 self.client.get().has_active_states(op_id).await
426 }
427
428 pub async fn operation_exists(&self, op_id: OperationId) -> bool {
429 self.client.get().operation_exists(op_id).await
430 }
431
432 pub async fn get_own_active_states(&self) -> Vec<(M::States, ActiveStateMeta)> {
433 self.client
434 .get()
435 .executor()
436 .get_active_states()
437 .await
438 .into_iter()
439 .filter(|s| s.0.module_instance_id() == self.module_instance_id)
440 .map(|s| {
441 (
442 Clone::clone(
443 s.0.as_any()
444 .downcast_ref::<M::States>()
445 .expect("incorrect output type passed to module plugin"),
446 ),
447 s.1,
448 )
449 })
450 .collect()
451 }
452
453 pub async fn get_config(&self) -> ClientConfig {
454 self.client.get().config().await
455 }
456
457 pub async fn get_invite_code(&self) -> InviteCode {
460 let cfg = self.get_config().await.global;
461 self.client
462 .get()
463 .invite_code(
464 *cfg.api_endpoints
465 .keys()
466 .next()
467 .expect("A federation always has at least one guardian"),
468 )
469 .await
470 .expect("The guardian we requested an invite code for exists")
471 }
472
473 pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
474 self.client.get().get_internal_payment_markers()
475 }
476
477 pub async fn manual_operation_start(
480 &self,
481 operation_id: OperationId,
482 op_type: &str,
483 operation_meta: impl serde::Serialize + Debug,
484 sms: Vec<DynState>,
485 ) -> anyhow::Result<()> {
486 let db = self.module_db();
487 let mut dbtx = db.begin_transaction().await;
488 {
489 let dbtx = &mut dbtx.global_dbtx(self.global_dbtx_access_token);
490
491 self.manual_operation_start_inner(
492 &mut dbtx.to_ref_nc(),
493 operation_id,
494 op_type,
495 operation_meta,
496 sms,
497 )
498 .await?;
499 }
500
501 dbtx.commit_tx_result().await.map_err(|_| {
502 anyhow!(
503 "Operation with id {} already exists",
504 operation_id.fmt_short()
505 )
506 })?;
507
508 Ok(())
509 }
510
511 pub async fn manual_operation_start_dbtx(
512 &self,
513 dbtx: &mut DatabaseTransaction<'_>,
514 operation_id: OperationId,
515 op_type: &str,
516 operation_meta: impl serde::Serialize + Debug,
517 sms: Vec<DynState>,
518 ) -> anyhow::Result<()> {
519 self.manual_operation_start_inner(
520 &mut dbtx.global_dbtx(self.global_dbtx_access_token),
521 operation_id,
522 op_type,
523 operation_meta,
524 sms,
525 )
526 .await
527 }
528
529 async fn manual_operation_start_inner(
532 &self,
533 dbtx: &mut DatabaseTransaction<'_>,
534 operation_id: OperationId,
535 op_type: &str,
536 operation_meta: impl serde::Serialize + Debug,
537 sms: Vec<DynState>,
538 ) -> anyhow::Result<()> {
539 dbtx.ensure_global()
540 .expect("Must deal with global dbtx here");
541
542 if self
543 .client
544 .get()
545 .operation_log()
546 .get_operation_dbtx(&mut dbtx.to_ref_nc(), operation_id)
547 .await
548 .is_some()
549 {
550 bail!(
551 "Operation with id {} already exists",
552 operation_id.fmt_short()
553 );
554 }
555
556 self.client
557 .get()
558 .operation_log()
559 .add_operation_log_entry_dbtx(
560 &mut dbtx.to_ref_nc(),
561 operation_id,
562 op_type,
563 serde_json::to_value(operation_meta).expect("Can't fail"),
564 )
565 .await;
566
567 self.client
568 .get()
569 .executor()
570 .add_state_machines_dbtx(&mut dbtx.to_ref_nc(), sms)
571 .await
572 .expect("State machine is valid");
573
574 Ok(())
575 }
576
577 pub fn outcome_or_updates<U, S>(
578 &self,
579 operation: OperationLogEntry,
580 operation_id: OperationId,
581 stream_gen: impl FnOnce() -> S + 'static,
582 ) -> UpdateStreamOrOutcome<U>
583 where
584 U: Clone + Serialize + DeserializeOwned + Debug + MaybeSend + MaybeSync + 'static,
585 S: Stream<Item = U> + MaybeSend + 'static,
586 {
587 use futures::StreamExt;
588 match self.client.get().operation_log().outcome_or_updates(
589 &self.global_db(),
590 operation_id,
591 operation,
592 Box::new(move || {
593 let stream_gen = stream_gen();
594 Box::pin(
595 stream_gen.map(move |item| serde_json::to_value(item).expect("Can't fail")),
596 )
597 }),
598 ) {
599 UpdateStreamOrOutcome::UpdateStream(stream) => UpdateStreamOrOutcome::UpdateStream(
600 Box::pin(stream.map(|u| serde_json::from_value(u).expect("Can't fail"))),
601 ),
602 UpdateStreamOrOutcome::Outcome(o) => {
603 UpdateStreamOrOutcome::Outcome(serde_json::from_value(o).expect("Can't fail"))
604 }
605 }
606 }
607
608 pub async fn claim_inputs<I, S>(
609 &self,
610 dbtx: &mut DatabaseTransaction<'_>,
611 inputs: ClientInputBundle<I, S>,
612 operation_id: OperationId,
613 ) -> anyhow::Result<OutPointRange>
614 where
615 I: IInput + MaybeSend + MaybeSync + 'static,
616 S: sm::IState + MaybeSend + MaybeSync + 'static,
617 {
618 self.claim_inputs_dyn(dbtx, inputs.into_instanceless(), operation_id)
619 .await
620 }
621
622 async fn claim_inputs_dyn(
623 &self,
624 dbtx: &mut DatabaseTransaction<'_>,
625 inputs: InstancelessDynClientInputBundle,
626 operation_id: OperationId,
627 ) -> anyhow::Result<OutPointRange> {
628 let tx_builder =
629 TransactionBuilder::new().with_inputs(inputs.into_dyn(self.module_instance_id));
630
631 self.client
632 .get()
633 .finalize_and_submit_transaction_inner(
634 &mut dbtx.global_dbtx(self.global_dbtx_access_token),
635 operation_id,
636 tx_builder,
637 )
638 .await
639 }
640
641 pub async fn add_state_machines_dbtx(
642 &self,
643 dbtx: &mut DatabaseTransaction<'_>,
644 states: Vec<DynState>,
645 ) -> AddStateMachinesResult {
646 self.client
647 .get()
648 .executor()
649 .add_state_machines_dbtx(&mut dbtx.global_dbtx(self.global_dbtx_access_token), states)
650 .await
651 }
652
653 pub async fn add_operation_log_entry_dbtx(
654 &self,
655 dbtx: &mut DatabaseTransaction<'_>,
656 operation_id: OperationId,
657 operation_type: &str,
658 operation_meta: impl serde::Serialize,
659 ) {
660 self.client
661 .get()
662 .operation_log()
663 .add_operation_log_entry_dbtx(
664 &mut dbtx.global_dbtx(self.global_dbtx_access_token),
665 operation_id,
666 operation_type,
667 serde_json::to_value(operation_meta).expect("Can't fail"),
668 )
669 .await;
670 }
671
672 pub async fn log_event<E, Cap>(&self, dbtx: &mut DatabaseTransaction<'_, Cap>, event: E)
673 where
674 E: Event + Send,
675 Cap: Send,
676 {
677 if <E as Event>::MODULE != Some(<M as ClientModule>::kind()) {
678 warn!(
679 target: LOG_CLIENT,
680 module_kind = %<M as ClientModule>::kind(),
681 event_module = ?<E as Event>::MODULE,
682 "Client module logging events of different module than its own. This might become an error in the future."
683 );
684 }
685 self.client
686 .get()
687 .log_event_json(
688 &mut dbtx.global_dbtx(self.global_dbtx_access_token).to_ref_nc(),
689 <E as Event>::MODULE,
690 self.module_instance_id,
691 <E as Event>::KIND,
692 serde_json::to_value(event).expect("Can't fail"),
693 <E as Event>::PERSIST,
694 )
695 .await;
696 }
697}
698
699#[apply(async_trait_maybe_send!)]
701pub trait ClientModule: Debug + MaybeSend + MaybeSync + 'static {
702 type Init: ClientModuleInit;
703
704 type Common: ModuleCommon;
706
707 type Backup: ModuleBackup;
710
711 type ModuleStateMachineContext: Context;
714
715 type States: State<ModuleContext = Self::ModuleStateMachineContext>
717 + IntoDynInstance<DynType = DynState>;
718
719 fn decoder() -> Decoder {
720 let mut decoder_builder = Self::Common::decoder_builder();
721 decoder_builder.with_decodable_type::<Self::States>();
722 decoder_builder.with_decodable_type::<Self::Backup>();
723 decoder_builder.build()
724 }
725
726 fn kind() -> ModuleKind {
727 <<<Self as ClientModule>::Init as ModuleInit>::Common as CommonModuleInit>::KIND
728 }
729
730 fn context(&self) -> Self::ModuleStateMachineContext;
731
732 async fn start(&self) {}
738
739 async fn handle_cli_command(
740 &self,
741 _args: &[ffi::OsString],
742 ) -> anyhow::Result<serde_json::Value> {
743 Err(anyhow::format_err!(
744 "This module does not implement cli commands"
745 ))
746 }
747
748 async fn handle_rpc(
749 &self,
750 _method: String,
751 _request: serde_json::Value,
752 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
753 Box::pin(futures::stream::once(std::future::ready(Err(
754 anyhow::format_err!("This module does not implement rpc"),
755 ))))
756 }
757
758 fn input_fee(
767 &self,
768 amount: Amount,
769 input: &<Self::Common as ModuleCommon>::Input,
770 ) -> Option<Amount>;
771
772 fn output_fee(
781 &self,
782 amount: Amount,
783 output: &<Self::Common as ModuleCommon>::Output,
784 ) -> Option<Amount>;
785
786 fn supports_backup(&self) -> bool {
787 false
788 }
789
790 async fn backup(&self) -> anyhow::Result<Self::Backup> {
791 anyhow::bail!("Backup not supported");
792 }
793
794 fn supports_being_primary(&self) -> bool {
803 false
804 }
805
806 async fn create_final_inputs_and_outputs(
825 &self,
826 _dbtx: &mut DatabaseTransaction<'_>,
827 _operation_id: OperationId,
828 _input_amount: Amount,
829 _output_amount: Amount,
830 ) -> anyhow::Result<(
831 ClientInputBundle<<Self::Common as ModuleCommon>::Input, Self::States>,
832 ClientOutputBundle<<Self::Common as ModuleCommon>::Output, Self::States>,
833 )> {
834 unimplemented!()
835 }
836
837 async fn await_primary_module_output(
842 &self,
843 _operation_id: OperationId,
844 _out_point: OutPoint,
845 ) -> anyhow::Result<()> {
846 unimplemented!()
847 }
848
849 async fn get_balance(&self, _dbtx: &mut DatabaseTransaction<'_>) -> Amount {
852 unimplemented!()
853 }
854
855 async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
858 unimplemented!()
859 }
860
861 async fn leave(&self, _dbtx: &mut DatabaseTransaction<'_>) -> anyhow::Result<()> {
917 bail!("Unable to determine if safe to leave the federation: Not implemented")
918 }
919}
920
921#[apply(async_trait_maybe_send!)]
923pub trait IClientModule: Debug {
924 fn as_any(&self) -> &(maybe_add_send_sync!(dyn std::any::Any));
925
926 fn decoder(&self) -> Decoder;
927
928 fn context(&self, instance: ModuleInstanceId) -> DynContext;
929
930 async fn start(&self);
931
932 async fn handle_cli_command(&self, args: &[ffi::OsString])
933 -> anyhow::Result<serde_json::Value>;
934
935 async fn handle_rpc(
936 &self,
937 method: String,
938 request: serde_json::Value,
939 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>>;
940
941 fn input_fee(&self, amount: Amount, input: &DynInput) -> Option<Amount>;
942
943 fn output_fee(&self, amount: Amount, output: &DynOutput) -> Option<Amount>;
944
945 fn supports_backup(&self) -> bool;
946
947 async fn backup(&self, module_instance_id: ModuleInstanceId)
948 -> anyhow::Result<DynModuleBackup>;
949
950 fn supports_being_primary(&self) -> bool;
951
952 async fn create_final_inputs_and_outputs(
953 &self,
954 module_instance: ModuleInstanceId,
955 dbtx: &mut DatabaseTransaction<'_>,
956 operation_id: OperationId,
957 input_amount: Amount,
958 output_amount: Amount,
959 ) -> anyhow::Result<(ClientInputBundle, ClientOutputBundle)>;
960
961 async fn await_primary_module_output(
962 &self,
963 operation_id: OperationId,
964 out_point: OutPoint,
965 ) -> anyhow::Result<()>;
966
967 async fn get_balance(
968 &self,
969 module_instance: ModuleInstanceId,
970 dbtx: &mut DatabaseTransaction<'_>,
971 ) -> Amount;
972
973 async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()>;
974}
975
976#[apply(async_trait_maybe_send!)]
977impl<T> IClientModule for T
978where
979 T: ClientModule,
980{
981 fn as_any(&self) -> &(maybe_add_send_sync!(dyn Any)) {
982 self
983 }
984
985 fn decoder(&self) -> Decoder {
986 T::decoder()
987 }
988
989 fn context(&self, instance: ModuleInstanceId) -> DynContext {
990 DynContext::from_typed(instance, <T as ClientModule>::context(self))
991 }
992
993 async fn start(&self) {
994 <T as ClientModule>::start(self).await;
995 }
996
997 async fn handle_cli_command(
998 &self,
999 args: &[ffi::OsString],
1000 ) -> anyhow::Result<serde_json::Value> {
1001 <T as ClientModule>::handle_cli_command(self, args).await
1002 }
1003
1004 async fn handle_rpc(
1005 &self,
1006 method: String,
1007 request: serde_json::Value,
1008 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1009 <T as ClientModule>::handle_rpc(self, method, request).await
1010 }
1011
1012 fn input_fee(&self, amount: Amount, input: &DynInput) -> Option<Amount> {
1013 <T as ClientModule>::input_fee(
1014 self,
1015 amount,
1016 input
1017 .as_any()
1018 .downcast_ref()
1019 .expect("Dispatched to correct module"),
1020 )
1021 }
1022
1023 fn output_fee(&self, amount: Amount, output: &DynOutput) -> Option<Amount> {
1024 <T as ClientModule>::output_fee(
1025 self,
1026 amount,
1027 output
1028 .as_any()
1029 .downcast_ref()
1030 .expect("Dispatched to correct module"),
1031 )
1032 }
1033
1034 fn supports_backup(&self) -> bool {
1035 <T as ClientModule>::supports_backup(self)
1036 }
1037
1038 async fn backup(
1039 &self,
1040 module_instance_id: ModuleInstanceId,
1041 ) -> anyhow::Result<DynModuleBackup> {
1042 Ok(DynModuleBackup::from_typed(
1043 module_instance_id,
1044 <T as ClientModule>::backup(self).await?,
1045 ))
1046 }
1047
1048 fn supports_being_primary(&self) -> bool {
1049 <T as ClientModule>::supports_being_primary(self)
1050 }
1051
1052 async fn create_final_inputs_and_outputs(
1053 &self,
1054 module_instance: ModuleInstanceId,
1055 dbtx: &mut DatabaseTransaction<'_>,
1056 operation_id: OperationId,
1057 input_amount: Amount,
1058 output_amount: Amount,
1059 ) -> anyhow::Result<(ClientInputBundle, ClientOutputBundle)> {
1060 let (inputs, outputs) = <T as ClientModule>::create_final_inputs_and_outputs(
1061 self,
1062 &mut dbtx.to_ref_with_prefix_module_id(module_instance).0,
1063 operation_id,
1064 input_amount,
1065 output_amount,
1066 )
1067 .await?;
1068
1069 let inputs = inputs.into_dyn(module_instance);
1070
1071 let outputs = outputs.into_dyn(module_instance);
1072
1073 Ok((inputs, outputs))
1074 }
1075
1076 async fn await_primary_module_output(
1077 &self,
1078 operation_id: OperationId,
1079 out_point: OutPoint,
1080 ) -> anyhow::Result<()> {
1081 <T as ClientModule>::await_primary_module_output(self, operation_id, out_point).await
1082 }
1083
1084 async fn get_balance(
1085 &self,
1086 module_instance: ModuleInstanceId,
1087 dbtx: &mut DatabaseTransaction<'_>,
1088 ) -> Amount {
1089 <T as ClientModule>::get_balance(
1090 self,
1091 &mut dbtx.to_ref_with_prefix_module_id(module_instance).0,
1092 )
1093 .await
1094 }
1095
1096 async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
1097 <T as ClientModule>::subscribe_balance_changes(self).await
1098 }
1099}
1100
1101dyn_newtype_define!(
1102 #[derive(Clone)]
1103 pub DynClientModule(Arc<IClientModule>)
1104);
1105
1106impl AsRef<maybe_add_send_sync!(dyn IClientModule + 'static)> for DynClientModule {
1107 fn as_ref(&self) -> &maybe_add_send_sync!(dyn IClientModule + 'static) {
1108 self.inner.as_ref()
1109 }
1110}
1111
1112#[derive(Copy, Clone, Encodable, Decodable, Serialize, Deserialize, PartialEq, Eq, Hash, Debug)]
1114pub struct IdxRange {
1115 start: u64,
1116 end: u64,
1117}
1118
1119impl IdxRange {
1120 pub fn new_single(start: u64) -> Option<Self> {
1121 start.checked_add(1).map(|end| Self { start, end })
1122 }
1123
1124 pub fn start(self) -> u64 {
1125 self.start
1126 }
1127
1128 pub fn count(self) -> usize {
1129 self.into_iter().count()
1130 }
1131
1132 pub fn from_inclusive(range: ops::RangeInclusive<u64>) -> Option<Self> {
1133 range.end().checked_add(1).map(|end| Self {
1134 start: *range.start(),
1135 end,
1136 })
1137 }
1138}
1139
1140impl From<Range<u64>> for IdxRange {
1141 fn from(Range { start, end }: Range<u64>) -> Self {
1142 Self { start, end }
1143 }
1144}
1145
1146impl IntoIterator for IdxRange {
1147 type Item = u64;
1148
1149 type IntoIter = ops::Range<u64>;
1150
1151 fn into_iter(self) -> Self::IntoIter {
1152 ops::Range {
1153 start: self.start,
1154 end: self.end,
1155 }
1156 }
1157}
1158
1159#[derive(Copy, Clone, Encodable, Decodable, Serialize, Deserialize, PartialEq, Eq, Hash, Debug)]
1160pub struct OutPointRange {
1161 pub txid: TransactionId,
1162 idx_range: IdxRange,
1163}
1164
1165impl OutPointRange {
1166 pub fn new(txid: TransactionId, idx_range: IdxRange) -> Self {
1167 Self { txid, idx_range }
1168 }
1169
1170 pub fn new_single(txid: TransactionId, idx: u64) -> Option<Self> {
1171 IdxRange::new_single(idx).map(|idx_range| Self { txid, idx_range })
1172 }
1173
1174 pub fn start_idx(self) -> u64 {
1175 self.idx_range.start()
1176 }
1177
1178 pub fn out_idx_iter(self) -> impl Iterator<Item = u64> {
1179 self.idx_range.into_iter()
1180 }
1181
1182 pub fn count(self) -> usize {
1183 self.idx_range.count()
1184 }
1185}
1186
1187impl IntoIterator for OutPointRange {
1188 type Item = OutPoint;
1189
1190 type IntoIter = OutPointRangeIter;
1191
1192 fn into_iter(self) -> Self::IntoIter {
1193 OutPointRangeIter {
1194 txid: self.txid,
1195 inner: self.idx_range.into_iter(),
1196 }
1197 }
1198}
1199
1200pub struct OutPointRangeIter {
1201 txid: TransactionId,
1202
1203 inner: ops::Range<u64>,
1204}
1205
1206impl OutPointRange {
1207 pub fn txid(&self) -> TransactionId {
1208 self.txid
1209 }
1210}
1211
1212impl Iterator for OutPointRangeIter {
1213 type Item = OutPoint;
1214
1215 fn next(&mut self) -> Option<Self::Item> {
1216 self.inner.next().map(|idx| OutPoint {
1217 txid: self.txid,
1218 out_idx: idx,
1219 })
1220 }
1221}
1222
1223pub type StateGenerator<S> = Arc<maybe_add_send_sync!(dyn Fn(OutPointRange) -> Vec<S> + 'static)>;