fedimint_client_module/module/
mod.rs

1use core::fmt;
2use std::any::Any;
3use std::fmt::Debug;
4use std::ops::Range;
5use std::pin::Pin;
6use std::sync::{Arc, Weak};
7use std::{ffi, marker, ops};
8
9use anyhow::{anyhow, bail};
10use bitcoin::secp256k1::PublicKey;
11use fedimint_api_client::api::DynGlobalApi;
12use fedimint_core::config::ClientConfig;
13use fedimint_core::core::{
14    Decoder, DynInput, DynOutput, IInput, IntoDynInstance, ModuleInstanceId, ModuleKind,
15    OperationId,
16};
17use fedimint_core::db::{Database, DatabaseTransaction, GlobalDBTxAccessToken, NonCommittable};
18use fedimint_core::encoding::{Decodable, Encodable};
19use fedimint_core::invite_code::InviteCode;
20use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
21use fedimint_core::module::{CommonModuleInit, ModuleCommon, ModuleInit};
22use fedimint_core::task::{MaybeSend, MaybeSync};
23use fedimint_core::util::BoxStream;
24use fedimint_core::{
25    Amount, OutPoint, PeerId, TransactionId, apply, async_trait_maybe_send, dyn_newtype_define,
26    maybe_add_send, maybe_add_send_sync,
27};
28use fedimint_eventlog::{Event, EventKind};
29use fedimint_logging::LOG_CLIENT;
30use futures::Stream;
31use serde::de::DeserializeOwned;
32use serde::{Deserialize, Serialize};
33use tracing::warn;
34
35use self::init::ClientModuleInit;
36use crate::module::recovery::{DynModuleBackup, ModuleBackup};
37use crate::oplog::{IOperationLog, OperationLogEntry, UpdateStreamOrOutcome};
38use crate::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
39use crate::sm::{self, ActiveStateMeta, Context, DynContext, DynState, InactiveStateMeta, State};
40use crate::transaction::{ClientInputBundle, ClientOutputBundle, TransactionBuilder};
41use crate::{AddStateMachinesResult, InstancelessDynClientInputBundle, TransactionUpdates, oplog};
42
43pub mod init;
44pub mod recovery;
45
46pub type ClientModuleRegistry = ModuleRegistry<DynClientModule>;
47
48/// A fedimint-client interface exposed to client modules
49///
50/// To break the dependency of the client modules on the whole fedimint client
51/// and in particular the `fedimint-client` crate, the module gets access to an
52/// interface, that is implemented by the `Client`.
53///
54/// This allows lose coupling, less recompilation and better control and
55/// understanding of what functionality of the Client the modules get access to.
56#[apply(async_trait_maybe_send!)]
57pub trait ClientContextIface: MaybeSend + MaybeSync {
58    fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule);
59    fn api_clone(&self) -> DynGlobalApi;
60    fn decoders(&self) -> &ModuleDecoderRegistry;
61    async fn finalize_and_submit_transaction(
62        &self,
63        operation_id: OperationId,
64        operation_type: &str,
65        operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
66        tx_builder: TransactionBuilder,
67    ) -> anyhow::Result<OutPointRange>;
68
69    // TODO: unify
70    async fn finalize_and_submit_transaction_inner(
71        &self,
72        dbtx: &mut DatabaseTransaction<'_>,
73        operation_id: OperationId,
74        tx_builder: TransactionBuilder,
75    ) -> anyhow::Result<OutPointRange>;
76
77    async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates;
78
79    async fn await_primary_module_outputs(
80        &self,
81        operation_id: OperationId,
82        // TODO: make `impl Iterator<Item = ...>`
83        outputs: Vec<OutPoint>,
84    ) -> anyhow::Result<()>;
85
86    fn operation_log(&self) -> &dyn IOperationLog;
87
88    async fn has_active_states(&self, operation_id: OperationId) -> bool;
89
90    async fn operation_exists(&self, operation_id: OperationId) -> bool;
91
92    async fn config(&self) -> ClientConfig;
93
94    fn db(&self) -> &Database;
95
96    fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static));
97
98    async fn invite_code(&self, peer: PeerId) -> Option<InviteCode>;
99
100    fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)>;
101
102    async fn log_event_json(
103        &self,
104        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
105        module_kind: Option<ModuleKind>,
106        module_id: ModuleInstanceId,
107        kind: EventKind,
108        payload: serde_json::Value,
109        persist: bool,
110    );
111
112    async fn read_operation_active_states<'dbtx>(
113        &self,
114        operation_id: OperationId,
115        module_id: ModuleInstanceId,
116        dbtx: &'dbtx mut DatabaseTransaction<'_>,
117    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>;
118
119    async fn read_operation_inactive_states<'dbtx>(
120        &self,
121        operation_id: OperationId,
122        module_id: ModuleInstanceId,
123        dbtx: &'dbtx mut DatabaseTransaction<'_>,
124    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>;
125}
126
127/// A final, fully initialized client
128///
129/// Client modules need to be able to access a `Client` they are a part
130/// of. To break the circular dependency, the final `Client` is passed
131/// after `Client` was built via a shared state.
132#[derive(Clone, Default)]
133pub struct FinalClientIface(Arc<std::sync::OnceLock<Weak<dyn ClientContextIface>>>);
134
135impl FinalClientIface {
136    /// Get a temporary strong reference to [`ClientContextIface`]
137    ///
138    /// Care must be taken to not let the user take ownership of this value,
139    /// and not store it elsewhere permanently either, as it could prevent
140    /// the cleanup of the Client.
141    pub(crate) fn get(&self) -> Arc<dyn ClientContextIface> {
142        self.0
143            .get()
144            .expect("client must be already set")
145            .upgrade()
146            .expect("client module context must not be use past client shutdown")
147    }
148
149    pub fn set(&self, client: Weak<dyn ClientContextIface>) {
150        self.0.set(client).expect("FinalLazyClient already set");
151    }
152}
153
154impl fmt::Debug for FinalClientIface {
155    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156        f.write_str("FinalClientIface")
157    }
158}
159/// A Client context for a [`ClientModule`] `M`
160///
161/// Client modules can interact with the whole
162/// client through this struct.
163pub struct ClientContext<M> {
164    client: FinalClientIface,
165    module_instance_id: ModuleInstanceId,
166    global_dbtx_access_token: GlobalDBTxAccessToken,
167    module_db: Database,
168    _marker: marker::PhantomData<M>,
169}
170
171impl<M> Clone for ClientContext<M> {
172    fn clone(&self) -> Self {
173        Self {
174            client: self.client.clone(),
175            module_db: self.module_db.clone(),
176            module_instance_id: self.module_instance_id,
177            _marker: marker::PhantomData,
178            global_dbtx_access_token: self.global_dbtx_access_token,
179        }
180    }
181}
182
183/// A reference back to itself that the module cacn get from the
184/// [`ClientContext`]
185pub struct ClientContextSelfRef<'s, M> {
186    // we are OK storing `ClientStrong` here, because of the `'s` preventing `Self` from being
187    // stored permanently somewhere
188    client: Arc<dyn ClientContextIface>,
189    module_instance_id: ModuleInstanceId,
190    _marker: marker::PhantomData<&'s M>,
191}
192
193impl<M> ops::Deref for ClientContextSelfRef<'_, M>
194where
195    M: ClientModule,
196{
197    type Target = M;
198
199    fn deref(&self) -> &Self::Target {
200        self.client
201            .get_module(self.module_instance_id)
202            .as_any()
203            .downcast_ref::<M>()
204            .unwrap_or_else(|| panic!("Module is not of type {}", std::any::type_name::<M>()))
205    }
206}
207
208impl<M> fmt::Debug for ClientContext<M> {
209    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
210        f.write_str("ClientContext")
211    }
212}
213
214impl<M> ClientContext<M>
215where
216    M: ClientModule,
217{
218    pub fn new(
219        client: FinalClientIface,
220        module_instance_id: ModuleInstanceId,
221        global_dbtx_access_token: GlobalDBTxAccessToken,
222        module_db: Database,
223    ) -> Self {
224        Self {
225            client,
226            module_instance_id,
227            global_dbtx_access_token,
228            module_db,
229            _marker: marker::PhantomData,
230        }
231    }
232
233    /// Get a reference back to client module from the [`Self`]
234    ///
235    /// It's often necessary for a client module to "move self"
236    /// by-value, especially due to async lifetimes issues.
237    /// Clients usually work with `&mut self`, which can't really
238    /// work in such context.
239    ///
240    /// Fortunately [`ClientContext`] is `Clone` and `Send, and
241    /// can be used to recover the reference to the module at later
242    /// time.
243    #[allow(clippy::needless_lifetimes)] // just for explicitiness
244    pub fn self_ref<'s>(&'s self) -> ClientContextSelfRef<'s, M> {
245        ClientContextSelfRef {
246            client: self.client.get(),
247            module_instance_id: self.module_instance_id,
248            _marker: marker::PhantomData,
249        }
250    }
251
252    /// Get a reference to a global Api handle
253    pub fn global_api(&self) -> DynGlobalApi {
254        self.client.get().api_clone()
255    }
256
257    /// A set of all decoders of all modules of the client
258    pub fn decoders(&self) -> ModuleDecoderRegistry {
259        Clone::clone(self.client.get().decoders())
260    }
261
262    pub fn input_from_dyn<'i>(
263        &self,
264        input: &'i DynInput,
265    ) -> Option<&'i <M::Common as ModuleCommon>::Input> {
266        (input.module_instance_id() == self.module_instance_id).then(|| {
267            input
268                .as_any()
269                .downcast_ref::<<M::Common as ModuleCommon>::Input>()
270                .unwrap_or_else(|| {
271                    panic!("instance_id {} just checked", input.module_instance_id())
272                })
273        })
274    }
275
276    pub fn output_from_dyn<'o>(
277        &self,
278        output: &'o DynOutput,
279    ) -> Option<&'o <M::Common as ModuleCommon>::Output> {
280        (output.module_instance_id() == self.module_instance_id).then(|| {
281            output
282                .as_any()
283                .downcast_ref::<<M::Common as ModuleCommon>::Output>()
284                .unwrap_or_else(|| {
285                    panic!("instance_id {} just checked", output.module_instance_id())
286                })
287        })
288    }
289
290    pub fn map_dyn<'s, 'i, 'o, I>(
291        &'s self,
292        typed: impl IntoIterator<Item = I> + 'i,
293    ) -> impl Iterator<Item = <I as IntoDynInstance>::DynType> + 'o
294    where
295        I: IntoDynInstance,
296        'i: 'o,
297        's: 'o,
298    {
299        typed.into_iter().map(|i| self.make_dyn(i))
300    }
301
302    /// Turn a typed output into a dyn version
303    pub fn make_dyn_output(&self, output: <M::Common as ModuleCommon>::Output) -> DynOutput {
304        self.make_dyn(output)
305    }
306
307    /// Turn a typed input into a dyn version
308    pub fn make_dyn_input(&self, input: <M::Common as ModuleCommon>::Input) -> DynInput {
309        self.make_dyn(input)
310    }
311
312    /// Turn a `typed` into a dyn version
313    pub fn make_dyn<I>(&self, typed: I) -> <I as IntoDynInstance>::DynType
314    where
315        I: IntoDynInstance,
316    {
317        typed.into_dyn(self.module_instance_id)
318    }
319
320    /// Turn a typed [`ClientOutputBundle`] into a dyn version
321    pub fn make_client_outputs<O, S>(&self, output: ClientOutputBundle<O, S>) -> ClientOutputBundle
322    where
323        O: IntoDynInstance<DynType = DynOutput> + 'static,
324        S: IntoDynInstance<DynType = DynState> + 'static,
325    {
326        self.make_dyn(output)
327    }
328
329    /// Turn a typed [`ClientInputBundle`] into a dyn version
330    pub fn make_client_inputs<I, S>(&self, inputs: ClientInputBundle<I, S>) -> ClientInputBundle
331    where
332        I: IntoDynInstance<DynType = DynInput> + 'static,
333        S: IntoDynInstance<DynType = DynState> + 'static,
334    {
335        self.make_dyn(inputs)
336    }
337
338    pub fn make_dyn_state<S>(&self, sm: S) -> DynState
339    where
340        S: sm::IState + 'static,
341    {
342        DynState::from_typed(self.module_instance_id, sm)
343    }
344
345    pub async fn finalize_and_submit_transaction<F, Meta>(
346        &self,
347        operation_id: OperationId,
348        operation_type: &str,
349        operation_meta_gen: F,
350        tx_builder: TransactionBuilder,
351    ) -> anyhow::Result<OutPointRange>
352    where
353        F: Fn(OutPointRange) -> Meta + Clone + MaybeSend + MaybeSync + 'static,
354        Meta: serde::Serialize + MaybeSend,
355    {
356        self.client
357            .get()
358            .finalize_and_submit_transaction(
359                operation_id,
360                operation_type,
361                Box::new(move |out_point_range| {
362                    serde_json::to_value(operation_meta_gen(out_point_range)).expect("Can't fail")
363                }),
364                tx_builder,
365            )
366            .await
367    }
368
369    pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
370        self.client.get().transaction_updates(operation_id).await
371    }
372
373    pub async fn await_primary_module_outputs(
374        &self,
375        operation_id: OperationId,
376        // TODO: make `impl Iterator<Item = ...>`
377        outputs: Vec<OutPoint>,
378    ) -> anyhow::Result<()> {
379        self.client
380            .get()
381            .await_primary_module_outputs(operation_id, outputs)
382            .await
383    }
384
385    // TODO: unify with `Self::get_operation`
386    pub async fn get_operation(
387        &self,
388        operation_id: OperationId,
389    ) -> anyhow::Result<oplog::OperationLogEntry> {
390        let operation = self
391            .client
392            .get()
393            .operation_log()
394            .get_operation(operation_id)
395            .await
396            .ok_or(anyhow::anyhow!("Operation not found"))?;
397
398        if operation.operation_module_kind() != M::kind().as_str() {
399            bail!("Operation is not a lightning operation");
400        }
401
402        Ok(operation)
403    }
404
405    /// Get global db.
406    ///
407    /// Only intended for internal use (private).
408    fn global_db(&self) -> fedimint_core::db::Database {
409        let db = Clone::clone(self.client.get().db());
410
411        db.ensure_global()
412            .expect("global_db must always return a global db");
413
414        db
415    }
416
417    pub fn module_db(&self) -> &Database {
418        self.module_db
419            .ensure_isolated()
420            .expect("module_db must always return isolated db");
421        &self.module_db
422    }
423
424    pub async fn has_active_states(&self, op_id: OperationId) -> bool {
425        self.client.get().has_active_states(op_id).await
426    }
427
428    pub async fn operation_exists(&self, op_id: OperationId) -> bool {
429        self.client.get().operation_exists(op_id).await
430    }
431
432    pub async fn get_own_active_states(&self) -> Vec<(M::States, ActiveStateMeta)> {
433        self.client
434            .get()
435            .executor()
436            .get_active_states()
437            .await
438            .into_iter()
439            .filter(|s| s.0.module_instance_id() == self.module_instance_id)
440            .map(|s| {
441                (
442                    Clone::clone(
443                        s.0.as_any()
444                            .downcast_ref::<M::States>()
445                            .expect("incorrect output type passed to module plugin"),
446                    ),
447                    s.1,
448                )
449            })
450            .collect()
451    }
452
453    pub async fn get_config(&self) -> ClientConfig {
454        self.client.get().config().await
455    }
456
457    /// Returns an invite code for the federation that points to an arbitrary
458    /// guardian server for fetching the config
459    pub async fn get_invite_code(&self) -> InviteCode {
460        let cfg = self.get_config().await.global;
461        self.client
462            .get()
463            .invite_code(
464                *cfg.api_endpoints
465                    .keys()
466                    .next()
467                    .expect("A federation always has at least one guardian"),
468            )
469            .await
470            .expect("The guardian we requested an invite code for exists")
471    }
472
473    pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
474        self.client.get().get_internal_payment_markers()
475    }
476
477    /// This method starts n state machines with given operation id without a
478    /// corresponding transaction
479    pub async fn manual_operation_start(
480        &self,
481        operation_id: OperationId,
482        op_type: &str,
483        operation_meta: impl serde::Serialize + Debug,
484        sms: Vec<DynState>,
485    ) -> anyhow::Result<()> {
486        let db = self.module_db();
487        let mut dbtx = db.begin_transaction().await;
488        {
489            let dbtx = &mut dbtx.global_dbtx(self.global_dbtx_access_token);
490
491            self.manual_operation_start_inner(
492                &mut dbtx.to_ref_nc(),
493                operation_id,
494                op_type,
495                operation_meta,
496                sms,
497            )
498            .await?;
499        }
500
501        dbtx.commit_tx_result().await.map_err(|_| {
502            anyhow!(
503                "Operation with id {} already exists",
504                operation_id.fmt_short()
505            )
506        })?;
507
508        Ok(())
509    }
510
511    pub async fn manual_operation_start_dbtx(
512        &self,
513        dbtx: &mut DatabaseTransaction<'_>,
514        operation_id: OperationId,
515        op_type: &str,
516        operation_meta: impl serde::Serialize + Debug,
517        sms: Vec<DynState>,
518    ) -> anyhow::Result<()> {
519        self.manual_operation_start_inner(
520            &mut dbtx.global_dbtx(self.global_dbtx_access_token),
521            operation_id,
522            op_type,
523            operation_meta,
524            sms,
525        )
526        .await
527    }
528
529    /// See [`Self::manual_operation_start`], just inside a database
530    /// transaction.
531    async fn manual_operation_start_inner(
532        &self,
533        dbtx: &mut DatabaseTransaction<'_>,
534        operation_id: OperationId,
535        op_type: &str,
536        operation_meta: impl serde::Serialize + Debug,
537        sms: Vec<DynState>,
538    ) -> anyhow::Result<()> {
539        dbtx.ensure_global()
540            .expect("Must deal with global dbtx here");
541
542        if self
543            .client
544            .get()
545            .operation_log()
546            .get_operation_dbtx(&mut dbtx.to_ref_nc(), operation_id)
547            .await
548            .is_some()
549        {
550            bail!(
551                "Operation with id {} already exists",
552                operation_id.fmt_short()
553            );
554        }
555
556        self.client
557            .get()
558            .operation_log()
559            .add_operation_log_entry_dbtx(
560                &mut dbtx.to_ref_nc(),
561                operation_id,
562                op_type,
563                serde_json::to_value(operation_meta).expect("Can't fail"),
564            )
565            .await;
566
567        self.client
568            .get()
569            .executor()
570            .add_state_machines_dbtx(&mut dbtx.to_ref_nc(), sms)
571            .await
572            .expect("State machine is valid");
573
574        Ok(())
575    }
576
577    pub fn outcome_or_updates<U, S>(
578        &self,
579        operation: OperationLogEntry,
580        operation_id: OperationId,
581        stream_gen: impl FnOnce() -> S + 'static,
582    ) -> UpdateStreamOrOutcome<U>
583    where
584        U: Clone + Serialize + DeserializeOwned + Debug + MaybeSend + MaybeSync + 'static,
585        S: Stream<Item = U> + MaybeSend + 'static,
586    {
587        use futures::StreamExt;
588        match self.client.get().operation_log().outcome_or_updates(
589            &self.global_db(),
590            operation_id,
591            operation,
592            Box::new(move || {
593                let stream_gen = stream_gen();
594                Box::pin(
595                    stream_gen.map(move |item| serde_json::to_value(item).expect("Can't fail")),
596                )
597            }),
598        ) {
599            UpdateStreamOrOutcome::UpdateStream(stream) => UpdateStreamOrOutcome::UpdateStream(
600                Box::pin(stream.map(|u| serde_json::from_value(u).expect("Can't fail"))),
601            ),
602            UpdateStreamOrOutcome::Outcome(o) => {
603                UpdateStreamOrOutcome::Outcome(serde_json::from_value(o).expect("Can't fail"))
604            }
605        }
606    }
607
608    pub async fn claim_inputs<I, S>(
609        &self,
610        dbtx: &mut DatabaseTransaction<'_>,
611        inputs: ClientInputBundle<I, S>,
612        operation_id: OperationId,
613    ) -> anyhow::Result<OutPointRange>
614    where
615        I: IInput + MaybeSend + MaybeSync + 'static,
616        S: sm::IState + MaybeSend + MaybeSync + 'static,
617    {
618        self.claim_inputs_dyn(dbtx, inputs.into_instanceless(), operation_id)
619            .await
620    }
621
622    async fn claim_inputs_dyn(
623        &self,
624        dbtx: &mut DatabaseTransaction<'_>,
625        inputs: InstancelessDynClientInputBundle,
626        operation_id: OperationId,
627    ) -> anyhow::Result<OutPointRange> {
628        let tx_builder =
629            TransactionBuilder::new().with_inputs(inputs.into_dyn(self.module_instance_id));
630
631        self.client
632            .get()
633            .finalize_and_submit_transaction_inner(
634                &mut dbtx.global_dbtx(self.global_dbtx_access_token),
635                operation_id,
636                tx_builder,
637            )
638            .await
639    }
640
641    pub async fn add_state_machines_dbtx(
642        &self,
643        dbtx: &mut DatabaseTransaction<'_>,
644        states: Vec<DynState>,
645    ) -> AddStateMachinesResult {
646        self.client
647            .get()
648            .executor()
649            .add_state_machines_dbtx(&mut dbtx.global_dbtx(self.global_dbtx_access_token), states)
650            .await
651    }
652
653    pub async fn add_operation_log_entry_dbtx(
654        &self,
655        dbtx: &mut DatabaseTransaction<'_>,
656        operation_id: OperationId,
657        operation_type: &str,
658        operation_meta: impl serde::Serialize,
659    ) {
660        self.client
661            .get()
662            .operation_log()
663            .add_operation_log_entry_dbtx(
664                &mut dbtx.global_dbtx(self.global_dbtx_access_token),
665                operation_id,
666                operation_type,
667                serde_json::to_value(operation_meta).expect("Can't fail"),
668            )
669            .await;
670    }
671
672    pub async fn log_event<E, Cap>(&self, dbtx: &mut DatabaseTransaction<'_, Cap>, event: E)
673    where
674        E: Event + Send,
675        Cap: Send,
676    {
677        if <E as Event>::MODULE != Some(<M as ClientModule>::kind()) {
678            warn!(
679                target: LOG_CLIENT,
680                module_kind = %<M as ClientModule>::kind(),
681                event_module = ?<E as Event>::MODULE,
682                "Client module logging events of different module than its own. This might become an error in the future."
683            );
684        }
685        self.client
686            .get()
687            .log_event_json(
688                &mut dbtx.global_dbtx(self.global_dbtx_access_token).to_ref_nc(),
689                <E as Event>::MODULE,
690                self.module_instance_id,
691                <E as Event>::KIND,
692                serde_json::to_value(event).expect("Can't fail"),
693                <E as Event>::PERSIST,
694            )
695            .await;
696    }
697}
698
699/// Fedimint module client
700#[apply(async_trait_maybe_send!)]
701pub trait ClientModule: Debug + MaybeSend + MaybeSync + 'static {
702    type Init: ClientModuleInit;
703
704    /// Common module types shared between client and server
705    type Common: ModuleCommon;
706
707    /// Data stored in regular backups so that restoring doesn't have to start
708    /// from epoch 0
709    type Backup: ModuleBackup;
710
711    /// Data and API clients available to state machine transitions of this
712    /// module
713    type ModuleStateMachineContext: Context;
714
715    /// All possible states this client can submit to the executor
716    type States: State<ModuleContext = Self::ModuleStateMachineContext>
717        + IntoDynInstance<DynType = DynState>;
718
719    fn decoder() -> Decoder {
720        let mut decoder_builder = Self::Common::decoder_builder();
721        decoder_builder.with_decodable_type::<Self::States>();
722        decoder_builder.with_decodable_type::<Self::Backup>();
723        decoder_builder.build()
724    }
725
726    fn kind() -> ModuleKind {
727        <<<Self as ClientModule>::Init as ModuleInit>::Common as CommonModuleInit>::KIND
728    }
729
730    fn context(&self) -> Self::ModuleStateMachineContext;
731
732    /// Initialize client.
733    ///
734    /// Called by the core client code on start, after [`ClientContext`] is
735    /// fully initialized, so unlike during [`ClientModuleInit::init`],
736    /// access to global client is allowed.
737    async fn start(&self) {}
738
739    async fn handle_cli_command(
740        &self,
741        _args: &[ffi::OsString],
742    ) -> anyhow::Result<serde_json::Value> {
743        Err(anyhow::format_err!(
744            "This module does not implement cli commands"
745        ))
746    }
747
748    async fn handle_rpc(
749        &self,
750        _method: String,
751        _request: serde_json::Value,
752    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
753        Box::pin(futures::stream::once(std::future::ready(Err(
754            anyhow::format_err!("This module does not implement rpc"),
755        ))))
756    }
757
758    /// Returns the fee the processing of this input requires.
759    ///
760    /// If the semantics of a given input aren't known this function returns
761    /// `None`, this only happens if a future version of Fedimint introduces a
762    /// new input variant. For clients this should only be the case when
763    /// processing transactions created by other users, so the result of
764    /// this function can be `unwrap`ped whenever dealing with inputs
765    /// generated by ourselves.
766    fn input_fee(
767        &self,
768        amount: Amount,
769        input: &<Self::Common as ModuleCommon>::Input,
770    ) -> Option<Amount>;
771
772    /// Returns the fee the processing of this output requires.
773    ///
774    /// If the semantics of a given output aren't known this function returns
775    /// `None`, this only happens if a future version of Fedimint introduces a
776    /// new output variant. For clients this should only be the case when
777    /// processing transactions created by other users, so the result of
778    /// this function can be `unwrap`ped whenever dealing with inputs
779    /// generated by ourselves.
780    fn output_fee(
781        &self,
782        amount: Amount,
783        output: &<Self::Common as ModuleCommon>::Output,
784    ) -> Option<Amount>;
785
786    fn supports_backup(&self) -> bool {
787        false
788    }
789
790    async fn backup(&self) -> anyhow::Result<Self::Backup> {
791        anyhow::bail!("Backup not supported");
792    }
793
794    /// Does this module support being a primary module
795    ///
796    /// If it does it must implement:
797    ///
798    /// * [`Self::create_final_inputs_and_outputs`]
799    /// * [`Self::await_primary_module_output`]
800    /// * [`Self::get_balance`]
801    /// * [`Self::subscribe_balance_changes`]
802    fn supports_being_primary(&self) -> bool {
803        false
804    }
805
806    /// Creates all inputs and outputs necessary to balance the transaction.
807    /// The function returns an error if and only if the client's funds are not
808    /// sufficient to create the inputs necessary to fully fund the transaction.
809    ///
810    /// A returned input also contains:
811    /// * A set of private keys belonging to the input for signing the
812    ///   transaction
813    /// * A closure that generates states belonging to the input. This closure
814    ///   takes the transaction id of the transaction in which the input was
815    ///   used and the input index as input since these cannot be known at time
816    ///   of calling `create_funding_input` and have to be injected later.
817    ///
818    /// A returned output also contains:
819    /// * A closure that generates states belonging to the output. This closure
820    ///   takes the transaction id of the transaction in which the output was
821    ///   used and the output index as input since these cannot be known at time
822    ///   of calling `create_change_output` and have to be injected later.
823
824    async fn create_final_inputs_and_outputs(
825        &self,
826        _dbtx: &mut DatabaseTransaction<'_>,
827        _operation_id: OperationId,
828        _input_amount: Amount,
829        _output_amount: Amount,
830    ) -> anyhow::Result<(
831        ClientInputBundle<<Self::Common as ModuleCommon>::Input, Self::States>,
832        ClientOutputBundle<<Self::Common as ModuleCommon>::Output, Self::States>,
833    )> {
834        unimplemented!()
835    }
836
837    /// Waits for the funds from an output created by
838    /// [`Self::create_final_inputs_and_outputs`] to become available. This
839    /// function returning typically implies a change in the output of
840    /// [`Self::get_balance`].
841    async fn await_primary_module_output(
842        &self,
843        _operation_id: OperationId,
844        _out_point: OutPoint,
845    ) -> anyhow::Result<()> {
846        unimplemented!()
847    }
848
849    /// Returns the balance held by this module and available for funding
850    /// transactions.
851    async fn get_balance(&self, _dbtx: &mut DatabaseTransaction<'_>) -> Amount {
852        unimplemented!()
853    }
854
855    /// Returns a stream that will output the updated module balance each time
856    /// it changes.
857    async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
858        unimplemented!()
859    }
860
861    /// Leave the federation
862    ///
863    /// While technically there's nothing stopping the client from just
864    /// abandoning Federation at any point by deleting all the related
865    /// local data, it is useful to make sure it's safe beforehand.
866    ///
867    /// This call indicates the desire of the caller client code
868    /// to orderly and safely leave the Federation by this module instance.
869    /// The goal of the implementations is to fulfil that wish,
870    /// giving prompt and informative feedback if it's not yet possible.
871    ///
872    /// The client module implementation should handle the request
873    /// and return as fast as possible avoiding blocking for longer than
874    /// necessary. This would usually involve some combination of:
875    ///
876    /// * recording the state of being in process of leaving the Federation to
877    ///   prevent initiating new conditions that could delay its completion;
878    /// * performing any fast to complete cleanup/exit logic;
879    /// * initiating any time-consuming logic (e.g. canceling outstanding
880    ///   contracts), as background jobs, tasks machines, etc.
881    /// * checking for any conditions indicating it might not be safe to leave
882    ///   at the moment.
883    ///
884    /// This function should return `Ok` only if from the perspective
885    /// of this module instance, it is safe to delete client data and
886    /// stop using it, with no further actions (like background jobs) required
887    /// to complete.
888    ///
889    /// This function should return an error if it's not currently possible
890    /// to safely (e.g. without loosing funds) leave the Federation.
891    /// It should avoid running indefinitely trying to complete any cleanup
892    /// actions necessary to reach a clean state, preferring spawning new
893    /// state machines and returning an informative error about cleanup
894    /// still in progress.
895    ///
896    /// If any internal task needs to complete, any user action is required,
897    /// or even external condition needs to be met this function
898    /// should return a `Err`.
899    ///
900    /// Notably modules should not disable interaction that might be necessary
901    /// for the user (possibly through other modules) to leave the Federation.
902    /// In particular a Mint module should retain ability to create new notes,
903    /// and LN module should retain ability to send funds out.
904    ///
905    /// Calling code must NOT assume that a module that once returned `Ok`,
906    /// will not return `Err` at later point. E.g. a Mint module might have
907    /// no outstanding balance at first, but other modules winding down
908    /// might "cash-out" to Ecash.
909    ///
910    /// Before leaving the Federation and deleting any state the calling code
911    /// must collect a full round of `Ok` from all the modules.
912    ///
913    /// Calling code should allow the user to override and ignore any
914    /// outstanding errors, after sufficient amount of warnings. Ideally,
915    /// this should be done on per-module basis, to avoid mistakes.
916    async fn leave(&self, _dbtx: &mut DatabaseTransaction<'_>) -> anyhow::Result<()> {
917        bail!("Unable to determine if safe to leave the federation: Not implemented")
918    }
919}
920
921/// Type-erased version of [`ClientModule`]
922#[apply(async_trait_maybe_send!)]
923pub trait IClientModule: Debug {
924    fn as_any(&self) -> &(maybe_add_send_sync!(dyn std::any::Any));
925
926    fn decoder(&self) -> Decoder;
927
928    fn context(&self, instance: ModuleInstanceId) -> DynContext;
929
930    async fn start(&self);
931
932    async fn handle_cli_command(&self, args: &[ffi::OsString])
933    -> anyhow::Result<serde_json::Value>;
934
935    async fn handle_rpc(
936        &self,
937        method: String,
938        request: serde_json::Value,
939    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>>;
940
941    fn input_fee(&self, amount: Amount, input: &DynInput) -> Option<Amount>;
942
943    fn output_fee(&self, amount: Amount, output: &DynOutput) -> Option<Amount>;
944
945    fn supports_backup(&self) -> bool;
946
947    async fn backup(&self, module_instance_id: ModuleInstanceId)
948    -> anyhow::Result<DynModuleBackup>;
949
950    fn supports_being_primary(&self) -> bool;
951
952    async fn create_final_inputs_and_outputs(
953        &self,
954        module_instance: ModuleInstanceId,
955        dbtx: &mut DatabaseTransaction<'_>,
956        operation_id: OperationId,
957        input_amount: Amount,
958        output_amount: Amount,
959    ) -> anyhow::Result<(ClientInputBundle, ClientOutputBundle)>;
960
961    async fn await_primary_module_output(
962        &self,
963        operation_id: OperationId,
964        out_point: OutPoint,
965    ) -> anyhow::Result<()>;
966
967    async fn get_balance(
968        &self,
969        module_instance: ModuleInstanceId,
970        dbtx: &mut DatabaseTransaction<'_>,
971    ) -> Amount;
972
973    async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()>;
974}
975
976#[apply(async_trait_maybe_send!)]
977impl<T> IClientModule for T
978where
979    T: ClientModule,
980{
981    fn as_any(&self) -> &(maybe_add_send_sync!(dyn Any)) {
982        self
983    }
984
985    fn decoder(&self) -> Decoder {
986        T::decoder()
987    }
988
989    fn context(&self, instance: ModuleInstanceId) -> DynContext {
990        DynContext::from_typed(instance, <T as ClientModule>::context(self))
991    }
992
993    async fn start(&self) {
994        <T as ClientModule>::start(self).await;
995    }
996
997    async fn handle_cli_command(
998        &self,
999        args: &[ffi::OsString],
1000    ) -> anyhow::Result<serde_json::Value> {
1001        <T as ClientModule>::handle_cli_command(self, args).await
1002    }
1003
1004    async fn handle_rpc(
1005        &self,
1006        method: String,
1007        request: serde_json::Value,
1008    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1009        <T as ClientModule>::handle_rpc(self, method, request).await
1010    }
1011
1012    fn input_fee(&self, amount: Amount, input: &DynInput) -> Option<Amount> {
1013        <T as ClientModule>::input_fee(
1014            self,
1015            amount,
1016            input
1017                .as_any()
1018                .downcast_ref()
1019                .expect("Dispatched to correct module"),
1020        )
1021    }
1022
1023    fn output_fee(&self, amount: Amount, output: &DynOutput) -> Option<Amount> {
1024        <T as ClientModule>::output_fee(
1025            self,
1026            amount,
1027            output
1028                .as_any()
1029                .downcast_ref()
1030                .expect("Dispatched to correct module"),
1031        )
1032    }
1033
1034    fn supports_backup(&self) -> bool {
1035        <T as ClientModule>::supports_backup(self)
1036    }
1037
1038    async fn backup(
1039        &self,
1040        module_instance_id: ModuleInstanceId,
1041    ) -> anyhow::Result<DynModuleBackup> {
1042        Ok(DynModuleBackup::from_typed(
1043            module_instance_id,
1044            <T as ClientModule>::backup(self).await?,
1045        ))
1046    }
1047
1048    fn supports_being_primary(&self) -> bool {
1049        <T as ClientModule>::supports_being_primary(self)
1050    }
1051
1052    async fn create_final_inputs_and_outputs(
1053        &self,
1054        module_instance: ModuleInstanceId,
1055        dbtx: &mut DatabaseTransaction<'_>,
1056        operation_id: OperationId,
1057        input_amount: Amount,
1058        output_amount: Amount,
1059    ) -> anyhow::Result<(ClientInputBundle, ClientOutputBundle)> {
1060        let (inputs, outputs) = <T as ClientModule>::create_final_inputs_and_outputs(
1061            self,
1062            &mut dbtx.to_ref_with_prefix_module_id(module_instance).0,
1063            operation_id,
1064            input_amount,
1065            output_amount,
1066        )
1067        .await?;
1068
1069        let inputs = inputs.into_dyn(module_instance);
1070
1071        let outputs = outputs.into_dyn(module_instance);
1072
1073        Ok((inputs, outputs))
1074    }
1075
1076    async fn await_primary_module_output(
1077        &self,
1078        operation_id: OperationId,
1079        out_point: OutPoint,
1080    ) -> anyhow::Result<()> {
1081        <T as ClientModule>::await_primary_module_output(self, operation_id, out_point).await
1082    }
1083
1084    async fn get_balance(
1085        &self,
1086        module_instance: ModuleInstanceId,
1087        dbtx: &mut DatabaseTransaction<'_>,
1088    ) -> Amount {
1089        <T as ClientModule>::get_balance(
1090            self,
1091            &mut dbtx.to_ref_with_prefix_module_id(module_instance).0,
1092        )
1093        .await
1094    }
1095
1096    async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
1097        <T as ClientModule>::subscribe_balance_changes(self).await
1098    }
1099}
1100
1101dyn_newtype_define!(
1102    #[derive(Clone)]
1103    pub DynClientModule(Arc<IClientModule>)
1104);
1105
1106impl AsRef<maybe_add_send_sync!(dyn IClientModule + 'static)> for DynClientModule {
1107    fn as_ref(&self) -> &maybe_add_send_sync!(dyn IClientModule + 'static) {
1108        self.inner.as_ref()
1109    }
1110}
1111
1112/// A contiguous range of input/output indexes
1113#[derive(Copy, Clone, Encodable, Decodable, Serialize, Deserialize, PartialEq, Eq, Hash, Debug)]
1114pub struct IdxRange {
1115    start: u64,
1116    end: u64,
1117}
1118
1119impl IdxRange {
1120    pub fn new_single(start: u64) -> Option<Self> {
1121        start.checked_add(1).map(|end| Self { start, end })
1122    }
1123
1124    pub fn start(self) -> u64 {
1125        self.start
1126    }
1127
1128    pub fn count(self) -> usize {
1129        self.into_iter().count()
1130    }
1131
1132    pub fn from_inclusive(range: ops::RangeInclusive<u64>) -> Option<Self> {
1133        range.end().checked_add(1).map(|end| Self {
1134            start: *range.start(),
1135            end,
1136        })
1137    }
1138}
1139
1140impl From<Range<u64>> for IdxRange {
1141    fn from(Range { start, end }: Range<u64>) -> Self {
1142        Self { start, end }
1143    }
1144}
1145
1146impl IntoIterator for IdxRange {
1147    type Item = u64;
1148
1149    type IntoIter = ops::Range<u64>;
1150
1151    fn into_iter(self) -> Self::IntoIter {
1152        ops::Range {
1153            start: self.start,
1154            end: self.end,
1155        }
1156    }
1157}
1158
1159#[derive(Copy, Clone, Encodable, Decodable, Serialize, Deserialize, PartialEq, Eq, Hash, Debug)]
1160pub struct OutPointRange {
1161    pub txid: TransactionId,
1162    idx_range: IdxRange,
1163}
1164
1165impl OutPointRange {
1166    pub fn new(txid: TransactionId, idx_range: IdxRange) -> Self {
1167        Self { txid, idx_range }
1168    }
1169
1170    pub fn new_single(txid: TransactionId, idx: u64) -> Option<Self> {
1171        IdxRange::new_single(idx).map(|idx_range| Self { txid, idx_range })
1172    }
1173
1174    pub fn start_idx(self) -> u64 {
1175        self.idx_range.start()
1176    }
1177
1178    pub fn out_idx_iter(self) -> impl Iterator<Item = u64> {
1179        self.idx_range.into_iter()
1180    }
1181
1182    pub fn count(self) -> usize {
1183        self.idx_range.count()
1184    }
1185}
1186
1187impl IntoIterator for OutPointRange {
1188    type Item = OutPoint;
1189
1190    type IntoIter = OutPointRangeIter;
1191
1192    fn into_iter(self) -> Self::IntoIter {
1193        OutPointRangeIter {
1194            txid: self.txid,
1195            inner: self.idx_range.into_iter(),
1196        }
1197    }
1198}
1199
1200pub struct OutPointRangeIter {
1201    txid: TransactionId,
1202
1203    inner: ops::Range<u64>,
1204}
1205
1206impl OutPointRange {
1207    pub fn txid(&self) -> TransactionId {
1208        self.txid
1209    }
1210}
1211
1212impl Iterator for OutPointRangeIter {
1213    type Item = OutPoint;
1214
1215    fn next(&mut self) -> Option<Self::Item> {
1216        self.inner.next().map(|idx| OutPoint {
1217            txid: self.txid,
1218            out_idx: idx,
1219        })
1220    }
1221}
1222
1223pub type StateGenerator<S> = Arc<maybe_add_send_sync!(dyn Fn(OutPointRange) -> Vec<S> + 'static)>;