Skip to main content

fedimint_mintv2_client/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::missing_errors_doc)]
4#![allow(clippy::missing_panics_doc)]
5#![allow(clippy::module_name_repetitions)]
6#![allow(clippy::must_use_candidate)]
7#![allow(clippy::return_self_not_must_use)]
8#![allow(clippy::too_many_lines)]
9
10pub use fedimint_mintv2_common as common;
11
12mod api;
13#[cfg(feature = "cli")]
14mod cli;
15mod client_db;
16mod ecash;
17mod events;
18mod input;
19pub mod issuance;
20mod output;
21mod receive;
22
23use std::collections::{BTreeMap, BTreeSet};
24use std::convert::Infallible;
25use std::sync::{Arc, RwLock};
26use std::time::Duration;
27
28use anyhow::{Context as _, anyhow};
29use bitcoin_hashes::sha256;
30use client_db::{RecoveryState, RecoveryStateKey, SpendableNoteAmountPrefix, SpendableNotePrefix};
31pub use events::*;
32use fedimint_api_client::api::DynModuleApi;
33use fedimint_client::module::ClientModule;
34use fedimint_client::transaction::{
35    ClientInput, ClientInputBundle, ClientInputSM, ClientOutput, ClientOutputBundle,
36    ClientOutputSM, TransactionBuilder,
37};
38use fedimint_client_module::db::ClientModuleMigrationFn;
39use fedimint_client_module::module::init::{
40    ClientModuleInit, ClientModuleInitArgs, ClientModuleRecoverArgs,
41};
42use fedimint_client_module::module::recovery::{NoModuleBackup, RecoveryProgress};
43use fedimint_client_module::module::{
44    ClientContext, OutPointRange, PrimaryModulePriority, PrimaryModuleSupport,
45};
46use fedimint_client_module::sm::{Context, DynState, ModuleNotifier, State, StateTransition};
47use fedimint_client_module::{DynGlobalClientContext, sm_enum_variant_translation};
48use fedimint_core::base32::{self, FEDIMINT_PREFIX};
49use fedimint_core::config::FederationId;
50use fedimint_core::core::{IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId};
51use fedimint_core::db::{DatabaseTransaction, DatabaseVersion, IDatabaseTransactionOpsCoreTyped};
52use fedimint_core::encoding::{Decodable, Encodable};
53use fedimint_core::module::{
54    AmountUnit, Amounts, ApiVersion, CommonModuleInit, ModuleCommon, ModuleInit, MultiApiVersion,
55};
56use fedimint_core::secp256k1::rand::{Rng, thread_rng};
57use fedimint_core::secp256k1::{Keypair, PublicKey};
58use fedimint_core::util::{BoxStream, NextOrPending};
59use fedimint_core::{Amount, OutPoint, PeerId, apply, async_trait_maybe_send};
60use fedimint_derive_secret::DerivableSecret;
61use fedimint_mintv2_common::config::{FeeConsensus, MintClientConfig, client_denominations};
62use fedimint_mintv2_common::{
63    Denomination, KIND, MintCommonInit, MintInput, MintModuleTypes, MintOutput, Note, RecoveryItem,
64};
65use futures::{StreamExt, pin_mut};
66use itertools::Itertools;
67use rand::seq::IteratorRandom;
68use serde::{Deserialize, Serialize};
69use serde_json::Value;
70use tbs::AggregatePublicKey;
71use thiserror::Error;
72
73use crate::api::MintV2ModuleApi;
74use crate::client_db::SpendableNoteKey;
75pub use crate::ecash::ECash;
76use crate::input::{InputSMCommon, InputSMState, InputStateMachine};
77use crate::issuance::NoteIssuanceRequest;
78use crate::output::{MintOutputStateMachine, OutputSMCommon, OutputSMState};
79use crate::receive::{ReceiveSMState, ReceiveStateMachine};
80
81const TARGET_PER_DENOMINATION: usize = 3;
82const SLICE_SIZE: u64 = 10000;
83const PARALLEL_HASH_REQUESTS: usize = 10;
84const PARALLEL_SLICE_REQUESTS: usize = 10;
85
86#[derive(Debug, Clone, PartialEq, Eq, Hash, Encodable, Decodable)]
87pub struct SpendableNote {
88    pub denomination: Denomination,
89    pub keypair: Keypair,
90    pub signature: tbs::Signature,
91}
92
93impl SpendableNote {
94    pub fn amount(&self) -> Amount {
95        self.denomination.amount()
96    }
97}
98
99impl SpendableNote {
100    fn nonce(&self) -> PublicKey {
101        self.keypair.public_key()
102    }
103
104    fn note(&self) -> Note {
105        Note {
106            denomination: self.denomination,
107            nonce: self.nonce(),
108            signature: self.signature,
109        }
110    }
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
114pub enum MintOperationMeta {
115    Send {
116        ecash: String,
117        custom_meta: Value,
118    },
119    Reissue {
120        change_outpoint_range: OutPointRange,
121        amount: Amount,
122        custom_meta: Value,
123    },
124    Receive {
125        change_outpoint_range: OutPointRange,
126        ecash: String,
127        custom_meta: Value,
128    },
129}
130
131#[derive(Debug, Clone)]
132pub struct MintClientInit;
133
134impl ModuleInit for MintClientInit {
135    type Common = MintCommonInit;
136
137    async fn dump_database(
138        &self,
139        _dbtx: &mut DatabaseTransaction<'_>,
140        _prefix_names: Vec<String>,
141    ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
142        Box::new(BTreeMap::new().into_iter())
143    }
144}
145
146#[apply(async_trait_maybe_send!)]
147impl ClientModuleInit for MintClientInit {
148    type Module = MintClientModule;
149
150    fn supported_api_versions(&self) -> MultiApiVersion {
151        MultiApiVersion::try_from_iter([ApiVersion { major: 0, minor: 1 }])
152            .expect("no version conflicts")
153    }
154
155    async fn recover(
156        &self,
157        args: &ClientModuleRecoverArgs<Self>,
158        _snapshot: Option<&NoModuleBackup>,
159    ) -> anyhow::Result<()> {
160        let mut state = if let Some(state) = args
161            .db()
162            .begin_transaction_nc()
163            .await
164            .get_value(&RecoveryStateKey)
165            .await
166        {
167            state
168        } else {
169            RecoveryState {
170                next_index: 0,
171                total_items: args.module_api().fetch_recovery_count().await?,
172                requests: BTreeMap::new(),
173                nonces: BTreeSet::new(),
174            }
175        };
176
177        if state.next_index == state.total_items {
178            return Ok(());
179        }
180
181        let peer_selector = PeerSelector::new(args.api().all_peers().clone());
182
183        let mut recovery_stream = futures::stream::iter(
184            (state.next_index..state.total_items).step_by(SLICE_SIZE as usize),
185        )
186        .map(|start| {
187            let api = args.module_api().clone();
188            let end = std::cmp::min(start + SLICE_SIZE, state.total_items);
189
190            async move { (start, end, api.fetch_recovery_slice_hash(start, end).await) }
191        })
192        .buffered(PARALLEL_HASH_REQUESTS)
193        .map(|(start, end, hash)| {
194            download_slice_with_hash(
195                args.module_api().clone(),
196                peer_selector.clone(),
197                start,
198                end,
199                hash,
200            )
201        })
202        .buffered(PARALLEL_SLICE_REQUESTS);
203
204        let tweak_filter = issuance::tweak_filter(args.module_root_secret());
205
206        loop {
207            let items = recovery_stream
208                .next()
209                .await
210                .context("Recovery stream finished before recovery is complete")?;
211
212            for item in &items {
213                match item {
214                    RecoveryItem::Output {
215                        denomination,
216                        nonce_hash,
217                        tweak,
218                    } => {
219                        if !issuance::check_tweak(*tweak, tweak_filter) {
220                            continue;
221                        }
222                        let output_secret = issuance::output_secret(
223                            *denomination,
224                            *tweak,
225                            args.module_root_secret(),
226                        );
227
228                        if !issuance::check_nonce(&output_secret, *nonce_hash) {
229                            continue;
230                        }
231
232                        let computed_nonce_hash = issuance::nonce(&output_secret).consensus_hash();
233
234                        // Ignore possible duplicate nonces
235                        if !state.nonces.insert(computed_nonce_hash) {
236                            continue;
237                        }
238
239                        state.requests.insert(
240                            computed_nonce_hash,
241                            NoteIssuanceRequest::new(
242                                *denomination,
243                                *tweak,
244                                args.module_root_secret(),
245                            ),
246                        );
247                    }
248                    RecoveryItem::Input { nonce_hash } => {
249                        state.requests.remove(nonce_hash);
250                        state.nonces.remove(nonce_hash);
251                    }
252                }
253            }
254
255            state.next_index += items.len() as u64;
256
257            let mut dbtx = args.db().begin_transaction().await;
258
259            dbtx.insert_entry(&RecoveryStateKey, &state).await;
260
261            if state.next_index == state.total_items {
262                let state_machines = args
263                    .context()
264                    .map_dyn(vec![MintClientStateMachines::Output(
265                        MintOutputStateMachine {
266                            common: OutputSMCommon {
267                                operation_id: OperationId::new_random(),
268                                range: None,
269                                issuance_requests: state.requests.into_values().collect(),
270                            },
271                            state: OutputSMState::Pending,
272                        },
273                    )])
274                    .collect();
275
276                args.context()
277                    .add_state_machines_dbtx(&mut dbtx.to_ref_nc(), state_machines)
278                    .await
279                    .expect("state machine is valid");
280
281                dbtx.commit_tx().await;
282
283                return Ok(());
284            }
285
286            dbtx.commit_tx().await;
287
288            args.update_recovery_progress(RecoveryProgress {
289                complete: state.next_index.try_into().unwrap_or(u32::MAX),
290                total: state.total_items.try_into().unwrap_or(u32::MAX),
291            });
292        }
293    }
294
295    async fn init(&self, args: &ClientModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
296        let (tweak_sender, tweak_receiver) = async_channel::bounded(50);
297
298        let filter = issuance::tweak_filter(args.module_root_secret());
299
300        tokio::task::spawn_blocking(move || {
301            loop {
302                let tweak: [u8; 16] = thread_rng().r#gen();
303
304                if !issuance::check_tweak(tweak, filter) {
305                    continue;
306                }
307
308                if tweak_sender.send_blocking(tweak).is_err() {
309                    return;
310                }
311            }
312        });
313
314        Ok(MintClientModule {
315            federation_id: *args.federation_id(),
316            cfg: args.cfg().clone(),
317            root_secret: args.module_root_secret().clone(),
318            notifier: args.notifier().clone(),
319            client_ctx: args.context(),
320            balance_update_sender: tokio::sync::watch::channel(()).0,
321            tweak_receiver,
322        })
323    }
324
325    fn get_database_migrations(&self) -> BTreeMap<DatabaseVersion, ClientModuleMigrationFn> {
326        BTreeMap::new()
327    }
328}
329
330#[derive(Debug)]
331pub struct MintClientModule {
332    federation_id: FederationId,
333    cfg: MintClientConfig,
334    root_secret: DerivableSecret,
335    notifier: ModuleNotifier<MintClientStateMachines>,
336    client_ctx: ClientContext<Self>,
337    balance_update_sender: tokio::sync::watch::Sender<()>,
338    tweak_receiver: async_channel::Receiver<[u8; 16]>,
339}
340
341#[derive(Debug, Clone)]
342pub struct MintClientContext {
343    client_ctx: ClientContext<MintClientModule>,
344    tbs_agg_pks: BTreeMap<Denomination, AggregatePublicKey>,
345    tbs_pks: BTreeMap<Denomination, BTreeMap<PeerId, tbs::PublicKeyShare>>,
346    pub balance_update_sender: tokio::sync::watch::Sender<()>,
347}
348
349impl Context for MintClientContext {
350    const KIND: Option<ModuleKind> = Some(KIND);
351}
352
353#[apply(async_trait_maybe_send!)]
354impl ClientModule for MintClientModule {
355    type Init = MintClientInit;
356    type Common = MintModuleTypes;
357    type Backup = NoModuleBackup;
358    type ModuleStateMachineContext = MintClientContext;
359    type States = MintClientStateMachines;
360
361    fn context(&self) -> Self::ModuleStateMachineContext {
362        MintClientContext {
363            client_ctx: self.client_ctx.clone(),
364            tbs_agg_pks: self.cfg.tbs_agg_pks.clone(),
365            tbs_pks: self.cfg.tbs_pks.clone(),
366            balance_update_sender: self.balance_update_sender.clone(),
367        }
368    }
369
370    fn input_fee(
371        &self,
372        amounts: &Amounts,
373        _input: &<Self::Common as ModuleCommon>::Input,
374    ) -> Option<Amounts> {
375        let unit = self.cfg.amount_unit;
376        let amount = amounts.get(&unit).copied().unwrap_or_default();
377        let fee = self.cfg.fee_consensus.fee(amount);
378
379        Some(Amounts::new_custom(unit, fee))
380    }
381
382    fn output_fee(
383        &self,
384        amounts: &Amounts,
385        _output: &<Self::Common as ModuleCommon>::Output,
386    ) -> Option<Amounts> {
387        let unit = self.cfg.amount_unit;
388        let amount = amounts.get(&unit).copied().unwrap_or_default();
389        let fee = self.cfg.fee_consensus.fee(amount);
390
391        Some(Amounts::new_custom(unit, fee))
392    }
393
394    #[cfg(feature = "cli")]
395    async fn handle_cli_command(
396        &self,
397        args: &[std::ffi::OsString],
398    ) -> anyhow::Result<serde_json::Value> {
399        cli::handle_cli_command(self, args).await
400    }
401
402    fn supports_being_primary(&self) -> PrimaryModuleSupport {
403        PrimaryModuleSupport::selected(PrimaryModulePriority::HIGH, [self.cfg.amount_unit])
404    }
405
406    async fn create_final_inputs_and_outputs(
407        &self,
408        dbtx: &mut DatabaseTransaction<'_>,
409        operation_id: OperationId,
410        unit: AmountUnit,
411        mut input_amount: Amount,
412        mut output_amount: Amount,
413    ) -> anyhow::Result<(
414        ClientInputBundle<MintInput, MintClientStateMachines>,
415        ClientOutputBundle<MintOutput, MintClientStateMachines>,
416    )> {
417        if unit != self.cfg.amount_unit {
418            anyhow::bail!("Module can only handle its configured amount unit");
419        }
420
421        let funding_notes = self
422            .select_funding_input(dbtx, output_amount.saturating_sub(input_amount))
423            .await
424            .context("Insufficient funds")?;
425
426        for note in &funding_notes {
427            self.remove_spendable_note(dbtx, note).await;
428        }
429
430        input_amount += funding_notes.iter().map(SpendableNote::amount).sum();
431
432        output_amount += funding_notes
433            .iter()
434            .map(|input| self.cfg.fee_consensus.fee(input.amount()))
435            .sum();
436
437        assert!(output_amount <= input_amount);
438
439        let (input_notes, output_amounts) = self
440            .rebalance(dbtx, &self.cfg.fee_consensus, input_amount - output_amount)
441            .await;
442
443        for note in &input_notes {
444            self.remove_spendable_note(dbtx, note).await;
445        }
446
447        input_amount += input_notes.iter().map(SpendableNote::amount).sum();
448
449        output_amount += input_notes
450            .iter()
451            .map(|note| self.cfg.fee_consensus.fee(note.amount()))
452            .sum();
453
454        output_amount += output_amounts
455            .iter()
456            .map(|denomination| {
457                denomination.amount() + self.cfg.fee_consensus.fee(denomination.amount())
458            })
459            .sum();
460
461        assert!(output_amount <= input_amount);
462
463        let mut spendable_notes = funding_notes
464            .into_iter()
465            .chain(input_notes)
466            .collect::<Vec<SpendableNote>>();
467
468        // We sort the notes by denomination to minimize the leaked information.
469        spendable_notes.sort_by_key(|note| note.denomination);
470
471        let input_bundle =
472            Self::create_input_bundle(operation_id, spendable_notes, false, self.cfg.amount_unit);
473
474        let mut denominations = represent_amount_with_fees(
475            input_amount.saturating_sub(output_amount),
476            &self.cfg.fee_consensus,
477        )
478        .into_iter()
479        .chain(output_amounts)
480        .collect::<Vec<Denomination>>();
481
482        // We sort the amounts to minimize the leaked information.
483        denominations.sort();
484
485        let output_bundle = self.create_output_bundle(operation_id, denominations).await;
486
487        let sender = self.balance_update_sender.clone();
488        dbtx.on_commit(move || sender.send_replace(()));
489
490        Ok((input_bundle, output_bundle))
491    }
492
493    async fn await_primary_module_output(
494        &self,
495        operation_id: OperationId,
496        outpoint: OutPoint,
497    ) -> anyhow::Result<()> {
498        self.await_output_sm_success(operation_id, outpoint).await
499    }
500
501    async fn get_balance(&self, dbtx: &mut DatabaseTransaction<'_>, unit: AmountUnit) -> Amount {
502        if unit != self.cfg.amount_unit {
503            return Amount::ZERO;
504        }
505
506        self.get_count_by_denomination_dbtx(dbtx)
507            .await
508            .into_iter()
509            .map(|(denomination, count)| denomination.amount().mul_u64(count))
510            .sum()
511    }
512
513    async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
514        Box::pin(tokio_stream::wrappers::WatchStream::new(
515            self.balance_update_sender.subscribe(),
516        ))
517    }
518}
519
520impl MintClientModule {
521    async fn select_funding_input(
522        &self,
523        dbtx: &mut DatabaseTransaction<'_>,
524        mut excess_output: Amount,
525    ) -> Option<Vec<SpendableNote>> {
526        let mut selected_notes = Vec::new();
527        let mut target_notes = Vec::new();
528        let mut excess_notes = Vec::new();
529
530        for amount in client_denominations().rev() {
531            let notes_amount = dbtx
532                .find_by_prefix(&SpendableNoteAmountPrefix(amount))
533                .await
534                .map(|entry| entry.0.0)
535                .collect::<Vec<SpendableNote>>()
536                .await;
537
538            target_notes.extend(notes_amount.iter().take(TARGET_PER_DENOMINATION).cloned());
539
540            if notes_amount.len() > 2 * TARGET_PER_DENOMINATION {
541                for note in notes_amount.into_iter().skip(TARGET_PER_DENOMINATION) {
542                    let note_fee = self.cfg.fee_consensus.fee(note.amount());
543
544                    let note_value = note
545                        .amount()
546                        .checked_sub(note_fee)
547                        .expect("All our notes are economical");
548
549                    excess_output = excess_output.saturating_sub(note_value);
550
551                    selected_notes.push(note);
552                }
553            } else {
554                excess_notes.extend(notes_amount.into_iter().skip(TARGET_PER_DENOMINATION));
555            }
556        }
557
558        if excess_output == Amount::ZERO {
559            return Some(selected_notes);
560        }
561
562        for note in excess_notes.into_iter().chain(target_notes) {
563            let note_amount = note.amount();
564            let note_value = note_amount
565                .checked_sub(self.cfg.fee_consensus.fee(note_amount))
566                .expect("All our notes are economical");
567
568            excess_output = excess_output.saturating_sub(note_value);
569
570            selected_notes.push(note);
571
572            if excess_output == Amount::ZERO {
573                return Some(selected_notes);
574            }
575        }
576
577        None
578    }
579
580    async fn rebalance(
581        &self,
582        dbtx: &mut DatabaseTransaction<'_>,
583        fee: &FeeConsensus,
584        mut excess_input: Amount,
585    ) -> (Vec<SpendableNote>, Vec<Denomination>) {
586        let n_denominations = self.get_count_by_denomination_dbtx(dbtx).await;
587
588        let mut notes = dbtx
589            .find_by_prefix_sorted_descending(&SpendableNotePrefix)
590            .await
591            .map(|entry| entry.0.0)
592            .fuse();
593
594        let mut input_notes = Vec::new();
595        let mut output_denominations = Vec::new();
596
597        for d in client_denominations() {
598            let n_denomination = n_denominations.get(&d).copied().unwrap_or(0);
599
600            let n_missing = TARGET_PER_DENOMINATION.saturating_sub(n_denomination as usize);
601
602            for _ in 0..n_missing {
603                match excess_input.checked_sub(d.amount() + fee.fee(d.amount())) {
604                    Some(remaining_excess) => excess_input = remaining_excess,
605                    None => match notes.next().await {
606                        Some(note) => {
607                            if note.amount() <= d.amount() + fee.fee(d.amount()) {
608                                break;
609                            }
610
611                            excess_input += note.amount() - (d.amount() + fee.fee(d.amount()));
612
613                            input_notes.push(note);
614                        }
615                        None => break,
616                    },
617                }
618
619                output_denominations.push(d);
620            }
621        }
622
623        (input_notes, output_denominations)
624    }
625
626    fn create_input_bundle(
627        operation_id: OperationId,
628        notes: Vec<SpendableNote>,
629        include_receive_sm: bool,
630        amount_unit: AmountUnit,
631    ) -> ClientInputBundle<MintInput, MintClientStateMachines> {
632        let inputs = notes
633            .iter()
634            .map(|spendable_note| ClientInput {
635                input: MintInput::new_v0(spendable_note.note()),
636                keys: vec![spendable_note.keypair],
637                amounts: Amounts::new_custom(amount_unit, spendable_note.amount()),
638            })
639            .collect();
640
641        let input_sms = vec![ClientInputSM {
642            state_machines: Arc::new(move |range: OutPointRange| {
643                let mut sms = vec![MintClientStateMachines::Input(InputStateMachine {
644                    common: InputSMCommon {
645                        operation_id,
646                        txid: range.txid(),
647                        spendable_notes: notes.clone(),
648                    },
649                    state: InputSMState::Pending,
650                })];
651
652                if include_receive_sm {
653                    sms.push(MintClientStateMachines::Receive(ReceiveStateMachine {
654                        common: crate::receive::ReceiveSMCommon {
655                            operation_id,
656                            txid: range.txid(),
657                        },
658                        state: crate::receive::ReceiveSMState::Pending,
659                    }));
660                }
661
662                sms
663            }),
664        }];
665
666        ClientInputBundle::new(inputs, input_sms)
667    }
668
669    async fn create_output_bundle(
670        &self,
671        operation_id: OperationId,
672        requested_denominations: Vec<Denomination>,
673    ) -> ClientOutputBundle<MintOutput, MintClientStateMachines> {
674        let issuance_requests = futures::stream::iter(requested_denominations)
675            .zip(self.tweak_receiver.clone())
676            .map(|(d, tweak)| NoteIssuanceRequest::new(d, tweak, &self.root_secret))
677            .collect::<Vec<NoteIssuanceRequest>>()
678            .await;
679
680        let amount_unit = self.cfg.amount_unit;
681        let outputs = issuance_requests
682            .iter()
683            .map(|request| ClientOutput {
684                output: request.output(),
685                amounts: Amounts::new_custom(amount_unit, request.denomination.amount()),
686            })
687            .collect();
688
689        let output_sms = vec![ClientOutputSM {
690            state_machines: Arc::new(move |range: OutPointRange| {
691                vec![MintClientStateMachines::Output(MintOutputStateMachine {
692                    common: OutputSMCommon {
693                        operation_id,
694                        range: Some(range),
695                        issuance_requests: issuance_requests.clone(),
696                    },
697                    state: OutputSMState::Pending,
698                })]
699            }),
700        }];
701
702        ClientOutputBundle::new(outputs, output_sms)
703    }
704
705    async fn await_output_sm_success(
706        &self,
707        operation_id: OperationId,
708        outpoint: OutPoint,
709    ) -> anyhow::Result<()> {
710        let stream = self
711            .notifier
712            .subscribe(operation_id)
713            .await
714            .filter_map(|state| async {
715                let MintClientStateMachines::Output(state) = state else {
716                    return None;
717                };
718
719                if !state.common.range?.into_iter().contains(&outpoint) {
720                    return None;
721                }
722
723                match state.state {
724                    OutputSMState::Pending => None,
725                    OutputSMState::Success => Some(Ok(())),
726                    OutputSMState::Aborted => Some(Err(anyhow!("Transaction was rejected"))),
727                    OutputSMState::Failure => Some(Err(anyhow!("Failed to finalize notes",))),
728                }
729            });
730
731        pin_mut!(stream);
732
733        stream.next_or_pending().await
734    }
735
736    /// Count the `ECash` notes in the client's database by denomination.
737    pub async fn get_count_by_denomination(&self) -> BTreeMap<Denomination, u64> {
738        self.get_count_by_denomination_dbtx(
739            &mut self.client_ctx.module_db().begin_transaction_nc().await,
740        )
741        .await
742    }
743
744    async fn get_count_by_denomination_dbtx(
745        &self,
746        dbtx: &mut DatabaseTransaction<'_>,
747    ) -> BTreeMap<Denomination, u64> {
748        dbtx.find_by_prefix(&SpendableNotePrefix)
749            .await
750            .fold(BTreeMap::new(), |mut acc, entry| async move {
751                acc.entry(entry.0.0.denomination)
752                    .and_modify(|count| *count += 1)
753                    .or_insert(1);
754
755                acc
756            })
757            .await
758    }
759
760    /// Send `ECash` for the given amount. The
761    /// amount will be rounded up to a multiple of 512 msats which is the
762    /// smallest denomination used throughout the client. If the rounded
763    /// amount cannot be covered with the ecash notes in the client's
764    /// database the client will create a transaction to reissue the
765    /// required denominations. It is safe to cancel the send method call
766    /// before the reissue is complete in which case the reissued notes are
767    /// returned to the regular balance. To cancel a successful ecash send
768    /// simply receive it yourself.
769    ///
770    /// If `include_invite` is set, the federation's invite code is embedded in
771    /// the returned ecash so a recipient that has not joined the federation can
772    /// do so directly from the received ecash.
773    pub async fn send(
774        &self,
775        amount: Amount,
776        custom_meta: Value,
777        include_invite: bool,
778    ) -> Result<ECash, SendECashError> {
779        let amount = round_to_multiple(amount, client_denominations().next().unwrap().amount());
780
781        if let Some(ecash) = self
782            .client_ctx
783            .module_db()
784            .autocommit(
785                |dbtx, _| {
786                    Box::pin(self.send_ecash_dbtx(
787                        dbtx,
788                        amount,
789                        custom_meta.clone(),
790                        include_invite,
791                    ))
792                },
793                Some(100),
794            )
795            .await
796            .expect("Failed to commit dbtx after 100 retries")
797        {
798            return Ok(ecash);
799        }
800
801        self.client_ctx
802            .global_api()
803            .session_count()
804            .await
805            .map_err(|_| SendECashError::Offline)?;
806
807        let operation_id = OperationId::new_random();
808
809        let output = self
810            .create_output_bundle(operation_id, represent_amount(amount))
811            .await;
812        let output = self.client_ctx.make_client_outputs(output);
813        let cm = custom_meta.clone();
814
815        let range = self
816            .client_ctx
817            .finalize_and_submit_transaction(
818                operation_id,
819                MintCommonInit::KIND.as_str(),
820                move |change_outpoint_range| MintOperationMeta::Reissue {
821                    change_outpoint_range,
822                    amount,
823                    custom_meta: cm.clone(),
824                },
825                TransactionBuilder::new().with_outputs(output),
826            )
827            .await
828            .map_err(|_| SendECashError::InsufficientBalance)?;
829
830        for outpoint in range {
831            self.await_output_sm_success(operation_id, outpoint)
832                .await
833                .map_err(|_| SendECashError::Failure)?;
834        }
835
836        Box::pin(self.send(amount, custom_meta, include_invite)).await
837    }
838
839    async fn send_ecash_dbtx(
840        &self,
841        dbtx: &mut DatabaseTransaction<'_>,
842        mut remaining_amount: Amount,
843        custom_meta: Value,
844        include_invite: bool,
845    ) -> Result<Option<ECash>, Infallible> {
846        let mut stream = dbtx
847            .find_by_prefix_sorted_descending(&SpendableNotePrefix)
848            .await
849            .map(|entry| entry.0.0);
850
851        let mut notes = vec![];
852
853        while let Some(spendable_note) = stream.next().await {
854            remaining_amount = match remaining_amount.checked_sub(spendable_note.amount()) {
855                Some(amount) => amount,
856                None => continue,
857            };
858
859            notes.push(spendable_note);
860        }
861
862        drop(stream);
863
864        if remaining_amount != Amount::ZERO {
865            return Ok(None);
866        }
867
868        for spendable_note in &notes {
869            self.remove_spendable_note(dbtx, spendable_note).await;
870        }
871
872        let ecash = if include_invite {
873            let invite = self.client_ctx.get_invite_code().await;
874            ECash::new_with_invite(notes, &invite)
875        } else {
876            ECash::new(self.federation_id, notes)
877        };
878        let amount = ecash.amount();
879        let operation_id = OperationId::new_random();
880
881        self.client_ctx
882            .add_operation_log_entry_dbtx(
883                dbtx,
884                operation_id,
885                MintCommonInit::KIND.as_str(),
886                MintOperationMeta::Send {
887                    ecash: base32::encode_prefixed(FEDIMINT_PREFIX, &ecash),
888                    custom_meta,
889                },
890            )
891            .await;
892
893        self.client_ctx
894            .log_event(
895                dbtx,
896                SendPaymentEvent {
897                    operation_id,
898                    amount,
899                    ecash: base32::encode_prefixed(FEDIMINT_PREFIX, &ecash),
900                },
901            )
902            .await;
903
904        let sender = self.balance_update_sender.clone();
905        dbtx.on_commit(move || sender.send_replace(()));
906
907        Ok(Some(ecash))
908    }
909
910    /// Receive the `ECash` by reissuing the notes and return the total amount
911    /// of the ecash reissued. This method is idempotent.
912    pub async fn receive(
913        &self,
914        ecash: ECash,
915        custom_meta: Value,
916    ) -> Result<OperationId, ReceiveECashError> {
917        let operation_id = OperationId::from_encodable(&ecash);
918
919        if self.client_ctx.operation_exists(operation_id).await {
920            return Ok(operation_id);
921        }
922
923        if ecash.mint() != Some(self.federation_id) {
924            return Err(ReceiveECashError::WrongFederation);
925        }
926
927        if ecash
928            .notes()
929            .iter()
930            .any(|note| note.amount() <= self.cfg.fee_consensus.base_fee())
931        {
932            return Err(ReceiveECashError::UneconomicalDenomination);
933        }
934
935        let input =
936            Self::create_input_bundle(operation_id, ecash.notes(), true, self.cfg.amount_unit);
937        let input = self.client_ctx.make_client_inputs(input);
938        let ec = base32::encode_prefixed(FEDIMINT_PREFIX, &ecash);
939
940        self.client_ctx
941            .finalize_and_submit_transaction(
942                operation_id,
943                MintCommonInit::KIND.as_str(),
944                move |change_outpoint_range| MintOperationMeta::Receive {
945                    change_outpoint_range,
946                    ecash: ec.clone(),
947                    custom_meta: custom_meta.clone(),
948                },
949                TransactionBuilder::new().with_inputs(input),
950            )
951            .await
952            .map_err(|_| ReceiveECashError::InsufficientFunds)?;
953
954        let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
955
956        self.client_ctx
957            .log_event(
958                &mut dbtx,
959                ReceivePaymentEvent {
960                    operation_id,
961                    amount: ecash.amount(),
962                },
963            )
964            .await;
965
966        dbtx.commit_tx().await;
967
968        Ok(operation_id)
969    }
970
971    /// Await the final state of the receive operation.
972    pub async fn await_final_receive_operation_state(
973        &self,
974        operation_id: OperationId,
975    ) -> anyhow::Result<FinalReceiveOperationState> {
976        let operation = self.client_ctx.get_operation(operation_id).await?;
977        let mut stream = self.notifier.subscribe(operation_id).await;
978
979        let mut stream = self
980            .client_ctx
981            .outcome_or_updates(operation, operation_id, move || {
982                async_stream::stream! {
983                    loop {
984                        if let Some(MintClientStateMachines::Receive(state)) = stream.next().await {
985                            match state.state {
986                                ReceiveSMState::Pending => {}
987                                ReceiveSMState::Success => {
988                                    yield FinalReceiveOperationState::Success;
989                                    return;
990                                }
991                                ReceiveSMState::Rejected(..) => {
992                                    yield FinalReceiveOperationState::Rejected;
993                                    return;
994                                }
995                            }
996                        }
997                    }
998                }
999            })
1000            .into_stream();
1001
1002        let mut final_state = None;
1003
1004        while let Some(state) = stream.next().await {
1005            final_state = Some(state);
1006        }
1007
1008        Ok(final_state.expect("Stream contains one final state"))
1009    }
1010
1011    async fn remove_spendable_note(
1012        &self,
1013        dbtx: &mut DatabaseTransaction<'_>,
1014        spendable_note: &SpendableNote,
1015    ) {
1016        dbtx.remove_entry(&SpendableNoteKey(spendable_note.clone()))
1017            .await
1018            .expect("Must delete existing spendable note");
1019    }
1020}
1021
1022#[derive(Clone)]
1023struct PeerSelector {
1024    latency: Arc<RwLock<BTreeMap<PeerId, Duration>>>,
1025}
1026
1027impl PeerSelector {
1028    fn new(peers: BTreeSet<PeerId>) -> Self {
1029        let latency = peers
1030            .into_iter()
1031            .map(|peer| (peer, Duration::ZERO))
1032            .collect();
1033
1034        Self {
1035            latency: Arc::new(RwLock::new(latency)),
1036        }
1037    }
1038
1039    /// Pick 2 peers at random, return the one with lower latency
1040    fn choose_peer(&self) -> PeerId {
1041        let latency = self.latency.read().unwrap();
1042
1043        let peer_a = latency.iter().choose(&mut thread_rng()).unwrap();
1044        let peer_b = latency.iter().choose(&mut thread_rng()).unwrap();
1045
1046        if peer_a.1 <= peer_b.1 {
1047            *peer_a.0
1048        } else {
1049            *peer_b.0
1050        }
1051    }
1052
1053    // Update with exponential moving average (α = 0.1)
1054    fn report(&self, peer: PeerId, duration: Duration) {
1055        self.latency
1056            .write()
1057            .unwrap()
1058            .entry(peer)
1059            .and_modify(|latency| *latency = *latency * 9 / 10 + duration * 1 / 10)
1060            .or_insert(duration);
1061    }
1062
1063    fn remove(&self, peer: PeerId) {
1064        self.latency.write().unwrap().remove(&peer);
1065    }
1066}
1067
1068/// Download a slice with hash verification and peer selection
1069async fn download_slice_with_hash(
1070    module_api: DynModuleApi,
1071    peer_selector: PeerSelector,
1072    start: u64,
1073    end: u64,
1074    expected_hash: sha256::Hash,
1075) -> Vec<RecoveryItem> {
1076    const TIMEOUT: Duration = Duration::from_secs(30);
1077
1078    loop {
1079        let peer = peer_selector.choose_peer();
1080        let start_time = fedimint_core::time::now();
1081
1082        if let Ok(data) = module_api
1083            .fetch_recovery_slice(peer, TIMEOUT, start, end)
1084            .await
1085        {
1086            let elapsed = fedimint_core::time::now()
1087                .duration_since(start_time)
1088                .unwrap_or_default();
1089
1090            peer_selector.report(peer, elapsed);
1091
1092            if data.consensus_hash::<sha256::Hash>() == expected_hash {
1093                return data;
1094            }
1095
1096            peer_selector.remove(peer);
1097        } else {
1098            peer_selector.report(peer, TIMEOUT);
1099        }
1100    }
1101}
1102
1103#[derive(Error, Debug, Clone, Eq, PartialEq)]
1104pub enum SendECashError {
1105    #[error("We need to reissue notes but the client is offline")]
1106    Offline,
1107    #[error("The clients balance is insufficient")]
1108    InsufficientBalance,
1109    #[error("A non-recoverable error has occurred")]
1110    Failure,
1111}
1112
1113#[derive(Error, Debug, Clone, Eq, PartialEq)]
1114pub enum ReceiveECashError {
1115    #[error("The ECash is from a different federation")]
1116    WrongFederation,
1117    #[error("ECash contains an uneconomical denomination")]
1118    UneconomicalDenomination,
1119    #[error("Receiving ecash requires additional funds")]
1120    InsufficientFunds,
1121}
1122
1123#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
1124pub enum FinalReceiveOperationState {
1125    // The ecash notes have been reissued
1126    Success,
1127    // The ecash notes were already spent
1128    Rejected,
1129}
1130
1131#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
1132pub enum MintClientStateMachines {
1133    Input(InputStateMachine),
1134    Output(MintOutputStateMachine),
1135    Receive(ReceiveStateMachine),
1136}
1137
1138impl IntoDynInstance for MintClientStateMachines {
1139    type DynType = DynState;
1140
1141    fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
1142        DynState::from_typed(instance_id, self)
1143    }
1144}
1145
1146impl State for MintClientStateMachines {
1147    type ModuleContext = MintClientContext;
1148
1149    fn transitions(
1150        &self,
1151        context: &Self::ModuleContext,
1152        global_context: &DynGlobalClientContext,
1153    ) -> Vec<StateTransition<Self>> {
1154        match self {
1155            MintClientStateMachines::Input(redemption_state) => {
1156                sm_enum_variant_translation!(
1157                    redemption_state.transitions(context, global_context),
1158                    MintClientStateMachines::Input
1159                )
1160            }
1161            MintClientStateMachines::Output(issuance_state) => {
1162                sm_enum_variant_translation!(
1163                    issuance_state.transitions(context, global_context),
1164                    MintClientStateMachines::Output
1165                )
1166            }
1167            MintClientStateMachines::Receive(receive_state) => {
1168                sm_enum_variant_translation!(
1169                    receive_state.transitions(context, global_context),
1170                    MintClientStateMachines::Receive
1171                )
1172            }
1173        }
1174    }
1175
1176    fn operation_id(&self) -> OperationId {
1177        match self {
1178            MintClientStateMachines::Input(redemption_state) => redemption_state.operation_id(),
1179            MintClientStateMachines::Output(issuance_state) => issuance_state.operation_id(),
1180            MintClientStateMachines::Receive(receive_state) => receive_state.operation_id(),
1181        }
1182    }
1183}
1184
1185fn round_to_multiple(amount: Amount, min_denomiation: Amount) -> Amount {
1186    Amount::from_msats(amount.msats.next_multiple_of(min_denomiation.msats))
1187}
1188
1189fn represent_amount_with_fees(
1190    mut remaining_amount: Amount,
1191    fee_consensus: &FeeConsensus,
1192) -> Vec<Denomination> {
1193    let mut denominations = Vec::new();
1194
1195    // Add denominations with a greedy algorithm
1196    for denomination in client_denominations().rev() {
1197        let n_add =
1198            remaining_amount / (denomination.amount() + fee_consensus.fee(denomination.amount()));
1199
1200        denominations.extend(std::iter::repeat_n(denomination, n_add as usize));
1201
1202        remaining_amount -=
1203            n_add * (denomination.amount() + fee_consensus.fee(denomination.amount()));
1204    }
1205
1206    // We sort the notes by amount to minimize the leaked information.
1207    denominations.sort();
1208
1209    denominations
1210}
1211
1212fn represent_amount(mut remaining_amount: Amount) -> Vec<Denomination> {
1213    let mut denominations = Vec::new();
1214
1215    // Add denominations with a greedy algorithm
1216    for denomination in client_denominations().rev() {
1217        let n_add = remaining_amount / denomination.amount();
1218
1219        denominations.extend(std::iter::repeat_n(denomination, n_add as usize));
1220
1221        remaining_amount -= n_add * denomination.amount();
1222    }
1223
1224    denominations
1225}