fedimint_client_module/module/
mod.rs

1use core::fmt;
2use std::any::Any;
3use std::collections::BTreeSet;
4use std::fmt::Debug;
5use std::ops::Range;
6use std::pin::Pin;
7use std::sync::{Arc, Weak};
8use std::{ffi, marker, ops};
9
10use anyhow::{anyhow, bail};
11use bitcoin::secp256k1::PublicKey;
12use fedimint_api_client::api::{DynGlobalApi, DynModuleApi};
13use fedimint_core::config::ClientConfig;
14use fedimint_core::core::{
15    Decoder, DynInput, DynOutput, IInput, IntoDynInstance, ModuleInstanceId, ModuleKind,
16    OperationId,
17};
18use fedimint_core::db::{Database, DatabaseTransaction, GlobalDBTxAccessToken, NonCommittable};
19use fedimint_core::encoding::{Decodable, Encodable};
20use fedimint_core::invite_code::InviteCode;
21use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
22use fedimint_core::module::{AmountUnit, Amounts, CommonModuleInit, ModuleCommon, ModuleInit};
23use fedimint_core::task::{MaybeSend, MaybeSync};
24use fedimint_core::util::BoxStream;
25use fedimint_core::{
26    Amount, OutPoint, PeerId, TransactionId, apply, async_trait_maybe_send, dyn_newtype_define,
27    maybe_add_send, maybe_add_send_sync,
28};
29use fedimint_eventlog::{Event, EventKind, EventPersistence};
30use fedimint_logging::LOG_CLIENT;
31use futures::Stream;
32use serde::de::DeserializeOwned;
33use serde::{Deserialize, Serialize};
34use tracing::warn;
35
36use self::init::ClientModuleInit;
37use crate::module::recovery::{DynModuleBackup, ModuleBackup};
38use crate::oplog::{IOperationLog, OperationLogEntry, UpdateStreamOrOutcome};
39use crate::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
40use crate::sm::{self, ActiveStateMeta, Context, DynContext, DynState, InactiveStateMeta, State};
41use crate::transaction::{ClientInputBundle, ClientOutputBundle, TransactionBuilder};
42use crate::{AddStateMachinesResult, InstancelessDynClientInputBundle, TransactionUpdates, oplog};
43
44pub mod init;
45pub mod recovery;
46
47pub type ClientModuleRegistry = ModuleRegistry<DynClientModule>;
48
49/// A fedimint-client interface exposed to client modules
50///
51/// To break the dependency of the client modules on the whole fedimint client
52/// and in particular the `fedimint-client` crate, the module gets access to an
53/// interface, that is implemented by the `Client`.
54///
55/// This allows lose coupling, less recompilation and better control and
56/// understanding of what functionality of the Client the modules get access to.
57#[apply(async_trait_maybe_send!)]
58pub trait ClientContextIface: MaybeSend + MaybeSync {
59    fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule);
60    fn api_clone(&self) -> DynGlobalApi;
61    fn decoders(&self) -> &ModuleDecoderRegistry;
62    async fn finalize_and_submit_transaction(
63        &self,
64        operation_id: OperationId,
65        operation_type: &str,
66        operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
67        tx_builder: TransactionBuilder,
68    ) -> anyhow::Result<OutPointRange>;
69
70    // TODO: unify
71    async fn finalize_and_submit_transaction_inner(
72        &self,
73        dbtx: &mut DatabaseTransaction<'_>,
74        operation_id: OperationId,
75        tx_builder: TransactionBuilder,
76    ) -> anyhow::Result<OutPointRange>;
77
78    async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates;
79
80    async fn await_primary_module_outputs(
81        &self,
82        operation_id: OperationId,
83        // TODO: make `impl Iterator<Item = ...>`
84        outputs: Vec<OutPoint>,
85    ) -> anyhow::Result<()>;
86
87    fn operation_log(&self) -> &dyn IOperationLog;
88
89    async fn has_active_states(&self, operation_id: OperationId) -> bool;
90
91    async fn operation_exists(&self, operation_id: OperationId) -> bool;
92
93    async fn config(&self) -> ClientConfig;
94
95    fn db(&self) -> &Database;
96
97    fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static));
98
99    async fn invite_code(&self, peer: PeerId) -> Option<InviteCode>;
100
101    fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)>;
102
103    #[allow(clippy::too_many_arguments)]
104    async fn log_event_json(
105        &self,
106        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
107        module_kind: Option<ModuleKind>,
108        module_id: ModuleInstanceId,
109        kind: EventKind,
110        payload: serde_json::Value,
111        persist: EventPersistence,
112    );
113
114    async fn read_operation_active_states<'dbtx>(
115        &self,
116        operation_id: OperationId,
117        module_id: ModuleInstanceId,
118        dbtx: &'dbtx mut DatabaseTransaction<'_>,
119    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>;
120
121    async fn read_operation_inactive_states<'dbtx>(
122        &self,
123        operation_id: OperationId,
124        module_id: ModuleInstanceId,
125        dbtx: &'dbtx mut DatabaseTransaction<'_>,
126    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>;
127}
128
129/// A final, fully initialized client
130///
131/// Client modules need to be able to access a `Client` they are a part
132/// of. To break the circular dependency, the final `Client` is passed
133/// after `Client` was built via a shared state.
134#[derive(Clone, Default)]
135pub struct FinalClientIface(Arc<std::sync::OnceLock<Weak<dyn ClientContextIface>>>);
136
137impl FinalClientIface {
138    /// Get a temporary strong reference to [`ClientContextIface`]
139    ///
140    /// Care must be taken to not let the user take ownership of this value,
141    /// and not store it elsewhere permanently either, as it could prevent
142    /// the cleanup of the Client.
143    pub(crate) fn get(&self) -> Arc<dyn ClientContextIface> {
144        self.0
145            .get()
146            .expect("client must be already set")
147            .upgrade()
148            .expect("client module context must not be use past client shutdown")
149    }
150
151    pub fn set(&self, client: Weak<dyn ClientContextIface>) {
152        self.0.set(client).expect("FinalLazyClient already set");
153    }
154}
155
156impl fmt::Debug for FinalClientIface {
157    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158        f.write_str("FinalClientIface")
159    }
160}
161/// A Client context for a [`ClientModule`] `M`
162///
163/// Client modules can interact with the whole
164/// client through this struct.
165pub struct ClientContext<M> {
166    client: FinalClientIface,
167    module_instance_id: ModuleInstanceId,
168    global_dbtx_access_token: GlobalDBTxAccessToken,
169    module_db: Database,
170    _marker: marker::PhantomData<M>,
171}
172
173impl<M> Clone for ClientContext<M> {
174    fn clone(&self) -> Self {
175        Self {
176            client: self.client.clone(),
177            module_db: self.module_db.clone(),
178            module_instance_id: self.module_instance_id,
179            _marker: marker::PhantomData,
180            global_dbtx_access_token: self.global_dbtx_access_token,
181        }
182    }
183}
184
185/// A reference back to itself that the module cacn get from the
186/// [`ClientContext`]
187pub struct ClientContextSelfRef<'s, M> {
188    // we are OK storing `ClientStrong` here, because of the `'s` preventing `Self` from being
189    // stored permanently somewhere
190    client: Arc<dyn ClientContextIface>,
191    module_instance_id: ModuleInstanceId,
192    _marker: marker::PhantomData<&'s M>,
193}
194
195impl<M> ops::Deref for ClientContextSelfRef<'_, M>
196where
197    M: ClientModule,
198{
199    type Target = M;
200
201    fn deref(&self) -> &Self::Target {
202        self.client
203            .get_module(self.module_instance_id)
204            .as_any()
205            .downcast_ref::<M>()
206            .unwrap_or_else(|| panic!("Module is not of type {}", std::any::type_name::<M>()))
207    }
208}
209
210impl<M> fmt::Debug for ClientContext<M> {
211    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212        f.write_str("ClientContext")
213    }
214}
215
216impl<M> ClientContext<M>
217where
218    M: ClientModule,
219{
220    pub fn new(
221        client: FinalClientIface,
222        module_instance_id: ModuleInstanceId,
223        global_dbtx_access_token: GlobalDBTxAccessToken,
224        module_db: Database,
225    ) -> Self {
226        Self {
227            client,
228            module_instance_id,
229            global_dbtx_access_token,
230            module_db,
231            _marker: marker::PhantomData,
232        }
233    }
234
235    /// Get a reference back to client module from the [`Self`]
236    ///
237    /// It's often necessary for a client module to "move self"
238    /// by-value, especially due to async lifetimes issues.
239    /// Clients usually work with `&mut self`, which can't really
240    /// work in such context.
241    ///
242    /// Fortunately [`ClientContext`] is `Clone` and `Send, and
243    /// can be used to recover the reference to the module at later
244    /// time.
245    #[allow(clippy::needless_lifetimes)] // just for explicitiness
246    pub fn self_ref(&self) -> ClientContextSelfRef<'_, M> {
247        ClientContextSelfRef {
248            client: self.client.get(),
249            module_instance_id: self.module_instance_id,
250            _marker: marker::PhantomData,
251        }
252    }
253
254    /// Get a reference to a global Api handle
255    pub fn global_api(&self) -> DynGlobalApi {
256        self.client.get().api_clone()
257    }
258
259    /// Get a reference to a module Api handle
260    pub fn module_api(&self) -> DynModuleApi {
261        self.global_api().with_module(self.module_instance_id)
262    }
263
264    /// A set of all decoders of all modules of the client
265    pub fn decoders(&self) -> ModuleDecoderRegistry {
266        Clone::clone(self.client.get().decoders())
267    }
268
269    pub fn input_from_dyn<'i>(
270        &self,
271        input: &'i DynInput,
272    ) -> Option<&'i <M::Common as ModuleCommon>::Input> {
273        (input.module_instance_id() == self.module_instance_id).then(|| {
274            input
275                .as_any()
276                .downcast_ref::<<M::Common as ModuleCommon>::Input>()
277                .unwrap_or_else(|| {
278                    panic!("instance_id {} just checked", input.module_instance_id())
279                })
280        })
281    }
282
283    pub fn output_from_dyn<'o>(
284        &self,
285        output: &'o DynOutput,
286    ) -> Option<&'o <M::Common as ModuleCommon>::Output> {
287        (output.module_instance_id() == self.module_instance_id).then(|| {
288            output
289                .as_any()
290                .downcast_ref::<<M::Common as ModuleCommon>::Output>()
291                .unwrap_or_else(|| {
292                    panic!("instance_id {} just checked", output.module_instance_id())
293                })
294        })
295    }
296
297    pub fn map_dyn<'s, 'i, 'o, I>(
298        &'s self,
299        typed: impl IntoIterator<Item = I> + 'i,
300    ) -> impl Iterator<Item = <I as IntoDynInstance>::DynType> + 'o
301    where
302        I: IntoDynInstance,
303        'i: 'o,
304        's: 'o,
305    {
306        typed.into_iter().map(|i| self.make_dyn(i))
307    }
308
309    /// Turn a typed output into a dyn version
310    pub fn make_dyn_output(&self, output: <M::Common as ModuleCommon>::Output) -> DynOutput {
311        self.make_dyn(output)
312    }
313
314    /// Turn a typed input into a dyn version
315    pub fn make_dyn_input(&self, input: <M::Common as ModuleCommon>::Input) -> DynInput {
316        self.make_dyn(input)
317    }
318
319    /// Turn a `typed` into a dyn version
320    pub fn make_dyn<I>(&self, typed: I) -> <I as IntoDynInstance>::DynType
321    where
322        I: IntoDynInstance,
323    {
324        typed.into_dyn(self.module_instance_id)
325    }
326
327    /// Turn a typed [`ClientOutputBundle`] into a dyn version
328    pub fn make_client_outputs<O, S>(&self, output: ClientOutputBundle<O, S>) -> ClientOutputBundle
329    where
330        O: IntoDynInstance<DynType = DynOutput> + 'static,
331        S: IntoDynInstance<DynType = DynState> + 'static,
332    {
333        self.make_dyn(output)
334    }
335
336    /// Turn a typed [`ClientInputBundle`] into a dyn version
337    pub fn make_client_inputs<I, S>(&self, inputs: ClientInputBundle<I, S>) -> ClientInputBundle
338    where
339        I: IntoDynInstance<DynType = DynInput> + 'static,
340        S: IntoDynInstance<DynType = DynState> + 'static,
341    {
342        self.make_dyn(inputs)
343    }
344
345    pub fn make_dyn_state<S>(&self, sm: S) -> DynState
346    where
347        S: sm::IState + 'static,
348    {
349        DynState::from_typed(self.module_instance_id, sm)
350    }
351
352    pub async fn finalize_and_submit_transaction<F, Meta>(
353        &self,
354        operation_id: OperationId,
355        operation_type: &str,
356        operation_meta_gen: F,
357        tx_builder: TransactionBuilder,
358    ) -> anyhow::Result<OutPointRange>
359    where
360        F: Fn(OutPointRange) -> Meta + Clone + MaybeSend + MaybeSync + 'static,
361        Meta: serde::Serialize + MaybeSend,
362    {
363        self.client
364            .get()
365            .finalize_and_submit_transaction(
366                operation_id,
367                operation_type,
368                Box::new(move |out_point_range| {
369                    serde_json::to_value(operation_meta_gen(out_point_range)).expect("Can't fail")
370                }),
371                tx_builder,
372            )
373            .await
374    }
375
376    pub async fn finalize_and_submit_transaction_dbtx(
377        &self,
378        dbtx: &mut DatabaseTransaction<'_>,
379        operation_id: OperationId,
380        tx_builder: TransactionBuilder,
381    ) -> anyhow::Result<OutPointRange> {
382        self.client
383            .get()
384            .finalize_and_submit_transaction_inner(dbtx, operation_id, tx_builder)
385            .await
386    }
387
388    pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
389        self.client.get().transaction_updates(operation_id).await
390    }
391
392    pub async fn await_primary_module_outputs(
393        &self,
394        operation_id: OperationId,
395        // TODO: make `impl Iterator<Item = ...>`
396        outputs: Vec<OutPoint>,
397    ) -> anyhow::Result<()> {
398        self.client
399            .get()
400            .await_primary_module_outputs(operation_id, outputs)
401            .await
402    }
403
404    // TODO: unify with `Self::get_operation`
405    pub async fn get_operation(
406        &self,
407        operation_id: OperationId,
408    ) -> anyhow::Result<oplog::OperationLogEntry> {
409        let operation = self
410            .client
411            .get()
412            .operation_log()
413            .get_operation(operation_id)
414            .await
415            .ok_or(anyhow::anyhow!("Operation not found"))?;
416
417        if operation.operation_module_kind() != M::kind().as_str() {
418            bail!("Operation is not a lightning operation");
419        }
420
421        Ok(operation)
422    }
423
424    /// Get global db.
425    ///
426    /// Only intended for internal use (private).
427    fn global_db(&self) -> fedimint_core::db::Database {
428        let db = Clone::clone(self.client.get().db());
429
430        db.ensure_global()
431            .expect("global_db must always return a global db");
432
433        db
434    }
435
436    pub fn module_db(&self) -> &Database {
437        self.module_db
438            .ensure_isolated()
439            .expect("module_db must always return isolated db");
440        &self.module_db
441    }
442
443    pub async fn has_active_states(&self, op_id: OperationId) -> bool {
444        self.client.get().has_active_states(op_id).await
445    }
446
447    pub async fn operation_exists(&self, op_id: OperationId) -> bool {
448        self.client.get().operation_exists(op_id).await
449    }
450
451    pub async fn get_own_active_states(&self) -> Vec<(M::States, ActiveStateMeta)> {
452        self.client
453            .get()
454            .executor()
455            .get_active_states()
456            .await
457            .into_iter()
458            .filter(|s| s.0.module_instance_id() == self.module_instance_id)
459            .map(|s| {
460                (
461                    Clone::clone(
462                        s.0.as_any()
463                            .downcast_ref::<M::States>()
464                            .expect("incorrect output type passed to module plugin"),
465                    ),
466                    s.1,
467                )
468            })
469            .collect()
470    }
471
472    pub async fn get_config(&self) -> ClientConfig {
473        self.client.get().config().await
474    }
475
476    /// Returns an invite code for the federation that points to an arbitrary
477    /// guardian server for fetching the config
478    pub async fn get_invite_code(&self) -> InviteCode {
479        let cfg = self.get_config().await.global;
480        self.client
481            .get()
482            .invite_code(
483                *cfg.api_endpoints
484                    .keys()
485                    .next()
486                    .expect("A federation always has at least one guardian"),
487            )
488            .await
489            .expect("The guardian we requested an invite code for exists")
490    }
491
492    pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
493        self.client.get().get_internal_payment_markers()
494    }
495
496    /// This method starts n state machines with given operation id without a
497    /// corresponding transaction
498    pub async fn manual_operation_start(
499        &self,
500        operation_id: OperationId,
501        op_type: &str,
502        operation_meta: impl serde::Serialize + Debug,
503        sms: Vec<DynState>,
504    ) -> anyhow::Result<()> {
505        let db = self.module_db();
506        let mut dbtx = db.begin_transaction().await;
507        {
508            let dbtx = &mut dbtx.global_dbtx(self.global_dbtx_access_token);
509
510            self.manual_operation_start_inner(
511                &mut dbtx.to_ref_nc(),
512                operation_id,
513                op_type,
514                operation_meta,
515                sms,
516            )
517            .await?;
518        }
519
520        dbtx.commit_tx_result().await.map_err(|_| {
521            anyhow!(
522                "Operation with id {} already exists",
523                operation_id.fmt_short()
524            )
525        })?;
526
527        Ok(())
528    }
529
530    pub async fn manual_operation_start_dbtx(
531        &self,
532        dbtx: &mut DatabaseTransaction<'_>,
533        operation_id: OperationId,
534        op_type: &str,
535        operation_meta: impl serde::Serialize + Debug,
536        sms: Vec<DynState>,
537    ) -> anyhow::Result<()> {
538        self.manual_operation_start_inner(
539            &mut dbtx.global_dbtx(self.global_dbtx_access_token),
540            operation_id,
541            op_type,
542            operation_meta,
543            sms,
544        )
545        .await
546    }
547
548    /// See [`Self::manual_operation_start`], just inside a database
549    /// transaction.
550    async fn manual_operation_start_inner(
551        &self,
552        dbtx: &mut DatabaseTransaction<'_>,
553        operation_id: OperationId,
554        op_type: &str,
555        operation_meta: impl serde::Serialize + Debug,
556        sms: Vec<DynState>,
557    ) -> anyhow::Result<()> {
558        dbtx.ensure_global()
559            .expect("Must deal with global dbtx here");
560
561        if self
562            .client
563            .get()
564            .operation_log()
565            .get_operation_dbtx(&mut dbtx.to_ref_nc(), operation_id)
566            .await
567            .is_some()
568        {
569            bail!(
570                "Operation with id {} already exists",
571                operation_id.fmt_short()
572            );
573        }
574
575        self.client
576            .get()
577            .operation_log()
578            .add_operation_log_entry_dbtx(
579                &mut dbtx.to_ref_nc(),
580                operation_id,
581                op_type,
582                serde_json::to_value(operation_meta).expect("Can't fail"),
583            )
584            .await;
585
586        self.client
587            .get()
588            .executor()
589            .add_state_machines_dbtx(&mut dbtx.to_ref_nc(), sms)
590            .await
591            .expect("State machine is valid");
592
593        Ok(())
594    }
595
596    pub fn outcome_or_updates<U, S>(
597        &self,
598        operation: OperationLogEntry,
599        operation_id: OperationId,
600        stream_gen: impl FnOnce() -> S + 'static,
601    ) -> UpdateStreamOrOutcome<U>
602    where
603        U: Clone + Serialize + DeserializeOwned + Debug + MaybeSend + MaybeSync + 'static,
604        S: Stream<Item = U> + MaybeSend + 'static,
605    {
606        use futures::StreamExt;
607        match self.client.get().operation_log().outcome_or_updates(
608            &self.global_db(),
609            operation_id,
610            operation,
611            Box::new(move || {
612                let stream_gen = stream_gen();
613                Box::pin(
614                    stream_gen.map(move |item| serde_json::to_value(item).expect("Can't fail")),
615                )
616            }),
617        ) {
618            UpdateStreamOrOutcome::UpdateStream(stream) => UpdateStreamOrOutcome::UpdateStream(
619                Box::pin(stream.map(|u| serde_json::from_value(u).expect("Can't fail"))),
620            ),
621            UpdateStreamOrOutcome::Outcome(o) => {
622                UpdateStreamOrOutcome::Outcome(serde_json::from_value(o).expect("Can't fail"))
623            }
624        }
625    }
626
627    pub async fn claim_inputs<I, S>(
628        &self,
629        dbtx: &mut DatabaseTransaction<'_>,
630        inputs: ClientInputBundle<I, S>,
631        operation_id: OperationId,
632    ) -> anyhow::Result<OutPointRange>
633    where
634        I: IInput + MaybeSend + MaybeSync + 'static,
635        S: sm::IState + MaybeSend + MaybeSync + 'static,
636    {
637        self.claim_inputs_dyn(dbtx, inputs.into_instanceless(), operation_id)
638            .await
639    }
640
641    async fn claim_inputs_dyn(
642        &self,
643        dbtx: &mut DatabaseTransaction<'_>,
644        inputs: InstancelessDynClientInputBundle,
645        operation_id: OperationId,
646    ) -> anyhow::Result<OutPointRange> {
647        let tx_builder =
648            TransactionBuilder::new().with_inputs(inputs.into_dyn(self.module_instance_id));
649
650        self.client
651            .get()
652            .finalize_and_submit_transaction_inner(
653                &mut dbtx.global_dbtx(self.global_dbtx_access_token),
654                operation_id,
655                tx_builder,
656            )
657            .await
658    }
659
660    pub async fn add_state_machines_dbtx(
661        &self,
662        dbtx: &mut DatabaseTransaction<'_>,
663        states: Vec<DynState>,
664    ) -> AddStateMachinesResult {
665        self.client
666            .get()
667            .executor()
668            .add_state_machines_dbtx(&mut dbtx.global_dbtx(self.global_dbtx_access_token), states)
669            .await
670    }
671
672    pub async fn add_operation_log_entry_dbtx(
673        &self,
674        dbtx: &mut DatabaseTransaction<'_>,
675        operation_id: OperationId,
676        operation_type: &str,
677        operation_meta: impl serde::Serialize,
678    ) {
679        self.client
680            .get()
681            .operation_log()
682            .add_operation_log_entry_dbtx(
683                &mut dbtx.global_dbtx(self.global_dbtx_access_token),
684                operation_id,
685                operation_type,
686                serde_json::to_value(operation_meta).expect("Can't fail"),
687            )
688            .await;
689    }
690
691    pub async fn log_event<E, Cap>(&self, dbtx: &mut DatabaseTransaction<'_, Cap>, event: E)
692    where
693        E: Event + Send,
694        Cap: Send,
695    {
696        if <E as Event>::MODULE != Some(<M as ClientModule>::kind()) {
697            warn!(
698                target: LOG_CLIENT,
699                module_kind = %<M as ClientModule>::kind(),
700                event_module = ?<E as Event>::MODULE,
701                "Client module logging events of different module than its own. This might become an error in the future."
702            );
703        }
704        self.client
705            .get()
706            .log_event_json(
707                &mut dbtx.global_dbtx(self.global_dbtx_access_token).to_ref_nc(),
708                <E as Event>::MODULE,
709                self.module_instance_id,
710                <E as Event>::KIND,
711                serde_json::to_value(event).expect("Can't fail"),
712                <E as Event>::PERSISTENCE,
713            )
714            .await;
715    }
716}
717
718/// Priority module priority (lower number is higher priority)
719#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
720pub struct PrimaryModulePriority(u64);
721
722impl PrimaryModulePriority {
723    pub const HIGH: Self = Self(100);
724    pub const LOW: Self = Self(10000);
725
726    pub fn custom(prio: u64) -> Self {
727        Self(prio)
728    }
729}
730/// Which amount units this module supports being primary for
731pub enum PrimaryModuleSupport {
732    /// Potentially any unit
733    Any { priority: PrimaryModulePriority },
734    /// Some units supported by the module
735    Selected {
736        priority: PrimaryModulePriority,
737        units: BTreeSet<AmountUnit>,
738    },
739    /// None
740    None,
741}
742
743impl PrimaryModuleSupport {
744    pub fn selected<const N: usize>(
745        priority: PrimaryModulePriority,
746        units: [AmountUnit; N],
747    ) -> Self {
748        Self::Selected {
749            priority,
750            units: BTreeSet::from(units),
751        }
752    }
753}
754
755/// Fedimint module client
756#[apply(async_trait_maybe_send!)]
757pub trait ClientModule: Debug + MaybeSend + MaybeSync + 'static {
758    type Init: ClientModuleInit;
759
760    /// Common module types shared between client and server
761    type Common: ModuleCommon;
762
763    /// Data stored in regular backups so that restoring doesn't have to start
764    /// from epoch 0
765    type Backup: ModuleBackup;
766
767    /// Data and API clients available to state machine transitions of this
768    /// module
769    type ModuleStateMachineContext: Context;
770
771    /// All possible states this client can submit to the executor
772    type States: State<ModuleContext = Self::ModuleStateMachineContext>
773        + IntoDynInstance<DynType = DynState>;
774
775    fn decoder() -> Decoder {
776        let mut decoder_builder = Self::Common::decoder_builder();
777        decoder_builder.with_decodable_type::<Self::States>();
778        decoder_builder.with_decodable_type::<Self::Backup>();
779        decoder_builder.build()
780    }
781
782    fn kind() -> ModuleKind {
783        <<<Self as ClientModule>::Init as ModuleInit>::Common as CommonModuleInit>::KIND
784    }
785
786    fn context(&self) -> Self::ModuleStateMachineContext;
787
788    /// Initialize client.
789    ///
790    /// Called by the core client code on start, after [`ClientContext`] is
791    /// fully initialized, so unlike during [`ClientModuleInit::init`],
792    /// access to global client is allowed.
793    async fn start(&self) {}
794
795    async fn handle_cli_command(
796        &self,
797        _args: &[ffi::OsString],
798    ) -> anyhow::Result<serde_json::Value> {
799        Err(anyhow::format_err!(
800            "This module does not implement cli commands"
801        ))
802    }
803
804    async fn handle_rpc(
805        &self,
806        _method: String,
807        _request: serde_json::Value,
808    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
809        Box::pin(futures::stream::once(std::future::ready(Err(
810            anyhow::format_err!("This module does not implement rpc"),
811        ))))
812    }
813
814    /// Returns the fee the processing of this input requires.
815    ///
816    /// If the semantics of a given input aren't known this function returns
817    /// `None`, this only happens if a future version of Fedimint introduces a
818    /// new input variant. For clients this should only be the case when
819    /// processing transactions created by other users, so the result of
820    /// this function can be `unwrap`ped whenever dealing with inputs
821    /// generated by ourselves.
822    fn input_fee(
823        &self,
824        amount: &Amounts,
825        input: &<Self::Common as ModuleCommon>::Input,
826    ) -> Option<Amounts>;
827
828    /// Returns the fee the processing of this output requires.
829    ///
830    /// If the semantics of a given output aren't known this function returns
831    /// `None`, this only happens if a future version of Fedimint introduces a
832    /// new output variant. For clients this should only be the case when
833    /// processing transactions created by other users, so the result of
834    /// this function can be `unwrap`ped whenever dealing with inputs
835    /// generated by ourselves.
836    fn output_fee(
837        &self,
838        amount: &Amounts,
839        output: &<Self::Common as ModuleCommon>::Output,
840    ) -> Option<Amounts>;
841
842    fn supports_backup(&self) -> bool {
843        false
844    }
845
846    async fn backup(&self) -> anyhow::Result<Self::Backup> {
847        anyhow::bail!("Backup not supported");
848    }
849
850    /// Does this module support being a primary module
851    ///
852    /// If it does it must implement:
853    ///
854    /// * [`Self::create_final_inputs_and_outputs`]
855    /// * [`Self::await_primary_module_output`]
856    /// * [`Self::get_balance`]
857    /// * [`Self::subscribe_balance_changes`]
858    fn supports_being_primary(&self) -> PrimaryModuleSupport {
859        PrimaryModuleSupport::None
860    }
861
862    /// Creates all inputs and outputs necessary to balance the transaction.
863    /// The function returns an error if and only if the client's funds are not
864    /// sufficient to create the inputs necessary to fully fund the transaction.
865    ///
866    /// A returned input also contains:
867    /// * A set of private keys belonging to the input for signing the
868    ///   transaction
869    /// * A closure that generates states belonging to the input. This closure
870    ///   takes the transaction id of the transaction in which the input was
871    ///   used and the input index as input since these cannot be known at time
872    ///   of calling `create_funding_input` and have to be injected later.
873    ///
874    /// A returned output also contains:
875    /// * A closure that generates states belonging to the output. This closure
876    ///   takes the transaction id of the transaction in which the output was
877    ///   used and the output index as input since these cannot be known at time
878    ///   of calling `create_change_output` and have to be injected later.
879    async fn create_final_inputs_and_outputs(
880        &self,
881        _dbtx: &mut DatabaseTransaction<'_>,
882        _operation_id: OperationId,
883        _unit: AmountUnit,
884        _input_amount: Amount,
885        _output_amount: Amount,
886    ) -> anyhow::Result<(
887        ClientInputBundle<<Self::Common as ModuleCommon>::Input, Self::States>,
888        ClientOutputBundle<<Self::Common as ModuleCommon>::Output, Self::States>,
889    )> {
890        unimplemented!()
891    }
892
893    /// Waits for the funds from an output created by
894    /// [`Self::create_final_inputs_and_outputs`] to become available. This
895    /// function returning typically implies a change in the output of
896    /// [`Self::get_balance`].
897    async fn await_primary_module_output(
898        &self,
899        _operation_id: OperationId,
900        _out_point: OutPoint,
901    ) -> anyhow::Result<()> {
902        unimplemented!()
903    }
904
905    /// Returns the balance held by this module and available for funding
906    /// transactions.
907    async fn get_balance(&self, _dbtx: &mut DatabaseTransaction<'_>, _unit: AmountUnit) -> Amount {
908        unimplemented!()
909    }
910
911    /// Returns the balance held by this module and available for funding
912    /// transactions.
913    async fn get_balances(&self, _dbtx: &mut DatabaseTransaction<'_>) -> Amounts {
914        unimplemented!()
915    }
916
917    /// Returns a stream that will output the updated module balance each time
918    /// it changes.
919    async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
920        unimplemented!()
921    }
922
923    /// Leave the federation
924    ///
925    /// While technically there's nothing stopping the client from just
926    /// abandoning Federation at any point by deleting all the related
927    /// local data, it is useful to make sure it's safe beforehand.
928    ///
929    /// This call indicates the desire of the caller client code
930    /// to orderly and safely leave the Federation by this module instance.
931    /// The goal of the implementations is to fulfil that wish,
932    /// giving prompt and informative feedback if it's not yet possible.
933    ///
934    /// The client module implementation should handle the request
935    /// and return as fast as possible avoiding blocking for longer than
936    /// necessary. This would usually involve some combination of:
937    ///
938    /// * recording the state of being in process of leaving the Federation to
939    ///   prevent initiating new conditions that could delay its completion;
940    /// * performing any fast to complete cleanup/exit logic;
941    /// * initiating any time-consuming logic (e.g. canceling outstanding
942    ///   contracts), as background jobs, tasks machines, etc.
943    /// * checking for any conditions indicating it might not be safe to leave
944    ///   at the moment.
945    ///
946    /// This function should return `Ok` only if from the perspective
947    /// of this module instance, it is safe to delete client data and
948    /// stop using it, with no further actions (like background jobs) required
949    /// to complete.
950    ///
951    /// This function should return an error if it's not currently possible
952    /// to safely (e.g. without losing funds) leave the Federation.
953    /// It should avoid running indefinitely trying to complete any cleanup
954    /// actions necessary to reach a clean state, preferring spawning new
955    /// state machines and returning an informative error about cleanup
956    /// still in progress.
957    ///
958    /// If any internal task needs to complete, any user action is required,
959    /// or even external condition needs to be met this function
960    /// should return a `Err`.
961    ///
962    /// Notably modules should not disable interaction that might be necessary
963    /// for the user (possibly through other modules) to leave the Federation.
964    /// In particular a Mint module should retain ability to create new notes,
965    /// and LN module should retain ability to send funds out.
966    ///
967    /// Calling code must NOT assume that a module that once returned `Ok`,
968    /// will not return `Err` at later point. E.g. a Mint module might have
969    /// no outstanding balance at first, but other modules winding down
970    /// might "cash-out" to Ecash.
971    ///
972    /// Before leaving the Federation and deleting any state the calling code
973    /// must collect a full round of `Ok` from all the modules.
974    ///
975    /// Calling code should allow the user to override and ignore any
976    /// outstanding errors, after sufficient amount of warnings. Ideally,
977    /// this should be done on per-module basis, to avoid mistakes.
978    async fn leave(&self, _dbtx: &mut DatabaseTransaction<'_>) -> anyhow::Result<()> {
979        bail!("Unable to determine if safe to leave the federation: Not implemented")
980    }
981}
982
983/// Type-erased version of [`ClientModule`]
984#[apply(async_trait_maybe_send!)]
985pub trait IClientModule: Debug {
986    fn as_any(&self) -> &(maybe_add_send_sync!(dyn std::any::Any));
987
988    fn decoder(&self) -> Decoder;
989
990    fn context(&self, instance: ModuleInstanceId) -> DynContext;
991
992    async fn start(&self);
993
994    async fn handle_cli_command(&self, args: &[ffi::OsString])
995    -> anyhow::Result<serde_json::Value>;
996
997    async fn handle_rpc(
998        &self,
999        method: String,
1000        request: serde_json::Value,
1001    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>>;
1002
1003    fn input_fee(&self, amount: &Amounts, input: &DynInput) -> Option<Amounts>;
1004
1005    fn output_fee(&self, amount: &Amounts, output: &DynOutput) -> Option<Amounts>;
1006
1007    fn supports_backup(&self) -> bool;
1008
1009    async fn backup(&self, module_instance_id: ModuleInstanceId)
1010    -> anyhow::Result<DynModuleBackup>;
1011
1012    fn supports_being_primary(&self) -> PrimaryModuleSupport;
1013
1014    async fn create_final_inputs_and_outputs(
1015        &self,
1016        module_instance: ModuleInstanceId,
1017        dbtx: &mut DatabaseTransaction<'_>,
1018        operation_id: OperationId,
1019        unit: AmountUnit,
1020        input_amount: Amount,
1021        output_amount: Amount,
1022    ) -> anyhow::Result<(ClientInputBundle, ClientOutputBundle)>;
1023
1024    async fn await_primary_module_output(
1025        &self,
1026        operation_id: OperationId,
1027        out_point: OutPoint,
1028    ) -> anyhow::Result<()>;
1029
1030    async fn get_balance(
1031        &self,
1032        module_instance: ModuleInstanceId,
1033        dbtx: &mut DatabaseTransaction<'_>,
1034        unit: AmountUnit,
1035    ) -> Amount;
1036
1037    async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()>;
1038}
1039
1040#[apply(async_trait_maybe_send!)]
1041impl<T> IClientModule for T
1042where
1043    T: ClientModule,
1044{
1045    fn as_any(&self) -> &(maybe_add_send_sync!(dyn Any)) {
1046        self
1047    }
1048
1049    fn decoder(&self) -> Decoder {
1050        T::decoder()
1051    }
1052
1053    fn context(&self, instance: ModuleInstanceId) -> DynContext {
1054        DynContext::from_typed(instance, <T as ClientModule>::context(self))
1055    }
1056
1057    async fn start(&self) {
1058        <T as ClientModule>::start(self).await;
1059    }
1060
1061    async fn handle_cli_command(
1062        &self,
1063        args: &[ffi::OsString],
1064    ) -> anyhow::Result<serde_json::Value> {
1065        <T as ClientModule>::handle_cli_command(self, args).await
1066    }
1067
1068    async fn handle_rpc(
1069        &self,
1070        method: String,
1071        request: serde_json::Value,
1072    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1073        <T as ClientModule>::handle_rpc(self, method, request).await
1074    }
1075
1076    fn input_fee(&self, amount: &Amounts, input: &DynInput) -> Option<Amounts> {
1077        <T as ClientModule>::input_fee(
1078            self,
1079            amount,
1080            input
1081                .as_any()
1082                .downcast_ref()
1083                .expect("Dispatched to correct module"),
1084        )
1085    }
1086
1087    fn output_fee(&self, amount: &Amounts, output: &DynOutput) -> Option<Amounts> {
1088        <T as ClientModule>::output_fee(
1089            self,
1090            amount,
1091            output
1092                .as_any()
1093                .downcast_ref()
1094                .expect("Dispatched to correct module"),
1095        )
1096    }
1097
1098    fn supports_backup(&self) -> bool {
1099        <T as ClientModule>::supports_backup(self)
1100    }
1101
1102    async fn backup(
1103        &self,
1104        module_instance_id: ModuleInstanceId,
1105    ) -> anyhow::Result<DynModuleBackup> {
1106        Ok(DynModuleBackup::from_typed(
1107            module_instance_id,
1108            <T as ClientModule>::backup(self).await?,
1109        ))
1110    }
1111
1112    fn supports_being_primary(&self) -> PrimaryModuleSupport {
1113        <T as ClientModule>::supports_being_primary(self)
1114    }
1115
1116    async fn create_final_inputs_and_outputs(
1117        &self,
1118        module_instance: ModuleInstanceId,
1119        dbtx: &mut DatabaseTransaction<'_>,
1120        operation_id: OperationId,
1121        unit: AmountUnit,
1122        input_amount: Amount,
1123        output_amount: Amount,
1124    ) -> anyhow::Result<(ClientInputBundle, ClientOutputBundle)> {
1125        let (inputs, outputs) = <T as ClientModule>::create_final_inputs_and_outputs(
1126            self,
1127            &mut dbtx.to_ref_with_prefix_module_id(module_instance).0,
1128            operation_id,
1129            unit,
1130            input_amount,
1131            output_amount,
1132        )
1133        .await?;
1134
1135        let inputs = inputs.into_dyn(module_instance);
1136
1137        let outputs = outputs.into_dyn(module_instance);
1138
1139        Ok((inputs, outputs))
1140    }
1141
1142    async fn await_primary_module_output(
1143        &self,
1144        operation_id: OperationId,
1145        out_point: OutPoint,
1146    ) -> anyhow::Result<()> {
1147        <T as ClientModule>::await_primary_module_output(self, operation_id, out_point).await
1148    }
1149
1150    async fn get_balance(
1151        &self,
1152        module_instance: ModuleInstanceId,
1153        dbtx: &mut DatabaseTransaction<'_>,
1154        unit: AmountUnit,
1155    ) -> Amount {
1156        <T as ClientModule>::get_balance(
1157            self,
1158            &mut dbtx.to_ref_with_prefix_module_id(module_instance).0,
1159            unit,
1160        )
1161        .await
1162    }
1163
1164    async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
1165        <T as ClientModule>::subscribe_balance_changes(self).await
1166    }
1167}
1168
1169dyn_newtype_define!(
1170    #[derive(Clone)]
1171    pub DynClientModule(Arc<IClientModule>)
1172);
1173
1174impl AsRef<maybe_add_send_sync!(dyn IClientModule + 'static)> for DynClientModule {
1175    fn as_ref(&self) -> &maybe_add_send_sync!(dyn IClientModule + 'static) {
1176        self.inner.as_ref()
1177    }
1178}
1179
1180/// A contiguous range of input/output indexes
1181#[derive(Copy, Clone, Encodable, Decodable, Serialize, Deserialize, PartialEq, Eq, Hash, Debug)]
1182pub struct IdxRange {
1183    start: u64,
1184    end: u64,
1185}
1186
1187impl IdxRange {
1188    pub fn new_single(start: u64) -> Option<Self> {
1189        start.checked_add(1).map(|end| Self { start, end })
1190    }
1191
1192    pub fn start(self) -> u64 {
1193        self.start
1194    }
1195
1196    pub fn count(self) -> usize {
1197        self.into_iter().count()
1198    }
1199
1200    pub fn from_inclusive(range: ops::RangeInclusive<u64>) -> Option<Self> {
1201        range.end().checked_add(1).map(|end| Self {
1202            start: *range.start(),
1203            end,
1204        })
1205    }
1206}
1207
1208impl From<Range<u64>> for IdxRange {
1209    fn from(Range { start, end }: Range<u64>) -> Self {
1210        Self { start, end }
1211    }
1212}
1213
1214impl IntoIterator for IdxRange {
1215    type Item = u64;
1216
1217    type IntoIter = ops::Range<u64>;
1218
1219    fn into_iter(self) -> Self::IntoIter {
1220        ops::Range {
1221            start: self.start,
1222            end: self.end,
1223        }
1224    }
1225}
1226
1227#[derive(Copy, Clone, Encodable, Decodable, Serialize, Deserialize, PartialEq, Eq, Hash, Debug)]
1228pub struct OutPointRange {
1229    pub txid: TransactionId,
1230    idx_range: IdxRange,
1231}
1232
1233impl OutPointRange {
1234    pub fn new(txid: TransactionId, idx_range: IdxRange) -> Self {
1235        Self { txid, idx_range }
1236    }
1237
1238    pub fn new_single(txid: TransactionId, idx: u64) -> Option<Self> {
1239        IdxRange::new_single(idx).map(|idx_range| Self { txid, idx_range })
1240    }
1241
1242    pub fn start_idx(self) -> u64 {
1243        self.idx_range.start()
1244    }
1245
1246    pub fn out_idx_iter(self) -> impl Iterator<Item = u64> {
1247        self.idx_range.into_iter()
1248    }
1249
1250    pub fn count(self) -> usize {
1251        self.idx_range.count()
1252    }
1253}
1254
1255impl IntoIterator for OutPointRange {
1256    type Item = OutPoint;
1257
1258    type IntoIter = OutPointRangeIter;
1259
1260    fn into_iter(self) -> Self::IntoIter {
1261        OutPointRangeIter {
1262            txid: self.txid,
1263            inner: self.idx_range.into_iter(),
1264        }
1265    }
1266}
1267
1268pub struct OutPointRangeIter {
1269    txid: TransactionId,
1270
1271    inner: ops::Range<u64>,
1272}
1273
1274impl OutPointRange {
1275    pub fn txid(&self) -> TransactionId {
1276        self.txid
1277    }
1278}
1279
1280impl Iterator for OutPointRangeIter {
1281    type Item = OutPoint;
1282
1283    fn next(&mut self) -> Option<Self::Item> {
1284        self.inner.next().map(|idx| OutPoint {
1285            txid: self.txid,
1286            out_idx: idx,
1287        })
1288    }
1289}
1290
1291pub type StateGenerator<S> = Arc<maybe_add_send_sync!(dyn Fn(OutPointRange) -> Vec<S> + 'static)>;