fedimint_client_module/module/
mod.rs

1use core::fmt;
2use std::any::Any;
3use std::fmt::Debug;
4use std::ops::Range;
5use std::pin::Pin;
6use std::sync::{Arc, Weak};
7use std::{ffi, marker, ops};
8
9use anyhow::{anyhow, bail};
10use bitcoin::secp256k1::PublicKey;
11use fedimint_api_client::api::{DynGlobalApi, DynModuleApi};
12use fedimint_core::config::ClientConfig;
13use fedimint_core::core::{
14    Decoder, DynInput, DynOutput, IInput, IntoDynInstance, ModuleInstanceId, ModuleKind,
15    OperationId,
16};
17use fedimint_core::db::{Database, DatabaseTransaction, GlobalDBTxAccessToken, NonCommittable};
18use fedimint_core::encoding::{Decodable, Encodable};
19use fedimint_core::invite_code::InviteCode;
20use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
21use fedimint_core::module::{CommonModuleInit, ModuleCommon, ModuleInit};
22use fedimint_core::task::{MaybeSend, MaybeSync};
23use fedimint_core::util::BoxStream;
24use fedimint_core::{
25    Amount, OutPoint, PeerId, TransactionId, apply, async_trait_maybe_send, dyn_newtype_define,
26    maybe_add_send, maybe_add_send_sync,
27};
28use fedimint_eventlog::{Event, EventKind, EventPersistence};
29use fedimint_logging::LOG_CLIENT;
30use futures::Stream;
31use serde::de::DeserializeOwned;
32use serde::{Deserialize, Serialize};
33use tracing::warn;
34
35use self::init::ClientModuleInit;
36use crate::module::recovery::{DynModuleBackup, ModuleBackup};
37use crate::oplog::{IOperationLog, OperationLogEntry, UpdateStreamOrOutcome};
38use crate::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
39use crate::sm::{self, ActiveStateMeta, Context, DynContext, DynState, InactiveStateMeta, State};
40use crate::transaction::{ClientInputBundle, ClientOutputBundle, TransactionBuilder};
41use crate::{AddStateMachinesResult, InstancelessDynClientInputBundle, TransactionUpdates, oplog};
42
43pub mod init;
44pub mod recovery;
45
46pub type ClientModuleRegistry = ModuleRegistry<DynClientModule>;
47
48/// A fedimint-client interface exposed to client modules
49///
50/// To break the dependency of the client modules on the whole fedimint client
51/// and in particular the `fedimint-client` crate, the module gets access to an
52/// interface, that is implemented by the `Client`.
53///
54/// This allows lose coupling, less recompilation and better control and
55/// understanding of what functionality of the Client the modules get access to.
56#[apply(async_trait_maybe_send!)]
57pub trait ClientContextIface: MaybeSend + MaybeSync {
58    fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule);
59    fn api_clone(&self) -> DynGlobalApi;
60    fn decoders(&self) -> &ModuleDecoderRegistry;
61    async fn finalize_and_submit_transaction(
62        &self,
63        operation_id: OperationId,
64        operation_type: &str,
65        operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
66        tx_builder: TransactionBuilder,
67    ) -> anyhow::Result<OutPointRange>;
68
69    // TODO: unify
70    async fn finalize_and_submit_transaction_inner(
71        &self,
72        dbtx: &mut DatabaseTransaction<'_>,
73        operation_id: OperationId,
74        tx_builder: TransactionBuilder,
75    ) -> anyhow::Result<OutPointRange>;
76
77    async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates;
78
79    async fn await_primary_module_outputs(
80        &self,
81        operation_id: OperationId,
82        // TODO: make `impl Iterator<Item = ...>`
83        outputs: Vec<OutPoint>,
84    ) -> anyhow::Result<()>;
85
86    fn operation_log(&self) -> &dyn IOperationLog;
87
88    async fn has_active_states(&self, operation_id: OperationId) -> bool;
89
90    async fn operation_exists(&self, operation_id: OperationId) -> bool;
91
92    async fn config(&self) -> ClientConfig;
93
94    fn db(&self) -> &Database;
95
96    fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static));
97
98    async fn invite_code(&self, peer: PeerId) -> Option<InviteCode>;
99
100    fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)>;
101
102    #[allow(clippy::too_many_arguments)]
103    async fn log_event_json(
104        &self,
105        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
106        module_kind: Option<ModuleKind>,
107        module_id: ModuleInstanceId,
108        kind: EventKind,
109        payload: serde_json::Value,
110        persist: EventPersistence,
111    );
112
113    async fn read_operation_active_states<'dbtx>(
114        &self,
115        operation_id: OperationId,
116        module_id: ModuleInstanceId,
117        dbtx: &'dbtx mut DatabaseTransaction<'_>,
118    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>;
119
120    async fn read_operation_inactive_states<'dbtx>(
121        &self,
122        operation_id: OperationId,
123        module_id: ModuleInstanceId,
124        dbtx: &'dbtx mut DatabaseTransaction<'_>,
125    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>;
126}
127
128/// A final, fully initialized client
129///
130/// Client modules need to be able to access a `Client` they are a part
131/// of. To break the circular dependency, the final `Client` is passed
132/// after `Client` was built via a shared state.
133#[derive(Clone, Default)]
134pub struct FinalClientIface(Arc<std::sync::OnceLock<Weak<dyn ClientContextIface>>>);
135
136impl FinalClientIface {
137    /// Get a temporary strong reference to [`ClientContextIface`]
138    ///
139    /// Care must be taken to not let the user take ownership of this value,
140    /// and not store it elsewhere permanently either, as it could prevent
141    /// the cleanup of the Client.
142    pub(crate) fn get(&self) -> Arc<dyn ClientContextIface> {
143        self.0
144            .get()
145            .expect("client must be already set")
146            .upgrade()
147            .expect("client module context must not be use past client shutdown")
148    }
149
150    pub fn set(&self, client: Weak<dyn ClientContextIface>) {
151        self.0.set(client).expect("FinalLazyClient already set");
152    }
153}
154
155impl fmt::Debug for FinalClientIface {
156    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157        f.write_str("FinalClientIface")
158    }
159}
160/// A Client context for a [`ClientModule`] `M`
161///
162/// Client modules can interact with the whole
163/// client through this struct.
164pub struct ClientContext<M> {
165    client: FinalClientIface,
166    module_instance_id: ModuleInstanceId,
167    global_dbtx_access_token: GlobalDBTxAccessToken,
168    module_db: Database,
169    _marker: marker::PhantomData<M>,
170}
171
172impl<M> Clone for ClientContext<M> {
173    fn clone(&self) -> Self {
174        Self {
175            client: self.client.clone(),
176            module_db: self.module_db.clone(),
177            module_instance_id: self.module_instance_id,
178            _marker: marker::PhantomData,
179            global_dbtx_access_token: self.global_dbtx_access_token,
180        }
181    }
182}
183
184/// A reference back to itself that the module cacn get from the
185/// [`ClientContext`]
186pub struct ClientContextSelfRef<'s, M> {
187    // we are OK storing `ClientStrong` here, because of the `'s` preventing `Self` from being
188    // stored permanently somewhere
189    client: Arc<dyn ClientContextIface>,
190    module_instance_id: ModuleInstanceId,
191    _marker: marker::PhantomData<&'s M>,
192}
193
194impl<M> ops::Deref for ClientContextSelfRef<'_, M>
195where
196    M: ClientModule,
197{
198    type Target = M;
199
200    fn deref(&self) -> &Self::Target {
201        self.client
202            .get_module(self.module_instance_id)
203            .as_any()
204            .downcast_ref::<M>()
205            .unwrap_or_else(|| panic!("Module is not of type {}", std::any::type_name::<M>()))
206    }
207}
208
209impl<M> fmt::Debug for ClientContext<M> {
210    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
211        f.write_str("ClientContext")
212    }
213}
214
215impl<M> ClientContext<M>
216where
217    M: ClientModule,
218{
219    pub fn new(
220        client: FinalClientIface,
221        module_instance_id: ModuleInstanceId,
222        global_dbtx_access_token: GlobalDBTxAccessToken,
223        module_db: Database,
224    ) -> Self {
225        Self {
226            client,
227            module_instance_id,
228            global_dbtx_access_token,
229            module_db,
230            _marker: marker::PhantomData,
231        }
232    }
233
234    /// Get a reference back to client module from the [`Self`]
235    ///
236    /// It's often necessary for a client module to "move self"
237    /// by-value, especially due to async lifetimes issues.
238    /// Clients usually work with `&mut self`, which can't really
239    /// work in such context.
240    ///
241    /// Fortunately [`ClientContext`] is `Clone` and `Send, and
242    /// can be used to recover the reference to the module at later
243    /// time.
244    #[allow(clippy::needless_lifetimes)] // just for explicitiness
245    pub fn self_ref(&self) -> ClientContextSelfRef<'_, M> {
246        ClientContextSelfRef {
247            client: self.client.get(),
248            module_instance_id: self.module_instance_id,
249            _marker: marker::PhantomData,
250        }
251    }
252
253    /// Get a reference to a global Api handle
254    pub fn global_api(&self) -> DynGlobalApi {
255        self.client.get().api_clone()
256    }
257
258    /// Get a reference to a module Api handle
259    pub fn module_api(&self) -> DynModuleApi {
260        self.global_api().with_module(self.module_instance_id)
261    }
262
263    /// A set of all decoders of all modules of the client
264    pub fn decoders(&self) -> ModuleDecoderRegistry {
265        Clone::clone(self.client.get().decoders())
266    }
267
268    pub fn input_from_dyn<'i>(
269        &self,
270        input: &'i DynInput,
271    ) -> Option<&'i <M::Common as ModuleCommon>::Input> {
272        (input.module_instance_id() == self.module_instance_id).then(|| {
273            input
274                .as_any()
275                .downcast_ref::<<M::Common as ModuleCommon>::Input>()
276                .unwrap_or_else(|| {
277                    panic!("instance_id {} just checked", input.module_instance_id())
278                })
279        })
280    }
281
282    pub fn output_from_dyn<'o>(
283        &self,
284        output: &'o DynOutput,
285    ) -> Option<&'o <M::Common as ModuleCommon>::Output> {
286        (output.module_instance_id() == self.module_instance_id).then(|| {
287            output
288                .as_any()
289                .downcast_ref::<<M::Common as ModuleCommon>::Output>()
290                .unwrap_or_else(|| {
291                    panic!("instance_id {} just checked", output.module_instance_id())
292                })
293        })
294    }
295
296    pub fn map_dyn<'s, 'i, 'o, I>(
297        &'s self,
298        typed: impl IntoIterator<Item = I> + 'i,
299    ) -> impl Iterator<Item = <I as IntoDynInstance>::DynType> + 'o
300    where
301        I: IntoDynInstance,
302        'i: 'o,
303        's: 'o,
304    {
305        typed.into_iter().map(|i| self.make_dyn(i))
306    }
307
308    /// Turn a typed output into a dyn version
309    pub fn make_dyn_output(&self, output: <M::Common as ModuleCommon>::Output) -> DynOutput {
310        self.make_dyn(output)
311    }
312
313    /// Turn a typed input into a dyn version
314    pub fn make_dyn_input(&self, input: <M::Common as ModuleCommon>::Input) -> DynInput {
315        self.make_dyn(input)
316    }
317
318    /// Turn a `typed` into a dyn version
319    pub fn make_dyn<I>(&self, typed: I) -> <I as IntoDynInstance>::DynType
320    where
321        I: IntoDynInstance,
322    {
323        typed.into_dyn(self.module_instance_id)
324    }
325
326    /// Turn a typed [`ClientOutputBundle`] into a dyn version
327    pub fn make_client_outputs<O, S>(&self, output: ClientOutputBundle<O, S>) -> ClientOutputBundle
328    where
329        O: IntoDynInstance<DynType = DynOutput> + 'static,
330        S: IntoDynInstance<DynType = DynState> + 'static,
331    {
332        self.make_dyn(output)
333    }
334
335    /// Turn a typed [`ClientInputBundle`] into a dyn version
336    pub fn make_client_inputs<I, S>(&self, inputs: ClientInputBundle<I, S>) -> ClientInputBundle
337    where
338        I: IntoDynInstance<DynType = DynInput> + 'static,
339        S: IntoDynInstance<DynType = DynState> + 'static,
340    {
341        self.make_dyn(inputs)
342    }
343
344    pub fn make_dyn_state<S>(&self, sm: S) -> DynState
345    where
346        S: sm::IState + 'static,
347    {
348        DynState::from_typed(self.module_instance_id, sm)
349    }
350
351    pub async fn finalize_and_submit_transaction<F, Meta>(
352        &self,
353        operation_id: OperationId,
354        operation_type: &str,
355        operation_meta_gen: F,
356        tx_builder: TransactionBuilder,
357    ) -> anyhow::Result<OutPointRange>
358    where
359        F: Fn(OutPointRange) -> Meta + Clone + MaybeSend + MaybeSync + 'static,
360        Meta: serde::Serialize + MaybeSend,
361    {
362        self.client
363            .get()
364            .finalize_and_submit_transaction(
365                operation_id,
366                operation_type,
367                Box::new(move |out_point_range| {
368                    serde_json::to_value(operation_meta_gen(out_point_range)).expect("Can't fail")
369                }),
370                tx_builder,
371            )
372            .await
373    }
374
375    pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
376        self.client.get().transaction_updates(operation_id).await
377    }
378
379    pub async fn await_primary_module_outputs(
380        &self,
381        operation_id: OperationId,
382        // TODO: make `impl Iterator<Item = ...>`
383        outputs: Vec<OutPoint>,
384    ) -> anyhow::Result<()> {
385        self.client
386            .get()
387            .await_primary_module_outputs(operation_id, outputs)
388            .await
389    }
390
391    // TODO: unify with `Self::get_operation`
392    pub async fn get_operation(
393        &self,
394        operation_id: OperationId,
395    ) -> anyhow::Result<oplog::OperationLogEntry> {
396        let operation = self
397            .client
398            .get()
399            .operation_log()
400            .get_operation(operation_id)
401            .await
402            .ok_or(anyhow::anyhow!("Operation not found"))?;
403
404        if operation.operation_module_kind() != M::kind().as_str() {
405            bail!("Operation is not a lightning operation");
406        }
407
408        Ok(operation)
409    }
410
411    /// Get global db.
412    ///
413    /// Only intended for internal use (private).
414    fn global_db(&self) -> fedimint_core::db::Database {
415        let db = Clone::clone(self.client.get().db());
416
417        db.ensure_global()
418            .expect("global_db must always return a global db");
419
420        db
421    }
422
423    pub fn module_db(&self) -> &Database {
424        self.module_db
425            .ensure_isolated()
426            .expect("module_db must always return isolated db");
427        &self.module_db
428    }
429
430    pub async fn has_active_states(&self, op_id: OperationId) -> bool {
431        self.client.get().has_active_states(op_id).await
432    }
433
434    pub async fn operation_exists(&self, op_id: OperationId) -> bool {
435        self.client.get().operation_exists(op_id).await
436    }
437
438    pub async fn get_own_active_states(&self) -> Vec<(M::States, ActiveStateMeta)> {
439        self.client
440            .get()
441            .executor()
442            .get_active_states()
443            .await
444            .into_iter()
445            .filter(|s| s.0.module_instance_id() == self.module_instance_id)
446            .map(|s| {
447                (
448                    Clone::clone(
449                        s.0.as_any()
450                            .downcast_ref::<M::States>()
451                            .expect("incorrect output type passed to module plugin"),
452                    ),
453                    s.1,
454                )
455            })
456            .collect()
457    }
458
459    pub async fn get_config(&self) -> ClientConfig {
460        self.client.get().config().await
461    }
462
463    /// Returns an invite code for the federation that points to an arbitrary
464    /// guardian server for fetching the config
465    pub async fn get_invite_code(&self) -> InviteCode {
466        let cfg = self.get_config().await.global;
467        self.client
468            .get()
469            .invite_code(
470                *cfg.api_endpoints
471                    .keys()
472                    .next()
473                    .expect("A federation always has at least one guardian"),
474            )
475            .await
476            .expect("The guardian we requested an invite code for exists")
477    }
478
479    pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
480        self.client.get().get_internal_payment_markers()
481    }
482
483    /// This method starts n state machines with given operation id without a
484    /// corresponding transaction
485    pub async fn manual_operation_start(
486        &self,
487        operation_id: OperationId,
488        op_type: &str,
489        operation_meta: impl serde::Serialize + Debug,
490        sms: Vec<DynState>,
491    ) -> anyhow::Result<()> {
492        let db = self.module_db();
493        let mut dbtx = db.begin_transaction().await;
494        {
495            let dbtx = &mut dbtx.global_dbtx(self.global_dbtx_access_token);
496
497            self.manual_operation_start_inner(
498                &mut dbtx.to_ref_nc(),
499                operation_id,
500                op_type,
501                operation_meta,
502                sms,
503            )
504            .await?;
505        }
506
507        dbtx.commit_tx_result().await.map_err(|_| {
508            anyhow!(
509                "Operation with id {} already exists",
510                operation_id.fmt_short()
511            )
512        })?;
513
514        Ok(())
515    }
516
517    pub async fn manual_operation_start_dbtx(
518        &self,
519        dbtx: &mut DatabaseTransaction<'_>,
520        operation_id: OperationId,
521        op_type: &str,
522        operation_meta: impl serde::Serialize + Debug,
523        sms: Vec<DynState>,
524    ) -> anyhow::Result<()> {
525        self.manual_operation_start_inner(
526            &mut dbtx.global_dbtx(self.global_dbtx_access_token),
527            operation_id,
528            op_type,
529            operation_meta,
530            sms,
531        )
532        .await
533    }
534
535    /// See [`Self::manual_operation_start`], just inside a database
536    /// transaction.
537    async fn manual_operation_start_inner(
538        &self,
539        dbtx: &mut DatabaseTransaction<'_>,
540        operation_id: OperationId,
541        op_type: &str,
542        operation_meta: impl serde::Serialize + Debug,
543        sms: Vec<DynState>,
544    ) -> anyhow::Result<()> {
545        dbtx.ensure_global()
546            .expect("Must deal with global dbtx here");
547
548        if self
549            .client
550            .get()
551            .operation_log()
552            .get_operation_dbtx(&mut dbtx.to_ref_nc(), operation_id)
553            .await
554            .is_some()
555        {
556            bail!(
557                "Operation with id {} already exists",
558                operation_id.fmt_short()
559            );
560        }
561
562        self.client
563            .get()
564            .operation_log()
565            .add_operation_log_entry_dbtx(
566                &mut dbtx.to_ref_nc(),
567                operation_id,
568                op_type,
569                serde_json::to_value(operation_meta).expect("Can't fail"),
570            )
571            .await;
572
573        self.client
574            .get()
575            .executor()
576            .add_state_machines_dbtx(&mut dbtx.to_ref_nc(), sms)
577            .await
578            .expect("State machine is valid");
579
580        Ok(())
581    }
582
583    pub fn outcome_or_updates<U, S>(
584        &self,
585        operation: OperationLogEntry,
586        operation_id: OperationId,
587        stream_gen: impl FnOnce() -> S + 'static,
588    ) -> UpdateStreamOrOutcome<U>
589    where
590        U: Clone + Serialize + DeserializeOwned + Debug + MaybeSend + MaybeSync + 'static,
591        S: Stream<Item = U> + MaybeSend + 'static,
592    {
593        use futures::StreamExt;
594        match self.client.get().operation_log().outcome_or_updates(
595            &self.global_db(),
596            operation_id,
597            operation,
598            Box::new(move || {
599                let stream_gen = stream_gen();
600                Box::pin(
601                    stream_gen.map(move |item| serde_json::to_value(item).expect("Can't fail")),
602                )
603            }),
604        ) {
605            UpdateStreamOrOutcome::UpdateStream(stream) => UpdateStreamOrOutcome::UpdateStream(
606                Box::pin(stream.map(|u| serde_json::from_value(u).expect("Can't fail"))),
607            ),
608            UpdateStreamOrOutcome::Outcome(o) => {
609                UpdateStreamOrOutcome::Outcome(serde_json::from_value(o).expect("Can't fail"))
610            }
611        }
612    }
613
614    pub async fn claim_inputs<I, S>(
615        &self,
616        dbtx: &mut DatabaseTransaction<'_>,
617        inputs: ClientInputBundle<I, S>,
618        operation_id: OperationId,
619    ) -> anyhow::Result<OutPointRange>
620    where
621        I: IInput + MaybeSend + MaybeSync + 'static,
622        S: sm::IState + MaybeSend + MaybeSync + 'static,
623    {
624        self.claim_inputs_dyn(dbtx, inputs.into_instanceless(), operation_id)
625            .await
626    }
627
628    async fn claim_inputs_dyn(
629        &self,
630        dbtx: &mut DatabaseTransaction<'_>,
631        inputs: InstancelessDynClientInputBundle,
632        operation_id: OperationId,
633    ) -> anyhow::Result<OutPointRange> {
634        let tx_builder =
635            TransactionBuilder::new().with_inputs(inputs.into_dyn(self.module_instance_id));
636
637        self.client
638            .get()
639            .finalize_and_submit_transaction_inner(
640                &mut dbtx.global_dbtx(self.global_dbtx_access_token),
641                operation_id,
642                tx_builder,
643            )
644            .await
645    }
646
647    pub async fn add_state_machines_dbtx(
648        &self,
649        dbtx: &mut DatabaseTransaction<'_>,
650        states: Vec<DynState>,
651    ) -> AddStateMachinesResult {
652        self.client
653            .get()
654            .executor()
655            .add_state_machines_dbtx(&mut dbtx.global_dbtx(self.global_dbtx_access_token), states)
656            .await
657    }
658
659    pub async fn add_operation_log_entry_dbtx(
660        &self,
661        dbtx: &mut DatabaseTransaction<'_>,
662        operation_id: OperationId,
663        operation_type: &str,
664        operation_meta: impl serde::Serialize,
665    ) {
666        self.client
667            .get()
668            .operation_log()
669            .add_operation_log_entry_dbtx(
670                &mut dbtx.global_dbtx(self.global_dbtx_access_token),
671                operation_id,
672                operation_type,
673                serde_json::to_value(operation_meta).expect("Can't fail"),
674            )
675            .await;
676    }
677
678    pub async fn log_event<E, Cap>(&self, dbtx: &mut DatabaseTransaction<'_, Cap>, event: E)
679    where
680        E: Event + Send,
681        Cap: Send,
682    {
683        if <E as Event>::MODULE != Some(<M as ClientModule>::kind()) {
684            warn!(
685                target: LOG_CLIENT,
686                module_kind = %<M as ClientModule>::kind(),
687                event_module = ?<E as Event>::MODULE,
688                "Client module logging events of different module than its own. This might become an error in the future."
689            );
690        }
691        self.client
692            .get()
693            .log_event_json(
694                &mut dbtx.global_dbtx(self.global_dbtx_access_token).to_ref_nc(),
695                <E as Event>::MODULE,
696                self.module_instance_id,
697                <E as Event>::KIND,
698                serde_json::to_value(event).expect("Can't fail"),
699                <E as Event>::PERSISTENCE,
700            )
701            .await;
702    }
703}
704
705/// Fedimint module client
706#[apply(async_trait_maybe_send!)]
707pub trait ClientModule: Debug + MaybeSend + MaybeSync + 'static {
708    type Init: ClientModuleInit;
709
710    /// Common module types shared between client and server
711    type Common: ModuleCommon;
712
713    /// Data stored in regular backups so that restoring doesn't have to start
714    /// from epoch 0
715    type Backup: ModuleBackup;
716
717    /// Data and API clients available to state machine transitions of this
718    /// module
719    type ModuleStateMachineContext: Context;
720
721    /// All possible states this client can submit to the executor
722    type States: State<ModuleContext = Self::ModuleStateMachineContext>
723        + IntoDynInstance<DynType = DynState>;
724
725    fn decoder() -> Decoder {
726        let mut decoder_builder = Self::Common::decoder_builder();
727        decoder_builder.with_decodable_type::<Self::States>();
728        decoder_builder.with_decodable_type::<Self::Backup>();
729        decoder_builder.build()
730    }
731
732    fn kind() -> ModuleKind {
733        <<<Self as ClientModule>::Init as ModuleInit>::Common as CommonModuleInit>::KIND
734    }
735
736    fn context(&self) -> Self::ModuleStateMachineContext;
737
738    /// Initialize client.
739    ///
740    /// Called by the core client code on start, after [`ClientContext`] is
741    /// fully initialized, so unlike during [`ClientModuleInit::init`],
742    /// access to global client is allowed.
743    async fn start(&self) {}
744
745    async fn handle_cli_command(
746        &self,
747        _args: &[ffi::OsString],
748    ) -> anyhow::Result<serde_json::Value> {
749        Err(anyhow::format_err!(
750            "This module does not implement cli commands"
751        ))
752    }
753
754    async fn handle_rpc(
755        &self,
756        _method: String,
757        _request: serde_json::Value,
758    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
759        Box::pin(futures::stream::once(std::future::ready(Err(
760            anyhow::format_err!("This module does not implement rpc"),
761        ))))
762    }
763
764    /// Returns the fee the processing of this input requires.
765    ///
766    /// If the semantics of a given input aren't known this function returns
767    /// `None`, this only happens if a future version of Fedimint introduces a
768    /// new input variant. For clients this should only be the case when
769    /// processing transactions created by other users, so the result of
770    /// this function can be `unwrap`ped whenever dealing with inputs
771    /// generated by ourselves.
772    fn input_fee(
773        &self,
774        amount: Amount,
775        input: &<Self::Common as ModuleCommon>::Input,
776    ) -> Option<Amount>;
777
778    /// Returns the fee the processing of this output requires.
779    ///
780    /// If the semantics of a given output aren't known this function returns
781    /// `None`, this only happens if a future version of Fedimint introduces a
782    /// new output variant. For clients this should only be the case when
783    /// processing transactions created by other users, so the result of
784    /// this function can be `unwrap`ped whenever dealing with inputs
785    /// generated by ourselves.
786    fn output_fee(
787        &self,
788        amount: Amount,
789        output: &<Self::Common as ModuleCommon>::Output,
790    ) -> Option<Amount>;
791
792    fn supports_backup(&self) -> bool {
793        false
794    }
795
796    async fn backup(&self) -> anyhow::Result<Self::Backup> {
797        anyhow::bail!("Backup not supported");
798    }
799
800    /// Does this module support being a primary module
801    ///
802    /// If it does it must implement:
803    ///
804    /// * [`Self::create_final_inputs_and_outputs`]
805    /// * [`Self::await_primary_module_output`]
806    /// * [`Self::get_balance`]
807    /// * [`Self::subscribe_balance_changes`]
808    fn supports_being_primary(&self) -> bool {
809        false
810    }
811
812    /// Creates all inputs and outputs necessary to balance the transaction.
813    /// The function returns an error if and only if the client's funds are not
814    /// sufficient to create the inputs necessary to fully fund the transaction.
815    ///
816    /// A returned input also contains:
817    /// * A set of private keys belonging to the input for signing the
818    ///   transaction
819    /// * A closure that generates states belonging to the input. This closure
820    ///   takes the transaction id of the transaction in which the input was
821    ///   used and the input index as input since these cannot be known at time
822    ///   of calling `create_funding_input` and have to be injected later.
823    ///
824    /// A returned output also contains:
825    /// * A closure that generates states belonging to the output. This closure
826    ///   takes the transaction id of the transaction in which the output was
827    ///   used and the output index as input since these cannot be known at time
828    ///   of calling `create_change_output` and have to be injected later.
829    async fn create_final_inputs_and_outputs(
830        &self,
831        _dbtx: &mut DatabaseTransaction<'_>,
832        _operation_id: OperationId,
833        _input_amount: Amount,
834        _output_amount: Amount,
835    ) -> anyhow::Result<(
836        ClientInputBundle<<Self::Common as ModuleCommon>::Input, Self::States>,
837        ClientOutputBundle<<Self::Common as ModuleCommon>::Output, Self::States>,
838    )> {
839        unimplemented!()
840    }
841
842    /// Waits for the funds from an output created by
843    /// [`Self::create_final_inputs_and_outputs`] to become available. This
844    /// function returning typically implies a change in the output of
845    /// [`Self::get_balance`].
846    async fn await_primary_module_output(
847        &self,
848        _operation_id: OperationId,
849        _out_point: OutPoint,
850    ) -> anyhow::Result<()> {
851        unimplemented!()
852    }
853
854    /// Returns the balance held by this module and available for funding
855    /// transactions.
856    async fn get_balance(&self, _dbtx: &mut DatabaseTransaction<'_>) -> Amount {
857        unimplemented!()
858    }
859
860    /// Returns a stream that will output the updated module balance each time
861    /// it changes.
862    async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
863        unimplemented!()
864    }
865
866    /// Leave the federation
867    ///
868    /// While technically there's nothing stopping the client from just
869    /// abandoning Federation at any point by deleting all the related
870    /// local data, it is useful to make sure it's safe beforehand.
871    ///
872    /// This call indicates the desire of the caller client code
873    /// to orderly and safely leave the Federation by this module instance.
874    /// The goal of the implementations is to fulfil that wish,
875    /// giving prompt and informative feedback if it's not yet possible.
876    ///
877    /// The client module implementation should handle the request
878    /// and return as fast as possible avoiding blocking for longer than
879    /// necessary. This would usually involve some combination of:
880    ///
881    /// * recording the state of being in process of leaving the Federation to
882    ///   prevent initiating new conditions that could delay its completion;
883    /// * performing any fast to complete cleanup/exit logic;
884    /// * initiating any time-consuming logic (e.g. canceling outstanding
885    ///   contracts), as background jobs, tasks machines, etc.
886    /// * checking for any conditions indicating it might not be safe to leave
887    ///   at the moment.
888    ///
889    /// This function should return `Ok` only if from the perspective
890    /// of this module instance, it is safe to delete client data and
891    /// stop using it, with no further actions (like background jobs) required
892    /// to complete.
893    ///
894    /// This function should return an error if it's not currently possible
895    /// to safely (e.g. without loosing funds) leave the Federation.
896    /// It should avoid running indefinitely trying to complete any cleanup
897    /// actions necessary to reach a clean state, preferring spawning new
898    /// state machines and returning an informative error about cleanup
899    /// still in progress.
900    ///
901    /// If any internal task needs to complete, any user action is required,
902    /// or even external condition needs to be met this function
903    /// should return a `Err`.
904    ///
905    /// Notably modules should not disable interaction that might be necessary
906    /// for the user (possibly through other modules) to leave the Federation.
907    /// In particular a Mint module should retain ability to create new notes,
908    /// and LN module should retain ability to send funds out.
909    ///
910    /// Calling code must NOT assume that a module that once returned `Ok`,
911    /// will not return `Err` at later point. E.g. a Mint module might have
912    /// no outstanding balance at first, but other modules winding down
913    /// might "cash-out" to Ecash.
914    ///
915    /// Before leaving the Federation and deleting any state the calling code
916    /// must collect a full round of `Ok` from all the modules.
917    ///
918    /// Calling code should allow the user to override and ignore any
919    /// outstanding errors, after sufficient amount of warnings. Ideally,
920    /// this should be done on per-module basis, to avoid mistakes.
921    async fn leave(&self, _dbtx: &mut DatabaseTransaction<'_>) -> anyhow::Result<()> {
922        bail!("Unable to determine if safe to leave the federation: Not implemented")
923    }
924}
925
926/// Type-erased version of [`ClientModule`]
927#[apply(async_trait_maybe_send!)]
928pub trait IClientModule: Debug {
929    fn as_any(&self) -> &(maybe_add_send_sync!(dyn std::any::Any));
930
931    fn decoder(&self) -> Decoder;
932
933    fn context(&self, instance: ModuleInstanceId) -> DynContext;
934
935    async fn start(&self);
936
937    async fn handle_cli_command(&self, args: &[ffi::OsString])
938    -> anyhow::Result<serde_json::Value>;
939
940    async fn handle_rpc(
941        &self,
942        method: String,
943        request: serde_json::Value,
944    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>>;
945
946    fn input_fee(&self, amount: Amount, input: &DynInput) -> Option<Amount>;
947
948    fn output_fee(&self, amount: Amount, output: &DynOutput) -> Option<Amount>;
949
950    fn supports_backup(&self) -> bool;
951
952    async fn backup(&self, module_instance_id: ModuleInstanceId)
953    -> anyhow::Result<DynModuleBackup>;
954
955    fn supports_being_primary(&self) -> bool;
956
957    async fn create_final_inputs_and_outputs(
958        &self,
959        module_instance: ModuleInstanceId,
960        dbtx: &mut DatabaseTransaction<'_>,
961        operation_id: OperationId,
962        input_amount: Amount,
963        output_amount: Amount,
964    ) -> anyhow::Result<(ClientInputBundle, ClientOutputBundle)>;
965
966    async fn await_primary_module_output(
967        &self,
968        operation_id: OperationId,
969        out_point: OutPoint,
970    ) -> anyhow::Result<()>;
971
972    async fn get_balance(
973        &self,
974        module_instance: ModuleInstanceId,
975        dbtx: &mut DatabaseTransaction<'_>,
976    ) -> Amount;
977
978    async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()>;
979}
980
981#[apply(async_trait_maybe_send!)]
982impl<T> IClientModule for T
983where
984    T: ClientModule,
985{
986    fn as_any(&self) -> &(maybe_add_send_sync!(dyn Any)) {
987        self
988    }
989
990    fn decoder(&self) -> Decoder {
991        T::decoder()
992    }
993
994    fn context(&self, instance: ModuleInstanceId) -> DynContext {
995        DynContext::from_typed(instance, <T as ClientModule>::context(self))
996    }
997
998    async fn start(&self) {
999        <T as ClientModule>::start(self).await;
1000    }
1001
1002    async fn handle_cli_command(
1003        &self,
1004        args: &[ffi::OsString],
1005    ) -> anyhow::Result<serde_json::Value> {
1006        <T as ClientModule>::handle_cli_command(self, args).await
1007    }
1008
1009    async fn handle_rpc(
1010        &self,
1011        method: String,
1012        request: serde_json::Value,
1013    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1014        <T as ClientModule>::handle_rpc(self, method, request).await
1015    }
1016
1017    fn input_fee(&self, amount: Amount, input: &DynInput) -> Option<Amount> {
1018        <T as ClientModule>::input_fee(
1019            self,
1020            amount,
1021            input
1022                .as_any()
1023                .downcast_ref()
1024                .expect("Dispatched to correct module"),
1025        )
1026    }
1027
1028    fn output_fee(&self, amount: Amount, output: &DynOutput) -> Option<Amount> {
1029        <T as ClientModule>::output_fee(
1030            self,
1031            amount,
1032            output
1033                .as_any()
1034                .downcast_ref()
1035                .expect("Dispatched to correct module"),
1036        )
1037    }
1038
1039    fn supports_backup(&self) -> bool {
1040        <T as ClientModule>::supports_backup(self)
1041    }
1042
1043    async fn backup(
1044        &self,
1045        module_instance_id: ModuleInstanceId,
1046    ) -> anyhow::Result<DynModuleBackup> {
1047        Ok(DynModuleBackup::from_typed(
1048            module_instance_id,
1049            <T as ClientModule>::backup(self).await?,
1050        ))
1051    }
1052
1053    fn supports_being_primary(&self) -> bool {
1054        <T as ClientModule>::supports_being_primary(self)
1055    }
1056
1057    async fn create_final_inputs_and_outputs(
1058        &self,
1059        module_instance: ModuleInstanceId,
1060        dbtx: &mut DatabaseTransaction<'_>,
1061        operation_id: OperationId,
1062        input_amount: Amount,
1063        output_amount: Amount,
1064    ) -> anyhow::Result<(ClientInputBundle, ClientOutputBundle)> {
1065        let (inputs, outputs) = <T as ClientModule>::create_final_inputs_and_outputs(
1066            self,
1067            &mut dbtx.to_ref_with_prefix_module_id(module_instance).0,
1068            operation_id,
1069            input_amount,
1070            output_amount,
1071        )
1072        .await?;
1073
1074        let inputs = inputs.into_dyn(module_instance);
1075
1076        let outputs = outputs.into_dyn(module_instance);
1077
1078        Ok((inputs, outputs))
1079    }
1080
1081    async fn await_primary_module_output(
1082        &self,
1083        operation_id: OperationId,
1084        out_point: OutPoint,
1085    ) -> anyhow::Result<()> {
1086        <T as ClientModule>::await_primary_module_output(self, operation_id, out_point).await
1087    }
1088
1089    async fn get_balance(
1090        &self,
1091        module_instance: ModuleInstanceId,
1092        dbtx: &mut DatabaseTransaction<'_>,
1093    ) -> Amount {
1094        <T as ClientModule>::get_balance(
1095            self,
1096            &mut dbtx.to_ref_with_prefix_module_id(module_instance).0,
1097        )
1098        .await
1099    }
1100
1101    async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
1102        <T as ClientModule>::subscribe_balance_changes(self).await
1103    }
1104}
1105
1106dyn_newtype_define!(
1107    #[derive(Clone)]
1108    pub DynClientModule(Arc<IClientModule>)
1109);
1110
1111impl AsRef<maybe_add_send_sync!(dyn IClientModule + 'static)> for DynClientModule {
1112    fn as_ref(&self) -> &maybe_add_send_sync!(dyn IClientModule + 'static) {
1113        self.inner.as_ref()
1114    }
1115}
1116
1117/// A contiguous range of input/output indexes
1118#[derive(Copy, Clone, Encodable, Decodable, Serialize, Deserialize, PartialEq, Eq, Hash, Debug)]
1119pub struct IdxRange {
1120    start: u64,
1121    end: u64,
1122}
1123
1124impl IdxRange {
1125    pub fn new_single(start: u64) -> Option<Self> {
1126        start.checked_add(1).map(|end| Self { start, end })
1127    }
1128
1129    pub fn start(self) -> u64 {
1130        self.start
1131    }
1132
1133    pub fn count(self) -> usize {
1134        self.into_iter().count()
1135    }
1136
1137    pub fn from_inclusive(range: ops::RangeInclusive<u64>) -> Option<Self> {
1138        range.end().checked_add(1).map(|end| Self {
1139            start: *range.start(),
1140            end,
1141        })
1142    }
1143}
1144
1145impl From<Range<u64>> for IdxRange {
1146    fn from(Range { start, end }: Range<u64>) -> Self {
1147        Self { start, end }
1148    }
1149}
1150
1151impl IntoIterator for IdxRange {
1152    type Item = u64;
1153
1154    type IntoIter = ops::Range<u64>;
1155
1156    fn into_iter(self) -> Self::IntoIter {
1157        ops::Range {
1158            start: self.start,
1159            end: self.end,
1160        }
1161    }
1162}
1163
1164#[derive(Copy, Clone, Encodable, Decodable, Serialize, Deserialize, PartialEq, Eq, Hash, Debug)]
1165pub struct OutPointRange {
1166    pub txid: TransactionId,
1167    idx_range: IdxRange,
1168}
1169
1170impl OutPointRange {
1171    pub fn new(txid: TransactionId, idx_range: IdxRange) -> Self {
1172        Self { txid, idx_range }
1173    }
1174
1175    pub fn new_single(txid: TransactionId, idx: u64) -> Option<Self> {
1176        IdxRange::new_single(idx).map(|idx_range| Self { txid, idx_range })
1177    }
1178
1179    pub fn start_idx(self) -> u64 {
1180        self.idx_range.start()
1181    }
1182
1183    pub fn out_idx_iter(self) -> impl Iterator<Item = u64> {
1184        self.idx_range.into_iter()
1185    }
1186
1187    pub fn count(self) -> usize {
1188        self.idx_range.count()
1189    }
1190}
1191
1192impl IntoIterator for OutPointRange {
1193    type Item = OutPoint;
1194
1195    type IntoIter = OutPointRangeIter;
1196
1197    fn into_iter(self) -> Self::IntoIter {
1198        OutPointRangeIter {
1199            txid: self.txid,
1200            inner: self.idx_range.into_iter(),
1201        }
1202    }
1203}
1204
1205pub struct OutPointRangeIter {
1206    txid: TransactionId,
1207
1208    inner: ops::Range<u64>,
1209}
1210
1211impl OutPointRange {
1212    pub fn txid(&self) -> TransactionId {
1213        self.txid
1214    }
1215}
1216
1217impl Iterator for OutPointRangeIter {
1218    type Item = OutPoint;
1219
1220    fn next(&mut self) -> Option<Self::Item> {
1221        self.inner.next().map(|idx| OutPoint {
1222            txid: self.txid,
1223            out_idx: idx,
1224        })
1225    }
1226}
1227
1228pub type StateGenerator<S> = Arc<maybe_add_send_sync!(dyn Fn(OutPointRange) -> Vec<S> + 'static)>;