1use core::fmt;
2use std::any::Any;
3use std::fmt::Debug;
4use std::ops::Range;
5use std::pin::Pin;
6use std::sync::{Arc, Weak};
7use std::{ffi, marker, ops};
8
9use anyhow::{anyhow, bail};
10use bitcoin::secp256k1::PublicKey;
11use fedimint_api_client::api::DynGlobalApi;
12use fedimint_core::config::ClientConfig;
13use fedimint_core::core::{
14 Decoder, DynInput, DynOutput, IInput, IntoDynInstance, ModuleInstanceId, ModuleKind,
15 OperationId,
16};
17use fedimint_core::db::{Database, DatabaseTransaction, GlobalDBTxAccessToken, NonCommittable};
18use fedimint_core::encoding::{Decodable, Encodable};
19use fedimint_core::invite_code::InviteCode;
20use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
21use fedimint_core::module::{CommonModuleInit, ModuleCommon, ModuleInit};
22use fedimint_core::task::{MaybeSend, MaybeSync};
23use fedimint_core::util::BoxStream;
24use fedimint_core::{
25 Amount, OutPoint, PeerId, TransactionId, apply, async_trait_maybe_send, dyn_newtype_define,
26 maybe_add_send, maybe_add_send_sync,
27};
28use fedimint_eventlog::{Event, EventKind};
29use fedimint_logging::LOG_CLIENT;
30use futures::Stream;
31use serde::de::DeserializeOwned;
32use serde::{Deserialize, Serialize};
33use tracing::warn;
34
35use self::init::ClientModuleInit;
36use crate::module::recovery::{DynModuleBackup, ModuleBackup};
37use crate::oplog::{IOperationLog, OperationLogEntry, UpdateStreamOrOutcome};
38use crate::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
39use crate::sm::{self, ActiveStateMeta, Context, DynContext, DynState, InactiveStateMeta, State};
40use crate::transaction::{ClientInputBundle, ClientOutputBundle, TransactionBuilder};
41use crate::{AddStateMachinesResult, InstancelessDynClientInputBundle, TransactionUpdates, oplog};
42
43pub mod init;
44pub mod recovery;
45
46pub type ClientModuleRegistry = ModuleRegistry<DynClientModule>;
47
48#[apply(async_trait_maybe_send!)]
57pub trait ClientContextIface: MaybeSend + MaybeSync {
58 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule);
59 fn api_clone(&self) -> DynGlobalApi;
60 fn decoders(&self) -> &ModuleDecoderRegistry;
61 async fn finalize_and_submit_transaction(
62 &self,
63 operation_id: OperationId,
64 operation_type: &str,
65 operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
66 tx_builder: TransactionBuilder,
67 ) -> anyhow::Result<OutPointRange>;
68
69 async fn finalize_and_submit_transaction_inner(
71 &self,
72 dbtx: &mut DatabaseTransaction<'_>,
73 operation_id: OperationId,
74 tx_builder: TransactionBuilder,
75 ) -> anyhow::Result<OutPointRange>;
76
77 async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates;
78
79 async fn await_primary_module_outputs(
80 &self,
81 operation_id: OperationId,
82 outputs: Vec<OutPoint>,
84 ) -> anyhow::Result<()>;
85
86 fn operation_log(&self) -> &dyn IOperationLog;
87
88 async fn has_active_states(&self, operation_id: OperationId) -> bool;
89
90 async fn operation_exists(&self, operation_id: OperationId) -> bool;
91
92 async fn config(&self) -> ClientConfig;
93
94 fn db(&self) -> &Database;
95
96 fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static));
97
98 async fn invite_code(&self, peer: PeerId) -> Option<InviteCode>;
99
100 fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)>;
101
102 async fn log_event_json(
103 &self,
104 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
105 module_kind: Option<ModuleKind>,
106 module_id: ModuleInstanceId,
107 kind: EventKind,
108 payload: serde_json::Value,
109 persist: bool,
110 );
111
112 async fn read_operation_active_states<'dbtx>(
113 &self,
114 operation_id: OperationId,
115 module_id: ModuleInstanceId,
116 dbtx: &'dbtx mut DatabaseTransaction<'_>,
117 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>;
118
119 async fn read_operation_inactive_states<'dbtx>(
120 &self,
121 operation_id: OperationId,
122 module_id: ModuleInstanceId,
123 dbtx: &'dbtx mut DatabaseTransaction<'_>,
124 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>;
125}
126
127#[derive(Clone, Default)]
133pub struct FinalClientIface(Arc<std::sync::OnceLock<Weak<dyn ClientContextIface>>>);
134
135impl FinalClientIface {
136 pub(crate) fn get(&self) -> Arc<dyn ClientContextIface> {
142 self.0
143 .get()
144 .expect("client must be already set")
145 .upgrade()
146 .expect("client module context must not be use past client shutdown")
147 }
148
149 pub fn set(&self, client: Weak<dyn ClientContextIface>) {
150 self.0.set(client).expect("FinalLazyClient already set");
151 }
152}
153
154impl fmt::Debug for FinalClientIface {
155 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156 f.write_str("FinalClientIface")
157 }
158}
159pub struct ClientContext<M> {
164 client: FinalClientIface,
165 module_instance_id: ModuleInstanceId,
166 global_dbtx_access_token: GlobalDBTxAccessToken,
167 module_db: Database,
168 _marker: marker::PhantomData<M>,
169}
170
171impl<M> Clone for ClientContext<M> {
172 fn clone(&self) -> Self {
173 Self {
174 client: self.client.clone(),
175 module_db: self.module_db.clone(),
176 module_instance_id: self.module_instance_id,
177 _marker: marker::PhantomData,
178 global_dbtx_access_token: self.global_dbtx_access_token,
179 }
180 }
181}
182
183pub struct ClientContextSelfRef<'s, M> {
186 client: Arc<dyn ClientContextIface>,
189 module_instance_id: ModuleInstanceId,
190 _marker: marker::PhantomData<&'s M>,
191}
192
193impl<M> ops::Deref for ClientContextSelfRef<'_, M>
194where
195 M: ClientModule,
196{
197 type Target = M;
198
199 fn deref(&self) -> &Self::Target {
200 self.client
201 .get_module(self.module_instance_id)
202 .as_any()
203 .downcast_ref::<M>()
204 .unwrap_or_else(|| panic!("Module is not of type {}", std::any::type_name::<M>()))
205 }
206}
207
208impl<M> fmt::Debug for ClientContext<M> {
209 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
210 f.write_str("ClientContext")
211 }
212}
213
214impl<M> ClientContext<M>
215where
216 M: ClientModule,
217{
218 pub fn new(
219 client: FinalClientIface,
220 module_instance_id: ModuleInstanceId,
221 global_dbtx_access_token: GlobalDBTxAccessToken,
222 module_db: Database,
223 ) -> Self {
224 Self {
225 client,
226 module_instance_id,
227 global_dbtx_access_token,
228 module_db,
229 _marker: marker::PhantomData,
230 }
231 }
232
233 #[allow(clippy::needless_lifetimes)] pub fn self_ref<'s>(&'s self) -> ClientContextSelfRef<'s, M> {
245 ClientContextSelfRef {
246 client: self.client.get(),
247 module_instance_id: self.module_instance_id,
248 _marker: marker::PhantomData,
249 }
250 }
251
252 pub fn global_api(&self) -> DynGlobalApi {
254 self.client.get().api_clone()
255 }
256
257 pub fn decoders(&self) -> ModuleDecoderRegistry {
259 Clone::clone(self.client.get().decoders())
260 }
261
262 pub fn input_from_dyn<'i>(
263 &self,
264 input: &'i DynInput,
265 ) -> Option<&'i <M::Common as ModuleCommon>::Input> {
266 (input.module_instance_id() == self.module_instance_id).then(|| {
267 input
268 .as_any()
269 .downcast_ref::<<M::Common as ModuleCommon>::Input>()
270 .unwrap_or_else(|| {
271 panic!("instance_id {} just checked", input.module_instance_id())
272 })
273 })
274 }
275
276 pub fn output_from_dyn<'o>(
277 &self,
278 output: &'o DynOutput,
279 ) -> Option<&'o <M::Common as ModuleCommon>::Output> {
280 (output.module_instance_id() == self.module_instance_id).then(|| {
281 output
282 .as_any()
283 .downcast_ref::<<M::Common as ModuleCommon>::Output>()
284 .unwrap_or_else(|| {
285 panic!("instance_id {} just checked", output.module_instance_id())
286 })
287 })
288 }
289
290 pub fn map_dyn<'s, 'i, 'o, I>(
291 &'s self,
292 typed: impl IntoIterator<Item = I> + 'i,
293 ) -> impl Iterator<Item = <I as IntoDynInstance>::DynType> + 'o
294 where
295 I: IntoDynInstance,
296 'i: 'o,
297 's: 'o,
298 {
299 typed.into_iter().map(|i| self.make_dyn(i))
300 }
301
302 pub fn make_dyn_output(&self, output: <M::Common as ModuleCommon>::Output) -> DynOutput {
304 self.make_dyn(output)
305 }
306
307 pub fn make_dyn_input(&self, input: <M::Common as ModuleCommon>::Input) -> DynInput {
309 self.make_dyn(input)
310 }
311
312 pub fn make_dyn<I>(&self, typed: I) -> <I as IntoDynInstance>::DynType
314 where
315 I: IntoDynInstance,
316 {
317 typed.into_dyn(self.module_instance_id)
318 }
319
320 pub fn make_client_outputs<O, S>(&self, output: ClientOutputBundle<O, S>) -> ClientOutputBundle
322 where
323 O: IntoDynInstance<DynType = DynOutput> + 'static,
324 S: IntoDynInstance<DynType = DynState> + 'static,
325 {
326 self.make_dyn(output)
327 }
328
329 pub fn make_client_inputs<I, S>(&self, inputs: ClientInputBundle<I, S>) -> ClientInputBundle
331 where
332 I: IntoDynInstance<DynType = DynInput> + 'static,
333 S: IntoDynInstance<DynType = DynState> + 'static,
334 {
335 self.make_dyn(inputs)
336 }
337
338 pub fn make_dyn_state<S>(&self, sm: S) -> DynState
339 where
340 S: sm::IState + 'static,
341 {
342 DynState::from_typed(self.module_instance_id, sm)
343 }
344
345 pub async fn finalize_and_submit_transaction<F, Meta>(
346 &self,
347 operation_id: OperationId,
348 operation_type: &str,
349 operation_meta_gen: F,
350 tx_builder: TransactionBuilder,
351 ) -> anyhow::Result<OutPointRange>
352 where
353 F: Fn(OutPointRange) -> Meta + Clone + MaybeSend + MaybeSync + 'static,
354 Meta: serde::Serialize + MaybeSend,
355 {
356 self.client
357 .get()
358 .finalize_and_submit_transaction(
359 operation_id,
360 operation_type,
361 Box::new(move |out_point_range| {
362 serde_json::to_value(operation_meta_gen(out_point_range)).expect("Can't fail")
363 }),
364 tx_builder,
365 )
366 .await
367 }
368
369 pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
370 self.client.get().transaction_updates(operation_id).await
371 }
372
373 pub async fn await_primary_module_outputs(
374 &self,
375 operation_id: OperationId,
376 outputs: Vec<OutPoint>,
378 ) -> anyhow::Result<()> {
379 self.client
380 .get()
381 .await_primary_module_outputs(operation_id, outputs)
382 .await
383 }
384
385 pub async fn get_operation(
387 &self,
388 operation_id: OperationId,
389 ) -> anyhow::Result<oplog::OperationLogEntry> {
390 let operation = self
391 .client
392 .get()
393 .operation_log()
394 .get_operation(operation_id)
395 .await
396 .ok_or(anyhow::anyhow!("Operation not found"))?;
397
398 if operation.operation_module_kind() != M::kind().as_str() {
399 bail!("Operation is not a lightning operation");
400 }
401
402 Ok(operation)
403 }
404
405 fn global_db(&self) -> fedimint_core::db::Database {
409 let db = Clone::clone(self.client.get().db());
410
411 db.ensure_global()
412 .expect("global_db must always return a global db");
413
414 db
415 }
416
417 pub fn module_db(&self) -> &Database {
418 self.module_db
419 .ensure_isolated()
420 .expect("module_db must always return isolated db");
421 &self.module_db
422 }
423
424 pub async fn has_active_states(&self, op_id: OperationId) -> bool {
425 self.client.get().has_active_states(op_id).await
426 }
427
428 pub async fn operation_exists(&self, op_id: OperationId) -> bool {
429 self.client.get().operation_exists(op_id).await
430 }
431
432 pub async fn get_own_active_states(&self) -> Vec<(M::States, ActiveStateMeta)> {
433 self.client
434 .get()
435 .executor()
436 .get_active_states()
437 .await
438 .into_iter()
439 .filter(|s| s.0.module_instance_id() == self.module_instance_id)
440 .map(|s| {
441 (
442 Clone::clone(
443 s.0.as_any()
444 .downcast_ref::<M::States>()
445 .expect("incorrect output type passed to module plugin"),
446 ),
447 s.1,
448 )
449 })
450 .collect()
451 }
452
453 pub async fn get_config(&self) -> ClientConfig {
454 self.client.get().config().await
455 }
456
457 pub async fn get_invite_code(&self) -> InviteCode {
460 let cfg = self.get_config().await.global;
461 self.client
462 .get()
463 .invite_code(
464 *cfg.api_endpoints
465 .keys()
466 .next()
467 .expect("A federation always has at least one guardian"),
468 )
469 .await
470 .expect("The guardian we requested an invite code for exists")
471 }
472
473 pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
474 self.client.get().get_internal_payment_markers()
475 }
476
477 pub async fn manual_operation_start(
480 &self,
481 operation_id: OperationId,
482 op_type: &str,
483 operation_meta: impl serde::Serialize + Debug,
484 sms: Vec<DynState>,
485 ) -> anyhow::Result<()> {
486 let db = self.module_db();
487 let mut dbtx = db.begin_transaction().await;
488 {
489 let dbtx = &mut dbtx.global_dbtx(self.global_dbtx_access_token);
490
491 self.manual_operation_start_inner(
492 &mut dbtx.to_ref_nc(),
493 operation_id,
494 op_type,
495 operation_meta,
496 sms,
497 )
498 .await?;
499 }
500
501 dbtx.commit_tx_result().await.map_err(|_| {
502 anyhow!(
503 "Operation with id {} already exists",
504 operation_id.fmt_short()
505 )
506 })?;
507
508 Ok(())
509 }
510
511 pub async fn manual_operation_start_dbtx(
512 &self,
513 dbtx: &mut DatabaseTransaction<'_>,
514 operation_id: OperationId,
515 op_type: &str,
516 operation_meta: impl serde::Serialize + Debug,
517 sms: Vec<DynState>,
518 ) -> anyhow::Result<()> {
519 self.manual_operation_start_inner(
520 &mut dbtx.global_dbtx(self.global_dbtx_access_token),
521 operation_id,
522 op_type,
523 operation_meta,
524 sms,
525 )
526 .await
527 }
528
529 async fn manual_operation_start_inner(
532 &self,
533 dbtx: &mut DatabaseTransaction<'_>,
534 operation_id: OperationId,
535 op_type: &str,
536 operation_meta: impl serde::Serialize + Debug,
537 sms: Vec<DynState>,
538 ) -> anyhow::Result<()> {
539 dbtx.ensure_global()
540 .expect("Must deal with global dbtx here");
541
542 if self
543 .client
544 .get()
545 .operation_log()
546 .get_operation_dbtx(&mut dbtx.to_ref_nc(), operation_id)
547 .await
548 .is_some()
549 {
550 bail!(
551 "Operation with id {} already exists",
552 operation_id.fmt_short()
553 );
554 }
555
556 self.client
557 .get()
558 .operation_log()
559 .add_operation_log_entry_dbtx(
560 &mut dbtx.to_ref_nc(),
561 operation_id,
562 op_type,
563 serde_json::to_value(operation_meta).expect("Can't fail"),
564 )
565 .await;
566
567 self.client
568 .get()
569 .executor()
570 .add_state_machines_dbtx(&mut dbtx.to_ref_nc(), sms)
571 .await
572 .expect("State machine is valid");
573
574 Ok(())
575 }
576
577 pub fn outcome_or_updates<U, S>(
578 &self,
579 operation: OperationLogEntry,
580 operation_id: OperationId,
581 stream_gen: impl FnOnce() -> S + 'static,
582 ) -> UpdateStreamOrOutcome<U>
583 where
584 U: Clone + Serialize + DeserializeOwned + Debug + MaybeSend + MaybeSync + 'static,
585 S: Stream<Item = U> + MaybeSend + 'static,
586 {
587 use futures::StreamExt;
588 match self.client.get().operation_log().outcome_or_updates(
589 &self.global_db(),
590 operation_id,
591 operation,
592 Box::new(move || {
593 let stream_gen = stream_gen();
594 Box::pin(
595 stream_gen.map(move |item| serde_json::to_value(item).expect("Can't fail")),
596 )
597 }),
598 ) {
599 UpdateStreamOrOutcome::UpdateStream(stream) => UpdateStreamOrOutcome::UpdateStream(
600 Box::pin(stream.map(|u| serde_json::from_value(u).expect("Can't fail"))),
601 ),
602 UpdateStreamOrOutcome::Outcome(o) => {
603 UpdateStreamOrOutcome::Outcome(serde_json::from_value(o).expect("Can't fail"))
604 }
605 }
606 }
607
608 pub async fn claim_inputs<I, S>(
609 &self,
610 dbtx: &mut DatabaseTransaction<'_>,
611 inputs: ClientInputBundle<I, S>,
612 operation_id: OperationId,
613 ) -> anyhow::Result<OutPointRange>
614 where
615 I: IInput + MaybeSend + MaybeSync + 'static,
616 S: sm::IState + MaybeSend + MaybeSync + 'static,
617 {
618 self.claim_inputs_dyn(dbtx, inputs.into_instanceless(), operation_id)
619 .await
620 }
621
622 async fn claim_inputs_dyn(
623 &self,
624 dbtx: &mut DatabaseTransaction<'_>,
625 inputs: InstancelessDynClientInputBundle,
626 operation_id: OperationId,
627 ) -> anyhow::Result<OutPointRange> {
628 let tx_builder =
629 TransactionBuilder::new().with_inputs(inputs.into_dyn(self.module_instance_id));
630
631 self.client
632 .get()
633 .finalize_and_submit_transaction_inner(
634 &mut dbtx.global_dbtx(self.global_dbtx_access_token),
635 operation_id,
636 tx_builder,
637 )
638 .await
639 }
640
641 pub async fn add_state_machines_dbtx(
642 &self,
643 dbtx: &mut DatabaseTransaction<'_>,
644 states: Vec<DynState>,
645 ) -> AddStateMachinesResult {
646 self.client
647 .get()
648 .executor()
649 .add_state_machines_dbtx(&mut dbtx.global_dbtx(self.global_dbtx_access_token), states)
650 .await
651 }
652
653 pub async fn add_operation_log_entry_dbtx(
654 &self,
655 dbtx: &mut DatabaseTransaction<'_>,
656 operation_id: OperationId,
657 operation_type: &str,
658 operation_meta: impl serde::Serialize,
659 ) {
660 self.client
661 .get()
662 .operation_log()
663 .add_operation_log_entry_dbtx(
664 &mut dbtx.global_dbtx(self.global_dbtx_access_token),
665 operation_id,
666 operation_type,
667 serde_json::to_value(operation_meta).expect("Can't fail"),
668 )
669 .await;
670 }
671
672 pub async fn log_event<E, Cap>(&self, dbtx: &mut DatabaseTransaction<'_, Cap>, event: E)
673 where
674 E: Event + Send,
675 Cap: Send,
676 {
677 if <E as Event>::MODULE != Some(<M as ClientModule>::kind()) {
678 warn!(
679 target: LOG_CLIENT,
680 module_kind = %<M as ClientModule>::kind(),
681 event_module = ?<E as Event>::MODULE,
682 "Client module logging events of different module than its own. This might become an error in the future."
683 );
684 }
685 self.client
686 .get()
687 .log_event_json(
688 &mut dbtx.global_dbtx(self.global_dbtx_access_token).to_ref_nc(),
689 <E as Event>::MODULE,
690 self.module_instance_id,
691 <E as Event>::KIND,
692 serde_json::to_value(event).expect("Can't fail"),
693 <E as Event>::PERSIST,
694 )
695 .await;
696 }
697}
698
699#[apply(async_trait_maybe_send!)]
701pub trait ClientModule: Debug + MaybeSend + MaybeSync + 'static {
702 type Init: ClientModuleInit;
703
704 type Common: ModuleCommon;
706
707 type Backup: ModuleBackup;
710
711 type ModuleStateMachineContext: Context;
714
715 type States: State<ModuleContext = Self::ModuleStateMachineContext>
717 + IntoDynInstance<DynType = DynState>;
718
719 fn decoder() -> Decoder {
720 let mut decoder_builder = Self::Common::decoder_builder();
721 decoder_builder.with_decodable_type::<Self::States>();
722 decoder_builder.with_decodable_type::<Self::Backup>();
723 decoder_builder.build()
724 }
725
726 fn kind() -> ModuleKind {
727 <<<Self as ClientModule>::Init as ModuleInit>::Common as CommonModuleInit>::KIND
728 }
729
730 fn context(&self) -> Self::ModuleStateMachineContext;
731
732 async fn start(&self) {}
738
739 async fn handle_cli_command(
740 &self,
741 _args: &[ffi::OsString],
742 ) -> anyhow::Result<serde_json::Value> {
743 Err(anyhow::format_err!(
744 "This module does not implement cli commands"
745 ))
746 }
747
748 async fn handle_rpc(
749 &self,
750 _method: String,
751 _request: serde_json::Value,
752 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
753 Box::pin(futures::stream::once(std::future::ready(Err(
754 anyhow::format_err!("This module does not implement rpc"),
755 ))))
756 }
757
758 fn input_fee(
767 &self,
768 amount: Amount,
769 input: &<Self::Common as ModuleCommon>::Input,
770 ) -> Option<Amount>;
771
772 fn output_fee(
781 &self,
782 amount: Amount,
783 output: &<Self::Common as ModuleCommon>::Output,
784 ) -> Option<Amount>;
785
786 fn supports_backup(&self) -> bool {
787 false
788 }
789
790 async fn backup(&self) -> anyhow::Result<Self::Backup> {
791 anyhow::bail!("Backup not supported");
792 }
793
794 fn supports_being_primary(&self) -> bool {
803 false
804 }
805
806 async fn create_final_inputs_and_outputs(
824 &self,
825 _dbtx: &mut DatabaseTransaction<'_>,
826 _operation_id: OperationId,
827 _input_amount: Amount,
828 _output_amount: Amount,
829 ) -> anyhow::Result<(
830 ClientInputBundle<<Self::Common as ModuleCommon>::Input, Self::States>,
831 ClientOutputBundle<<Self::Common as ModuleCommon>::Output, Self::States>,
832 )> {
833 unimplemented!()
834 }
835
836 async fn await_primary_module_output(
841 &self,
842 _operation_id: OperationId,
843 _out_point: OutPoint,
844 ) -> anyhow::Result<()> {
845 unimplemented!()
846 }
847
848 async fn get_balance(&self, _dbtx: &mut DatabaseTransaction<'_>) -> Amount {
851 unimplemented!()
852 }
853
854 async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
857 unimplemented!()
858 }
859
860 async fn leave(&self, _dbtx: &mut DatabaseTransaction<'_>) -> anyhow::Result<()> {
916 bail!("Unable to determine if safe to leave the federation: Not implemented")
917 }
918}
919
920#[apply(async_trait_maybe_send!)]
922pub trait IClientModule: Debug {
923 fn as_any(&self) -> &(maybe_add_send_sync!(dyn std::any::Any));
924
925 fn decoder(&self) -> Decoder;
926
927 fn context(&self, instance: ModuleInstanceId) -> DynContext;
928
929 async fn start(&self);
930
931 async fn handle_cli_command(&self, args: &[ffi::OsString])
932 -> anyhow::Result<serde_json::Value>;
933
934 async fn handle_rpc(
935 &self,
936 method: String,
937 request: serde_json::Value,
938 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>>;
939
940 fn input_fee(&self, amount: Amount, input: &DynInput) -> Option<Amount>;
941
942 fn output_fee(&self, amount: Amount, output: &DynOutput) -> Option<Amount>;
943
944 fn supports_backup(&self) -> bool;
945
946 async fn backup(&self, module_instance_id: ModuleInstanceId)
947 -> anyhow::Result<DynModuleBackup>;
948
949 fn supports_being_primary(&self) -> bool;
950
951 async fn create_final_inputs_and_outputs(
952 &self,
953 module_instance: ModuleInstanceId,
954 dbtx: &mut DatabaseTransaction<'_>,
955 operation_id: OperationId,
956 input_amount: Amount,
957 output_amount: Amount,
958 ) -> anyhow::Result<(ClientInputBundle, ClientOutputBundle)>;
959
960 async fn await_primary_module_output(
961 &self,
962 operation_id: OperationId,
963 out_point: OutPoint,
964 ) -> anyhow::Result<()>;
965
966 async fn get_balance(
967 &self,
968 module_instance: ModuleInstanceId,
969 dbtx: &mut DatabaseTransaction<'_>,
970 ) -> Amount;
971
972 async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()>;
973}
974
975#[apply(async_trait_maybe_send!)]
976impl<T> IClientModule for T
977where
978 T: ClientModule,
979{
980 fn as_any(&self) -> &(maybe_add_send_sync!(dyn Any)) {
981 self
982 }
983
984 fn decoder(&self) -> Decoder {
985 T::decoder()
986 }
987
988 fn context(&self, instance: ModuleInstanceId) -> DynContext {
989 DynContext::from_typed(instance, <T as ClientModule>::context(self))
990 }
991
992 async fn start(&self) {
993 <T as ClientModule>::start(self).await;
994 }
995
996 async fn handle_cli_command(
997 &self,
998 args: &[ffi::OsString],
999 ) -> anyhow::Result<serde_json::Value> {
1000 <T as ClientModule>::handle_cli_command(self, args).await
1001 }
1002
1003 async fn handle_rpc(
1004 &self,
1005 method: String,
1006 request: serde_json::Value,
1007 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1008 <T as ClientModule>::handle_rpc(self, method, request).await
1009 }
1010
1011 fn input_fee(&self, amount: Amount, input: &DynInput) -> Option<Amount> {
1012 <T as ClientModule>::input_fee(
1013 self,
1014 amount,
1015 input
1016 .as_any()
1017 .downcast_ref()
1018 .expect("Dispatched to correct module"),
1019 )
1020 }
1021
1022 fn output_fee(&self, amount: Amount, output: &DynOutput) -> Option<Amount> {
1023 <T as ClientModule>::output_fee(
1024 self,
1025 amount,
1026 output
1027 .as_any()
1028 .downcast_ref()
1029 .expect("Dispatched to correct module"),
1030 )
1031 }
1032
1033 fn supports_backup(&self) -> bool {
1034 <T as ClientModule>::supports_backup(self)
1035 }
1036
1037 async fn backup(
1038 &self,
1039 module_instance_id: ModuleInstanceId,
1040 ) -> anyhow::Result<DynModuleBackup> {
1041 Ok(DynModuleBackup::from_typed(
1042 module_instance_id,
1043 <T as ClientModule>::backup(self).await?,
1044 ))
1045 }
1046
1047 fn supports_being_primary(&self) -> bool {
1048 <T as ClientModule>::supports_being_primary(self)
1049 }
1050
1051 async fn create_final_inputs_and_outputs(
1052 &self,
1053 module_instance: ModuleInstanceId,
1054 dbtx: &mut DatabaseTransaction<'_>,
1055 operation_id: OperationId,
1056 input_amount: Amount,
1057 output_amount: Amount,
1058 ) -> anyhow::Result<(ClientInputBundle, ClientOutputBundle)> {
1059 let (inputs, outputs) = <T as ClientModule>::create_final_inputs_and_outputs(
1060 self,
1061 &mut dbtx.to_ref_with_prefix_module_id(module_instance).0,
1062 operation_id,
1063 input_amount,
1064 output_amount,
1065 )
1066 .await?;
1067
1068 let inputs = inputs.into_dyn(module_instance);
1069
1070 let outputs = outputs.into_dyn(module_instance);
1071
1072 Ok((inputs, outputs))
1073 }
1074
1075 async fn await_primary_module_output(
1076 &self,
1077 operation_id: OperationId,
1078 out_point: OutPoint,
1079 ) -> anyhow::Result<()> {
1080 <T as ClientModule>::await_primary_module_output(self, operation_id, out_point).await
1081 }
1082
1083 async fn get_balance(
1084 &self,
1085 module_instance: ModuleInstanceId,
1086 dbtx: &mut DatabaseTransaction<'_>,
1087 ) -> Amount {
1088 <T as ClientModule>::get_balance(
1089 self,
1090 &mut dbtx.to_ref_with_prefix_module_id(module_instance).0,
1091 )
1092 .await
1093 }
1094
1095 async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
1096 <T as ClientModule>::subscribe_balance_changes(self).await
1097 }
1098}
1099
1100dyn_newtype_define!(
1101 #[derive(Clone)]
1102 pub DynClientModule(Arc<IClientModule>)
1103);
1104
1105impl AsRef<maybe_add_send_sync!(dyn IClientModule + 'static)> for DynClientModule {
1106 fn as_ref(&self) -> &maybe_add_send_sync!(dyn IClientModule + 'static) {
1107 self.inner.as_ref()
1108 }
1109}
1110
1111#[derive(Copy, Clone, Encodable, Decodable, Serialize, Deserialize, PartialEq, Eq, Hash, Debug)]
1113pub struct IdxRange {
1114 start: u64,
1115 end: u64,
1116}
1117
1118impl IdxRange {
1119 pub fn new_single(start: u64) -> Option<Self> {
1120 start.checked_add(1).map(|end| Self { start, end })
1121 }
1122
1123 pub fn start(self) -> u64 {
1124 self.start
1125 }
1126
1127 pub fn count(self) -> usize {
1128 self.into_iter().count()
1129 }
1130
1131 pub fn from_inclusive(range: ops::RangeInclusive<u64>) -> Option<Self> {
1132 range.end().checked_add(1).map(|end| Self {
1133 start: *range.start(),
1134 end,
1135 })
1136 }
1137}
1138
1139impl From<Range<u64>> for IdxRange {
1140 fn from(Range { start, end }: Range<u64>) -> Self {
1141 Self { start, end }
1142 }
1143}
1144
1145impl IntoIterator for IdxRange {
1146 type Item = u64;
1147
1148 type IntoIter = ops::Range<u64>;
1149
1150 fn into_iter(self) -> Self::IntoIter {
1151 ops::Range {
1152 start: self.start,
1153 end: self.end,
1154 }
1155 }
1156}
1157
1158#[derive(Copy, Clone, Encodable, Decodable, Serialize, Deserialize, PartialEq, Eq, Hash, Debug)]
1159pub struct OutPointRange {
1160 pub txid: TransactionId,
1161 idx_range: IdxRange,
1162}
1163
1164impl OutPointRange {
1165 pub fn new(txid: TransactionId, idx_range: IdxRange) -> Self {
1166 Self { txid, idx_range }
1167 }
1168
1169 pub fn new_single(txid: TransactionId, idx: u64) -> Option<Self> {
1170 IdxRange::new_single(idx).map(|idx_range| Self { txid, idx_range })
1171 }
1172
1173 pub fn start_idx(self) -> u64 {
1174 self.idx_range.start()
1175 }
1176
1177 pub fn out_idx_iter(self) -> impl Iterator<Item = u64> {
1178 self.idx_range.into_iter()
1179 }
1180
1181 pub fn count(self) -> usize {
1182 self.idx_range.count()
1183 }
1184}
1185
1186impl IntoIterator for OutPointRange {
1187 type Item = OutPoint;
1188
1189 type IntoIter = OutPointRangeIter;
1190
1191 fn into_iter(self) -> Self::IntoIter {
1192 OutPointRangeIter {
1193 txid: self.txid,
1194 inner: self.idx_range.into_iter(),
1195 }
1196 }
1197}
1198
1199pub struct OutPointRangeIter {
1200 txid: TransactionId,
1201
1202 inner: ops::Range<u64>,
1203}
1204
1205impl OutPointRange {
1206 pub fn txid(&self) -> TransactionId {
1207 self.txid
1208 }
1209}
1210
1211impl Iterator for OutPointRangeIter {
1212 type Item = OutPoint;
1213
1214 fn next(&mut self) -> Option<Self::Item> {
1215 self.inner.next().map(|idx| OutPoint {
1216 txid: self.txid,
1217 out_idx: idx,
1218 })
1219 }
1220}
1221
1222pub type StateGenerator<S> = Arc<maybe_add_send_sync!(dyn Fn(OutPointRange) -> Vec<S> + 'static)>;