Skip to main content

fedimint_client_module/module/
mod.rs

1use core::fmt;
2use std::any::Any;
3use std::collections::BTreeSet;
4use std::fmt::Debug;
5use std::pin::Pin;
6use std::sync::{Arc, Weak};
7use std::{ffi, marker, ops};
8
9use anyhow::{anyhow, bail};
10use bitcoin::secp256k1::PublicKey;
11use fedimint_api_client::api::{DynGlobalApi, DynModuleApi};
12use fedimint_core::config::ClientConfig;
13use fedimint_core::core::{
14    Decoder, DynInput, DynOutput, IInput, IntoDynInstance, ModuleInstanceId, ModuleKind,
15    OperationId,
16};
17use fedimint_core::db::{Database, DatabaseTransaction, GlobalDBTxAccessToken, NonCommittable};
18use fedimint_core::invite_code::InviteCode;
19use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
20use fedimint_core::module::{AmountUnit, Amounts, CommonModuleInit, ModuleCommon, ModuleInit};
21use fedimint_core::task::{MaybeSend, MaybeSync};
22use fedimint_core::util::BoxStream;
23use fedimint_core::{
24    Amount, OutPoint, PeerId, apply, async_trait_maybe_send, dyn_newtype_define, maybe_add_send,
25    maybe_add_send_sync,
26};
27use fedimint_eventlog::{Event, EventKind, EventPersistence};
28use fedimint_logging::LOG_CLIENT;
29use futures::Stream;
30use serde::Serialize;
31use serde::de::DeserializeOwned;
32use tracing::warn;
33
34use self::init::ClientModuleInit;
35use crate::module::recovery::{DynModuleBackup, ModuleBackup};
36use crate::oplog::{IOperationLog, OperationLogEntry, UpdateStreamOrOutcome};
37use crate::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
38use crate::sm::{self, ActiveStateMeta, Context, DynContext, DynState, InactiveStateMeta, State};
39use crate::transaction::{ClientInputBundle, ClientOutputBundle, TransactionBuilder};
40use crate::{AddStateMachinesResult, InstancelessDynClientInputBundle, TransactionUpdates, oplog};
41
42pub mod init;
43pub mod recovery;
44
45pub type ClientModuleRegistry = ModuleRegistry<DynClientModule>;
46
47/// A fedimint-client interface exposed to client modules
48///
49/// To break the dependency of the client modules on the whole fedimint client
50/// and in particular the `fedimint-client` crate, the module gets access to an
51/// interface, that is implemented by the `Client`.
52///
53/// This allows lose coupling, less recompilation and better control and
54/// understanding of what functionality of the Client the modules get access to.
55#[apply(async_trait_maybe_send!)]
56pub trait ClientContextIface: MaybeSend + MaybeSync {
57    fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule);
58    fn api_clone(&self) -> DynGlobalApi;
59    fn decoders(&self) -> &ModuleDecoderRegistry;
60    async fn finalize_and_submit_transaction(
61        &self,
62        operation_id: OperationId,
63        operation_type: &str,
64        operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
65        tx_builder: TransactionBuilder,
66    ) -> anyhow::Result<OutPointRange>;
67
68    async fn finalize_and_submit_transaction_dbtx(
69        &self,
70        dbtx: &mut DatabaseTransaction<'_>,
71        operation_id: OperationId,
72        operation_type: &str,
73        operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
74        tx_builder: TransactionBuilder,
75    ) -> anyhow::Result<OutPointRange>;
76
77    // TODO: unify
78    async fn finalize_and_submit_transaction_inner(
79        &self,
80        dbtx: &mut DatabaseTransaction<'_>,
81        operation_id: OperationId,
82        tx_builder: TransactionBuilder,
83    ) -> anyhow::Result<OutPointRange>;
84
85    async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates;
86
87    async fn await_primary_module_outputs(
88        &self,
89        operation_id: OperationId,
90        // TODO: make `impl Iterator<Item = ...>`
91        outputs: Vec<OutPoint>,
92    ) -> anyhow::Result<()>;
93
94    fn operation_log(&self) -> &dyn IOperationLog;
95
96    async fn has_active_states(&self, operation_id: OperationId) -> bool;
97
98    async fn operation_exists(&self, operation_id: OperationId) -> bool;
99
100    async fn config(&self) -> ClientConfig;
101
102    fn db(&self) -> &Database;
103
104    fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static));
105
106    async fn invite_code(&self, peer: PeerId) -> Option<InviteCode>;
107
108    fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)>;
109
110    #[allow(clippy::too_many_arguments)]
111    async fn log_event_json(
112        &self,
113        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
114        module_kind: Option<ModuleKind>,
115        module_id: ModuleInstanceId,
116        kind: EventKind,
117        payload: serde_json::Value,
118        persist: EventPersistence,
119    );
120
121    async fn read_operation_active_states<'dbtx>(
122        &self,
123        operation_id: OperationId,
124        module_id: ModuleInstanceId,
125        dbtx: &'dbtx mut DatabaseTransaction<'_>,
126    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>;
127
128    async fn read_operation_inactive_states<'dbtx>(
129        &self,
130        operation_id: OperationId,
131        module_id: ModuleInstanceId,
132        dbtx: &'dbtx mut DatabaseTransaction<'_>,
133    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>;
134}
135
136/// A final, fully initialized client
137///
138/// Client modules need to be able to access a `Client` they are a part
139/// of. To break the circular dependency, the final `Client` is passed
140/// after `Client` was built via a shared state.
141#[derive(Clone, Default)]
142pub struct FinalClientIface(Arc<std::sync::OnceLock<Weak<dyn ClientContextIface>>>);
143
144impl FinalClientIface {
145    /// Get a temporary strong reference to [`ClientContextIface`]
146    ///
147    /// Care must be taken to not let the user take ownership of this value,
148    /// and not store it elsewhere permanently either, as it could prevent
149    /// the cleanup of the Client.
150    pub(crate) fn get(&self) -> Arc<dyn ClientContextIface> {
151        self.0
152            .get()
153            .expect("client must be already set")
154            .upgrade()
155            .expect("client module context must not be use past client shutdown")
156    }
157
158    pub fn set(&self, client: Weak<dyn ClientContextIface>) {
159        self.0.set(client).expect("FinalLazyClient already set");
160    }
161}
162
163impl fmt::Debug for FinalClientIface {
164    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
165        f.write_str("FinalClientIface")
166    }
167}
168/// A Client context for a [`ClientModule`] `M`
169///
170/// Client modules can interact with the whole
171/// client through this struct.
172pub struct ClientContext<M> {
173    client: FinalClientIface,
174    module_instance_id: ModuleInstanceId,
175    global_dbtx_access_token: GlobalDBTxAccessToken,
176    module_db: Database,
177    _marker: marker::PhantomData<M>,
178}
179
180impl<M> Clone for ClientContext<M> {
181    fn clone(&self) -> Self {
182        Self {
183            client: self.client.clone(),
184            module_db: self.module_db.clone(),
185            module_instance_id: self.module_instance_id,
186            _marker: marker::PhantomData,
187            global_dbtx_access_token: self.global_dbtx_access_token,
188        }
189    }
190}
191
192/// A reference back to itself that the module cacn get from the
193/// [`ClientContext`]
194pub struct ClientContextSelfRef<'s, M> {
195    // we are OK storing `ClientStrong` here, because of the `'s` preventing `Self` from being
196    // stored permanently somewhere
197    client: Arc<dyn ClientContextIface>,
198    module_instance_id: ModuleInstanceId,
199    _marker: marker::PhantomData<&'s M>,
200}
201
202impl<M> ops::Deref for ClientContextSelfRef<'_, M>
203where
204    M: ClientModule,
205{
206    type Target = M;
207
208    fn deref(&self) -> &Self::Target {
209        self.client
210            .get_module(self.module_instance_id)
211            .as_any()
212            .downcast_ref::<M>()
213            .unwrap_or_else(|| panic!("Module is not of type {}", std::any::type_name::<M>()))
214    }
215}
216
217impl<M> fmt::Debug for ClientContext<M> {
218    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
219        f.write_str("ClientContext")
220    }
221}
222
223impl<M> ClientContext<M>
224where
225    M: ClientModule,
226{
227    pub fn new(
228        client: FinalClientIface,
229        module_instance_id: ModuleInstanceId,
230        global_dbtx_access_token: GlobalDBTxAccessToken,
231        module_db: Database,
232    ) -> Self {
233        Self {
234            client,
235            module_instance_id,
236            global_dbtx_access_token,
237            module_db,
238            _marker: marker::PhantomData,
239        }
240    }
241
242    /// Get a reference back to client module from the [`Self`]
243    ///
244    /// It's often necessary for a client module to "move self"
245    /// by-value, especially due to async lifetimes issues.
246    /// Clients usually work with `&mut self`, which can't really
247    /// work in such context.
248    ///
249    /// Fortunately [`ClientContext`] is `Clone` and `Send, and
250    /// can be used to recover the reference to the module at later
251    /// time.
252    #[allow(clippy::needless_lifetimes)] // just for explicitiness
253    pub fn self_ref(&self) -> ClientContextSelfRef<'_, M> {
254        ClientContextSelfRef {
255            client: self.client.get(),
256            module_instance_id: self.module_instance_id,
257            _marker: marker::PhantomData,
258        }
259    }
260
261    /// Get a reference to a global Api handle
262    pub fn global_api(&self) -> DynGlobalApi {
263        self.client.get().api_clone()
264    }
265
266    /// Get a reference to a module Api handle
267    pub fn module_api(&self) -> DynModuleApi {
268        self.global_api().with_module(self.module_instance_id)
269    }
270
271    /// A set of all decoders of all modules of the client
272    pub fn decoders(&self) -> ModuleDecoderRegistry {
273        Clone::clone(self.client.get().decoders())
274    }
275
276    pub fn input_from_dyn<'i>(
277        &self,
278        input: &'i DynInput,
279    ) -> Option<&'i <M::Common as ModuleCommon>::Input> {
280        (input.module_instance_id() == self.module_instance_id).then(|| {
281            input
282                .as_any()
283                .downcast_ref::<<M::Common as ModuleCommon>::Input>()
284                .unwrap_or_else(|| {
285                    panic!("instance_id {} just checked", input.module_instance_id())
286                })
287        })
288    }
289
290    pub fn output_from_dyn<'o>(
291        &self,
292        output: &'o DynOutput,
293    ) -> Option<&'o <M::Common as ModuleCommon>::Output> {
294        (output.module_instance_id() == self.module_instance_id).then(|| {
295            output
296                .as_any()
297                .downcast_ref::<<M::Common as ModuleCommon>::Output>()
298                .unwrap_or_else(|| {
299                    panic!("instance_id {} just checked", output.module_instance_id())
300                })
301        })
302    }
303
304    pub fn map_dyn<'s, 'i, 'o, I>(
305        &'s self,
306        typed: impl IntoIterator<Item = I> + 'i,
307    ) -> impl Iterator<Item = <I as IntoDynInstance>::DynType> + 'o
308    where
309        I: IntoDynInstance,
310        'i: 'o,
311        's: 'o,
312    {
313        typed.into_iter().map(|i| self.make_dyn(i))
314    }
315
316    /// Turn a typed output into a dyn version
317    pub fn make_dyn_output(&self, output: <M::Common as ModuleCommon>::Output) -> DynOutput {
318        self.make_dyn(output)
319    }
320
321    /// Turn a typed input into a dyn version
322    pub fn make_dyn_input(&self, input: <M::Common as ModuleCommon>::Input) -> DynInput {
323        self.make_dyn(input)
324    }
325
326    /// Turn a `typed` into a dyn version
327    pub fn make_dyn<I>(&self, typed: I) -> <I as IntoDynInstance>::DynType
328    where
329        I: IntoDynInstance,
330    {
331        typed.into_dyn(self.module_instance_id)
332    }
333
334    /// Turn a typed [`ClientOutputBundle`] into a dyn version
335    pub fn make_client_outputs<O, S>(&self, output: ClientOutputBundle<O, S>) -> ClientOutputBundle
336    where
337        O: IntoDynInstance<DynType = DynOutput> + 'static,
338        S: IntoDynInstance<DynType = DynState> + 'static,
339    {
340        self.make_dyn(output)
341    }
342
343    /// Turn a typed [`ClientInputBundle`] into a dyn version
344    pub fn make_client_inputs<I, S>(&self, inputs: ClientInputBundle<I, S>) -> ClientInputBundle
345    where
346        I: IntoDynInstance<DynType = DynInput> + 'static,
347        S: IntoDynInstance<DynType = DynState> + 'static,
348    {
349        self.make_dyn(inputs)
350    }
351
352    pub fn make_dyn_state<S>(&self, sm: S) -> DynState
353    where
354        S: sm::IState + 'static,
355    {
356        DynState::from_typed(self.module_instance_id, sm)
357    }
358
359    pub async fn finalize_and_submit_transaction<F, Meta>(
360        &self,
361        operation_id: OperationId,
362        operation_type: &str,
363        operation_meta_gen: F,
364        tx_builder: TransactionBuilder,
365    ) -> anyhow::Result<OutPointRange>
366    where
367        F: Fn(OutPointRange) -> Meta + Clone + MaybeSend + MaybeSync + 'static,
368        Meta: serde::Serialize + MaybeSend,
369    {
370        self.client
371            .get()
372            .finalize_and_submit_transaction(
373                operation_id,
374                operation_type,
375                Box::new(move |out_point_range| {
376                    serde_json::to_value(operation_meta_gen(out_point_range)).expect("Can't fail")
377                }),
378                tx_builder,
379            )
380            .await
381    }
382
383    pub async fn finalize_and_submit_transaction_dbtx<F, Meta>(
384        &self,
385        dbtx: &mut DatabaseTransaction<'_>,
386        operation_id: OperationId,
387        operation_type: &str,
388        operation_meta_gen: F,
389        tx_builder: TransactionBuilder,
390    ) -> anyhow::Result<OutPointRange>
391    where
392        F: Fn(OutPointRange) -> Meta + MaybeSend + MaybeSync + 'static,
393        Meta: serde::Serialize + MaybeSend,
394    {
395        self.client
396            .get()
397            .finalize_and_submit_transaction_dbtx(
398                &mut dbtx.global_dbtx(self.global_dbtx_access_token),
399                operation_id,
400                operation_type,
401                Box::new(move |out_point_range| {
402                    serde_json::to_value(operation_meta_gen(out_point_range)).expect("Can't fail")
403                }),
404                tx_builder,
405            )
406            .await
407    }
408
409    pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
410        self.client.get().transaction_updates(operation_id).await
411    }
412
413    pub async fn await_primary_module_outputs(
414        &self,
415        operation_id: OperationId,
416        // TODO: make `impl Iterator<Item = ...>`
417        outputs: Vec<OutPoint>,
418    ) -> anyhow::Result<()> {
419        self.client
420            .get()
421            .await_primary_module_outputs(operation_id, outputs)
422            .await
423    }
424
425    // TODO: unify with `Self::get_operation`
426    pub async fn get_operation(
427        &self,
428        operation_id: OperationId,
429    ) -> anyhow::Result<oplog::OperationLogEntry> {
430        let operation = self
431            .client
432            .get()
433            .operation_log()
434            .get_operation(operation_id)
435            .await
436            .ok_or(anyhow::anyhow!("Operation not found"))?;
437
438        if operation.operation_module_kind() != M::kind().as_str() {
439            bail!("Operation is not a lightning operation");
440        }
441
442        Ok(operation)
443    }
444
445    /// Get global db.
446    ///
447    /// Only intended for internal use (private).
448    fn global_db(&self) -> fedimint_core::db::Database {
449        let db = Clone::clone(self.client.get().db());
450
451        db.ensure_global()
452            .expect("global_db must always return a global db");
453
454        db
455    }
456
457    pub fn module_db(&self) -> &Database {
458        self.module_db
459            .ensure_isolated()
460            .expect("module_db must always return isolated db");
461        &self.module_db
462    }
463
464    pub async fn has_active_states(&self, op_id: OperationId) -> bool {
465        self.client.get().has_active_states(op_id).await
466    }
467
468    pub async fn operation_exists(&self, op_id: OperationId) -> bool {
469        self.client.get().operation_exists(op_id).await
470    }
471
472    pub async fn get_own_active_states(&self) -> Vec<(M::States, ActiveStateMeta)> {
473        self.client
474            .get()
475            .executor()
476            .get_active_states()
477            .await
478            .into_iter()
479            .filter(|s| s.0.module_instance_id() == self.module_instance_id)
480            .map(|s| {
481                (
482                    Clone::clone(
483                        s.0.as_any()
484                            .downcast_ref::<M::States>()
485                            .expect("incorrect output type passed to module plugin"),
486                    ),
487                    s.1,
488                )
489            })
490            .collect()
491    }
492
493    pub async fn get_config(&self) -> ClientConfig {
494        self.client.get().config().await
495    }
496
497    /// Returns an invite code for the federation that points to an arbitrary
498    /// guardian server for fetching the config
499    pub async fn get_invite_code(&self) -> InviteCode {
500        let cfg = self.get_config().await.global;
501        self.client
502            .get()
503            .invite_code(
504                *cfg.api_endpoints
505                    .keys()
506                    .next()
507                    .expect("A federation always has at least one guardian"),
508            )
509            .await
510            .expect("The guardian we requested an invite code for exists")
511    }
512
513    pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
514        self.client.get().get_internal_payment_markers()
515    }
516
517    /// This method starts n state machines with given operation id without a
518    /// corresponding transaction
519    pub async fn manual_operation_start(
520        &self,
521        operation_id: OperationId,
522        op_type: &str,
523        operation_meta: impl serde::Serialize + Debug,
524        sms: Vec<DynState>,
525    ) -> anyhow::Result<()> {
526        let db = self.module_db();
527        let mut dbtx = db.begin_transaction().await;
528        {
529            let dbtx = &mut dbtx.global_dbtx(self.global_dbtx_access_token);
530
531            self.manual_operation_start_inner(
532                &mut dbtx.to_ref_nc(),
533                operation_id,
534                op_type,
535                operation_meta,
536                sms,
537            )
538            .await?;
539        }
540
541        dbtx.commit_tx_result().await.map_err(|_| {
542            anyhow!(
543                "Operation with id {} already exists",
544                operation_id.fmt_short()
545            )
546        })?;
547
548        Ok(())
549    }
550
551    pub async fn manual_operation_start_dbtx(
552        &self,
553        dbtx: &mut DatabaseTransaction<'_>,
554        operation_id: OperationId,
555        op_type: &str,
556        operation_meta: impl serde::Serialize + Debug,
557        sms: Vec<DynState>,
558    ) -> anyhow::Result<()> {
559        self.manual_operation_start_inner(
560            &mut dbtx.global_dbtx(self.global_dbtx_access_token),
561            operation_id,
562            op_type,
563            operation_meta,
564            sms,
565        )
566        .await
567    }
568
569    /// See [`Self::manual_operation_start`], just inside a database
570    /// transaction.
571    async fn manual_operation_start_inner(
572        &self,
573        dbtx: &mut DatabaseTransaction<'_>,
574        operation_id: OperationId,
575        op_type: &str,
576        operation_meta: impl serde::Serialize + Debug,
577        sms: Vec<DynState>,
578    ) -> anyhow::Result<()> {
579        dbtx.ensure_global()
580            .expect("Must deal with global dbtx here");
581
582        if self
583            .client
584            .get()
585            .operation_log()
586            .get_operation_dbtx(&mut dbtx.to_ref_nc(), operation_id)
587            .await
588            .is_some()
589        {
590            bail!(
591                "Operation with id {} already exists",
592                operation_id.fmt_short()
593            );
594        }
595
596        self.client
597            .get()
598            .operation_log()
599            .add_operation_log_entry_dbtx(
600                &mut dbtx.to_ref_nc(),
601                operation_id,
602                op_type,
603                serde_json::to_value(operation_meta).expect("Can't fail"),
604            )
605            .await;
606
607        self.client
608            .get()
609            .executor()
610            .add_state_machines_dbtx(&mut dbtx.to_ref_nc(), sms)
611            .await
612            .expect("State machine is valid");
613
614        Ok(())
615    }
616
617    pub fn outcome_or_updates<U, S>(
618        &self,
619        operation: OperationLogEntry,
620        operation_id: OperationId,
621        stream_gen: impl FnOnce() -> S + 'static,
622    ) -> UpdateStreamOrOutcome<U>
623    where
624        U: Clone + Serialize + DeserializeOwned + Debug + MaybeSend + MaybeSync + 'static,
625        S: Stream<Item = U> + MaybeSend + 'static,
626    {
627        use futures::StreamExt;
628        match self.client.get().operation_log().outcome_or_updates(
629            &self.global_db(),
630            operation_id,
631            operation,
632            Box::new(move || {
633                let stream_gen = stream_gen();
634                Box::pin(
635                    stream_gen.map(move |item| serde_json::to_value(item).expect("Can't fail")),
636                )
637            }),
638        ) {
639            UpdateStreamOrOutcome::UpdateStream(stream) => UpdateStreamOrOutcome::UpdateStream(
640                Box::pin(stream.map(|u| serde_json::from_value(u).expect("Can't fail"))),
641            ),
642            UpdateStreamOrOutcome::Outcome(o) => {
643                UpdateStreamOrOutcome::Outcome(serde_json::from_value(o).expect("Can't fail"))
644            }
645        }
646    }
647
648    pub async fn claim_inputs<I, S>(
649        &self,
650        dbtx: &mut DatabaseTransaction<'_>,
651        inputs: ClientInputBundle<I, S>,
652        operation_id: OperationId,
653    ) -> anyhow::Result<OutPointRange>
654    where
655        I: IInput + MaybeSend + MaybeSync + 'static,
656        S: sm::IState + MaybeSend + MaybeSync + 'static,
657    {
658        self.claim_inputs_dyn(dbtx, inputs.into_instanceless(), operation_id)
659            .await
660    }
661
662    async fn claim_inputs_dyn(
663        &self,
664        dbtx: &mut DatabaseTransaction<'_>,
665        inputs: InstancelessDynClientInputBundle,
666        operation_id: OperationId,
667    ) -> anyhow::Result<OutPointRange> {
668        let tx_builder =
669            TransactionBuilder::new().with_inputs(inputs.into_dyn(self.module_instance_id));
670
671        self.client
672            .get()
673            .finalize_and_submit_transaction_inner(
674                &mut dbtx.global_dbtx(self.global_dbtx_access_token),
675                operation_id,
676                tx_builder,
677            )
678            .await
679    }
680
681    pub async fn add_state_machines_dbtx(
682        &self,
683        dbtx: &mut DatabaseTransaction<'_>,
684        states: Vec<DynState>,
685    ) -> AddStateMachinesResult {
686        self.client
687            .get()
688            .executor()
689            .add_state_machines_dbtx(&mut dbtx.global_dbtx(self.global_dbtx_access_token), states)
690            .await
691    }
692
693    pub async fn add_operation_log_entry_dbtx(
694        &self,
695        dbtx: &mut DatabaseTransaction<'_>,
696        operation_id: OperationId,
697        operation_type: &str,
698        operation_meta: impl serde::Serialize,
699    ) {
700        self.client
701            .get()
702            .operation_log()
703            .add_operation_log_entry_dbtx(
704                &mut dbtx.global_dbtx(self.global_dbtx_access_token),
705                operation_id,
706                operation_type,
707                serde_json::to_value(operation_meta).expect("Can't fail"),
708            )
709            .await;
710    }
711
712    pub async fn log_event<E, Cap>(&self, dbtx: &mut DatabaseTransaction<'_, Cap>, event: E)
713    where
714        E: Event + Send,
715        Cap: Send,
716    {
717        if <E as Event>::MODULE != Some(<M as ClientModule>::kind()) {
718            warn!(
719                target: LOG_CLIENT,
720                module_kind = %<M as ClientModule>::kind(),
721                event_module = ?<E as Event>::MODULE,
722                "Client module logging events of different module than its own. This might become an error in the future."
723            );
724        }
725        self.client
726            .get()
727            .log_event_json(
728                &mut dbtx.global_dbtx(self.global_dbtx_access_token).to_ref_nc(),
729                <E as Event>::MODULE,
730                self.module_instance_id,
731                <E as Event>::KIND,
732                serde_json::to_value(event).expect("Can't fail"),
733                <E as Event>::PERSISTENCE,
734            )
735            .await;
736    }
737}
738
739/// Priority module priority (lower number is higher priority)
740#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
741pub struct PrimaryModulePriority(u64);
742
743impl PrimaryModulePriority {
744    pub const HIGH: Self = Self(100);
745    pub const LOW: Self = Self(10000);
746
747    pub fn custom(prio: u64) -> Self {
748        Self(prio)
749    }
750}
751/// Which amount units this module supports being primary for
752pub enum PrimaryModuleSupport {
753    /// Potentially any unit
754    Any { priority: PrimaryModulePriority },
755    /// Some units supported by the module
756    Selected {
757        priority: PrimaryModulePriority,
758        units: BTreeSet<AmountUnit>,
759    },
760    /// None
761    None,
762}
763
764impl PrimaryModuleSupport {
765    pub fn selected<const N: usize>(
766        priority: PrimaryModulePriority,
767        units: [AmountUnit; N],
768    ) -> Self {
769        Self::Selected {
770            priority,
771            units: BTreeSet::from(units),
772        }
773    }
774}
775
776/// Fedimint module client
777#[apply(async_trait_maybe_send!)]
778pub trait ClientModule: Debug + MaybeSend + MaybeSync + 'static {
779    type Init: ClientModuleInit;
780
781    /// Common module types shared between client and server
782    type Common: ModuleCommon;
783
784    /// Data stored in regular backups so that restoring doesn't have to start
785    /// from epoch 0
786    type Backup: ModuleBackup;
787
788    /// Data and API clients available to state machine transitions of this
789    /// module
790    type ModuleStateMachineContext: Context;
791
792    /// All possible states this client can submit to the executor
793    type States: State<ModuleContext = Self::ModuleStateMachineContext>
794        + IntoDynInstance<DynType = DynState>;
795
796    fn decoder() -> Decoder {
797        let mut decoder_builder = Self::Common::decoder_builder();
798        decoder_builder.with_decodable_type::<Self::States>();
799        decoder_builder.with_decodable_type::<Self::Backup>();
800        decoder_builder.build()
801    }
802
803    fn kind() -> ModuleKind {
804        <<<Self as ClientModule>::Init as ModuleInit>::Common as CommonModuleInit>::KIND
805    }
806
807    fn context(&self) -> Self::ModuleStateMachineContext;
808
809    /// Initialize client.
810    ///
811    /// Called by the core client code on start, after [`ClientContext`] is
812    /// fully initialized, so unlike during [`ClientModuleInit::init`],
813    /// access to global client is allowed.
814    async fn start(&self) {}
815
816    async fn handle_cli_command(
817        &self,
818        _args: &[ffi::OsString],
819    ) -> anyhow::Result<serde_json::Value> {
820        Err(anyhow::format_err!(
821            "This module does not implement cli commands"
822        ))
823    }
824
825    async fn handle_rpc(
826        &self,
827        _method: String,
828        _request: serde_json::Value,
829    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
830        Box::pin(futures::stream::once(std::future::ready(Err(
831            anyhow::format_err!("This module does not implement rpc"),
832        ))))
833    }
834
835    /// Returns the fee the processing of this input requires.
836    ///
837    /// If the semantics of a given input aren't known this function returns
838    /// `None`, this only happens if a future version of Fedimint introduces a
839    /// new input variant. For clients this should only be the case when
840    /// processing transactions created by other users, so the result of
841    /// this function can be `unwrap`ped whenever dealing with inputs
842    /// generated by ourselves.
843    fn input_fee(
844        &self,
845        amount: &Amounts,
846        input: &<Self::Common as ModuleCommon>::Input,
847    ) -> Option<Amounts>;
848
849    /// Returns the fee the processing of this output requires.
850    ///
851    /// If the semantics of a given output aren't known this function returns
852    /// `None`, this only happens if a future version of Fedimint introduces a
853    /// new output variant. For clients this should only be the case when
854    /// processing transactions created by other users, so the result of
855    /// this function can be `unwrap`ped whenever dealing with inputs
856    /// generated by ourselves.
857    fn output_fee(
858        &self,
859        amount: &Amounts,
860        output: &<Self::Common as ModuleCommon>::Output,
861    ) -> Option<Amounts>;
862
863    fn supports_backup(&self) -> bool {
864        false
865    }
866
867    async fn backup(&self) -> anyhow::Result<Self::Backup> {
868        anyhow::bail!("Backup not supported");
869    }
870
871    /// Does this module support being a primary module
872    ///
873    /// If it does it must implement:
874    ///
875    /// * [`Self::create_final_inputs_and_outputs`]
876    /// * [`Self::await_primary_module_output`]
877    /// * [`Self::get_balance`]
878    /// * [`Self::subscribe_balance_changes`]
879    fn supports_being_primary(&self) -> PrimaryModuleSupport {
880        PrimaryModuleSupport::None
881    }
882
883    /// Creates all inputs and outputs necessary to balance the transaction.
884    /// The function returns an error if and only if the client's funds are not
885    /// sufficient to create the inputs necessary to fully fund the transaction.
886    ///
887    /// A returned input also contains:
888    /// * A set of private keys belonging to the input for signing the
889    ///   transaction
890    /// * A closure that generates states belonging to the input. This closure
891    ///   takes the transaction id of the transaction in which the input was
892    ///   used and the input index as input since these cannot be known at time
893    ///   of calling `create_funding_input` and have to be injected later.
894    ///
895    /// A returned output also contains:
896    /// * A closure that generates states belonging to the output. This closure
897    ///   takes the transaction id of the transaction in which the output was
898    ///   used and the output index as input since these cannot be known at time
899    ///   of calling `create_change_output` and have to be injected later.
900    async fn create_final_inputs_and_outputs(
901        &self,
902        _dbtx: &mut DatabaseTransaction<'_>,
903        _operation_id: OperationId,
904        _unit: AmountUnit,
905        _input_amount: Amount,
906        _output_amount: Amount,
907    ) -> anyhow::Result<(
908        ClientInputBundle<<Self::Common as ModuleCommon>::Input, Self::States>,
909        ClientOutputBundle<<Self::Common as ModuleCommon>::Output, Self::States>,
910    )> {
911        unimplemented!()
912    }
913
914    /// Waits for the funds from an output created by
915    /// [`Self::create_final_inputs_and_outputs`] to become available. This
916    /// function returning typically implies a change in the output of
917    /// [`Self::get_balance`].
918    async fn await_primary_module_output(
919        &self,
920        _operation_id: OperationId,
921        _out_point: OutPoint,
922    ) -> anyhow::Result<()> {
923        unimplemented!()
924    }
925
926    /// Returns the balance held by this module and available for funding
927    /// transactions.
928    async fn get_balance(&self, _dbtx: &mut DatabaseTransaction<'_>, _unit: AmountUnit) -> Amount {
929        unimplemented!()
930    }
931
932    /// Returns the balance held by this module and available for funding
933    /// transactions.
934    async fn get_balances(&self, _dbtx: &mut DatabaseTransaction<'_>) -> Amounts {
935        unimplemented!()
936    }
937
938    /// Returns a stream that will output the updated module balance each time
939    /// it changes.
940    async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
941        unimplemented!()
942    }
943
944    /// Leave the federation
945    ///
946    /// While technically there's nothing stopping the client from just
947    /// abandoning Federation at any point by deleting all the related
948    /// local data, it is useful to make sure it's safe beforehand.
949    ///
950    /// This call indicates the desire of the caller client code
951    /// to orderly and safely leave the Federation by this module instance.
952    /// The goal of the implementations is to fulfil that wish,
953    /// giving prompt and informative feedback if it's not yet possible.
954    ///
955    /// The client module implementation should handle the request
956    /// and return as fast as possible avoiding blocking for longer than
957    /// necessary. This would usually involve some combination of:
958    ///
959    /// * recording the state of being in process of leaving the Federation to
960    ///   prevent initiating new conditions that could delay its completion;
961    /// * performing any fast to complete cleanup/exit logic;
962    /// * initiating any time-consuming logic (e.g. canceling outstanding
963    ///   contracts), as background jobs, tasks machines, etc.
964    /// * checking for any conditions indicating it might not be safe to leave
965    ///   at the moment.
966    ///
967    /// This function should return `Ok` only if from the perspective
968    /// of this module instance, it is safe to delete client data and
969    /// stop using it, with no further actions (like background jobs) required
970    /// to complete.
971    ///
972    /// This function should return an error if it's not currently possible
973    /// to safely (e.g. without losing funds) leave the Federation.
974    /// It should avoid running indefinitely trying to complete any cleanup
975    /// actions necessary to reach a clean state, preferring spawning new
976    /// state machines and returning an informative error about cleanup
977    /// still in progress.
978    ///
979    /// If any internal task needs to complete, any user action is required,
980    /// or even external condition needs to be met this function
981    /// should return a `Err`.
982    ///
983    /// Notably modules should not disable interaction that might be necessary
984    /// for the user (possibly through other modules) to leave the Federation.
985    /// In particular a Mint module should retain ability to create new notes,
986    /// and LN module should retain ability to send funds out.
987    ///
988    /// Calling code must NOT assume that a module that once returned `Ok`,
989    /// will not return `Err` at later point. E.g. a Mint module might have
990    /// no outstanding balance at first, but other modules winding down
991    /// might "cash-out" to Ecash.
992    ///
993    /// Before leaving the Federation and deleting any state the calling code
994    /// must collect a full round of `Ok` from all the modules.
995    ///
996    /// Calling code should allow the user to override and ignore any
997    /// outstanding errors, after sufficient amount of warnings. Ideally,
998    /// this should be done on per-module basis, to avoid mistakes.
999    async fn leave(&self, _dbtx: &mut DatabaseTransaction<'_>) -> anyhow::Result<()> {
1000        bail!("Unable to determine if safe to leave the federation: Not implemented")
1001    }
1002}
1003
1004/// Type-erased version of [`ClientModule`]
1005#[apply(async_trait_maybe_send!)]
1006pub trait IClientModule: Debug {
1007    fn as_any(&self) -> &(maybe_add_send_sync!(dyn std::any::Any));
1008
1009    fn decoder(&self) -> Decoder;
1010
1011    fn context(&self, instance: ModuleInstanceId) -> DynContext;
1012
1013    async fn start(&self);
1014
1015    async fn handle_cli_command(&self, args: &[ffi::OsString])
1016    -> anyhow::Result<serde_json::Value>;
1017
1018    async fn handle_rpc(
1019        &self,
1020        method: String,
1021        request: serde_json::Value,
1022    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>>;
1023
1024    fn input_fee(&self, amount: &Amounts, input: &DynInput) -> Option<Amounts>;
1025
1026    fn output_fee(&self, amount: &Amounts, output: &DynOutput) -> Option<Amounts>;
1027
1028    fn supports_backup(&self) -> bool;
1029
1030    async fn backup(&self, module_instance_id: ModuleInstanceId)
1031    -> anyhow::Result<DynModuleBackup>;
1032
1033    fn supports_being_primary(&self) -> PrimaryModuleSupport;
1034
1035    async fn create_final_inputs_and_outputs(
1036        &self,
1037        module_instance: ModuleInstanceId,
1038        dbtx: &mut DatabaseTransaction<'_>,
1039        operation_id: OperationId,
1040        unit: AmountUnit,
1041        input_amount: Amount,
1042        output_amount: Amount,
1043    ) -> anyhow::Result<(ClientInputBundle, ClientOutputBundle)>;
1044
1045    async fn await_primary_module_output(
1046        &self,
1047        operation_id: OperationId,
1048        out_point: OutPoint,
1049    ) -> anyhow::Result<()>;
1050
1051    async fn get_balance(
1052        &self,
1053        module_instance: ModuleInstanceId,
1054        dbtx: &mut DatabaseTransaction<'_>,
1055        unit: AmountUnit,
1056    ) -> Amount;
1057
1058    async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()>;
1059}
1060
1061#[apply(async_trait_maybe_send!)]
1062impl<T> IClientModule for T
1063where
1064    T: ClientModule,
1065{
1066    fn as_any(&self) -> &(maybe_add_send_sync!(dyn Any)) {
1067        self
1068    }
1069
1070    fn decoder(&self) -> Decoder {
1071        T::decoder()
1072    }
1073
1074    fn context(&self, instance: ModuleInstanceId) -> DynContext {
1075        DynContext::from_typed(instance, <T as ClientModule>::context(self))
1076    }
1077
1078    async fn start(&self) {
1079        <T as ClientModule>::start(self).await;
1080    }
1081
1082    async fn handle_cli_command(
1083        &self,
1084        args: &[ffi::OsString],
1085    ) -> anyhow::Result<serde_json::Value> {
1086        <T as ClientModule>::handle_cli_command(self, args).await
1087    }
1088
1089    async fn handle_rpc(
1090        &self,
1091        method: String,
1092        request: serde_json::Value,
1093    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1094        <T as ClientModule>::handle_rpc(self, method, request).await
1095    }
1096
1097    fn input_fee(&self, amount: &Amounts, input: &DynInput) -> Option<Amounts> {
1098        <T as ClientModule>::input_fee(
1099            self,
1100            amount,
1101            input
1102                .as_any()
1103                .downcast_ref()
1104                .expect("Dispatched to correct module"),
1105        )
1106    }
1107
1108    fn output_fee(&self, amount: &Amounts, output: &DynOutput) -> Option<Amounts> {
1109        <T as ClientModule>::output_fee(
1110            self,
1111            amount,
1112            output
1113                .as_any()
1114                .downcast_ref()
1115                .expect("Dispatched to correct module"),
1116        )
1117    }
1118
1119    fn supports_backup(&self) -> bool {
1120        <T as ClientModule>::supports_backup(self)
1121    }
1122
1123    async fn backup(
1124        &self,
1125        module_instance_id: ModuleInstanceId,
1126    ) -> anyhow::Result<DynModuleBackup> {
1127        Ok(DynModuleBackup::from_typed(
1128            module_instance_id,
1129            <T as ClientModule>::backup(self).await?,
1130        ))
1131    }
1132
1133    fn supports_being_primary(&self) -> PrimaryModuleSupport {
1134        <T as ClientModule>::supports_being_primary(self)
1135    }
1136
1137    async fn create_final_inputs_and_outputs(
1138        &self,
1139        module_instance: ModuleInstanceId,
1140        dbtx: &mut DatabaseTransaction<'_>,
1141        operation_id: OperationId,
1142        unit: AmountUnit,
1143        input_amount: Amount,
1144        output_amount: Amount,
1145    ) -> anyhow::Result<(ClientInputBundle, ClientOutputBundle)> {
1146        let (inputs, outputs) = <T as ClientModule>::create_final_inputs_and_outputs(
1147            self,
1148            &mut dbtx.to_ref_with_prefix_module_id(module_instance).0,
1149            operation_id,
1150            unit,
1151            input_amount,
1152            output_amount,
1153        )
1154        .await?;
1155
1156        let inputs = inputs.into_dyn(module_instance);
1157
1158        let outputs = outputs.into_dyn(module_instance);
1159
1160        Ok((inputs, outputs))
1161    }
1162
1163    async fn await_primary_module_output(
1164        &self,
1165        operation_id: OperationId,
1166        out_point: OutPoint,
1167    ) -> anyhow::Result<()> {
1168        <T as ClientModule>::await_primary_module_output(self, operation_id, out_point).await
1169    }
1170
1171    async fn get_balance(
1172        &self,
1173        module_instance: ModuleInstanceId,
1174        dbtx: &mut DatabaseTransaction<'_>,
1175        unit: AmountUnit,
1176    ) -> Amount {
1177        <T as ClientModule>::get_balance(
1178            self,
1179            &mut dbtx.to_ref_with_prefix_module_id(module_instance).0,
1180            unit,
1181        )
1182        .await
1183    }
1184
1185    async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
1186        <T as ClientModule>::subscribe_balance_changes(self).await
1187    }
1188}
1189
1190dyn_newtype_define!(
1191    #[derive(Clone)]
1192    pub DynClientModule(Arc<IClientModule>)
1193);
1194
1195impl AsRef<maybe_add_send_sync!(dyn IClientModule + 'static)> for DynClientModule {
1196    fn as_ref(&self) -> &maybe_add_send_sync!(dyn IClientModule + 'static) {
1197        self.inner.as_ref()
1198    }
1199}
1200
1201// Re-export types from fedimint_core
1202pub use fedimint_core::{IdxRange, OutPointRange, OutPointRangeIter};
1203
1204pub type StateGenerator<S> = Arc<maybe_add_send_sync!(dyn Fn(OutPointRange) -> Vec<S> + 'static)>;