1use core::fmt;
2use std::any::Any;
3use std::fmt::Debug;
4use std::ops::Range;
5use std::pin::Pin;
6use std::sync::{Arc, Weak};
7use std::{ffi, marker, ops};
8
9use anyhow::{anyhow, bail};
10use bitcoin::secp256k1::PublicKey;
11use fedimint_api_client::api::{DynGlobalApi, DynModuleApi};
12use fedimint_core::config::ClientConfig;
13use fedimint_core::core::{
14 Decoder, DynInput, DynOutput, IInput, IntoDynInstance, ModuleInstanceId, ModuleKind,
15 OperationId,
16};
17use fedimint_core::db::{Database, DatabaseTransaction, GlobalDBTxAccessToken, NonCommittable};
18use fedimint_core::encoding::{Decodable, Encodable};
19use fedimint_core::invite_code::InviteCode;
20use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
21use fedimint_core::module::{CommonModuleInit, ModuleCommon, ModuleInit};
22use fedimint_core::task::{MaybeSend, MaybeSync};
23use fedimint_core::util::BoxStream;
24use fedimint_core::{
25 Amount, OutPoint, PeerId, TransactionId, apply, async_trait_maybe_send, dyn_newtype_define,
26 maybe_add_send, maybe_add_send_sync,
27};
28use fedimint_eventlog::{Event, EventKind, EventPersistence};
29use fedimint_logging::LOG_CLIENT;
30use futures::Stream;
31use serde::de::DeserializeOwned;
32use serde::{Deserialize, Serialize};
33use tracing::warn;
34
35use self::init::ClientModuleInit;
36use crate::module::recovery::{DynModuleBackup, ModuleBackup};
37use crate::oplog::{IOperationLog, OperationLogEntry, UpdateStreamOrOutcome};
38use crate::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
39use crate::sm::{self, ActiveStateMeta, Context, DynContext, DynState, InactiveStateMeta, State};
40use crate::transaction::{ClientInputBundle, ClientOutputBundle, TransactionBuilder};
41use crate::{AddStateMachinesResult, InstancelessDynClientInputBundle, TransactionUpdates, oplog};
42
43pub mod init;
44pub mod recovery;
45
46pub type ClientModuleRegistry = ModuleRegistry<DynClientModule>;
47
48#[apply(async_trait_maybe_send!)]
57pub trait ClientContextIface: MaybeSend + MaybeSync {
58 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule);
59 fn api_clone(&self) -> DynGlobalApi;
60 fn decoders(&self) -> &ModuleDecoderRegistry;
61 async fn finalize_and_submit_transaction(
62 &self,
63 operation_id: OperationId,
64 operation_type: &str,
65 operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
66 tx_builder: TransactionBuilder,
67 ) -> anyhow::Result<OutPointRange>;
68
69 async fn finalize_and_submit_transaction_inner(
71 &self,
72 dbtx: &mut DatabaseTransaction<'_>,
73 operation_id: OperationId,
74 tx_builder: TransactionBuilder,
75 ) -> anyhow::Result<OutPointRange>;
76
77 async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates;
78
79 async fn await_primary_module_outputs(
80 &self,
81 operation_id: OperationId,
82 outputs: Vec<OutPoint>,
84 ) -> anyhow::Result<()>;
85
86 fn operation_log(&self) -> &dyn IOperationLog;
87
88 async fn has_active_states(&self, operation_id: OperationId) -> bool;
89
90 async fn operation_exists(&self, operation_id: OperationId) -> bool;
91
92 async fn config(&self) -> ClientConfig;
93
94 fn db(&self) -> &Database;
95
96 fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static));
97
98 async fn invite_code(&self, peer: PeerId) -> Option<InviteCode>;
99
100 fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)>;
101
102 #[allow(clippy::too_many_arguments)]
103 async fn log_event_json(
104 &self,
105 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
106 module_kind: Option<ModuleKind>,
107 module_id: ModuleInstanceId,
108 kind: EventKind,
109 payload: serde_json::Value,
110 persist: EventPersistence,
111 );
112
113 async fn read_operation_active_states<'dbtx>(
114 &self,
115 operation_id: OperationId,
116 module_id: ModuleInstanceId,
117 dbtx: &'dbtx mut DatabaseTransaction<'_>,
118 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>;
119
120 async fn read_operation_inactive_states<'dbtx>(
121 &self,
122 operation_id: OperationId,
123 module_id: ModuleInstanceId,
124 dbtx: &'dbtx mut DatabaseTransaction<'_>,
125 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>;
126}
127
128#[derive(Clone, Default)]
134pub struct FinalClientIface(Arc<std::sync::OnceLock<Weak<dyn ClientContextIface>>>);
135
136impl FinalClientIface {
137 pub(crate) fn get(&self) -> Arc<dyn ClientContextIface> {
143 self.0
144 .get()
145 .expect("client must be already set")
146 .upgrade()
147 .expect("client module context must not be use past client shutdown")
148 }
149
150 pub fn set(&self, client: Weak<dyn ClientContextIface>) {
151 self.0.set(client).expect("FinalLazyClient already set");
152 }
153}
154
155impl fmt::Debug for FinalClientIface {
156 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157 f.write_str("FinalClientIface")
158 }
159}
160pub struct ClientContext<M> {
165 client: FinalClientIface,
166 module_instance_id: ModuleInstanceId,
167 global_dbtx_access_token: GlobalDBTxAccessToken,
168 module_db: Database,
169 _marker: marker::PhantomData<M>,
170}
171
172impl<M> Clone for ClientContext<M> {
173 fn clone(&self) -> Self {
174 Self {
175 client: self.client.clone(),
176 module_db: self.module_db.clone(),
177 module_instance_id: self.module_instance_id,
178 _marker: marker::PhantomData,
179 global_dbtx_access_token: self.global_dbtx_access_token,
180 }
181 }
182}
183
184pub struct ClientContextSelfRef<'s, M> {
187 client: Arc<dyn ClientContextIface>,
190 module_instance_id: ModuleInstanceId,
191 _marker: marker::PhantomData<&'s M>,
192}
193
194impl<M> ops::Deref for ClientContextSelfRef<'_, M>
195where
196 M: ClientModule,
197{
198 type Target = M;
199
200 fn deref(&self) -> &Self::Target {
201 self.client
202 .get_module(self.module_instance_id)
203 .as_any()
204 .downcast_ref::<M>()
205 .unwrap_or_else(|| panic!("Module is not of type {}", std::any::type_name::<M>()))
206 }
207}
208
209impl<M> fmt::Debug for ClientContext<M> {
210 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
211 f.write_str("ClientContext")
212 }
213}
214
215impl<M> ClientContext<M>
216where
217 M: ClientModule,
218{
219 pub fn new(
220 client: FinalClientIface,
221 module_instance_id: ModuleInstanceId,
222 global_dbtx_access_token: GlobalDBTxAccessToken,
223 module_db: Database,
224 ) -> Self {
225 Self {
226 client,
227 module_instance_id,
228 global_dbtx_access_token,
229 module_db,
230 _marker: marker::PhantomData,
231 }
232 }
233
234 #[allow(clippy::needless_lifetimes)] pub fn self_ref(&self) -> ClientContextSelfRef<'_, M> {
246 ClientContextSelfRef {
247 client: self.client.get(),
248 module_instance_id: self.module_instance_id,
249 _marker: marker::PhantomData,
250 }
251 }
252
253 pub fn global_api(&self) -> DynGlobalApi {
255 self.client.get().api_clone()
256 }
257
258 pub fn module_api(&self) -> DynModuleApi {
260 self.global_api().with_module(self.module_instance_id)
261 }
262
263 pub fn decoders(&self) -> ModuleDecoderRegistry {
265 Clone::clone(self.client.get().decoders())
266 }
267
268 pub fn input_from_dyn<'i>(
269 &self,
270 input: &'i DynInput,
271 ) -> Option<&'i <M::Common as ModuleCommon>::Input> {
272 (input.module_instance_id() == self.module_instance_id).then(|| {
273 input
274 .as_any()
275 .downcast_ref::<<M::Common as ModuleCommon>::Input>()
276 .unwrap_or_else(|| {
277 panic!("instance_id {} just checked", input.module_instance_id())
278 })
279 })
280 }
281
282 pub fn output_from_dyn<'o>(
283 &self,
284 output: &'o DynOutput,
285 ) -> Option<&'o <M::Common as ModuleCommon>::Output> {
286 (output.module_instance_id() == self.module_instance_id).then(|| {
287 output
288 .as_any()
289 .downcast_ref::<<M::Common as ModuleCommon>::Output>()
290 .unwrap_or_else(|| {
291 panic!("instance_id {} just checked", output.module_instance_id())
292 })
293 })
294 }
295
296 pub fn map_dyn<'s, 'i, 'o, I>(
297 &'s self,
298 typed: impl IntoIterator<Item = I> + 'i,
299 ) -> impl Iterator<Item = <I as IntoDynInstance>::DynType> + 'o
300 where
301 I: IntoDynInstance,
302 'i: 'o,
303 's: 'o,
304 {
305 typed.into_iter().map(|i| self.make_dyn(i))
306 }
307
308 pub fn make_dyn_output(&self, output: <M::Common as ModuleCommon>::Output) -> DynOutput {
310 self.make_dyn(output)
311 }
312
313 pub fn make_dyn_input(&self, input: <M::Common as ModuleCommon>::Input) -> DynInput {
315 self.make_dyn(input)
316 }
317
318 pub fn make_dyn<I>(&self, typed: I) -> <I as IntoDynInstance>::DynType
320 where
321 I: IntoDynInstance,
322 {
323 typed.into_dyn(self.module_instance_id)
324 }
325
326 pub fn make_client_outputs<O, S>(&self, output: ClientOutputBundle<O, S>) -> ClientOutputBundle
328 where
329 O: IntoDynInstance<DynType = DynOutput> + 'static,
330 S: IntoDynInstance<DynType = DynState> + 'static,
331 {
332 self.make_dyn(output)
333 }
334
335 pub fn make_client_inputs<I, S>(&self, inputs: ClientInputBundle<I, S>) -> ClientInputBundle
337 where
338 I: IntoDynInstance<DynType = DynInput> + 'static,
339 S: IntoDynInstance<DynType = DynState> + 'static,
340 {
341 self.make_dyn(inputs)
342 }
343
344 pub fn make_dyn_state<S>(&self, sm: S) -> DynState
345 where
346 S: sm::IState + 'static,
347 {
348 DynState::from_typed(self.module_instance_id, sm)
349 }
350
351 pub async fn finalize_and_submit_transaction<F, Meta>(
352 &self,
353 operation_id: OperationId,
354 operation_type: &str,
355 operation_meta_gen: F,
356 tx_builder: TransactionBuilder,
357 ) -> anyhow::Result<OutPointRange>
358 where
359 F: Fn(OutPointRange) -> Meta + Clone + MaybeSend + MaybeSync + 'static,
360 Meta: serde::Serialize + MaybeSend,
361 {
362 self.client
363 .get()
364 .finalize_and_submit_transaction(
365 operation_id,
366 operation_type,
367 Box::new(move |out_point_range| {
368 serde_json::to_value(operation_meta_gen(out_point_range)).expect("Can't fail")
369 }),
370 tx_builder,
371 )
372 .await
373 }
374
375 pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
376 self.client.get().transaction_updates(operation_id).await
377 }
378
379 pub async fn await_primary_module_outputs(
380 &self,
381 operation_id: OperationId,
382 outputs: Vec<OutPoint>,
384 ) -> anyhow::Result<()> {
385 self.client
386 .get()
387 .await_primary_module_outputs(operation_id, outputs)
388 .await
389 }
390
391 pub async fn get_operation(
393 &self,
394 operation_id: OperationId,
395 ) -> anyhow::Result<oplog::OperationLogEntry> {
396 let operation = self
397 .client
398 .get()
399 .operation_log()
400 .get_operation(operation_id)
401 .await
402 .ok_or(anyhow::anyhow!("Operation not found"))?;
403
404 if operation.operation_module_kind() != M::kind().as_str() {
405 bail!("Operation is not a lightning operation");
406 }
407
408 Ok(operation)
409 }
410
411 fn global_db(&self) -> fedimint_core::db::Database {
415 let db = Clone::clone(self.client.get().db());
416
417 db.ensure_global()
418 .expect("global_db must always return a global db");
419
420 db
421 }
422
423 pub fn module_db(&self) -> &Database {
424 self.module_db
425 .ensure_isolated()
426 .expect("module_db must always return isolated db");
427 &self.module_db
428 }
429
430 pub async fn has_active_states(&self, op_id: OperationId) -> bool {
431 self.client.get().has_active_states(op_id).await
432 }
433
434 pub async fn operation_exists(&self, op_id: OperationId) -> bool {
435 self.client.get().operation_exists(op_id).await
436 }
437
438 pub async fn get_own_active_states(&self) -> Vec<(M::States, ActiveStateMeta)> {
439 self.client
440 .get()
441 .executor()
442 .get_active_states()
443 .await
444 .into_iter()
445 .filter(|s| s.0.module_instance_id() == self.module_instance_id)
446 .map(|s| {
447 (
448 Clone::clone(
449 s.0.as_any()
450 .downcast_ref::<M::States>()
451 .expect("incorrect output type passed to module plugin"),
452 ),
453 s.1,
454 )
455 })
456 .collect()
457 }
458
459 pub async fn get_config(&self) -> ClientConfig {
460 self.client.get().config().await
461 }
462
463 pub async fn get_invite_code(&self) -> InviteCode {
466 let cfg = self.get_config().await.global;
467 self.client
468 .get()
469 .invite_code(
470 *cfg.api_endpoints
471 .keys()
472 .next()
473 .expect("A federation always has at least one guardian"),
474 )
475 .await
476 .expect("The guardian we requested an invite code for exists")
477 }
478
479 pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
480 self.client.get().get_internal_payment_markers()
481 }
482
483 pub async fn manual_operation_start(
486 &self,
487 operation_id: OperationId,
488 op_type: &str,
489 operation_meta: impl serde::Serialize + Debug,
490 sms: Vec<DynState>,
491 ) -> anyhow::Result<()> {
492 let db = self.module_db();
493 let mut dbtx = db.begin_transaction().await;
494 {
495 let dbtx = &mut dbtx.global_dbtx(self.global_dbtx_access_token);
496
497 self.manual_operation_start_inner(
498 &mut dbtx.to_ref_nc(),
499 operation_id,
500 op_type,
501 operation_meta,
502 sms,
503 )
504 .await?;
505 }
506
507 dbtx.commit_tx_result().await.map_err(|_| {
508 anyhow!(
509 "Operation with id {} already exists",
510 operation_id.fmt_short()
511 )
512 })?;
513
514 Ok(())
515 }
516
517 pub async fn manual_operation_start_dbtx(
518 &self,
519 dbtx: &mut DatabaseTransaction<'_>,
520 operation_id: OperationId,
521 op_type: &str,
522 operation_meta: impl serde::Serialize + Debug,
523 sms: Vec<DynState>,
524 ) -> anyhow::Result<()> {
525 self.manual_operation_start_inner(
526 &mut dbtx.global_dbtx(self.global_dbtx_access_token),
527 operation_id,
528 op_type,
529 operation_meta,
530 sms,
531 )
532 .await
533 }
534
535 async fn manual_operation_start_inner(
538 &self,
539 dbtx: &mut DatabaseTransaction<'_>,
540 operation_id: OperationId,
541 op_type: &str,
542 operation_meta: impl serde::Serialize + Debug,
543 sms: Vec<DynState>,
544 ) -> anyhow::Result<()> {
545 dbtx.ensure_global()
546 .expect("Must deal with global dbtx here");
547
548 if self
549 .client
550 .get()
551 .operation_log()
552 .get_operation_dbtx(&mut dbtx.to_ref_nc(), operation_id)
553 .await
554 .is_some()
555 {
556 bail!(
557 "Operation with id {} already exists",
558 operation_id.fmt_short()
559 );
560 }
561
562 self.client
563 .get()
564 .operation_log()
565 .add_operation_log_entry_dbtx(
566 &mut dbtx.to_ref_nc(),
567 operation_id,
568 op_type,
569 serde_json::to_value(operation_meta).expect("Can't fail"),
570 )
571 .await;
572
573 self.client
574 .get()
575 .executor()
576 .add_state_machines_dbtx(&mut dbtx.to_ref_nc(), sms)
577 .await
578 .expect("State machine is valid");
579
580 Ok(())
581 }
582
583 pub fn outcome_or_updates<U, S>(
584 &self,
585 operation: OperationLogEntry,
586 operation_id: OperationId,
587 stream_gen: impl FnOnce() -> S + 'static,
588 ) -> UpdateStreamOrOutcome<U>
589 where
590 U: Clone + Serialize + DeserializeOwned + Debug + MaybeSend + MaybeSync + 'static,
591 S: Stream<Item = U> + MaybeSend + 'static,
592 {
593 use futures::StreamExt;
594 match self.client.get().operation_log().outcome_or_updates(
595 &self.global_db(),
596 operation_id,
597 operation,
598 Box::new(move || {
599 let stream_gen = stream_gen();
600 Box::pin(
601 stream_gen.map(move |item| serde_json::to_value(item).expect("Can't fail")),
602 )
603 }),
604 ) {
605 UpdateStreamOrOutcome::UpdateStream(stream) => UpdateStreamOrOutcome::UpdateStream(
606 Box::pin(stream.map(|u| serde_json::from_value(u).expect("Can't fail"))),
607 ),
608 UpdateStreamOrOutcome::Outcome(o) => {
609 UpdateStreamOrOutcome::Outcome(serde_json::from_value(o).expect("Can't fail"))
610 }
611 }
612 }
613
614 pub async fn claim_inputs<I, S>(
615 &self,
616 dbtx: &mut DatabaseTransaction<'_>,
617 inputs: ClientInputBundle<I, S>,
618 operation_id: OperationId,
619 ) -> anyhow::Result<OutPointRange>
620 where
621 I: IInput + MaybeSend + MaybeSync + 'static,
622 S: sm::IState + MaybeSend + MaybeSync + 'static,
623 {
624 self.claim_inputs_dyn(dbtx, inputs.into_instanceless(), operation_id)
625 .await
626 }
627
628 async fn claim_inputs_dyn(
629 &self,
630 dbtx: &mut DatabaseTransaction<'_>,
631 inputs: InstancelessDynClientInputBundle,
632 operation_id: OperationId,
633 ) -> anyhow::Result<OutPointRange> {
634 let tx_builder =
635 TransactionBuilder::new().with_inputs(inputs.into_dyn(self.module_instance_id));
636
637 self.client
638 .get()
639 .finalize_and_submit_transaction_inner(
640 &mut dbtx.global_dbtx(self.global_dbtx_access_token),
641 operation_id,
642 tx_builder,
643 )
644 .await
645 }
646
647 pub async fn add_state_machines_dbtx(
648 &self,
649 dbtx: &mut DatabaseTransaction<'_>,
650 states: Vec<DynState>,
651 ) -> AddStateMachinesResult {
652 self.client
653 .get()
654 .executor()
655 .add_state_machines_dbtx(&mut dbtx.global_dbtx(self.global_dbtx_access_token), states)
656 .await
657 }
658
659 pub async fn add_operation_log_entry_dbtx(
660 &self,
661 dbtx: &mut DatabaseTransaction<'_>,
662 operation_id: OperationId,
663 operation_type: &str,
664 operation_meta: impl serde::Serialize,
665 ) {
666 self.client
667 .get()
668 .operation_log()
669 .add_operation_log_entry_dbtx(
670 &mut dbtx.global_dbtx(self.global_dbtx_access_token),
671 operation_id,
672 operation_type,
673 serde_json::to_value(operation_meta).expect("Can't fail"),
674 )
675 .await;
676 }
677
678 pub async fn log_event<E, Cap>(&self, dbtx: &mut DatabaseTransaction<'_, Cap>, event: E)
679 where
680 E: Event + Send,
681 Cap: Send,
682 {
683 if <E as Event>::MODULE != Some(<M as ClientModule>::kind()) {
684 warn!(
685 target: LOG_CLIENT,
686 module_kind = %<M as ClientModule>::kind(),
687 event_module = ?<E as Event>::MODULE,
688 "Client module logging events of different module than its own. This might become an error in the future."
689 );
690 }
691 self.client
692 .get()
693 .log_event_json(
694 &mut dbtx.global_dbtx(self.global_dbtx_access_token).to_ref_nc(),
695 <E as Event>::MODULE,
696 self.module_instance_id,
697 <E as Event>::KIND,
698 serde_json::to_value(event).expect("Can't fail"),
699 <E as Event>::PERSISTENCE,
700 )
701 .await;
702 }
703}
704
705#[apply(async_trait_maybe_send!)]
707pub trait ClientModule: Debug + MaybeSend + MaybeSync + 'static {
708 type Init: ClientModuleInit;
709
710 type Common: ModuleCommon;
712
713 type Backup: ModuleBackup;
716
717 type ModuleStateMachineContext: Context;
720
721 type States: State<ModuleContext = Self::ModuleStateMachineContext>
723 + IntoDynInstance<DynType = DynState>;
724
725 fn decoder() -> Decoder {
726 let mut decoder_builder = Self::Common::decoder_builder();
727 decoder_builder.with_decodable_type::<Self::States>();
728 decoder_builder.with_decodable_type::<Self::Backup>();
729 decoder_builder.build()
730 }
731
732 fn kind() -> ModuleKind {
733 <<<Self as ClientModule>::Init as ModuleInit>::Common as CommonModuleInit>::KIND
734 }
735
736 fn context(&self) -> Self::ModuleStateMachineContext;
737
738 async fn start(&self) {}
744
745 async fn handle_cli_command(
746 &self,
747 _args: &[ffi::OsString],
748 ) -> anyhow::Result<serde_json::Value> {
749 Err(anyhow::format_err!(
750 "This module does not implement cli commands"
751 ))
752 }
753
754 async fn handle_rpc(
755 &self,
756 _method: String,
757 _request: serde_json::Value,
758 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
759 Box::pin(futures::stream::once(std::future::ready(Err(
760 anyhow::format_err!("This module does not implement rpc"),
761 ))))
762 }
763
764 fn input_fee(
773 &self,
774 amount: Amount,
775 input: &<Self::Common as ModuleCommon>::Input,
776 ) -> Option<Amount>;
777
778 fn output_fee(
787 &self,
788 amount: Amount,
789 output: &<Self::Common as ModuleCommon>::Output,
790 ) -> Option<Amount>;
791
792 fn supports_backup(&self) -> bool {
793 false
794 }
795
796 async fn backup(&self) -> anyhow::Result<Self::Backup> {
797 anyhow::bail!("Backup not supported");
798 }
799
800 fn supports_being_primary(&self) -> bool {
809 false
810 }
811
812 async fn create_final_inputs_and_outputs(
830 &self,
831 _dbtx: &mut DatabaseTransaction<'_>,
832 _operation_id: OperationId,
833 _input_amount: Amount,
834 _output_amount: Amount,
835 ) -> anyhow::Result<(
836 ClientInputBundle<<Self::Common as ModuleCommon>::Input, Self::States>,
837 ClientOutputBundle<<Self::Common as ModuleCommon>::Output, Self::States>,
838 )> {
839 unimplemented!()
840 }
841
842 async fn await_primary_module_output(
847 &self,
848 _operation_id: OperationId,
849 _out_point: OutPoint,
850 ) -> anyhow::Result<()> {
851 unimplemented!()
852 }
853
854 async fn get_balance(&self, _dbtx: &mut DatabaseTransaction<'_>) -> Amount {
857 unimplemented!()
858 }
859
860 async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
863 unimplemented!()
864 }
865
866 async fn leave(&self, _dbtx: &mut DatabaseTransaction<'_>) -> anyhow::Result<()> {
922 bail!("Unable to determine if safe to leave the federation: Not implemented")
923 }
924}
925
926#[apply(async_trait_maybe_send!)]
928pub trait IClientModule: Debug {
929 fn as_any(&self) -> &(maybe_add_send_sync!(dyn std::any::Any));
930
931 fn decoder(&self) -> Decoder;
932
933 fn context(&self, instance: ModuleInstanceId) -> DynContext;
934
935 async fn start(&self);
936
937 async fn handle_cli_command(&self, args: &[ffi::OsString])
938 -> anyhow::Result<serde_json::Value>;
939
940 async fn handle_rpc(
941 &self,
942 method: String,
943 request: serde_json::Value,
944 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>>;
945
946 fn input_fee(&self, amount: Amount, input: &DynInput) -> Option<Amount>;
947
948 fn output_fee(&self, amount: Amount, output: &DynOutput) -> Option<Amount>;
949
950 fn supports_backup(&self) -> bool;
951
952 async fn backup(&self, module_instance_id: ModuleInstanceId)
953 -> anyhow::Result<DynModuleBackup>;
954
955 fn supports_being_primary(&self) -> bool;
956
957 async fn create_final_inputs_and_outputs(
958 &self,
959 module_instance: ModuleInstanceId,
960 dbtx: &mut DatabaseTransaction<'_>,
961 operation_id: OperationId,
962 input_amount: Amount,
963 output_amount: Amount,
964 ) -> anyhow::Result<(ClientInputBundle, ClientOutputBundle)>;
965
966 async fn await_primary_module_output(
967 &self,
968 operation_id: OperationId,
969 out_point: OutPoint,
970 ) -> anyhow::Result<()>;
971
972 async fn get_balance(
973 &self,
974 module_instance: ModuleInstanceId,
975 dbtx: &mut DatabaseTransaction<'_>,
976 ) -> Amount;
977
978 async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()>;
979}
980
981#[apply(async_trait_maybe_send!)]
982impl<T> IClientModule for T
983where
984 T: ClientModule,
985{
986 fn as_any(&self) -> &(maybe_add_send_sync!(dyn Any)) {
987 self
988 }
989
990 fn decoder(&self) -> Decoder {
991 T::decoder()
992 }
993
994 fn context(&self, instance: ModuleInstanceId) -> DynContext {
995 DynContext::from_typed(instance, <T as ClientModule>::context(self))
996 }
997
998 async fn start(&self) {
999 <T as ClientModule>::start(self).await;
1000 }
1001
1002 async fn handle_cli_command(
1003 &self,
1004 args: &[ffi::OsString],
1005 ) -> anyhow::Result<serde_json::Value> {
1006 <T as ClientModule>::handle_cli_command(self, args).await
1007 }
1008
1009 async fn handle_rpc(
1010 &self,
1011 method: String,
1012 request: serde_json::Value,
1013 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1014 <T as ClientModule>::handle_rpc(self, method, request).await
1015 }
1016
1017 fn input_fee(&self, amount: Amount, input: &DynInput) -> Option<Amount> {
1018 <T as ClientModule>::input_fee(
1019 self,
1020 amount,
1021 input
1022 .as_any()
1023 .downcast_ref()
1024 .expect("Dispatched to correct module"),
1025 )
1026 }
1027
1028 fn output_fee(&self, amount: Amount, output: &DynOutput) -> Option<Amount> {
1029 <T as ClientModule>::output_fee(
1030 self,
1031 amount,
1032 output
1033 .as_any()
1034 .downcast_ref()
1035 .expect("Dispatched to correct module"),
1036 )
1037 }
1038
1039 fn supports_backup(&self) -> bool {
1040 <T as ClientModule>::supports_backup(self)
1041 }
1042
1043 async fn backup(
1044 &self,
1045 module_instance_id: ModuleInstanceId,
1046 ) -> anyhow::Result<DynModuleBackup> {
1047 Ok(DynModuleBackup::from_typed(
1048 module_instance_id,
1049 <T as ClientModule>::backup(self).await?,
1050 ))
1051 }
1052
1053 fn supports_being_primary(&self) -> bool {
1054 <T as ClientModule>::supports_being_primary(self)
1055 }
1056
1057 async fn create_final_inputs_and_outputs(
1058 &self,
1059 module_instance: ModuleInstanceId,
1060 dbtx: &mut DatabaseTransaction<'_>,
1061 operation_id: OperationId,
1062 input_amount: Amount,
1063 output_amount: Amount,
1064 ) -> anyhow::Result<(ClientInputBundle, ClientOutputBundle)> {
1065 let (inputs, outputs) = <T as ClientModule>::create_final_inputs_and_outputs(
1066 self,
1067 &mut dbtx.to_ref_with_prefix_module_id(module_instance).0,
1068 operation_id,
1069 input_amount,
1070 output_amount,
1071 )
1072 .await?;
1073
1074 let inputs = inputs.into_dyn(module_instance);
1075
1076 let outputs = outputs.into_dyn(module_instance);
1077
1078 Ok((inputs, outputs))
1079 }
1080
1081 async fn await_primary_module_output(
1082 &self,
1083 operation_id: OperationId,
1084 out_point: OutPoint,
1085 ) -> anyhow::Result<()> {
1086 <T as ClientModule>::await_primary_module_output(self, operation_id, out_point).await
1087 }
1088
1089 async fn get_balance(
1090 &self,
1091 module_instance: ModuleInstanceId,
1092 dbtx: &mut DatabaseTransaction<'_>,
1093 ) -> Amount {
1094 <T as ClientModule>::get_balance(
1095 self,
1096 &mut dbtx.to_ref_with_prefix_module_id(module_instance).0,
1097 )
1098 .await
1099 }
1100
1101 async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
1102 <T as ClientModule>::subscribe_balance_changes(self).await
1103 }
1104}
1105
1106dyn_newtype_define!(
1107 #[derive(Clone)]
1108 pub DynClientModule(Arc<IClientModule>)
1109);
1110
1111impl AsRef<maybe_add_send_sync!(dyn IClientModule + 'static)> for DynClientModule {
1112 fn as_ref(&self) -> &maybe_add_send_sync!(dyn IClientModule + 'static) {
1113 self.inner.as_ref()
1114 }
1115}
1116
1117#[derive(Copy, Clone, Encodable, Decodable, Serialize, Deserialize, PartialEq, Eq, Hash, Debug)]
1119pub struct IdxRange {
1120 start: u64,
1121 end: u64,
1122}
1123
1124impl IdxRange {
1125 pub fn new_single(start: u64) -> Option<Self> {
1126 start.checked_add(1).map(|end| Self { start, end })
1127 }
1128
1129 pub fn start(self) -> u64 {
1130 self.start
1131 }
1132
1133 pub fn count(self) -> usize {
1134 self.into_iter().count()
1135 }
1136
1137 pub fn from_inclusive(range: ops::RangeInclusive<u64>) -> Option<Self> {
1138 range.end().checked_add(1).map(|end| Self {
1139 start: *range.start(),
1140 end,
1141 })
1142 }
1143}
1144
1145impl From<Range<u64>> for IdxRange {
1146 fn from(Range { start, end }: Range<u64>) -> Self {
1147 Self { start, end }
1148 }
1149}
1150
1151impl IntoIterator for IdxRange {
1152 type Item = u64;
1153
1154 type IntoIter = ops::Range<u64>;
1155
1156 fn into_iter(self) -> Self::IntoIter {
1157 ops::Range {
1158 start: self.start,
1159 end: self.end,
1160 }
1161 }
1162}
1163
1164#[derive(Copy, Clone, Encodable, Decodable, Serialize, Deserialize, PartialEq, Eq, Hash, Debug)]
1165pub struct OutPointRange {
1166 pub txid: TransactionId,
1167 idx_range: IdxRange,
1168}
1169
1170impl OutPointRange {
1171 pub fn new(txid: TransactionId, idx_range: IdxRange) -> Self {
1172 Self { txid, idx_range }
1173 }
1174
1175 pub fn new_single(txid: TransactionId, idx: u64) -> Option<Self> {
1176 IdxRange::new_single(idx).map(|idx_range| Self { txid, idx_range })
1177 }
1178
1179 pub fn start_idx(self) -> u64 {
1180 self.idx_range.start()
1181 }
1182
1183 pub fn out_idx_iter(self) -> impl Iterator<Item = u64> {
1184 self.idx_range.into_iter()
1185 }
1186
1187 pub fn count(self) -> usize {
1188 self.idx_range.count()
1189 }
1190}
1191
1192impl IntoIterator for OutPointRange {
1193 type Item = OutPoint;
1194
1195 type IntoIter = OutPointRangeIter;
1196
1197 fn into_iter(self) -> Self::IntoIter {
1198 OutPointRangeIter {
1199 txid: self.txid,
1200 inner: self.idx_range.into_iter(),
1201 }
1202 }
1203}
1204
1205pub struct OutPointRangeIter {
1206 txid: TransactionId,
1207
1208 inner: ops::Range<u64>,
1209}
1210
1211impl OutPointRange {
1212 pub fn txid(&self) -> TransactionId {
1213 self.txid
1214 }
1215}
1216
1217impl Iterator for OutPointRangeIter {
1218 type Item = OutPoint;
1219
1220 fn next(&mut self) -> Option<Self::Item> {
1221 self.inner.next().map(|idx| OutPoint {
1222 txid: self.txid,
1223 out_idx: idx,
1224 })
1225 }
1226}
1227
1228pub type StateGenerator<S> = Arc<maybe_add_send_sync!(dyn Fn(OutPointRange) -> Vec<S> + 'static)>;