fedimint_client_module/module/
mod.rs

1use core::fmt;
2use std::any::Any;
3use std::collections::BTreeSet;
4use std::fmt::Debug;
5use std::pin::Pin;
6use std::sync::{Arc, Weak};
7use std::{ffi, marker, ops};
8
9use anyhow::{anyhow, bail};
10use bitcoin::secp256k1::PublicKey;
11use fedimint_api_client::api::{DynGlobalApi, DynModuleApi};
12use fedimint_core::config::ClientConfig;
13use fedimint_core::core::{
14    Decoder, DynInput, DynOutput, IInput, IntoDynInstance, ModuleInstanceId, ModuleKind,
15    OperationId,
16};
17use fedimint_core::db::{Database, DatabaseTransaction, GlobalDBTxAccessToken, NonCommittable};
18use fedimint_core::invite_code::InviteCode;
19use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
20use fedimint_core::module::{AmountUnit, Amounts, CommonModuleInit, ModuleCommon, ModuleInit};
21use fedimint_core::task::{MaybeSend, MaybeSync};
22use fedimint_core::util::BoxStream;
23use fedimint_core::{
24    Amount, OutPoint, PeerId, apply, async_trait_maybe_send, dyn_newtype_define, maybe_add_send,
25    maybe_add_send_sync,
26};
27use fedimint_eventlog::{Event, EventKind, EventPersistence};
28use fedimint_logging::LOG_CLIENT;
29use futures::Stream;
30use serde::Serialize;
31use serde::de::DeserializeOwned;
32use tracing::warn;
33
34use self::init::ClientModuleInit;
35use crate::module::recovery::{DynModuleBackup, ModuleBackup};
36use crate::oplog::{IOperationLog, OperationLogEntry, UpdateStreamOrOutcome};
37use crate::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
38use crate::sm::{self, ActiveStateMeta, Context, DynContext, DynState, InactiveStateMeta, State};
39use crate::transaction::{ClientInputBundle, ClientOutputBundle, TransactionBuilder};
40use crate::{AddStateMachinesResult, InstancelessDynClientInputBundle, TransactionUpdates, oplog};
41
42pub mod init;
43pub mod recovery;
44
45pub type ClientModuleRegistry = ModuleRegistry<DynClientModule>;
46
47/// A fedimint-client interface exposed to client modules
48///
49/// To break the dependency of the client modules on the whole fedimint client
50/// and in particular the `fedimint-client` crate, the module gets access to an
51/// interface, that is implemented by the `Client`.
52///
53/// This allows lose coupling, less recompilation and better control and
54/// understanding of what functionality of the Client the modules get access to.
55#[apply(async_trait_maybe_send!)]
56pub trait ClientContextIface: MaybeSend + MaybeSync {
57    fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule);
58    fn api_clone(&self) -> DynGlobalApi;
59    fn decoders(&self) -> &ModuleDecoderRegistry;
60    async fn finalize_and_submit_transaction(
61        &self,
62        operation_id: OperationId,
63        operation_type: &str,
64        operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
65        tx_builder: TransactionBuilder,
66    ) -> anyhow::Result<OutPointRange>;
67
68    // TODO: unify
69    async fn finalize_and_submit_transaction_inner(
70        &self,
71        dbtx: &mut DatabaseTransaction<'_>,
72        operation_id: OperationId,
73        tx_builder: TransactionBuilder,
74    ) -> anyhow::Result<OutPointRange>;
75
76    async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates;
77
78    async fn await_primary_module_outputs(
79        &self,
80        operation_id: OperationId,
81        // TODO: make `impl Iterator<Item = ...>`
82        outputs: Vec<OutPoint>,
83    ) -> anyhow::Result<()>;
84
85    fn operation_log(&self) -> &dyn IOperationLog;
86
87    async fn has_active_states(&self, operation_id: OperationId) -> bool;
88
89    async fn operation_exists(&self, operation_id: OperationId) -> bool;
90
91    async fn config(&self) -> ClientConfig;
92
93    fn db(&self) -> &Database;
94
95    fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static));
96
97    async fn invite_code(&self, peer: PeerId) -> Option<InviteCode>;
98
99    fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)>;
100
101    #[allow(clippy::too_many_arguments)]
102    async fn log_event_json(
103        &self,
104        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
105        module_kind: Option<ModuleKind>,
106        module_id: ModuleInstanceId,
107        kind: EventKind,
108        payload: serde_json::Value,
109        persist: EventPersistence,
110    );
111
112    async fn read_operation_active_states<'dbtx>(
113        &self,
114        operation_id: OperationId,
115        module_id: ModuleInstanceId,
116        dbtx: &'dbtx mut DatabaseTransaction<'_>,
117    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>;
118
119    async fn read_operation_inactive_states<'dbtx>(
120        &self,
121        operation_id: OperationId,
122        module_id: ModuleInstanceId,
123        dbtx: &'dbtx mut DatabaseTransaction<'_>,
124    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>;
125}
126
127/// A final, fully initialized client
128///
129/// Client modules need to be able to access a `Client` they are a part
130/// of. To break the circular dependency, the final `Client` is passed
131/// after `Client` was built via a shared state.
132#[derive(Clone, Default)]
133pub struct FinalClientIface(Arc<std::sync::OnceLock<Weak<dyn ClientContextIface>>>);
134
135impl FinalClientIface {
136    /// Get a temporary strong reference to [`ClientContextIface`]
137    ///
138    /// Care must be taken to not let the user take ownership of this value,
139    /// and not store it elsewhere permanently either, as it could prevent
140    /// the cleanup of the Client.
141    pub(crate) fn get(&self) -> Arc<dyn ClientContextIface> {
142        self.0
143            .get()
144            .expect("client must be already set")
145            .upgrade()
146            .expect("client module context must not be use past client shutdown")
147    }
148
149    pub fn set(&self, client: Weak<dyn ClientContextIface>) {
150        self.0.set(client).expect("FinalLazyClient already set");
151    }
152}
153
154impl fmt::Debug for FinalClientIface {
155    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156        f.write_str("FinalClientIface")
157    }
158}
159/// A Client context for a [`ClientModule`] `M`
160///
161/// Client modules can interact with the whole
162/// client through this struct.
163pub struct ClientContext<M> {
164    client: FinalClientIface,
165    module_instance_id: ModuleInstanceId,
166    global_dbtx_access_token: GlobalDBTxAccessToken,
167    module_db: Database,
168    _marker: marker::PhantomData<M>,
169}
170
171impl<M> Clone for ClientContext<M> {
172    fn clone(&self) -> Self {
173        Self {
174            client: self.client.clone(),
175            module_db: self.module_db.clone(),
176            module_instance_id: self.module_instance_id,
177            _marker: marker::PhantomData,
178            global_dbtx_access_token: self.global_dbtx_access_token,
179        }
180    }
181}
182
183/// A reference back to itself that the module cacn get from the
184/// [`ClientContext`]
185pub struct ClientContextSelfRef<'s, M> {
186    // we are OK storing `ClientStrong` here, because of the `'s` preventing `Self` from being
187    // stored permanently somewhere
188    client: Arc<dyn ClientContextIface>,
189    module_instance_id: ModuleInstanceId,
190    _marker: marker::PhantomData<&'s M>,
191}
192
193impl<M> ops::Deref for ClientContextSelfRef<'_, M>
194where
195    M: ClientModule,
196{
197    type Target = M;
198
199    fn deref(&self) -> &Self::Target {
200        self.client
201            .get_module(self.module_instance_id)
202            .as_any()
203            .downcast_ref::<M>()
204            .unwrap_or_else(|| panic!("Module is not of type {}", std::any::type_name::<M>()))
205    }
206}
207
208impl<M> fmt::Debug for ClientContext<M> {
209    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
210        f.write_str("ClientContext")
211    }
212}
213
214impl<M> ClientContext<M>
215where
216    M: ClientModule,
217{
218    pub fn new(
219        client: FinalClientIface,
220        module_instance_id: ModuleInstanceId,
221        global_dbtx_access_token: GlobalDBTxAccessToken,
222        module_db: Database,
223    ) -> Self {
224        Self {
225            client,
226            module_instance_id,
227            global_dbtx_access_token,
228            module_db,
229            _marker: marker::PhantomData,
230        }
231    }
232
233    /// Get a reference back to client module from the [`Self`]
234    ///
235    /// It's often necessary for a client module to "move self"
236    /// by-value, especially due to async lifetimes issues.
237    /// Clients usually work with `&mut self`, which can't really
238    /// work in such context.
239    ///
240    /// Fortunately [`ClientContext`] is `Clone` and `Send, and
241    /// can be used to recover the reference to the module at later
242    /// time.
243    #[allow(clippy::needless_lifetimes)] // just for explicitiness
244    pub fn self_ref(&self) -> ClientContextSelfRef<'_, M> {
245        ClientContextSelfRef {
246            client: self.client.get(),
247            module_instance_id: self.module_instance_id,
248            _marker: marker::PhantomData,
249        }
250    }
251
252    /// Get a reference to a global Api handle
253    pub fn global_api(&self) -> DynGlobalApi {
254        self.client.get().api_clone()
255    }
256
257    /// Get a reference to a module Api handle
258    pub fn module_api(&self) -> DynModuleApi {
259        self.global_api().with_module(self.module_instance_id)
260    }
261
262    /// A set of all decoders of all modules of the client
263    pub fn decoders(&self) -> ModuleDecoderRegistry {
264        Clone::clone(self.client.get().decoders())
265    }
266
267    pub fn input_from_dyn<'i>(
268        &self,
269        input: &'i DynInput,
270    ) -> Option<&'i <M::Common as ModuleCommon>::Input> {
271        (input.module_instance_id() == self.module_instance_id).then(|| {
272            input
273                .as_any()
274                .downcast_ref::<<M::Common as ModuleCommon>::Input>()
275                .unwrap_or_else(|| {
276                    panic!("instance_id {} just checked", input.module_instance_id())
277                })
278        })
279    }
280
281    pub fn output_from_dyn<'o>(
282        &self,
283        output: &'o DynOutput,
284    ) -> Option<&'o <M::Common as ModuleCommon>::Output> {
285        (output.module_instance_id() == self.module_instance_id).then(|| {
286            output
287                .as_any()
288                .downcast_ref::<<M::Common as ModuleCommon>::Output>()
289                .unwrap_or_else(|| {
290                    panic!("instance_id {} just checked", output.module_instance_id())
291                })
292        })
293    }
294
295    pub fn map_dyn<'s, 'i, 'o, I>(
296        &'s self,
297        typed: impl IntoIterator<Item = I> + 'i,
298    ) -> impl Iterator<Item = <I as IntoDynInstance>::DynType> + 'o
299    where
300        I: IntoDynInstance,
301        'i: 'o,
302        's: 'o,
303    {
304        typed.into_iter().map(|i| self.make_dyn(i))
305    }
306
307    /// Turn a typed output into a dyn version
308    pub fn make_dyn_output(&self, output: <M::Common as ModuleCommon>::Output) -> DynOutput {
309        self.make_dyn(output)
310    }
311
312    /// Turn a typed input into a dyn version
313    pub fn make_dyn_input(&self, input: <M::Common as ModuleCommon>::Input) -> DynInput {
314        self.make_dyn(input)
315    }
316
317    /// Turn a `typed` into a dyn version
318    pub fn make_dyn<I>(&self, typed: I) -> <I as IntoDynInstance>::DynType
319    where
320        I: IntoDynInstance,
321    {
322        typed.into_dyn(self.module_instance_id)
323    }
324
325    /// Turn a typed [`ClientOutputBundle`] into a dyn version
326    pub fn make_client_outputs<O, S>(&self, output: ClientOutputBundle<O, S>) -> ClientOutputBundle
327    where
328        O: IntoDynInstance<DynType = DynOutput> + 'static,
329        S: IntoDynInstance<DynType = DynState> + 'static,
330    {
331        self.make_dyn(output)
332    }
333
334    /// Turn a typed [`ClientInputBundle`] into a dyn version
335    pub fn make_client_inputs<I, S>(&self, inputs: ClientInputBundle<I, S>) -> ClientInputBundle
336    where
337        I: IntoDynInstance<DynType = DynInput> + 'static,
338        S: IntoDynInstance<DynType = DynState> + 'static,
339    {
340        self.make_dyn(inputs)
341    }
342
343    pub fn make_dyn_state<S>(&self, sm: S) -> DynState
344    where
345        S: sm::IState + 'static,
346    {
347        DynState::from_typed(self.module_instance_id, sm)
348    }
349
350    pub async fn finalize_and_submit_transaction<F, Meta>(
351        &self,
352        operation_id: OperationId,
353        operation_type: &str,
354        operation_meta_gen: F,
355        tx_builder: TransactionBuilder,
356    ) -> anyhow::Result<OutPointRange>
357    where
358        F: Fn(OutPointRange) -> Meta + Clone + MaybeSend + MaybeSync + 'static,
359        Meta: serde::Serialize + MaybeSend,
360    {
361        self.client
362            .get()
363            .finalize_and_submit_transaction(
364                operation_id,
365                operation_type,
366                Box::new(move |out_point_range| {
367                    serde_json::to_value(operation_meta_gen(out_point_range)).expect("Can't fail")
368                }),
369                tx_builder,
370            )
371            .await
372    }
373
374    pub async fn finalize_and_submit_transaction_dbtx(
375        &self,
376        dbtx: &mut DatabaseTransaction<'_>,
377        operation_id: OperationId,
378        tx_builder: TransactionBuilder,
379    ) -> anyhow::Result<OutPointRange> {
380        self.client
381            .get()
382            .finalize_and_submit_transaction_inner(dbtx, operation_id, tx_builder)
383            .await
384    }
385
386    pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
387        self.client.get().transaction_updates(operation_id).await
388    }
389
390    pub async fn await_primary_module_outputs(
391        &self,
392        operation_id: OperationId,
393        // TODO: make `impl Iterator<Item = ...>`
394        outputs: Vec<OutPoint>,
395    ) -> anyhow::Result<()> {
396        self.client
397            .get()
398            .await_primary_module_outputs(operation_id, outputs)
399            .await
400    }
401
402    // TODO: unify with `Self::get_operation`
403    pub async fn get_operation(
404        &self,
405        operation_id: OperationId,
406    ) -> anyhow::Result<oplog::OperationLogEntry> {
407        let operation = self
408            .client
409            .get()
410            .operation_log()
411            .get_operation(operation_id)
412            .await
413            .ok_or(anyhow::anyhow!("Operation not found"))?;
414
415        if operation.operation_module_kind() != M::kind().as_str() {
416            bail!("Operation is not a lightning operation");
417        }
418
419        Ok(operation)
420    }
421
422    /// Get global db.
423    ///
424    /// Only intended for internal use (private).
425    fn global_db(&self) -> fedimint_core::db::Database {
426        let db = Clone::clone(self.client.get().db());
427
428        db.ensure_global()
429            .expect("global_db must always return a global db");
430
431        db
432    }
433
434    pub fn module_db(&self) -> &Database {
435        self.module_db
436            .ensure_isolated()
437            .expect("module_db must always return isolated db");
438        &self.module_db
439    }
440
441    pub async fn has_active_states(&self, op_id: OperationId) -> bool {
442        self.client.get().has_active_states(op_id).await
443    }
444
445    pub async fn operation_exists(&self, op_id: OperationId) -> bool {
446        self.client.get().operation_exists(op_id).await
447    }
448
449    pub async fn get_own_active_states(&self) -> Vec<(M::States, ActiveStateMeta)> {
450        self.client
451            .get()
452            .executor()
453            .get_active_states()
454            .await
455            .into_iter()
456            .filter(|s| s.0.module_instance_id() == self.module_instance_id)
457            .map(|s| {
458                (
459                    Clone::clone(
460                        s.0.as_any()
461                            .downcast_ref::<M::States>()
462                            .expect("incorrect output type passed to module plugin"),
463                    ),
464                    s.1,
465                )
466            })
467            .collect()
468    }
469
470    pub async fn get_config(&self) -> ClientConfig {
471        self.client.get().config().await
472    }
473
474    /// Returns an invite code for the federation that points to an arbitrary
475    /// guardian server for fetching the config
476    pub async fn get_invite_code(&self) -> InviteCode {
477        let cfg = self.get_config().await.global;
478        self.client
479            .get()
480            .invite_code(
481                *cfg.api_endpoints
482                    .keys()
483                    .next()
484                    .expect("A federation always has at least one guardian"),
485            )
486            .await
487            .expect("The guardian we requested an invite code for exists")
488    }
489
490    pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
491        self.client.get().get_internal_payment_markers()
492    }
493
494    /// This method starts n state machines with given operation id without a
495    /// corresponding transaction
496    pub async fn manual_operation_start(
497        &self,
498        operation_id: OperationId,
499        op_type: &str,
500        operation_meta: impl serde::Serialize + Debug,
501        sms: Vec<DynState>,
502    ) -> anyhow::Result<()> {
503        let db = self.module_db();
504        let mut dbtx = db.begin_transaction().await;
505        {
506            let dbtx = &mut dbtx.global_dbtx(self.global_dbtx_access_token);
507
508            self.manual_operation_start_inner(
509                &mut dbtx.to_ref_nc(),
510                operation_id,
511                op_type,
512                operation_meta,
513                sms,
514            )
515            .await?;
516        }
517
518        dbtx.commit_tx_result().await.map_err(|_| {
519            anyhow!(
520                "Operation with id {} already exists",
521                operation_id.fmt_short()
522            )
523        })?;
524
525        Ok(())
526    }
527
528    pub async fn manual_operation_start_dbtx(
529        &self,
530        dbtx: &mut DatabaseTransaction<'_>,
531        operation_id: OperationId,
532        op_type: &str,
533        operation_meta: impl serde::Serialize + Debug,
534        sms: Vec<DynState>,
535    ) -> anyhow::Result<()> {
536        self.manual_operation_start_inner(
537            &mut dbtx.global_dbtx(self.global_dbtx_access_token),
538            operation_id,
539            op_type,
540            operation_meta,
541            sms,
542        )
543        .await
544    }
545
546    /// See [`Self::manual_operation_start`], just inside a database
547    /// transaction.
548    async fn manual_operation_start_inner(
549        &self,
550        dbtx: &mut DatabaseTransaction<'_>,
551        operation_id: OperationId,
552        op_type: &str,
553        operation_meta: impl serde::Serialize + Debug,
554        sms: Vec<DynState>,
555    ) -> anyhow::Result<()> {
556        dbtx.ensure_global()
557            .expect("Must deal with global dbtx here");
558
559        if self
560            .client
561            .get()
562            .operation_log()
563            .get_operation_dbtx(&mut dbtx.to_ref_nc(), operation_id)
564            .await
565            .is_some()
566        {
567            bail!(
568                "Operation with id {} already exists",
569                operation_id.fmt_short()
570            );
571        }
572
573        self.client
574            .get()
575            .operation_log()
576            .add_operation_log_entry_dbtx(
577                &mut dbtx.to_ref_nc(),
578                operation_id,
579                op_type,
580                serde_json::to_value(operation_meta).expect("Can't fail"),
581            )
582            .await;
583
584        self.client
585            .get()
586            .executor()
587            .add_state_machines_dbtx(&mut dbtx.to_ref_nc(), sms)
588            .await
589            .expect("State machine is valid");
590
591        Ok(())
592    }
593
594    pub fn outcome_or_updates<U, S>(
595        &self,
596        operation: OperationLogEntry,
597        operation_id: OperationId,
598        stream_gen: impl FnOnce() -> S + 'static,
599    ) -> UpdateStreamOrOutcome<U>
600    where
601        U: Clone + Serialize + DeserializeOwned + Debug + MaybeSend + MaybeSync + 'static,
602        S: Stream<Item = U> + MaybeSend + 'static,
603    {
604        use futures::StreamExt;
605        match self.client.get().operation_log().outcome_or_updates(
606            &self.global_db(),
607            operation_id,
608            operation,
609            Box::new(move || {
610                let stream_gen = stream_gen();
611                Box::pin(
612                    stream_gen.map(move |item| serde_json::to_value(item).expect("Can't fail")),
613                )
614            }),
615        ) {
616            UpdateStreamOrOutcome::UpdateStream(stream) => UpdateStreamOrOutcome::UpdateStream(
617                Box::pin(stream.map(|u| serde_json::from_value(u).expect("Can't fail"))),
618            ),
619            UpdateStreamOrOutcome::Outcome(o) => {
620                UpdateStreamOrOutcome::Outcome(serde_json::from_value(o).expect("Can't fail"))
621            }
622        }
623    }
624
625    pub async fn claim_inputs<I, S>(
626        &self,
627        dbtx: &mut DatabaseTransaction<'_>,
628        inputs: ClientInputBundle<I, S>,
629        operation_id: OperationId,
630    ) -> anyhow::Result<OutPointRange>
631    where
632        I: IInput + MaybeSend + MaybeSync + 'static,
633        S: sm::IState + MaybeSend + MaybeSync + 'static,
634    {
635        self.claim_inputs_dyn(dbtx, inputs.into_instanceless(), operation_id)
636            .await
637    }
638
639    async fn claim_inputs_dyn(
640        &self,
641        dbtx: &mut DatabaseTransaction<'_>,
642        inputs: InstancelessDynClientInputBundle,
643        operation_id: OperationId,
644    ) -> anyhow::Result<OutPointRange> {
645        let tx_builder =
646            TransactionBuilder::new().with_inputs(inputs.into_dyn(self.module_instance_id));
647
648        self.client
649            .get()
650            .finalize_and_submit_transaction_inner(
651                &mut dbtx.global_dbtx(self.global_dbtx_access_token),
652                operation_id,
653                tx_builder,
654            )
655            .await
656    }
657
658    pub async fn add_state_machines_dbtx(
659        &self,
660        dbtx: &mut DatabaseTransaction<'_>,
661        states: Vec<DynState>,
662    ) -> AddStateMachinesResult {
663        self.client
664            .get()
665            .executor()
666            .add_state_machines_dbtx(&mut dbtx.global_dbtx(self.global_dbtx_access_token), states)
667            .await
668    }
669
670    pub async fn add_operation_log_entry_dbtx(
671        &self,
672        dbtx: &mut DatabaseTransaction<'_>,
673        operation_id: OperationId,
674        operation_type: &str,
675        operation_meta: impl serde::Serialize,
676    ) {
677        self.client
678            .get()
679            .operation_log()
680            .add_operation_log_entry_dbtx(
681                &mut dbtx.global_dbtx(self.global_dbtx_access_token),
682                operation_id,
683                operation_type,
684                serde_json::to_value(operation_meta).expect("Can't fail"),
685            )
686            .await;
687    }
688
689    pub async fn log_event<E, Cap>(&self, dbtx: &mut DatabaseTransaction<'_, Cap>, event: E)
690    where
691        E: Event + Send,
692        Cap: Send,
693    {
694        if <E as Event>::MODULE != Some(<M as ClientModule>::kind()) {
695            warn!(
696                target: LOG_CLIENT,
697                module_kind = %<M as ClientModule>::kind(),
698                event_module = ?<E as Event>::MODULE,
699                "Client module logging events of different module than its own. This might become an error in the future."
700            );
701        }
702        self.client
703            .get()
704            .log_event_json(
705                &mut dbtx.global_dbtx(self.global_dbtx_access_token).to_ref_nc(),
706                <E as Event>::MODULE,
707                self.module_instance_id,
708                <E as Event>::KIND,
709                serde_json::to_value(event).expect("Can't fail"),
710                <E as Event>::PERSISTENCE,
711            )
712            .await;
713    }
714}
715
716/// Priority module priority (lower number is higher priority)
717#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
718pub struct PrimaryModulePriority(u64);
719
720impl PrimaryModulePriority {
721    pub const HIGH: Self = Self(100);
722    pub const LOW: Self = Self(10000);
723
724    pub fn custom(prio: u64) -> Self {
725        Self(prio)
726    }
727}
728/// Which amount units this module supports being primary for
729pub enum PrimaryModuleSupport {
730    /// Potentially any unit
731    Any { priority: PrimaryModulePriority },
732    /// Some units supported by the module
733    Selected {
734        priority: PrimaryModulePriority,
735        units: BTreeSet<AmountUnit>,
736    },
737    /// None
738    None,
739}
740
741impl PrimaryModuleSupport {
742    pub fn selected<const N: usize>(
743        priority: PrimaryModulePriority,
744        units: [AmountUnit; N],
745    ) -> Self {
746        Self::Selected {
747            priority,
748            units: BTreeSet::from(units),
749        }
750    }
751}
752
753/// Fedimint module client
754#[apply(async_trait_maybe_send!)]
755pub trait ClientModule: Debug + MaybeSend + MaybeSync + 'static {
756    type Init: ClientModuleInit;
757
758    /// Common module types shared between client and server
759    type Common: ModuleCommon;
760
761    /// Data stored in regular backups so that restoring doesn't have to start
762    /// from epoch 0
763    type Backup: ModuleBackup;
764
765    /// Data and API clients available to state machine transitions of this
766    /// module
767    type ModuleStateMachineContext: Context;
768
769    /// All possible states this client can submit to the executor
770    type States: State<ModuleContext = Self::ModuleStateMachineContext>
771        + IntoDynInstance<DynType = DynState>;
772
773    fn decoder() -> Decoder {
774        let mut decoder_builder = Self::Common::decoder_builder();
775        decoder_builder.with_decodable_type::<Self::States>();
776        decoder_builder.with_decodable_type::<Self::Backup>();
777        decoder_builder.build()
778    }
779
780    fn kind() -> ModuleKind {
781        <<<Self as ClientModule>::Init as ModuleInit>::Common as CommonModuleInit>::KIND
782    }
783
784    fn context(&self) -> Self::ModuleStateMachineContext;
785
786    /// Initialize client.
787    ///
788    /// Called by the core client code on start, after [`ClientContext`] is
789    /// fully initialized, so unlike during [`ClientModuleInit::init`],
790    /// access to global client is allowed.
791    async fn start(&self) {}
792
793    async fn handle_cli_command(
794        &self,
795        _args: &[ffi::OsString],
796    ) -> anyhow::Result<serde_json::Value> {
797        Err(anyhow::format_err!(
798            "This module does not implement cli commands"
799        ))
800    }
801
802    async fn handle_rpc(
803        &self,
804        _method: String,
805        _request: serde_json::Value,
806    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
807        Box::pin(futures::stream::once(std::future::ready(Err(
808            anyhow::format_err!("This module does not implement rpc"),
809        ))))
810    }
811
812    /// Returns the fee the processing of this input requires.
813    ///
814    /// If the semantics of a given input aren't known this function returns
815    /// `None`, this only happens if a future version of Fedimint introduces a
816    /// new input variant. For clients this should only be the case when
817    /// processing transactions created by other users, so the result of
818    /// this function can be `unwrap`ped whenever dealing with inputs
819    /// generated by ourselves.
820    fn input_fee(
821        &self,
822        amount: &Amounts,
823        input: &<Self::Common as ModuleCommon>::Input,
824    ) -> Option<Amounts>;
825
826    /// Returns the fee the processing of this output requires.
827    ///
828    /// If the semantics of a given output aren't known this function returns
829    /// `None`, this only happens if a future version of Fedimint introduces a
830    /// new output variant. For clients this should only be the case when
831    /// processing transactions created by other users, so the result of
832    /// this function can be `unwrap`ped whenever dealing with inputs
833    /// generated by ourselves.
834    fn output_fee(
835        &self,
836        amount: &Amounts,
837        output: &<Self::Common as ModuleCommon>::Output,
838    ) -> Option<Amounts>;
839
840    fn supports_backup(&self) -> bool {
841        false
842    }
843
844    async fn backup(&self) -> anyhow::Result<Self::Backup> {
845        anyhow::bail!("Backup not supported");
846    }
847
848    /// Does this module support being a primary module
849    ///
850    /// If it does it must implement:
851    ///
852    /// * [`Self::create_final_inputs_and_outputs`]
853    /// * [`Self::await_primary_module_output`]
854    /// * [`Self::get_balance`]
855    /// * [`Self::subscribe_balance_changes`]
856    fn supports_being_primary(&self) -> PrimaryModuleSupport {
857        PrimaryModuleSupport::None
858    }
859
860    /// Creates all inputs and outputs necessary to balance the transaction.
861    /// The function returns an error if and only if the client's funds are not
862    /// sufficient to create the inputs necessary to fully fund the transaction.
863    ///
864    /// A returned input also contains:
865    /// * A set of private keys belonging to the input for signing the
866    ///   transaction
867    /// * A closure that generates states belonging to the input. This closure
868    ///   takes the transaction id of the transaction in which the input was
869    ///   used and the input index as input since these cannot be known at time
870    ///   of calling `create_funding_input` and have to be injected later.
871    ///
872    /// A returned output also contains:
873    /// * A closure that generates states belonging to the output. This closure
874    ///   takes the transaction id of the transaction in which the output was
875    ///   used and the output index as input since these cannot be known at time
876    ///   of calling `create_change_output` and have to be injected later.
877    async fn create_final_inputs_and_outputs(
878        &self,
879        _dbtx: &mut DatabaseTransaction<'_>,
880        _operation_id: OperationId,
881        _unit: AmountUnit,
882        _input_amount: Amount,
883        _output_amount: Amount,
884    ) -> anyhow::Result<(
885        ClientInputBundle<<Self::Common as ModuleCommon>::Input, Self::States>,
886        ClientOutputBundle<<Self::Common as ModuleCommon>::Output, Self::States>,
887    )> {
888        unimplemented!()
889    }
890
891    /// Waits for the funds from an output created by
892    /// [`Self::create_final_inputs_and_outputs`] to become available. This
893    /// function returning typically implies a change in the output of
894    /// [`Self::get_balance`].
895    async fn await_primary_module_output(
896        &self,
897        _operation_id: OperationId,
898        _out_point: OutPoint,
899    ) -> anyhow::Result<()> {
900        unimplemented!()
901    }
902
903    /// Returns the balance held by this module and available for funding
904    /// transactions.
905    async fn get_balance(&self, _dbtx: &mut DatabaseTransaction<'_>, _unit: AmountUnit) -> Amount {
906        unimplemented!()
907    }
908
909    /// Returns the balance held by this module and available for funding
910    /// transactions.
911    async fn get_balances(&self, _dbtx: &mut DatabaseTransaction<'_>) -> Amounts {
912        unimplemented!()
913    }
914
915    /// Returns a stream that will output the updated module balance each time
916    /// it changes.
917    async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
918        unimplemented!()
919    }
920
921    /// Leave the federation
922    ///
923    /// While technically there's nothing stopping the client from just
924    /// abandoning Federation at any point by deleting all the related
925    /// local data, it is useful to make sure it's safe beforehand.
926    ///
927    /// This call indicates the desire of the caller client code
928    /// to orderly and safely leave the Federation by this module instance.
929    /// The goal of the implementations is to fulfil that wish,
930    /// giving prompt and informative feedback if it's not yet possible.
931    ///
932    /// The client module implementation should handle the request
933    /// and return as fast as possible avoiding blocking for longer than
934    /// necessary. This would usually involve some combination of:
935    ///
936    /// * recording the state of being in process of leaving the Federation to
937    ///   prevent initiating new conditions that could delay its completion;
938    /// * performing any fast to complete cleanup/exit logic;
939    /// * initiating any time-consuming logic (e.g. canceling outstanding
940    ///   contracts), as background jobs, tasks machines, etc.
941    /// * checking for any conditions indicating it might not be safe to leave
942    ///   at the moment.
943    ///
944    /// This function should return `Ok` only if from the perspective
945    /// of this module instance, it is safe to delete client data and
946    /// stop using it, with no further actions (like background jobs) required
947    /// to complete.
948    ///
949    /// This function should return an error if it's not currently possible
950    /// to safely (e.g. without losing funds) leave the Federation.
951    /// It should avoid running indefinitely trying to complete any cleanup
952    /// actions necessary to reach a clean state, preferring spawning new
953    /// state machines and returning an informative error about cleanup
954    /// still in progress.
955    ///
956    /// If any internal task needs to complete, any user action is required,
957    /// or even external condition needs to be met this function
958    /// should return a `Err`.
959    ///
960    /// Notably modules should not disable interaction that might be necessary
961    /// for the user (possibly through other modules) to leave the Federation.
962    /// In particular a Mint module should retain ability to create new notes,
963    /// and LN module should retain ability to send funds out.
964    ///
965    /// Calling code must NOT assume that a module that once returned `Ok`,
966    /// will not return `Err` at later point. E.g. a Mint module might have
967    /// no outstanding balance at first, but other modules winding down
968    /// might "cash-out" to Ecash.
969    ///
970    /// Before leaving the Federation and deleting any state the calling code
971    /// must collect a full round of `Ok` from all the modules.
972    ///
973    /// Calling code should allow the user to override and ignore any
974    /// outstanding errors, after sufficient amount of warnings. Ideally,
975    /// this should be done on per-module basis, to avoid mistakes.
976    async fn leave(&self, _dbtx: &mut DatabaseTransaction<'_>) -> anyhow::Result<()> {
977        bail!("Unable to determine if safe to leave the federation: Not implemented")
978    }
979}
980
981/// Type-erased version of [`ClientModule`]
982#[apply(async_trait_maybe_send!)]
983pub trait IClientModule: Debug {
984    fn as_any(&self) -> &(maybe_add_send_sync!(dyn std::any::Any));
985
986    fn decoder(&self) -> Decoder;
987
988    fn context(&self, instance: ModuleInstanceId) -> DynContext;
989
990    async fn start(&self);
991
992    async fn handle_cli_command(&self, args: &[ffi::OsString])
993    -> anyhow::Result<serde_json::Value>;
994
995    async fn handle_rpc(
996        &self,
997        method: String,
998        request: serde_json::Value,
999    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>>;
1000
1001    fn input_fee(&self, amount: &Amounts, input: &DynInput) -> Option<Amounts>;
1002
1003    fn output_fee(&self, amount: &Amounts, output: &DynOutput) -> Option<Amounts>;
1004
1005    fn supports_backup(&self) -> bool;
1006
1007    async fn backup(&self, module_instance_id: ModuleInstanceId)
1008    -> anyhow::Result<DynModuleBackup>;
1009
1010    fn supports_being_primary(&self) -> PrimaryModuleSupport;
1011
1012    async fn create_final_inputs_and_outputs(
1013        &self,
1014        module_instance: ModuleInstanceId,
1015        dbtx: &mut DatabaseTransaction<'_>,
1016        operation_id: OperationId,
1017        unit: AmountUnit,
1018        input_amount: Amount,
1019        output_amount: Amount,
1020    ) -> anyhow::Result<(ClientInputBundle, ClientOutputBundle)>;
1021
1022    async fn await_primary_module_output(
1023        &self,
1024        operation_id: OperationId,
1025        out_point: OutPoint,
1026    ) -> anyhow::Result<()>;
1027
1028    async fn get_balance(
1029        &self,
1030        module_instance: ModuleInstanceId,
1031        dbtx: &mut DatabaseTransaction<'_>,
1032        unit: AmountUnit,
1033    ) -> Amount;
1034
1035    async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()>;
1036}
1037
1038#[apply(async_trait_maybe_send!)]
1039impl<T> IClientModule for T
1040where
1041    T: ClientModule,
1042{
1043    fn as_any(&self) -> &(maybe_add_send_sync!(dyn Any)) {
1044        self
1045    }
1046
1047    fn decoder(&self) -> Decoder {
1048        T::decoder()
1049    }
1050
1051    fn context(&self, instance: ModuleInstanceId) -> DynContext {
1052        DynContext::from_typed(instance, <T as ClientModule>::context(self))
1053    }
1054
1055    async fn start(&self) {
1056        <T as ClientModule>::start(self).await;
1057    }
1058
1059    async fn handle_cli_command(
1060        &self,
1061        args: &[ffi::OsString],
1062    ) -> anyhow::Result<serde_json::Value> {
1063        <T as ClientModule>::handle_cli_command(self, args).await
1064    }
1065
1066    async fn handle_rpc(
1067        &self,
1068        method: String,
1069        request: serde_json::Value,
1070    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1071        <T as ClientModule>::handle_rpc(self, method, request).await
1072    }
1073
1074    fn input_fee(&self, amount: &Amounts, input: &DynInput) -> Option<Amounts> {
1075        <T as ClientModule>::input_fee(
1076            self,
1077            amount,
1078            input
1079                .as_any()
1080                .downcast_ref()
1081                .expect("Dispatched to correct module"),
1082        )
1083    }
1084
1085    fn output_fee(&self, amount: &Amounts, output: &DynOutput) -> Option<Amounts> {
1086        <T as ClientModule>::output_fee(
1087            self,
1088            amount,
1089            output
1090                .as_any()
1091                .downcast_ref()
1092                .expect("Dispatched to correct module"),
1093        )
1094    }
1095
1096    fn supports_backup(&self) -> bool {
1097        <T as ClientModule>::supports_backup(self)
1098    }
1099
1100    async fn backup(
1101        &self,
1102        module_instance_id: ModuleInstanceId,
1103    ) -> anyhow::Result<DynModuleBackup> {
1104        Ok(DynModuleBackup::from_typed(
1105            module_instance_id,
1106            <T as ClientModule>::backup(self).await?,
1107        ))
1108    }
1109
1110    fn supports_being_primary(&self) -> PrimaryModuleSupport {
1111        <T as ClientModule>::supports_being_primary(self)
1112    }
1113
1114    async fn create_final_inputs_and_outputs(
1115        &self,
1116        module_instance: ModuleInstanceId,
1117        dbtx: &mut DatabaseTransaction<'_>,
1118        operation_id: OperationId,
1119        unit: AmountUnit,
1120        input_amount: Amount,
1121        output_amount: Amount,
1122    ) -> anyhow::Result<(ClientInputBundle, ClientOutputBundle)> {
1123        let (inputs, outputs) = <T as ClientModule>::create_final_inputs_and_outputs(
1124            self,
1125            &mut dbtx.to_ref_with_prefix_module_id(module_instance).0,
1126            operation_id,
1127            unit,
1128            input_amount,
1129            output_amount,
1130        )
1131        .await?;
1132
1133        let inputs = inputs.into_dyn(module_instance);
1134
1135        let outputs = outputs.into_dyn(module_instance);
1136
1137        Ok((inputs, outputs))
1138    }
1139
1140    async fn await_primary_module_output(
1141        &self,
1142        operation_id: OperationId,
1143        out_point: OutPoint,
1144    ) -> anyhow::Result<()> {
1145        <T as ClientModule>::await_primary_module_output(self, operation_id, out_point).await
1146    }
1147
1148    async fn get_balance(
1149        &self,
1150        module_instance: ModuleInstanceId,
1151        dbtx: &mut DatabaseTransaction<'_>,
1152        unit: AmountUnit,
1153    ) -> Amount {
1154        <T as ClientModule>::get_balance(
1155            self,
1156            &mut dbtx.to_ref_with_prefix_module_id(module_instance).0,
1157            unit,
1158        )
1159        .await
1160    }
1161
1162    async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
1163        <T as ClientModule>::subscribe_balance_changes(self).await
1164    }
1165}
1166
1167dyn_newtype_define!(
1168    #[derive(Clone)]
1169    pub DynClientModule(Arc<IClientModule>)
1170);
1171
1172impl AsRef<maybe_add_send_sync!(dyn IClientModule + 'static)> for DynClientModule {
1173    fn as_ref(&self) -> &maybe_add_send_sync!(dyn IClientModule + 'static) {
1174        self.inner.as_ref()
1175    }
1176}
1177
1178// Re-export types from fedimint_core
1179pub use fedimint_core::{IdxRange, OutPointRange, OutPointRangeIter};
1180
1181pub type StateGenerator<S> = Arc<maybe_add_send_sync!(dyn Fn(OutPointRange) -> Vec<S> + 'static)>;