fedimint_client_module/module/
mod.rs

1use core::fmt;
2use std::any::Any;
3use std::collections::BTreeSet;
4use std::fmt::Debug;
5use std::ops::Range;
6use std::pin::Pin;
7use std::sync::{Arc, Weak};
8use std::{ffi, marker, ops};
9
10use anyhow::{anyhow, bail};
11use bitcoin::secp256k1::PublicKey;
12use fedimint_api_client::api::{DynGlobalApi, DynModuleApi};
13use fedimint_core::config::ClientConfig;
14use fedimint_core::core::{
15    Decoder, DynInput, DynOutput, IInput, IntoDynInstance, ModuleInstanceId, ModuleKind,
16    OperationId,
17};
18use fedimint_core::db::{Database, DatabaseTransaction, GlobalDBTxAccessToken, NonCommittable};
19use fedimint_core::encoding::{Decodable, Encodable};
20use fedimint_core::invite_code::InviteCode;
21use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
22use fedimint_core::module::{AmountUnit, Amounts, CommonModuleInit, ModuleCommon, ModuleInit};
23use fedimint_core::task::{MaybeSend, MaybeSync};
24use fedimint_core::util::BoxStream;
25use fedimint_core::{
26    Amount, OutPoint, PeerId, TransactionId, apply, async_trait_maybe_send, dyn_newtype_define,
27    maybe_add_send, maybe_add_send_sync,
28};
29use fedimint_eventlog::{Event, EventKind, EventPersistence};
30use fedimint_logging::LOG_CLIENT;
31use futures::Stream;
32use serde::de::DeserializeOwned;
33use serde::{Deserialize, Serialize};
34use tracing::warn;
35
36use self::init::ClientModuleInit;
37use crate::module::recovery::{DynModuleBackup, ModuleBackup};
38use crate::oplog::{IOperationLog, OperationLogEntry, UpdateStreamOrOutcome};
39use crate::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
40use crate::sm::{self, ActiveStateMeta, Context, DynContext, DynState, InactiveStateMeta, State};
41use crate::transaction::{ClientInputBundle, ClientOutputBundle, TransactionBuilder};
42use crate::{AddStateMachinesResult, InstancelessDynClientInputBundle, TransactionUpdates, oplog};
43
44pub mod init;
45pub mod recovery;
46
47pub type ClientModuleRegistry = ModuleRegistry<DynClientModule>;
48
49/// A fedimint-client interface exposed to client modules
50///
51/// To break the dependency of the client modules on the whole fedimint client
52/// and in particular the `fedimint-client` crate, the module gets access to an
53/// interface, that is implemented by the `Client`.
54///
55/// This allows lose coupling, less recompilation and better control and
56/// understanding of what functionality of the Client the modules get access to.
57#[apply(async_trait_maybe_send!)]
58pub trait ClientContextIface: MaybeSend + MaybeSync {
59    fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule);
60    fn api_clone(&self) -> DynGlobalApi;
61    fn decoders(&self) -> &ModuleDecoderRegistry;
62    async fn finalize_and_submit_transaction(
63        &self,
64        operation_id: OperationId,
65        operation_type: &str,
66        operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
67        tx_builder: TransactionBuilder,
68    ) -> anyhow::Result<OutPointRange>;
69
70    // TODO: unify
71    async fn finalize_and_submit_transaction_inner(
72        &self,
73        dbtx: &mut DatabaseTransaction<'_>,
74        operation_id: OperationId,
75        tx_builder: TransactionBuilder,
76    ) -> anyhow::Result<OutPointRange>;
77
78    async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates;
79
80    async fn await_primary_module_outputs(
81        &self,
82        operation_id: OperationId,
83        // TODO: make `impl Iterator<Item = ...>`
84        outputs: Vec<OutPoint>,
85    ) -> anyhow::Result<()>;
86
87    fn operation_log(&self) -> &dyn IOperationLog;
88
89    async fn has_active_states(&self, operation_id: OperationId) -> bool;
90
91    async fn operation_exists(&self, operation_id: OperationId) -> bool;
92
93    async fn config(&self) -> ClientConfig;
94
95    fn db(&self) -> &Database;
96
97    fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static));
98
99    async fn invite_code(&self, peer: PeerId) -> Option<InviteCode>;
100
101    fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)>;
102
103    #[allow(clippy::too_many_arguments)]
104    async fn log_event_json(
105        &self,
106        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
107        module_kind: Option<ModuleKind>,
108        module_id: ModuleInstanceId,
109        kind: EventKind,
110        payload: serde_json::Value,
111        persist: EventPersistence,
112    );
113
114    async fn read_operation_active_states<'dbtx>(
115        &self,
116        operation_id: OperationId,
117        module_id: ModuleInstanceId,
118        dbtx: &'dbtx mut DatabaseTransaction<'_>,
119    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>;
120
121    async fn read_operation_inactive_states<'dbtx>(
122        &self,
123        operation_id: OperationId,
124        module_id: ModuleInstanceId,
125        dbtx: &'dbtx mut DatabaseTransaction<'_>,
126    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>;
127}
128
129/// A final, fully initialized client
130///
131/// Client modules need to be able to access a `Client` they are a part
132/// of. To break the circular dependency, the final `Client` is passed
133/// after `Client` was built via a shared state.
134#[derive(Clone, Default)]
135pub struct FinalClientIface(Arc<std::sync::OnceLock<Weak<dyn ClientContextIface>>>);
136
137impl FinalClientIface {
138    /// Get a temporary strong reference to [`ClientContextIface`]
139    ///
140    /// Care must be taken to not let the user take ownership of this value,
141    /// and not store it elsewhere permanently either, as it could prevent
142    /// the cleanup of the Client.
143    pub(crate) fn get(&self) -> Arc<dyn ClientContextIface> {
144        self.0
145            .get()
146            .expect("client must be already set")
147            .upgrade()
148            .expect("client module context must not be use past client shutdown")
149    }
150
151    pub fn set(&self, client: Weak<dyn ClientContextIface>) {
152        self.0.set(client).expect("FinalLazyClient already set");
153    }
154}
155
156impl fmt::Debug for FinalClientIface {
157    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158        f.write_str("FinalClientIface")
159    }
160}
161/// A Client context for a [`ClientModule`] `M`
162///
163/// Client modules can interact with the whole
164/// client through this struct.
165pub struct ClientContext<M> {
166    client: FinalClientIface,
167    module_instance_id: ModuleInstanceId,
168    global_dbtx_access_token: GlobalDBTxAccessToken,
169    module_db: Database,
170    _marker: marker::PhantomData<M>,
171}
172
173impl<M> Clone for ClientContext<M> {
174    fn clone(&self) -> Self {
175        Self {
176            client: self.client.clone(),
177            module_db: self.module_db.clone(),
178            module_instance_id: self.module_instance_id,
179            _marker: marker::PhantomData,
180            global_dbtx_access_token: self.global_dbtx_access_token,
181        }
182    }
183}
184
185/// A reference back to itself that the module cacn get from the
186/// [`ClientContext`]
187pub struct ClientContextSelfRef<'s, M> {
188    // we are OK storing `ClientStrong` here, because of the `'s` preventing `Self` from being
189    // stored permanently somewhere
190    client: Arc<dyn ClientContextIface>,
191    module_instance_id: ModuleInstanceId,
192    _marker: marker::PhantomData<&'s M>,
193}
194
195impl<M> ops::Deref for ClientContextSelfRef<'_, M>
196where
197    M: ClientModule,
198{
199    type Target = M;
200
201    fn deref(&self) -> &Self::Target {
202        self.client
203            .get_module(self.module_instance_id)
204            .as_any()
205            .downcast_ref::<M>()
206            .unwrap_or_else(|| panic!("Module is not of type {}", std::any::type_name::<M>()))
207    }
208}
209
210impl<M> fmt::Debug for ClientContext<M> {
211    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212        f.write_str("ClientContext")
213    }
214}
215
216impl<M> ClientContext<M>
217where
218    M: ClientModule,
219{
220    pub fn new(
221        client: FinalClientIface,
222        module_instance_id: ModuleInstanceId,
223        global_dbtx_access_token: GlobalDBTxAccessToken,
224        module_db: Database,
225    ) -> Self {
226        Self {
227            client,
228            module_instance_id,
229            global_dbtx_access_token,
230            module_db,
231            _marker: marker::PhantomData,
232        }
233    }
234
235    /// Get a reference back to client module from the [`Self`]
236    ///
237    /// It's often necessary for a client module to "move self"
238    /// by-value, especially due to async lifetimes issues.
239    /// Clients usually work with `&mut self`, which can't really
240    /// work in such context.
241    ///
242    /// Fortunately [`ClientContext`] is `Clone` and `Send, and
243    /// can be used to recover the reference to the module at later
244    /// time.
245    #[allow(clippy::needless_lifetimes)] // just for explicitiness
246    pub fn self_ref(&self) -> ClientContextSelfRef<'_, M> {
247        ClientContextSelfRef {
248            client: self.client.get(),
249            module_instance_id: self.module_instance_id,
250            _marker: marker::PhantomData,
251        }
252    }
253
254    /// Get a reference to a global Api handle
255    pub fn global_api(&self) -> DynGlobalApi {
256        self.client.get().api_clone()
257    }
258
259    /// Get a reference to a module Api handle
260    pub fn module_api(&self) -> DynModuleApi {
261        self.global_api().with_module(self.module_instance_id)
262    }
263
264    /// A set of all decoders of all modules of the client
265    pub fn decoders(&self) -> ModuleDecoderRegistry {
266        Clone::clone(self.client.get().decoders())
267    }
268
269    pub fn input_from_dyn<'i>(
270        &self,
271        input: &'i DynInput,
272    ) -> Option<&'i <M::Common as ModuleCommon>::Input> {
273        (input.module_instance_id() == self.module_instance_id).then(|| {
274            input
275                .as_any()
276                .downcast_ref::<<M::Common as ModuleCommon>::Input>()
277                .unwrap_or_else(|| {
278                    panic!("instance_id {} just checked", input.module_instance_id())
279                })
280        })
281    }
282
283    pub fn output_from_dyn<'o>(
284        &self,
285        output: &'o DynOutput,
286    ) -> Option<&'o <M::Common as ModuleCommon>::Output> {
287        (output.module_instance_id() == self.module_instance_id).then(|| {
288            output
289                .as_any()
290                .downcast_ref::<<M::Common as ModuleCommon>::Output>()
291                .unwrap_or_else(|| {
292                    panic!("instance_id {} just checked", output.module_instance_id())
293                })
294        })
295    }
296
297    pub fn map_dyn<'s, 'i, 'o, I>(
298        &'s self,
299        typed: impl IntoIterator<Item = I> + 'i,
300    ) -> impl Iterator<Item = <I as IntoDynInstance>::DynType> + 'o
301    where
302        I: IntoDynInstance,
303        'i: 'o,
304        's: 'o,
305    {
306        typed.into_iter().map(|i| self.make_dyn(i))
307    }
308
309    /// Turn a typed output into a dyn version
310    pub fn make_dyn_output(&self, output: <M::Common as ModuleCommon>::Output) -> DynOutput {
311        self.make_dyn(output)
312    }
313
314    /// Turn a typed input into a dyn version
315    pub fn make_dyn_input(&self, input: <M::Common as ModuleCommon>::Input) -> DynInput {
316        self.make_dyn(input)
317    }
318
319    /// Turn a `typed` into a dyn version
320    pub fn make_dyn<I>(&self, typed: I) -> <I as IntoDynInstance>::DynType
321    where
322        I: IntoDynInstance,
323    {
324        typed.into_dyn(self.module_instance_id)
325    }
326
327    /// Turn a typed [`ClientOutputBundle`] into a dyn version
328    pub fn make_client_outputs<O, S>(&self, output: ClientOutputBundle<O, S>) -> ClientOutputBundle
329    where
330        O: IntoDynInstance<DynType = DynOutput> + 'static,
331        S: IntoDynInstance<DynType = DynState> + 'static,
332    {
333        self.make_dyn(output)
334    }
335
336    /// Turn a typed [`ClientInputBundle`] into a dyn version
337    pub fn make_client_inputs<I, S>(&self, inputs: ClientInputBundle<I, S>) -> ClientInputBundle
338    where
339        I: IntoDynInstance<DynType = DynInput> + 'static,
340        S: IntoDynInstance<DynType = DynState> + 'static,
341    {
342        self.make_dyn(inputs)
343    }
344
345    pub fn make_dyn_state<S>(&self, sm: S) -> DynState
346    where
347        S: sm::IState + 'static,
348    {
349        DynState::from_typed(self.module_instance_id, sm)
350    }
351
352    pub async fn finalize_and_submit_transaction<F, Meta>(
353        &self,
354        operation_id: OperationId,
355        operation_type: &str,
356        operation_meta_gen: F,
357        tx_builder: TransactionBuilder,
358    ) -> anyhow::Result<OutPointRange>
359    where
360        F: Fn(OutPointRange) -> Meta + Clone + MaybeSend + MaybeSync + 'static,
361        Meta: serde::Serialize + MaybeSend,
362    {
363        self.client
364            .get()
365            .finalize_and_submit_transaction(
366                operation_id,
367                operation_type,
368                Box::new(move |out_point_range| {
369                    serde_json::to_value(operation_meta_gen(out_point_range)).expect("Can't fail")
370                }),
371                tx_builder,
372            )
373            .await
374    }
375
376    pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
377        self.client.get().transaction_updates(operation_id).await
378    }
379
380    pub async fn await_primary_module_outputs(
381        &self,
382        operation_id: OperationId,
383        // TODO: make `impl Iterator<Item = ...>`
384        outputs: Vec<OutPoint>,
385    ) -> anyhow::Result<()> {
386        self.client
387            .get()
388            .await_primary_module_outputs(operation_id, outputs)
389            .await
390    }
391
392    // TODO: unify with `Self::get_operation`
393    pub async fn get_operation(
394        &self,
395        operation_id: OperationId,
396    ) -> anyhow::Result<oplog::OperationLogEntry> {
397        let operation = self
398            .client
399            .get()
400            .operation_log()
401            .get_operation(operation_id)
402            .await
403            .ok_or(anyhow::anyhow!("Operation not found"))?;
404
405        if operation.operation_module_kind() != M::kind().as_str() {
406            bail!("Operation is not a lightning operation");
407        }
408
409        Ok(operation)
410    }
411
412    /// Get global db.
413    ///
414    /// Only intended for internal use (private).
415    fn global_db(&self) -> fedimint_core::db::Database {
416        let db = Clone::clone(self.client.get().db());
417
418        db.ensure_global()
419            .expect("global_db must always return a global db");
420
421        db
422    }
423
424    pub fn module_db(&self) -> &Database {
425        self.module_db
426            .ensure_isolated()
427            .expect("module_db must always return isolated db");
428        &self.module_db
429    }
430
431    pub async fn has_active_states(&self, op_id: OperationId) -> bool {
432        self.client.get().has_active_states(op_id).await
433    }
434
435    pub async fn operation_exists(&self, op_id: OperationId) -> bool {
436        self.client.get().operation_exists(op_id).await
437    }
438
439    pub async fn get_own_active_states(&self) -> Vec<(M::States, ActiveStateMeta)> {
440        self.client
441            .get()
442            .executor()
443            .get_active_states()
444            .await
445            .into_iter()
446            .filter(|s| s.0.module_instance_id() == self.module_instance_id)
447            .map(|s| {
448                (
449                    Clone::clone(
450                        s.0.as_any()
451                            .downcast_ref::<M::States>()
452                            .expect("incorrect output type passed to module plugin"),
453                    ),
454                    s.1,
455                )
456            })
457            .collect()
458    }
459
460    pub async fn get_config(&self) -> ClientConfig {
461        self.client.get().config().await
462    }
463
464    /// Returns an invite code for the federation that points to an arbitrary
465    /// guardian server for fetching the config
466    pub async fn get_invite_code(&self) -> InviteCode {
467        let cfg = self.get_config().await.global;
468        self.client
469            .get()
470            .invite_code(
471                *cfg.api_endpoints
472                    .keys()
473                    .next()
474                    .expect("A federation always has at least one guardian"),
475            )
476            .await
477            .expect("The guardian we requested an invite code for exists")
478    }
479
480    pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
481        self.client.get().get_internal_payment_markers()
482    }
483
484    /// This method starts n state machines with given operation id without a
485    /// corresponding transaction
486    pub async fn manual_operation_start(
487        &self,
488        operation_id: OperationId,
489        op_type: &str,
490        operation_meta: impl serde::Serialize + Debug,
491        sms: Vec<DynState>,
492    ) -> anyhow::Result<()> {
493        let db = self.module_db();
494        let mut dbtx = db.begin_transaction().await;
495        {
496            let dbtx = &mut dbtx.global_dbtx(self.global_dbtx_access_token);
497
498            self.manual_operation_start_inner(
499                &mut dbtx.to_ref_nc(),
500                operation_id,
501                op_type,
502                operation_meta,
503                sms,
504            )
505            .await?;
506        }
507
508        dbtx.commit_tx_result().await.map_err(|_| {
509            anyhow!(
510                "Operation with id {} already exists",
511                operation_id.fmt_short()
512            )
513        })?;
514
515        Ok(())
516    }
517
518    pub async fn manual_operation_start_dbtx(
519        &self,
520        dbtx: &mut DatabaseTransaction<'_>,
521        operation_id: OperationId,
522        op_type: &str,
523        operation_meta: impl serde::Serialize + Debug,
524        sms: Vec<DynState>,
525    ) -> anyhow::Result<()> {
526        self.manual_operation_start_inner(
527            &mut dbtx.global_dbtx(self.global_dbtx_access_token),
528            operation_id,
529            op_type,
530            operation_meta,
531            sms,
532        )
533        .await
534    }
535
536    /// See [`Self::manual_operation_start`], just inside a database
537    /// transaction.
538    async fn manual_operation_start_inner(
539        &self,
540        dbtx: &mut DatabaseTransaction<'_>,
541        operation_id: OperationId,
542        op_type: &str,
543        operation_meta: impl serde::Serialize + Debug,
544        sms: Vec<DynState>,
545    ) -> anyhow::Result<()> {
546        dbtx.ensure_global()
547            .expect("Must deal with global dbtx here");
548
549        if self
550            .client
551            .get()
552            .operation_log()
553            .get_operation_dbtx(&mut dbtx.to_ref_nc(), operation_id)
554            .await
555            .is_some()
556        {
557            bail!(
558                "Operation with id {} already exists",
559                operation_id.fmt_short()
560            );
561        }
562
563        self.client
564            .get()
565            .operation_log()
566            .add_operation_log_entry_dbtx(
567                &mut dbtx.to_ref_nc(),
568                operation_id,
569                op_type,
570                serde_json::to_value(operation_meta).expect("Can't fail"),
571            )
572            .await;
573
574        self.client
575            .get()
576            .executor()
577            .add_state_machines_dbtx(&mut dbtx.to_ref_nc(), sms)
578            .await
579            .expect("State machine is valid");
580
581        Ok(())
582    }
583
584    pub fn outcome_or_updates<U, S>(
585        &self,
586        operation: OperationLogEntry,
587        operation_id: OperationId,
588        stream_gen: impl FnOnce() -> S + 'static,
589    ) -> UpdateStreamOrOutcome<U>
590    where
591        U: Clone + Serialize + DeserializeOwned + Debug + MaybeSend + MaybeSync + 'static,
592        S: Stream<Item = U> + MaybeSend + 'static,
593    {
594        use futures::StreamExt;
595        match self.client.get().operation_log().outcome_or_updates(
596            &self.global_db(),
597            operation_id,
598            operation,
599            Box::new(move || {
600                let stream_gen = stream_gen();
601                Box::pin(
602                    stream_gen.map(move |item| serde_json::to_value(item).expect("Can't fail")),
603                )
604            }),
605        ) {
606            UpdateStreamOrOutcome::UpdateStream(stream) => UpdateStreamOrOutcome::UpdateStream(
607                Box::pin(stream.map(|u| serde_json::from_value(u).expect("Can't fail"))),
608            ),
609            UpdateStreamOrOutcome::Outcome(o) => {
610                UpdateStreamOrOutcome::Outcome(serde_json::from_value(o).expect("Can't fail"))
611            }
612        }
613    }
614
615    pub async fn claim_inputs<I, S>(
616        &self,
617        dbtx: &mut DatabaseTransaction<'_>,
618        inputs: ClientInputBundle<I, S>,
619        operation_id: OperationId,
620    ) -> anyhow::Result<OutPointRange>
621    where
622        I: IInput + MaybeSend + MaybeSync + 'static,
623        S: sm::IState + MaybeSend + MaybeSync + 'static,
624    {
625        self.claim_inputs_dyn(dbtx, inputs.into_instanceless(), operation_id)
626            .await
627    }
628
629    async fn claim_inputs_dyn(
630        &self,
631        dbtx: &mut DatabaseTransaction<'_>,
632        inputs: InstancelessDynClientInputBundle,
633        operation_id: OperationId,
634    ) -> anyhow::Result<OutPointRange> {
635        let tx_builder =
636            TransactionBuilder::new().with_inputs(inputs.into_dyn(self.module_instance_id));
637
638        self.client
639            .get()
640            .finalize_and_submit_transaction_inner(
641                &mut dbtx.global_dbtx(self.global_dbtx_access_token),
642                operation_id,
643                tx_builder,
644            )
645            .await
646    }
647
648    pub async fn add_state_machines_dbtx(
649        &self,
650        dbtx: &mut DatabaseTransaction<'_>,
651        states: Vec<DynState>,
652    ) -> AddStateMachinesResult {
653        self.client
654            .get()
655            .executor()
656            .add_state_machines_dbtx(&mut dbtx.global_dbtx(self.global_dbtx_access_token), states)
657            .await
658    }
659
660    pub async fn add_operation_log_entry_dbtx(
661        &self,
662        dbtx: &mut DatabaseTransaction<'_>,
663        operation_id: OperationId,
664        operation_type: &str,
665        operation_meta: impl serde::Serialize,
666    ) {
667        self.client
668            .get()
669            .operation_log()
670            .add_operation_log_entry_dbtx(
671                &mut dbtx.global_dbtx(self.global_dbtx_access_token),
672                operation_id,
673                operation_type,
674                serde_json::to_value(operation_meta).expect("Can't fail"),
675            )
676            .await;
677    }
678
679    pub async fn log_event<E, Cap>(&self, dbtx: &mut DatabaseTransaction<'_, Cap>, event: E)
680    where
681        E: Event + Send,
682        Cap: Send,
683    {
684        if <E as Event>::MODULE != Some(<M as ClientModule>::kind()) {
685            warn!(
686                target: LOG_CLIENT,
687                module_kind = %<M as ClientModule>::kind(),
688                event_module = ?<E as Event>::MODULE,
689                "Client module logging events of different module than its own. This might become an error in the future."
690            );
691        }
692        self.client
693            .get()
694            .log_event_json(
695                &mut dbtx.global_dbtx(self.global_dbtx_access_token).to_ref_nc(),
696                <E as Event>::MODULE,
697                self.module_instance_id,
698                <E as Event>::KIND,
699                serde_json::to_value(event).expect("Can't fail"),
700                <E as Event>::PERSISTENCE,
701            )
702            .await;
703    }
704}
705
706/// Priority module priority (lower number is higher priority)
707#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
708pub struct PrimaryModulePriority(u64);
709
710impl PrimaryModulePriority {
711    pub const HIGH: Self = Self(100);
712    pub const LOW: Self = Self(10000);
713
714    pub fn custom(prio: u64) -> Self {
715        Self(prio)
716    }
717}
718/// Which amount units this module supports being primary for
719pub enum PrimaryModuleSupport {
720    /// Potentially any unit
721    Any { priority: PrimaryModulePriority },
722    /// Some units supported by the module
723    Selected {
724        priority: PrimaryModulePriority,
725        units: BTreeSet<AmountUnit>,
726    },
727    /// None
728    None,
729}
730
731impl PrimaryModuleSupport {
732    pub fn selected<const N: usize>(
733        priority: PrimaryModulePriority,
734        units: [AmountUnit; N],
735    ) -> Self {
736        Self::Selected {
737            priority,
738            units: BTreeSet::from(units),
739        }
740    }
741}
742
743/// Fedimint module client
744#[apply(async_trait_maybe_send!)]
745pub trait ClientModule: Debug + MaybeSend + MaybeSync + 'static {
746    type Init: ClientModuleInit;
747
748    /// Common module types shared between client and server
749    type Common: ModuleCommon;
750
751    /// Data stored in regular backups so that restoring doesn't have to start
752    /// from epoch 0
753    type Backup: ModuleBackup;
754
755    /// Data and API clients available to state machine transitions of this
756    /// module
757    type ModuleStateMachineContext: Context;
758
759    /// All possible states this client can submit to the executor
760    type States: State<ModuleContext = Self::ModuleStateMachineContext>
761        + IntoDynInstance<DynType = DynState>;
762
763    fn decoder() -> Decoder {
764        let mut decoder_builder = Self::Common::decoder_builder();
765        decoder_builder.with_decodable_type::<Self::States>();
766        decoder_builder.with_decodable_type::<Self::Backup>();
767        decoder_builder.build()
768    }
769
770    fn kind() -> ModuleKind {
771        <<<Self as ClientModule>::Init as ModuleInit>::Common as CommonModuleInit>::KIND
772    }
773
774    fn context(&self) -> Self::ModuleStateMachineContext;
775
776    /// Initialize client.
777    ///
778    /// Called by the core client code on start, after [`ClientContext`] is
779    /// fully initialized, so unlike during [`ClientModuleInit::init`],
780    /// access to global client is allowed.
781    async fn start(&self) {}
782
783    async fn handle_cli_command(
784        &self,
785        _args: &[ffi::OsString],
786    ) -> anyhow::Result<serde_json::Value> {
787        Err(anyhow::format_err!(
788            "This module does not implement cli commands"
789        ))
790    }
791
792    async fn handle_rpc(
793        &self,
794        _method: String,
795        _request: serde_json::Value,
796    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
797        Box::pin(futures::stream::once(std::future::ready(Err(
798            anyhow::format_err!("This module does not implement rpc"),
799        ))))
800    }
801
802    /// Returns the fee the processing of this input requires.
803    ///
804    /// If the semantics of a given input aren't known this function returns
805    /// `None`, this only happens if a future version of Fedimint introduces a
806    /// new input variant. For clients this should only be the case when
807    /// processing transactions created by other users, so the result of
808    /// this function can be `unwrap`ped whenever dealing with inputs
809    /// generated by ourselves.
810    fn input_fee(
811        &self,
812        amount: &Amounts,
813        input: &<Self::Common as ModuleCommon>::Input,
814    ) -> Option<Amounts>;
815
816    /// Returns the fee the processing of this output requires.
817    ///
818    /// If the semantics of a given output aren't known this function returns
819    /// `None`, this only happens if a future version of Fedimint introduces a
820    /// new output variant. For clients this should only be the case when
821    /// processing transactions created by other users, so the result of
822    /// this function can be `unwrap`ped whenever dealing with inputs
823    /// generated by ourselves.
824    fn output_fee(
825        &self,
826        amount: &Amounts,
827        output: &<Self::Common as ModuleCommon>::Output,
828    ) -> Option<Amounts>;
829
830    fn supports_backup(&self) -> bool {
831        false
832    }
833
834    async fn backup(&self) -> anyhow::Result<Self::Backup> {
835        anyhow::bail!("Backup not supported");
836    }
837
838    /// Does this module support being a primary module
839    ///
840    /// If it does it must implement:
841    ///
842    /// * [`Self::create_final_inputs_and_outputs`]
843    /// * [`Self::await_primary_module_output`]
844    /// * [`Self::get_balance`]
845    /// * [`Self::subscribe_balance_changes`]
846    fn supports_being_primary(&self) -> PrimaryModuleSupport {
847        PrimaryModuleSupport::None
848    }
849
850    /// Creates all inputs and outputs necessary to balance the transaction.
851    /// The function returns an error if and only if the client's funds are not
852    /// sufficient to create the inputs necessary to fully fund the transaction.
853    ///
854    /// A returned input also contains:
855    /// * A set of private keys belonging to the input for signing the
856    ///   transaction
857    /// * A closure that generates states belonging to the input. This closure
858    ///   takes the transaction id of the transaction in which the input was
859    ///   used and the input index as input since these cannot be known at time
860    ///   of calling `create_funding_input` and have to be injected later.
861    ///
862    /// A returned output also contains:
863    /// * A closure that generates states belonging to the output. This closure
864    ///   takes the transaction id of the transaction in which the output was
865    ///   used and the output index as input since these cannot be known at time
866    ///   of calling `create_change_output` and have to be injected later.
867    async fn create_final_inputs_and_outputs(
868        &self,
869        _dbtx: &mut DatabaseTransaction<'_>,
870        _operation_id: OperationId,
871        _unit: AmountUnit,
872        _input_amount: Amount,
873        _output_amount: Amount,
874    ) -> anyhow::Result<(
875        ClientInputBundle<<Self::Common as ModuleCommon>::Input, Self::States>,
876        ClientOutputBundle<<Self::Common as ModuleCommon>::Output, Self::States>,
877    )> {
878        unimplemented!()
879    }
880
881    /// Waits for the funds from an output created by
882    /// [`Self::create_final_inputs_and_outputs`] to become available. This
883    /// function returning typically implies a change in the output of
884    /// [`Self::get_balance`].
885    async fn await_primary_module_output(
886        &self,
887        _operation_id: OperationId,
888        _out_point: OutPoint,
889    ) -> anyhow::Result<()> {
890        unimplemented!()
891    }
892
893    /// Returns the balance held by this module and available for funding
894    /// transactions.
895    async fn get_balance(&self, _dbtx: &mut DatabaseTransaction<'_>, _unit: AmountUnit) -> Amount {
896        unimplemented!()
897    }
898
899    /// Returns the balance held by this module and available for funding
900    /// transactions.
901    async fn get_balances(&self, _dbtx: &mut DatabaseTransaction<'_>) -> Amounts {
902        unimplemented!()
903    }
904
905    /// Returns a stream that will output the updated module balance each time
906    /// it changes.
907    async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
908        unimplemented!()
909    }
910
911    /// Leave the federation
912    ///
913    /// While technically there's nothing stopping the client from just
914    /// abandoning Federation at any point by deleting all the related
915    /// local data, it is useful to make sure it's safe beforehand.
916    ///
917    /// This call indicates the desire of the caller client code
918    /// to orderly and safely leave the Federation by this module instance.
919    /// The goal of the implementations is to fulfil that wish,
920    /// giving prompt and informative feedback if it's not yet possible.
921    ///
922    /// The client module implementation should handle the request
923    /// and return as fast as possible avoiding blocking for longer than
924    /// necessary. This would usually involve some combination of:
925    ///
926    /// * recording the state of being in process of leaving the Federation to
927    ///   prevent initiating new conditions that could delay its completion;
928    /// * performing any fast to complete cleanup/exit logic;
929    /// * initiating any time-consuming logic (e.g. canceling outstanding
930    ///   contracts), as background jobs, tasks machines, etc.
931    /// * checking for any conditions indicating it might not be safe to leave
932    ///   at the moment.
933    ///
934    /// This function should return `Ok` only if from the perspective
935    /// of this module instance, it is safe to delete client data and
936    /// stop using it, with no further actions (like background jobs) required
937    /// to complete.
938    ///
939    /// This function should return an error if it's not currently possible
940    /// to safely (e.g. without loosing funds) leave the Federation.
941    /// It should avoid running indefinitely trying to complete any cleanup
942    /// actions necessary to reach a clean state, preferring spawning new
943    /// state machines and returning an informative error about cleanup
944    /// still in progress.
945    ///
946    /// If any internal task needs to complete, any user action is required,
947    /// or even external condition needs to be met this function
948    /// should return a `Err`.
949    ///
950    /// Notably modules should not disable interaction that might be necessary
951    /// for the user (possibly through other modules) to leave the Federation.
952    /// In particular a Mint module should retain ability to create new notes,
953    /// and LN module should retain ability to send funds out.
954    ///
955    /// Calling code must NOT assume that a module that once returned `Ok`,
956    /// will not return `Err` at later point. E.g. a Mint module might have
957    /// no outstanding balance at first, but other modules winding down
958    /// might "cash-out" to Ecash.
959    ///
960    /// Before leaving the Federation and deleting any state the calling code
961    /// must collect a full round of `Ok` from all the modules.
962    ///
963    /// Calling code should allow the user to override and ignore any
964    /// outstanding errors, after sufficient amount of warnings. Ideally,
965    /// this should be done on per-module basis, to avoid mistakes.
966    async fn leave(&self, _dbtx: &mut DatabaseTransaction<'_>) -> anyhow::Result<()> {
967        bail!("Unable to determine if safe to leave the federation: Not implemented")
968    }
969}
970
971/// Type-erased version of [`ClientModule`]
972#[apply(async_trait_maybe_send!)]
973pub trait IClientModule: Debug {
974    fn as_any(&self) -> &(maybe_add_send_sync!(dyn std::any::Any));
975
976    fn decoder(&self) -> Decoder;
977
978    fn context(&self, instance: ModuleInstanceId) -> DynContext;
979
980    async fn start(&self);
981
982    async fn handle_cli_command(&self, args: &[ffi::OsString])
983    -> anyhow::Result<serde_json::Value>;
984
985    async fn handle_rpc(
986        &self,
987        method: String,
988        request: serde_json::Value,
989    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>>;
990
991    fn input_fee(&self, amount: &Amounts, input: &DynInput) -> Option<Amounts>;
992
993    fn output_fee(&self, amount: &Amounts, output: &DynOutput) -> Option<Amounts>;
994
995    fn supports_backup(&self) -> bool;
996
997    async fn backup(&self, module_instance_id: ModuleInstanceId)
998    -> anyhow::Result<DynModuleBackup>;
999
1000    fn supports_being_primary(&self) -> PrimaryModuleSupport;
1001
1002    async fn create_final_inputs_and_outputs(
1003        &self,
1004        module_instance: ModuleInstanceId,
1005        dbtx: &mut DatabaseTransaction<'_>,
1006        operation_id: OperationId,
1007        unit: AmountUnit,
1008        input_amount: Amount,
1009        output_amount: Amount,
1010    ) -> anyhow::Result<(ClientInputBundle, ClientOutputBundle)>;
1011
1012    async fn await_primary_module_output(
1013        &self,
1014        operation_id: OperationId,
1015        out_point: OutPoint,
1016    ) -> anyhow::Result<()>;
1017
1018    async fn get_balance(
1019        &self,
1020        module_instance: ModuleInstanceId,
1021        dbtx: &mut DatabaseTransaction<'_>,
1022        unit: AmountUnit,
1023    ) -> Amount;
1024
1025    async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()>;
1026}
1027
1028#[apply(async_trait_maybe_send!)]
1029impl<T> IClientModule for T
1030where
1031    T: ClientModule,
1032{
1033    fn as_any(&self) -> &(maybe_add_send_sync!(dyn Any)) {
1034        self
1035    }
1036
1037    fn decoder(&self) -> Decoder {
1038        T::decoder()
1039    }
1040
1041    fn context(&self, instance: ModuleInstanceId) -> DynContext {
1042        DynContext::from_typed(instance, <T as ClientModule>::context(self))
1043    }
1044
1045    async fn start(&self) {
1046        <T as ClientModule>::start(self).await;
1047    }
1048
1049    async fn handle_cli_command(
1050        &self,
1051        args: &[ffi::OsString],
1052    ) -> anyhow::Result<serde_json::Value> {
1053        <T as ClientModule>::handle_cli_command(self, args).await
1054    }
1055
1056    async fn handle_rpc(
1057        &self,
1058        method: String,
1059        request: serde_json::Value,
1060    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1061        <T as ClientModule>::handle_rpc(self, method, request).await
1062    }
1063
1064    fn input_fee(&self, amount: &Amounts, input: &DynInput) -> Option<Amounts> {
1065        <T as ClientModule>::input_fee(
1066            self,
1067            amount,
1068            input
1069                .as_any()
1070                .downcast_ref()
1071                .expect("Dispatched to correct module"),
1072        )
1073    }
1074
1075    fn output_fee(&self, amount: &Amounts, output: &DynOutput) -> Option<Amounts> {
1076        <T as ClientModule>::output_fee(
1077            self,
1078            amount,
1079            output
1080                .as_any()
1081                .downcast_ref()
1082                .expect("Dispatched to correct module"),
1083        )
1084    }
1085
1086    fn supports_backup(&self) -> bool {
1087        <T as ClientModule>::supports_backup(self)
1088    }
1089
1090    async fn backup(
1091        &self,
1092        module_instance_id: ModuleInstanceId,
1093    ) -> anyhow::Result<DynModuleBackup> {
1094        Ok(DynModuleBackup::from_typed(
1095            module_instance_id,
1096            <T as ClientModule>::backup(self).await?,
1097        ))
1098    }
1099
1100    fn supports_being_primary(&self) -> PrimaryModuleSupport {
1101        <T as ClientModule>::supports_being_primary(self)
1102    }
1103
1104    async fn create_final_inputs_and_outputs(
1105        &self,
1106        module_instance: ModuleInstanceId,
1107        dbtx: &mut DatabaseTransaction<'_>,
1108        operation_id: OperationId,
1109        unit: AmountUnit,
1110        input_amount: Amount,
1111        output_amount: Amount,
1112    ) -> anyhow::Result<(ClientInputBundle, ClientOutputBundle)> {
1113        let (inputs, outputs) = <T as ClientModule>::create_final_inputs_and_outputs(
1114            self,
1115            &mut dbtx.to_ref_with_prefix_module_id(module_instance).0,
1116            operation_id,
1117            unit,
1118            input_amount,
1119            output_amount,
1120        )
1121        .await?;
1122
1123        let inputs = inputs.into_dyn(module_instance);
1124
1125        let outputs = outputs.into_dyn(module_instance);
1126
1127        Ok((inputs, outputs))
1128    }
1129
1130    async fn await_primary_module_output(
1131        &self,
1132        operation_id: OperationId,
1133        out_point: OutPoint,
1134    ) -> anyhow::Result<()> {
1135        <T as ClientModule>::await_primary_module_output(self, operation_id, out_point).await
1136    }
1137
1138    async fn get_balance(
1139        &self,
1140        module_instance: ModuleInstanceId,
1141        dbtx: &mut DatabaseTransaction<'_>,
1142        unit: AmountUnit,
1143    ) -> Amount {
1144        <T as ClientModule>::get_balance(
1145            self,
1146            &mut dbtx.to_ref_with_prefix_module_id(module_instance).0,
1147            unit,
1148        )
1149        .await
1150    }
1151
1152    async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
1153        <T as ClientModule>::subscribe_balance_changes(self).await
1154    }
1155}
1156
1157dyn_newtype_define!(
1158    #[derive(Clone)]
1159    pub DynClientModule(Arc<IClientModule>)
1160);
1161
1162impl AsRef<maybe_add_send_sync!(dyn IClientModule + 'static)> for DynClientModule {
1163    fn as_ref(&self) -> &maybe_add_send_sync!(dyn IClientModule + 'static) {
1164        self.inner.as_ref()
1165    }
1166}
1167
1168/// A contiguous range of input/output indexes
1169#[derive(Copy, Clone, Encodable, Decodable, Serialize, Deserialize, PartialEq, Eq, Hash, Debug)]
1170pub struct IdxRange {
1171    start: u64,
1172    end: u64,
1173}
1174
1175impl IdxRange {
1176    pub fn new_single(start: u64) -> Option<Self> {
1177        start.checked_add(1).map(|end| Self { start, end })
1178    }
1179
1180    pub fn start(self) -> u64 {
1181        self.start
1182    }
1183
1184    pub fn count(self) -> usize {
1185        self.into_iter().count()
1186    }
1187
1188    pub fn from_inclusive(range: ops::RangeInclusive<u64>) -> Option<Self> {
1189        range.end().checked_add(1).map(|end| Self {
1190            start: *range.start(),
1191            end,
1192        })
1193    }
1194}
1195
1196impl From<Range<u64>> for IdxRange {
1197    fn from(Range { start, end }: Range<u64>) -> Self {
1198        Self { start, end }
1199    }
1200}
1201
1202impl IntoIterator for IdxRange {
1203    type Item = u64;
1204
1205    type IntoIter = ops::Range<u64>;
1206
1207    fn into_iter(self) -> Self::IntoIter {
1208        ops::Range {
1209            start: self.start,
1210            end: self.end,
1211        }
1212    }
1213}
1214
1215#[derive(Copy, Clone, Encodable, Decodable, Serialize, Deserialize, PartialEq, Eq, Hash, Debug)]
1216pub struct OutPointRange {
1217    pub txid: TransactionId,
1218    idx_range: IdxRange,
1219}
1220
1221impl OutPointRange {
1222    pub fn new(txid: TransactionId, idx_range: IdxRange) -> Self {
1223        Self { txid, idx_range }
1224    }
1225
1226    pub fn new_single(txid: TransactionId, idx: u64) -> Option<Self> {
1227        IdxRange::new_single(idx).map(|idx_range| Self { txid, idx_range })
1228    }
1229
1230    pub fn start_idx(self) -> u64 {
1231        self.idx_range.start()
1232    }
1233
1234    pub fn out_idx_iter(self) -> impl Iterator<Item = u64> {
1235        self.idx_range.into_iter()
1236    }
1237
1238    pub fn count(self) -> usize {
1239        self.idx_range.count()
1240    }
1241}
1242
1243impl IntoIterator for OutPointRange {
1244    type Item = OutPoint;
1245
1246    type IntoIter = OutPointRangeIter;
1247
1248    fn into_iter(self) -> Self::IntoIter {
1249        OutPointRangeIter {
1250            txid: self.txid,
1251            inner: self.idx_range.into_iter(),
1252        }
1253    }
1254}
1255
1256pub struct OutPointRangeIter {
1257    txid: TransactionId,
1258
1259    inner: ops::Range<u64>,
1260}
1261
1262impl OutPointRange {
1263    pub fn txid(&self) -> TransactionId {
1264        self.txid
1265    }
1266}
1267
1268impl Iterator for OutPointRangeIter {
1269    type Item = OutPoint;
1270
1271    fn next(&mut self) -> Option<Self::Item> {
1272        self.inner.next().map(|idx| OutPoint {
1273            txid: self.txid,
1274            out_idx: idx,
1275        })
1276    }
1277}
1278
1279pub type StateGenerator<S> = Arc<maybe_add_send_sync!(dyn Fn(OutPointRange) -> Vec<S> + 'static)>;