fedimint_client_module/module/
mod.rs

1use core::fmt;
2use std::any::Any;
3use std::collections::BTreeSet;
4use std::fmt::Debug;
5use std::pin::Pin;
6use std::sync::{Arc, Weak};
7use std::{ffi, marker, ops};
8
9use anyhow::{anyhow, bail};
10use bitcoin::secp256k1::PublicKey;
11use fedimint_api_client::api::{DynGlobalApi, DynModuleApi};
12use fedimint_core::config::ClientConfig;
13use fedimint_core::core::{
14    Decoder, DynInput, DynOutput, IInput, IntoDynInstance, ModuleInstanceId, ModuleKind,
15    OperationId,
16};
17use fedimint_core::db::{Database, DatabaseTransaction, GlobalDBTxAccessToken, NonCommittable};
18use fedimint_core::invite_code::InviteCode;
19use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
20use fedimint_core::module::{AmountUnit, Amounts, CommonModuleInit, ModuleCommon, ModuleInit};
21use fedimint_core::task::{MaybeSend, MaybeSync};
22use fedimint_core::util::BoxStream;
23use fedimint_core::{
24    Amount, OutPoint, PeerId, apply, async_trait_maybe_send, dyn_newtype_define, maybe_add_send,
25    maybe_add_send_sync,
26};
27use fedimint_eventlog::{Event, EventKind, EventPersistence};
28use fedimint_logging::LOG_CLIENT;
29use futures::Stream;
30use serde::Serialize;
31use serde::de::DeserializeOwned;
32use tracing::warn;
33
34use self::init::ClientModuleInit;
35use crate::module::recovery::{DynModuleBackup, ModuleBackup};
36use crate::oplog::{IOperationLog, OperationLogEntry, UpdateStreamOrOutcome};
37use crate::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
38use crate::sm::{self, ActiveStateMeta, Context, DynContext, DynState, InactiveStateMeta, State};
39use crate::transaction::{ClientInputBundle, ClientOutputBundle, TransactionBuilder};
40use crate::{AddStateMachinesResult, InstancelessDynClientInputBundle, TransactionUpdates, oplog};
41
42pub mod init;
43pub mod recovery;
44
45pub type ClientModuleRegistry = ModuleRegistry<DynClientModule>;
46
47/// A fedimint-client interface exposed to client modules
48///
49/// To break the dependency of the client modules on the whole fedimint client
50/// and in particular the `fedimint-client` crate, the module gets access to an
51/// interface, that is implemented by the `Client`.
52///
53/// This allows lose coupling, less recompilation and better control and
54/// understanding of what functionality of the Client the modules get access to.
55#[apply(async_trait_maybe_send!)]
56pub trait ClientContextIface: MaybeSend + MaybeSync {
57    fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule);
58    fn api_clone(&self) -> DynGlobalApi;
59    fn decoders(&self) -> &ModuleDecoderRegistry;
60    async fn finalize_and_submit_transaction(
61        &self,
62        operation_id: OperationId,
63        operation_type: &str,
64        operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
65        tx_builder: TransactionBuilder,
66    ) -> anyhow::Result<OutPointRange>;
67
68    // TODO: unify
69    async fn finalize_and_submit_transaction_inner(
70        &self,
71        dbtx: &mut DatabaseTransaction<'_>,
72        operation_id: OperationId,
73        tx_builder: TransactionBuilder,
74    ) -> anyhow::Result<OutPointRange>;
75
76    async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates;
77
78    async fn await_primary_module_outputs(
79        &self,
80        operation_id: OperationId,
81        // TODO: make `impl Iterator<Item = ...>`
82        outputs: Vec<OutPoint>,
83    ) -> anyhow::Result<()>;
84
85    fn operation_log(&self) -> &dyn IOperationLog;
86
87    async fn has_active_states(&self, operation_id: OperationId) -> bool;
88
89    async fn operation_exists(&self, operation_id: OperationId) -> bool;
90
91    async fn config(&self) -> ClientConfig;
92
93    fn db(&self) -> &Database;
94
95    fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static));
96
97    async fn invite_code(&self, peer: PeerId) -> Option<InviteCode>;
98
99    fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)>;
100
101    #[allow(clippy::too_many_arguments)]
102    async fn log_event_json(
103        &self,
104        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
105        module_kind: Option<ModuleKind>,
106        module_id: ModuleInstanceId,
107        kind: EventKind,
108        payload: serde_json::Value,
109        persist: EventPersistence,
110    );
111
112    async fn read_operation_active_states<'dbtx>(
113        &self,
114        operation_id: OperationId,
115        module_id: ModuleInstanceId,
116        dbtx: &'dbtx mut DatabaseTransaction<'_>,
117    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>;
118
119    async fn read_operation_inactive_states<'dbtx>(
120        &self,
121        operation_id: OperationId,
122        module_id: ModuleInstanceId,
123        dbtx: &'dbtx mut DatabaseTransaction<'_>,
124    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>;
125}
126
127/// A final, fully initialized client
128///
129/// Client modules need to be able to access a `Client` they are a part
130/// of. To break the circular dependency, the final `Client` is passed
131/// after `Client` was built via a shared state.
132#[derive(Clone, Default)]
133pub struct FinalClientIface(Arc<std::sync::OnceLock<Weak<dyn ClientContextIface>>>);
134
135impl FinalClientIface {
136    /// Get a temporary strong reference to [`ClientContextIface`]
137    ///
138    /// Care must be taken to not let the user take ownership of this value,
139    /// and not store it elsewhere permanently either, as it could prevent
140    /// the cleanup of the Client.
141    pub(crate) fn get(&self) -> Arc<dyn ClientContextIface> {
142        self.0
143            .get()
144            .expect("client must be already set")
145            .upgrade()
146            .expect("client module context must not be use past client shutdown")
147    }
148
149    pub fn set(&self, client: Weak<dyn ClientContextIface>) {
150        self.0.set(client).expect("FinalLazyClient already set");
151    }
152}
153
154impl fmt::Debug for FinalClientIface {
155    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156        f.write_str("FinalClientIface")
157    }
158}
159/// A Client context for a [`ClientModule`] `M`
160///
161/// Client modules can interact with the whole
162/// client through this struct.
163pub struct ClientContext<M> {
164    client: FinalClientIface,
165    module_instance_id: ModuleInstanceId,
166    global_dbtx_access_token: GlobalDBTxAccessToken,
167    module_db: Database,
168    _marker: marker::PhantomData<M>,
169}
170
171impl<M> Clone for ClientContext<M> {
172    fn clone(&self) -> Self {
173        Self {
174            client: self.client.clone(),
175            module_db: self.module_db.clone(),
176            module_instance_id: self.module_instance_id,
177            _marker: marker::PhantomData,
178            global_dbtx_access_token: self.global_dbtx_access_token,
179        }
180    }
181}
182
183/// A reference back to itself that the module cacn get from the
184/// [`ClientContext`]
185pub struct ClientContextSelfRef<'s, M> {
186    // we are OK storing `ClientStrong` here, because of the `'s` preventing `Self` from being
187    // stored permanently somewhere
188    client: Arc<dyn ClientContextIface>,
189    module_instance_id: ModuleInstanceId,
190    _marker: marker::PhantomData<&'s M>,
191}
192
193impl<M> ops::Deref for ClientContextSelfRef<'_, M>
194where
195    M: ClientModule,
196{
197    type Target = M;
198
199    fn deref(&self) -> &Self::Target {
200        self.client
201            .get_module(self.module_instance_id)
202            .as_any()
203            .downcast_ref::<M>()
204            .unwrap_or_else(|| panic!("Module is not of type {}", std::any::type_name::<M>()))
205    }
206}
207
208impl<M> fmt::Debug for ClientContext<M> {
209    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
210        f.write_str("ClientContext")
211    }
212}
213
214impl<M> ClientContext<M>
215where
216    M: ClientModule,
217{
218    pub fn new(
219        client: FinalClientIface,
220        module_instance_id: ModuleInstanceId,
221        global_dbtx_access_token: GlobalDBTxAccessToken,
222        module_db: Database,
223    ) -> Self {
224        Self {
225            client,
226            module_instance_id,
227            global_dbtx_access_token,
228            module_db,
229            _marker: marker::PhantomData,
230        }
231    }
232
233    /// Get a reference back to client module from the [`Self`]
234    ///
235    /// It's often necessary for a client module to "move self"
236    /// by-value, especially due to async lifetimes issues.
237    /// Clients usually work with `&mut self`, which can't really
238    /// work in such context.
239    ///
240    /// Fortunately [`ClientContext`] is `Clone` and `Send, and
241    /// can be used to recover the reference to the module at later
242    /// time.
243    #[allow(clippy::needless_lifetimes)] // just for explicitiness
244    pub fn self_ref(&self) -> ClientContextSelfRef<'_, M> {
245        ClientContextSelfRef {
246            client: self.client.get(),
247            module_instance_id: self.module_instance_id,
248            _marker: marker::PhantomData,
249        }
250    }
251
252    /// Get a reference to a global Api handle
253    pub fn global_api(&self) -> DynGlobalApi {
254        self.client.get().api_clone()
255    }
256
257    /// Get a reference to a module Api handle
258    pub fn module_api(&self) -> DynModuleApi {
259        self.global_api().with_module(self.module_instance_id)
260    }
261
262    /// A set of all decoders of all modules of the client
263    pub fn decoders(&self) -> ModuleDecoderRegistry {
264        Clone::clone(self.client.get().decoders())
265    }
266
267    pub fn input_from_dyn<'i>(
268        &self,
269        input: &'i DynInput,
270    ) -> Option<&'i <M::Common as ModuleCommon>::Input> {
271        (input.module_instance_id() == self.module_instance_id).then(|| {
272            input
273                .as_any()
274                .downcast_ref::<<M::Common as ModuleCommon>::Input>()
275                .unwrap_or_else(|| {
276                    panic!("instance_id {} just checked", input.module_instance_id())
277                })
278        })
279    }
280
281    pub fn output_from_dyn<'o>(
282        &self,
283        output: &'o DynOutput,
284    ) -> Option<&'o <M::Common as ModuleCommon>::Output> {
285        (output.module_instance_id() == self.module_instance_id).then(|| {
286            output
287                .as_any()
288                .downcast_ref::<<M::Common as ModuleCommon>::Output>()
289                .unwrap_or_else(|| {
290                    panic!("instance_id {} just checked", output.module_instance_id())
291                })
292        })
293    }
294
295    pub fn map_dyn<'s, 'i, 'o, I>(
296        &'s self,
297        typed: impl IntoIterator<Item = I> + 'i,
298    ) -> impl Iterator<Item = <I as IntoDynInstance>::DynType> + 'o
299    where
300        I: IntoDynInstance,
301        'i: 'o,
302        's: 'o,
303    {
304        typed.into_iter().map(|i| self.make_dyn(i))
305    }
306
307    /// Turn a typed output into a dyn version
308    pub fn make_dyn_output(&self, output: <M::Common as ModuleCommon>::Output) -> DynOutput {
309        self.make_dyn(output)
310    }
311
312    /// Turn a typed input into a dyn version
313    pub fn make_dyn_input(&self, input: <M::Common as ModuleCommon>::Input) -> DynInput {
314        self.make_dyn(input)
315    }
316
317    /// Turn a `typed` into a dyn version
318    pub fn make_dyn<I>(&self, typed: I) -> <I as IntoDynInstance>::DynType
319    where
320        I: IntoDynInstance,
321    {
322        typed.into_dyn(self.module_instance_id)
323    }
324
325    /// Turn a typed [`ClientOutputBundle`] into a dyn version
326    pub fn make_client_outputs<O, S>(&self, output: ClientOutputBundle<O, S>) -> ClientOutputBundle
327    where
328        O: IntoDynInstance<DynType = DynOutput> + 'static,
329        S: IntoDynInstance<DynType = DynState> + 'static,
330    {
331        self.make_dyn(output)
332    }
333
334    /// Turn a typed [`ClientInputBundle`] into a dyn version
335    pub fn make_client_inputs<I, S>(&self, inputs: ClientInputBundle<I, S>) -> ClientInputBundle
336    where
337        I: IntoDynInstance<DynType = DynInput> + 'static,
338        S: IntoDynInstance<DynType = DynState> + 'static,
339    {
340        self.make_dyn(inputs)
341    }
342
343    pub fn make_dyn_state<S>(&self, sm: S) -> DynState
344    where
345        S: sm::IState + 'static,
346    {
347        DynState::from_typed(self.module_instance_id, sm)
348    }
349
350    pub async fn finalize_and_submit_transaction<F, Meta>(
351        &self,
352        operation_id: OperationId,
353        operation_type: &str,
354        operation_meta_gen: F,
355        tx_builder: TransactionBuilder,
356    ) -> anyhow::Result<OutPointRange>
357    where
358        F: Fn(OutPointRange) -> Meta + Clone + MaybeSend + MaybeSync + 'static,
359        Meta: serde::Serialize + MaybeSend,
360    {
361        self.client
362            .get()
363            .finalize_and_submit_transaction(
364                operation_id,
365                operation_type,
366                Box::new(move |out_point_range| {
367                    serde_json::to_value(operation_meta_gen(out_point_range)).expect("Can't fail")
368                }),
369                tx_builder,
370            )
371            .await
372    }
373
374    pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
375        self.client.get().transaction_updates(operation_id).await
376    }
377
378    pub async fn await_primary_module_outputs(
379        &self,
380        operation_id: OperationId,
381        // TODO: make `impl Iterator<Item = ...>`
382        outputs: Vec<OutPoint>,
383    ) -> anyhow::Result<()> {
384        self.client
385            .get()
386            .await_primary_module_outputs(operation_id, outputs)
387            .await
388    }
389
390    // TODO: unify with `Self::get_operation`
391    pub async fn get_operation(
392        &self,
393        operation_id: OperationId,
394    ) -> anyhow::Result<oplog::OperationLogEntry> {
395        let operation = self
396            .client
397            .get()
398            .operation_log()
399            .get_operation(operation_id)
400            .await
401            .ok_or(anyhow::anyhow!("Operation not found"))?;
402
403        if operation.operation_module_kind() != M::kind().as_str() {
404            bail!("Operation is not a lightning operation");
405        }
406
407        Ok(operation)
408    }
409
410    /// Get global db.
411    ///
412    /// Only intended for internal use (private).
413    fn global_db(&self) -> fedimint_core::db::Database {
414        let db = Clone::clone(self.client.get().db());
415
416        db.ensure_global()
417            .expect("global_db must always return a global db");
418
419        db
420    }
421
422    pub fn module_db(&self) -> &Database {
423        self.module_db
424            .ensure_isolated()
425            .expect("module_db must always return isolated db");
426        &self.module_db
427    }
428
429    pub async fn has_active_states(&self, op_id: OperationId) -> bool {
430        self.client.get().has_active_states(op_id).await
431    }
432
433    pub async fn operation_exists(&self, op_id: OperationId) -> bool {
434        self.client.get().operation_exists(op_id).await
435    }
436
437    pub async fn get_own_active_states(&self) -> Vec<(M::States, ActiveStateMeta)> {
438        self.client
439            .get()
440            .executor()
441            .get_active_states()
442            .await
443            .into_iter()
444            .filter(|s| s.0.module_instance_id() == self.module_instance_id)
445            .map(|s| {
446                (
447                    Clone::clone(
448                        s.0.as_any()
449                            .downcast_ref::<M::States>()
450                            .expect("incorrect output type passed to module plugin"),
451                    ),
452                    s.1,
453                )
454            })
455            .collect()
456    }
457
458    pub async fn get_config(&self) -> ClientConfig {
459        self.client.get().config().await
460    }
461
462    /// Returns an invite code for the federation that points to an arbitrary
463    /// guardian server for fetching the config
464    pub async fn get_invite_code(&self) -> InviteCode {
465        let cfg = self.get_config().await.global;
466        self.client
467            .get()
468            .invite_code(
469                *cfg.api_endpoints
470                    .keys()
471                    .next()
472                    .expect("A federation always has at least one guardian"),
473            )
474            .await
475            .expect("The guardian we requested an invite code for exists")
476    }
477
478    pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
479        self.client.get().get_internal_payment_markers()
480    }
481
482    /// This method starts n state machines with given operation id without a
483    /// corresponding transaction
484    pub async fn manual_operation_start(
485        &self,
486        operation_id: OperationId,
487        op_type: &str,
488        operation_meta: impl serde::Serialize + Debug,
489        sms: Vec<DynState>,
490    ) -> anyhow::Result<()> {
491        let db = self.module_db();
492        let mut dbtx = db.begin_transaction().await;
493        {
494            let dbtx = &mut dbtx.global_dbtx(self.global_dbtx_access_token);
495
496            self.manual_operation_start_inner(
497                &mut dbtx.to_ref_nc(),
498                operation_id,
499                op_type,
500                operation_meta,
501                sms,
502            )
503            .await?;
504        }
505
506        dbtx.commit_tx_result().await.map_err(|_| {
507            anyhow!(
508                "Operation with id {} already exists",
509                operation_id.fmt_short()
510            )
511        })?;
512
513        Ok(())
514    }
515
516    pub async fn manual_operation_start_dbtx(
517        &self,
518        dbtx: &mut DatabaseTransaction<'_>,
519        operation_id: OperationId,
520        op_type: &str,
521        operation_meta: impl serde::Serialize + Debug,
522        sms: Vec<DynState>,
523    ) -> anyhow::Result<()> {
524        self.manual_operation_start_inner(
525            &mut dbtx.global_dbtx(self.global_dbtx_access_token),
526            operation_id,
527            op_type,
528            operation_meta,
529            sms,
530        )
531        .await
532    }
533
534    /// See [`Self::manual_operation_start`], just inside a database
535    /// transaction.
536    async fn manual_operation_start_inner(
537        &self,
538        dbtx: &mut DatabaseTransaction<'_>,
539        operation_id: OperationId,
540        op_type: &str,
541        operation_meta: impl serde::Serialize + Debug,
542        sms: Vec<DynState>,
543    ) -> anyhow::Result<()> {
544        dbtx.ensure_global()
545            .expect("Must deal with global dbtx here");
546
547        if self
548            .client
549            .get()
550            .operation_log()
551            .get_operation_dbtx(&mut dbtx.to_ref_nc(), operation_id)
552            .await
553            .is_some()
554        {
555            bail!(
556                "Operation with id {} already exists",
557                operation_id.fmt_short()
558            );
559        }
560
561        self.client
562            .get()
563            .operation_log()
564            .add_operation_log_entry_dbtx(
565                &mut dbtx.to_ref_nc(),
566                operation_id,
567                op_type,
568                serde_json::to_value(operation_meta).expect("Can't fail"),
569            )
570            .await;
571
572        self.client
573            .get()
574            .executor()
575            .add_state_machines_dbtx(&mut dbtx.to_ref_nc(), sms)
576            .await
577            .expect("State machine is valid");
578
579        Ok(())
580    }
581
582    pub fn outcome_or_updates<U, S>(
583        &self,
584        operation: OperationLogEntry,
585        operation_id: OperationId,
586        stream_gen: impl FnOnce() -> S + 'static,
587    ) -> UpdateStreamOrOutcome<U>
588    where
589        U: Clone + Serialize + DeserializeOwned + Debug + MaybeSend + MaybeSync + 'static,
590        S: Stream<Item = U> + MaybeSend + 'static,
591    {
592        use futures::StreamExt;
593        match self.client.get().operation_log().outcome_or_updates(
594            &self.global_db(),
595            operation_id,
596            operation,
597            Box::new(move || {
598                let stream_gen = stream_gen();
599                Box::pin(
600                    stream_gen.map(move |item| serde_json::to_value(item).expect("Can't fail")),
601                )
602            }),
603        ) {
604            UpdateStreamOrOutcome::UpdateStream(stream) => UpdateStreamOrOutcome::UpdateStream(
605                Box::pin(stream.map(|u| serde_json::from_value(u).expect("Can't fail"))),
606            ),
607            UpdateStreamOrOutcome::Outcome(o) => {
608                UpdateStreamOrOutcome::Outcome(serde_json::from_value(o).expect("Can't fail"))
609            }
610        }
611    }
612
613    pub async fn claim_inputs<I, S>(
614        &self,
615        dbtx: &mut DatabaseTransaction<'_>,
616        inputs: ClientInputBundle<I, S>,
617        operation_id: OperationId,
618    ) -> anyhow::Result<OutPointRange>
619    where
620        I: IInput + MaybeSend + MaybeSync + 'static,
621        S: sm::IState + MaybeSend + MaybeSync + 'static,
622    {
623        self.claim_inputs_dyn(dbtx, inputs.into_instanceless(), operation_id)
624            .await
625    }
626
627    async fn claim_inputs_dyn(
628        &self,
629        dbtx: &mut DatabaseTransaction<'_>,
630        inputs: InstancelessDynClientInputBundle,
631        operation_id: OperationId,
632    ) -> anyhow::Result<OutPointRange> {
633        let tx_builder =
634            TransactionBuilder::new().with_inputs(inputs.into_dyn(self.module_instance_id));
635
636        self.client
637            .get()
638            .finalize_and_submit_transaction_inner(
639                &mut dbtx.global_dbtx(self.global_dbtx_access_token),
640                operation_id,
641                tx_builder,
642            )
643            .await
644    }
645
646    pub async fn add_state_machines_dbtx(
647        &self,
648        dbtx: &mut DatabaseTransaction<'_>,
649        states: Vec<DynState>,
650    ) -> AddStateMachinesResult {
651        self.client
652            .get()
653            .executor()
654            .add_state_machines_dbtx(&mut dbtx.global_dbtx(self.global_dbtx_access_token), states)
655            .await
656    }
657
658    pub async fn add_operation_log_entry_dbtx(
659        &self,
660        dbtx: &mut DatabaseTransaction<'_>,
661        operation_id: OperationId,
662        operation_type: &str,
663        operation_meta: impl serde::Serialize,
664    ) {
665        self.client
666            .get()
667            .operation_log()
668            .add_operation_log_entry_dbtx(
669                &mut dbtx.global_dbtx(self.global_dbtx_access_token),
670                operation_id,
671                operation_type,
672                serde_json::to_value(operation_meta).expect("Can't fail"),
673            )
674            .await;
675    }
676
677    pub async fn log_event<E, Cap>(&self, dbtx: &mut DatabaseTransaction<'_, Cap>, event: E)
678    where
679        E: Event + Send,
680        Cap: Send,
681    {
682        if <E as Event>::MODULE != Some(<M as ClientModule>::kind()) {
683            warn!(
684                target: LOG_CLIENT,
685                module_kind = %<M as ClientModule>::kind(),
686                event_module = ?<E as Event>::MODULE,
687                "Client module logging events of different module than its own. This might become an error in the future."
688            );
689        }
690        self.client
691            .get()
692            .log_event_json(
693                &mut dbtx.global_dbtx(self.global_dbtx_access_token).to_ref_nc(),
694                <E as Event>::MODULE,
695                self.module_instance_id,
696                <E as Event>::KIND,
697                serde_json::to_value(event).expect("Can't fail"),
698                <E as Event>::PERSISTENCE,
699            )
700            .await;
701    }
702}
703
704/// Priority module priority (lower number is higher priority)
705#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
706pub struct PrimaryModulePriority(u64);
707
708impl PrimaryModulePriority {
709    pub const HIGH: Self = Self(100);
710    pub const LOW: Self = Self(10000);
711
712    pub fn custom(prio: u64) -> Self {
713        Self(prio)
714    }
715}
716/// Which amount units this module supports being primary for
717pub enum PrimaryModuleSupport {
718    /// Potentially any unit
719    Any { priority: PrimaryModulePriority },
720    /// Some units supported by the module
721    Selected {
722        priority: PrimaryModulePriority,
723        units: BTreeSet<AmountUnit>,
724    },
725    /// None
726    None,
727}
728
729impl PrimaryModuleSupport {
730    pub fn selected<const N: usize>(
731        priority: PrimaryModulePriority,
732        units: [AmountUnit; N],
733    ) -> Self {
734        Self::Selected {
735            priority,
736            units: BTreeSet::from(units),
737        }
738    }
739}
740
741/// Fedimint module client
742#[apply(async_trait_maybe_send!)]
743pub trait ClientModule: Debug + MaybeSend + MaybeSync + 'static {
744    type Init: ClientModuleInit;
745
746    /// Common module types shared between client and server
747    type Common: ModuleCommon;
748
749    /// Data stored in regular backups so that restoring doesn't have to start
750    /// from epoch 0
751    type Backup: ModuleBackup;
752
753    /// Data and API clients available to state machine transitions of this
754    /// module
755    type ModuleStateMachineContext: Context;
756
757    /// All possible states this client can submit to the executor
758    type States: State<ModuleContext = Self::ModuleStateMachineContext>
759        + IntoDynInstance<DynType = DynState>;
760
761    fn decoder() -> Decoder {
762        let mut decoder_builder = Self::Common::decoder_builder();
763        decoder_builder.with_decodable_type::<Self::States>();
764        decoder_builder.with_decodable_type::<Self::Backup>();
765        decoder_builder.build()
766    }
767
768    fn kind() -> ModuleKind {
769        <<<Self as ClientModule>::Init as ModuleInit>::Common as CommonModuleInit>::KIND
770    }
771
772    fn context(&self) -> Self::ModuleStateMachineContext;
773
774    /// Initialize client.
775    ///
776    /// Called by the core client code on start, after [`ClientContext`] is
777    /// fully initialized, so unlike during [`ClientModuleInit::init`],
778    /// access to global client is allowed.
779    async fn start(&self) {}
780
781    async fn handle_cli_command(
782        &self,
783        _args: &[ffi::OsString],
784    ) -> anyhow::Result<serde_json::Value> {
785        Err(anyhow::format_err!(
786            "This module does not implement cli commands"
787        ))
788    }
789
790    async fn handle_rpc(
791        &self,
792        _method: String,
793        _request: serde_json::Value,
794    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
795        Box::pin(futures::stream::once(std::future::ready(Err(
796            anyhow::format_err!("This module does not implement rpc"),
797        ))))
798    }
799
800    /// Returns the fee the processing of this input requires.
801    ///
802    /// If the semantics of a given input aren't known this function returns
803    /// `None`, this only happens if a future version of Fedimint introduces a
804    /// new input variant. For clients this should only be the case when
805    /// processing transactions created by other users, so the result of
806    /// this function can be `unwrap`ped whenever dealing with inputs
807    /// generated by ourselves.
808    fn input_fee(
809        &self,
810        amount: &Amounts,
811        input: &<Self::Common as ModuleCommon>::Input,
812    ) -> Option<Amounts>;
813
814    /// Returns the fee the processing of this output requires.
815    ///
816    /// If the semantics of a given output aren't known this function returns
817    /// `None`, this only happens if a future version of Fedimint introduces a
818    /// new output variant. For clients this should only be the case when
819    /// processing transactions created by other users, so the result of
820    /// this function can be `unwrap`ped whenever dealing with inputs
821    /// generated by ourselves.
822    fn output_fee(
823        &self,
824        amount: &Amounts,
825        output: &<Self::Common as ModuleCommon>::Output,
826    ) -> Option<Amounts>;
827
828    fn supports_backup(&self) -> bool {
829        false
830    }
831
832    async fn backup(&self) -> anyhow::Result<Self::Backup> {
833        anyhow::bail!("Backup not supported");
834    }
835
836    /// Does this module support being a primary module
837    ///
838    /// If it does it must implement:
839    ///
840    /// * [`Self::create_final_inputs_and_outputs`]
841    /// * [`Self::await_primary_module_output`]
842    /// * [`Self::get_balance`]
843    /// * [`Self::subscribe_balance_changes`]
844    fn supports_being_primary(&self) -> PrimaryModuleSupport {
845        PrimaryModuleSupport::None
846    }
847
848    /// Creates all inputs and outputs necessary to balance the transaction.
849    /// The function returns an error if and only if the client's funds are not
850    /// sufficient to create the inputs necessary to fully fund the transaction.
851    ///
852    /// A returned input also contains:
853    /// * A set of private keys belonging to the input for signing the
854    ///   transaction
855    /// * A closure that generates states belonging to the input. This closure
856    ///   takes the transaction id of the transaction in which the input was
857    ///   used and the input index as input since these cannot be known at time
858    ///   of calling `create_funding_input` and have to be injected later.
859    ///
860    /// A returned output also contains:
861    /// * A closure that generates states belonging to the output. This closure
862    ///   takes the transaction id of the transaction in which the output was
863    ///   used and the output index as input since these cannot be known at time
864    ///   of calling `create_change_output` and have to be injected later.
865    async fn create_final_inputs_and_outputs(
866        &self,
867        _dbtx: &mut DatabaseTransaction<'_>,
868        _operation_id: OperationId,
869        _unit: AmountUnit,
870        _input_amount: Amount,
871        _output_amount: Amount,
872    ) -> anyhow::Result<(
873        ClientInputBundle<<Self::Common as ModuleCommon>::Input, Self::States>,
874        ClientOutputBundle<<Self::Common as ModuleCommon>::Output, Self::States>,
875    )> {
876        unimplemented!()
877    }
878
879    /// Waits for the funds from an output created by
880    /// [`Self::create_final_inputs_and_outputs`] to become available. This
881    /// function returning typically implies a change in the output of
882    /// [`Self::get_balance`].
883    async fn await_primary_module_output(
884        &self,
885        _operation_id: OperationId,
886        _out_point: OutPoint,
887    ) -> anyhow::Result<()> {
888        unimplemented!()
889    }
890
891    /// Returns the balance held by this module and available for funding
892    /// transactions.
893    async fn get_balance(&self, _dbtx: &mut DatabaseTransaction<'_>, _unit: AmountUnit) -> Amount {
894        unimplemented!()
895    }
896
897    /// Returns the balance held by this module and available for funding
898    /// transactions.
899    async fn get_balances(&self, _dbtx: &mut DatabaseTransaction<'_>) -> Amounts {
900        unimplemented!()
901    }
902
903    /// Returns a stream that will output the updated module balance each time
904    /// it changes.
905    async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
906        unimplemented!()
907    }
908
909    /// Leave the federation
910    ///
911    /// While technically there's nothing stopping the client from just
912    /// abandoning Federation at any point by deleting all the related
913    /// local data, it is useful to make sure it's safe beforehand.
914    ///
915    /// This call indicates the desire of the caller client code
916    /// to orderly and safely leave the Federation by this module instance.
917    /// The goal of the implementations is to fulfil that wish,
918    /// giving prompt and informative feedback if it's not yet possible.
919    ///
920    /// The client module implementation should handle the request
921    /// and return as fast as possible avoiding blocking for longer than
922    /// necessary. This would usually involve some combination of:
923    ///
924    /// * recording the state of being in process of leaving the Federation to
925    ///   prevent initiating new conditions that could delay its completion;
926    /// * performing any fast to complete cleanup/exit logic;
927    /// * initiating any time-consuming logic (e.g. canceling outstanding
928    ///   contracts), as background jobs, tasks machines, etc.
929    /// * checking for any conditions indicating it might not be safe to leave
930    ///   at the moment.
931    ///
932    /// This function should return `Ok` only if from the perspective
933    /// of this module instance, it is safe to delete client data and
934    /// stop using it, with no further actions (like background jobs) required
935    /// to complete.
936    ///
937    /// This function should return an error if it's not currently possible
938    /// to safely (e.g. without losing funds) leave the Federation.
939    /// It should avoid running indefinitely trying to complete any cleanup
940    /// actions necessary to reach a clean state, preferring spawning new
941    /// state machines and returning an informative error about cleanup
942    /// still in progress.
943    ///
944    /// If any internal task needs to complete, any user action is required,
945    /// or even external condition needs to be met this function
946    /// should return a `Err`.
947    ///
948    /// Notably modules should not disable interaction that might be necessary
949    /// for the user (possibly through other modules) to leave the Federation.
950    /// In particular a Mint module should retain ability to create new notes,
951    /// and LN module should retain ability to send funds out.
952    ///
953    /// Calling code must NOT assume that a module that once returned `Ok`,
954    /// will not return `Err` at later point. E.g. a Mint module might have
955    /// no outstanding balance at first, but other modules winding down
956    /// might "cash-out" to Ecash.
957    ///
958    /// Before leaving the Federation and deleting any state the calling code
959    /// must collect a full round of `Ok` from all the modules.
960    ///
961    /// Calling code should allow the user to override and ignore any
962    /// outstanding errors, after sufficient amount of warnings. Ideally,
963    /// this should be done on per-module basis, to avoid mistakes.
964    async fn leave(&self, _dbtx: &mut DatabaseTransaction<'_>) -> anyhow::Result<()> {
965        bail!("Unable to determine if safe to leave the federation: Not implemented")
966    }
967}
968
969/// Type-erased version of [`ClientModule`]
970#[apply(async_trait_maybe_send!)]
971pub trait IClientModule: Debug {
972    fn as_any(&self) -> &(maybe_add_send_sync!(dyn std::any::Any));
973
974    fn decoder(&self) -> Decoder;
975
976    fn context(&self, instance: ModuleInstanceId) -> DynContext;
977
978    async fn start(&self);
979
980    async fn handle_cli_command(&self, args: &[ffi::OsString])
981    -> anyhow::Result<serde_json::Value>;
982
983    async fn handle_rpc(
984        &self,
985        method: String,
986        request: serde_json::Value,
987    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>>;
988
989    fn input_fee(&self, amount: &Amounts, input: &DynInput) -> Option<Amounts>;
990
991    fn output_fee(&self, amount: &Amounts, output: &DynOutput) -> Option<Amounts>;
992
993    fn supports_backup(&self) -> bool;
994
995    async fn backup(&self, module_instance_id: ModuleInstanceId)
996    -> anyhow::Result<DynModuleBackup>;
997
998    fn supports_being_primary(&self) -> PrimaryModuleSupport;
999
1000    async fn create_final_inputs_and_outputs(
1001        &self,
1002        module_instance: ModuleInstanceId,
1003        dbtx: &mut DatabaseTransaction<'_>,
1004        operation_id: OperationId,
1005        unit: AmountUnit,
1006        input_amount: Amount,
1007        output_amount: Amount,
1008    ) -> anyhow::Result<(ClientInputBundle, ClientOutputBundle)>;
1009
1010    async fn await_primary_module_output(
1011        &self,
1012        operation_id: OperationId,
1013        out_point: OutPoint,
1014    ) -> anyhow::Result<()>;
1015
1016    async fn get_balance(
1017        &self,
1018        module_instance: ModuleInstanceId,
1019        dbtx: &mut DatabaseTransaction<'_>,
1020        unit: AmountUnit,
1021    ) -> Amount;
1022
1023    async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()>;
1024}
1025
1026#[apply(async_trait_maybe_send!)]
1027impl<T> IClientModule for T
1028where
1029    T: ClientModule,
1030{
1031    fn as_any(&self) -> &(maybe_add_send_sync!(dyn Any)) {
1032        self
1033    }
1034
1035    fn decoder(&self) -> Decoder {
1036        T::decoder()
1037    }
1038
1039    fn context(&self, instance: ModuleInstanceId) -> DynContext {
1040        DynContext::from_typed(instance, <T as ClientModule>::context(self))
1041    }
1042
1043    async fn start(&self) {
1044        <T as ClientModule>::start(self).await;
1045    }
1046
1047    async fn handle_cli_command(
1048        &self,
1049        args: &[ffi::OsString],
1050    ) -> anyhow::Result<serde_json::Value> {
1051        <T as ClientModule>::handle_cli_command(self, args).await
1052    }
1053
1054    async fn handle_rpc(
1055        &self,
1056        method: String,
1057        request: serde_json::Value,
1058    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1059        <T as ClientModule>::handle_rpc(self, method, request).await
1060    }
1061
1062    fn input_fee(&self, amount: &Amounts, input: &DynInput) -> Option<Amounts> {
1063        <T as ClientModule>::input_fee(
1064            self,
1065            amount,
1066            input
1067                .as_any()
1068                .downcast_ref()
1069                .expect("Dispatched to correct module"),
1070        )
1071    }
1072
1073    fn output_fee(&self, amount: &Amounts, output: &DynOutput) -> Option<Amounts> {
1074        <T as ClientModule>::output_fee(
1075            self,
1076            amount,
1077            output
1078                .as_any()
1079                .downcast_ref()
1080                .expect("Dispatched to correct module"),
1081        )
1082    }
1083
1084    fn supports_backup(&self) -> bool {
1085        <T as ClientModule>::supports_backup(self)
1086    }
1087
1088    async fn backup(
1089        &self,
1090        module_instance_id: ModuleInstanceId,
1091    ) -> anyhow::Result<DynModuleBackup> {
1092        Ok(DynModuleBackup::from_typed(
1093            module_instance_id,
1094            <T as ClientModule>::backup(self).await?,
1095        ))
1096    }
1097
1098    fn supports_being_primary(&self) -> PrimaryModuleSupport {
1099        <T as ClientModule>::supports_being_primary(self)
1100    }
1101
1102    async fn create_final_inputs_and_outputs(
1103        &self,
1104        module_instance: ModuleInstanceId,
1105        dbtx: &mut DatabaseTransaction<'_>,
1106        operation_id: OperationId,
1107        unit: AmountUnit,
1108        input_amount: Amount,
1109        output_amount: Amount,
1110    ) -> anyhow::Result<(ClientInputBundle, ClientOutputBundle)> {
1111        let (inputs, outputs) = <T as ClientModule>::create_final_inputs_and_outputs(
1112            self,
1113            &mut dbtx.to_ref_with_prefix_module_id(module_instance).0,
1114            operation_id,
1115            unit,
1116            input_amount,
1117            output_amount,
1118        )
1119        .await?;
1120
1121        let inputs = inputs.into_dyn(module_instance);
1122
1123        let outputs = outputs.into_dyn(module_instance);
1124
1125        Ok((inputs, outputs))
1126    }
1127
1128    async fn await_primary_module_output(
1129        &self,
1130        operation_id: OperationId,
1131        out_point: OutPoint,
1132    ) -> anyhow::Result<()> {
1133        <T as ClientModule>::await_primary_module_output(self, operation_id, out_point).await
1134    }
1135
1136    async fn get_balance(
1137        &self,
1138        module_instance: ModuleInstanceId,
1139        dbtx: &mut DatabaseTransaction<'_>,
1140        unit: AmountUnit,
1141    ) -> Amount {
1142        <T as ClientModule>::get_balance(
1143            self,
1144            &mut dbtx.to_ref_with_prefix_module_id(module_instance).0,
1145            unit,
1146        )
1147        .await
1148    }
1149
1150    async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
1151        <T as ClientModule>::subscribe_balance_changes(self).await
1152    }
1153}
1154
1155dyn_newtype_define!(
1156    #[derive(Clone)]
1157    pub DynClientModule(Arc<IClientModule>)
1158);
1159
1160impl AsRef<maybe_add_send_sync!(dyn IClientModule + 'static)> for DynClientModule {
1161    fn as_ref(&self) -> &maybe_add_send_sync!(dyn IClientModule + 'static) {
1162        self.inner.as_ref()
1163    }
1164}
1165
1166// Re-export types from fedimint_core
1167pub use fedimint_core::{IdxRange, OutPointRange, OutPointRangeIter};
1168
1169pub type StateGenerator<S> = Arc<maybe_add_send_sync!(dyn Fn(OutPointRange) -> Vec<S> + 'static)>;