Skip to main content

fedimint_mintv2_client/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::missing_errors_doc)]
4#![allow(clippy::missing_panics_doc)]
5#![allow(clippy::module_name_repetitions)]
6#![allow(clippy::must_use_candidate)]
7#![allow(clippy::return_self_not_must_use)]
8#![allow(clippy::too_many_lines)]
9
10pub use fedimint_mintv2_common as common;
11
12mod api;
13#[cfg(feature = "cli")]
14mod cli;
15mod client_db;
16mod ecash;
17mod events;
18mod input;
19pub mod issuance;
20mod output;
21mod receive;
22
23use std::collections::{BTreeMap, BTreeSet};
24use std::convert::Infallible;
25use std::sync::{Arc, RwLock};
26use std::time::Duration;
27
28use anyhow::{Context as _, anyhow};
29use bitcoin_hashes::sha256;
30use client_db::{RecoveryState, RecoveryStateKey, SpendableNoteAmountPrefix, SpendableNotePrefix};
31pub use events::*;
32use fedimint_api_client::api::DynModuleApi;
33use fedimint_client::module::ClientModule;
34use fedimint_client::transaction::{
35    ClientInput, ClientInputBundle, ClientInputSM, ClientOutput, ClientOutputBundle,
36    ClientOutputSM, FeeQuote, FeeQuoteRequest, TransactionBuilder,
37};
38use fedimint_client_module::db::ClientModuleMigrationFn;
39use fedimint_client_module::module::init::{
40    ClientModuleInit, ClientModuleInitArgs, ClientModuleRecoverArgs,
41};
42use fedimint_client_module::module::recovery::{NoModuleBackup, RecoveryProgress};
43use fedimint_client_module::module::{
44    ClientContext, OutPointRange, PrimaryModulePriority, PrimaryModuleSupport,
45};
46use fedimint_client_module::sm::{Context, DynState, ModuleNotifier, State, StateTransition};
47use fedimint_client_module::{DynGlobalClientContext, sm_enum_variant_translation};
48use fedimint_core::base32::{self, FEDIMINT_PREFIX};
49use fedimint_core::config::FederationId;
50use fedimint_core::core::{IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId};
51use fedimint_core::db::{DatabaseTransaction, DatabaseVersion, IDatabaseTransactionOpsCoreTyped};
52use fedimint_core::encoding::{Decodable, Encodable};
53use fedimint_core::module::{
54    AmountUnit, Amounts, ApiVersion, CommonModuleInit, ModuleCommon, ModuleInit, MultiApiVersion,
55};
56use fedimint_core::secp256k1::rand::{Rng, thread_rng};
57use fedimint_core::secp256k1::{Keypair, PublicKey};
58use fedimint_core::util::{BoxStream, NextOrPending};
59use fedimint_core::{Amount, OutPoint, PeerId, apply, async_trait_maybe_send};
60use fedimint_derive_secret::DerivableSecret;
61use fedimint_mintv2_common::config::{FeeConsensus, MintClientConfig, client_denominations};
62use fedimint_mintv2_common::{
63    Denomination, KIND, MintCommonInit, MintInput, MintModuleTypes, MintOutput, Note, RecoveryItem,
64};
65use futures::{StreamExt, pin_mut};
66use itertools::Itertools;
67use rand::seq::IteratorRandom;
68use serde::{Deserialize, Serialize};
69use serde_json::Value;
70use tbs::AggregatePublicKey;
71use thiserror::Error;
72
73use crate::api::MintV2ModuleApi;
74use crate::client_db::SpendableNoteKey;
75pub use crate::ecash::ECash;
76use crate::input::{InputSMCommon, InputSMState, InputStateMachine};
77use crate::issuance::NoteIssuanceRequest;
78use crate::output::{MintOutputStateMachine, OutputSMCommon, OutputSMState};
79use crate::receive::{ReceiveSMState, ReceiveStateMachine};
80
81const TARGET_PER_DENOMINATION: usize = 3;
82const SLICE_SIZE: u64 = 10000;
83const PARALLEL_HASH_REQUESTS: usize = 10;
84const PARALLEL_SLICE_REQUESTS: usize = 10;
85
86#[derive(Debug, Clone, PartialEq, Eq, Hash, Encodable, Decodable)]
87pub struct SpendableNote {
88    pub denomination: Denomination,
89    pub keypair: Keypair,
90    pub signature: tbs::Signature,
91}
92
93impl SpendableNote {
94    pub fn amount(&self) -> Amount {
95        self.denomination.amount()
96    }
97}
98
99impl SpendableNote {
100    fn nonce(&self) -> PublicKey {
101        self.keypair.public_key()
102    }
103
104    fn note(&self) -> Note {
105        Note {
106            denomination: self.denomination,
107            nonce: self.nonce(),
108            signature: self.signature,
109        }
110    }
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
114pub enum MintOperationMeta {
115    Send {
116        ecash: String,
117        custom_meta: Value,
118    },
119    Reissue {
120        change_outpoint_range: OutPointRange,
121        amount: Amount,
122        custom_meta: Value,
123    },
124    Receive {
125        change_outpoint_range: OutPointRange,
126        ecash: String,
127        custom_meta: Value,
128    },
129}
130
131#[derive(Debug, Clone)]
132pub struct MintClientInit;
133
134impl ModuleInit for MintClientInit {
135    type Common = MintCommonInit;
136
137    async fn dump_database(
138        &self,
139        _dbtx: &mut DatabaseTransaction<'_>,
140        _prefix_names: Vec<String>,
141    ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
142        Box::new(BTreeMap::new().into_iter())
143    }
144}
145
146#[apply(async_trait_maybe_send!)]
147impl ClientModuleInit for MintClientInit {
148    type Module = MintClientModule;
149
150    fn supported_api_versions(&self) -> MultiApiVersion {
151        MultiApiVersion::try_from_iter([ApiVersion { major: 0, minor: 1 }])
152            .expect("no version conflicts")
153    }
154
155    async fn recover(
156        &self,
157        args: &ClientModuleRecoverArgs<Self>,
158        _snapshot: Option<&NoModuleBackup>,
159    ) -> anyhow::Result<()> {
160        let mut state = if let Some(state) = args
161            .db()
162            .begin_transaction_nc()
163            .await
164            .get_value(&RecoveryStateKey)
165            .await
166        {
167            state
168        } else {
169            RecoveryState {
170                next_index: 0,
171                total_items: args.module_api().fetch_recovery_count().await?,
172                requests: BTreeMap::new(),
173                nonces: BTreeSet::new(),
174            }
175        };
176
177        if state.next_index == state.total_items {
178            return Ok(());
179        }
180
181        let peer_selector = PeerSelector::new(args.api().all_peers().clone());
182
183        let mut recovery_stream = futures::stream::iter(
184            (state.next_index..state.total_items).step_by(SLICE_SIZE as usize),
185        )
186        .map(|start| {
187            let api = args.module_api().clone();
188            let end = std::cmp::min(start + SLICE_SIZE, state.total_items);
189
190            async move { (start, end, api.fetch_recovery_slice_hash(start, end).await) }
191        })
192        .buffered(PARALLEL_HASH_REQUESTS)
193        .map(|(start, end, hash)| {
194            download_slice_with_hash(
195                args.module_api().clone(),
196                peer_selector.clone(),
197                start,
198                end,
199                hash,
200            )
201        })
202        .buffered(PARALLEL_SLICE_REQUESTS);
203
204        let tweak_filter = issuance::tweak_filter(args.module_root_secret());
205
206        loop {
207            let items = recovery_stream
208                .next()
209                .await
210                .context("Recovery stream finished before recovery is complete")?;
211
212            for item in &items {
213                match item {
214                    RecoveryItem::Output {
215                        denomination,
216                        nonce_hash,
217                        tweak,
218                    } => {
219                        if !issuance::check_tweak(*tweak, tweak_filter) {
220                            continue;
221                        }
222                        let output_secret = issuance::output_secret(
223                            *denomination,
224                            *tweak,
225                            args.module_root_secret(),
226                        );
227
228                        if !issuance::check_nonce(&output_secret, *nonce_hash) {
229                            continue;
230                        }
231
232                        let computed_nonce_hash = issuance::nonce(&output_secret).consensus_hash();
233
234                        // Ignore possible duplicate nonces
235                        if !state.nonces.insert(computed_nonce_hash) {
236                            continue;
237                        }
238
239                        state.requests.insert(
240                            computed_nonce_hash,
241                            NoteIssuanceRequest::new(
242                                *denomination,
243                                *tweak,
244                                args.module_root_secret(),
245                            ),
246                        );
247                    }
248                    RecoveryItem::Input { nonce_hash } => {
249                        state.requests.remove(nonce_hash);
250                        state.nonces.remove(nonce_hash);
251                    }
252                }
253            }
254
255            state.next_index += items.len() as u64;
256
257            let mut dbtx = args.db().begin_transaction().await;
258
259            dbtx.insert_entry(&RecoveryStateKey, &state).await;
260
261            if state.next_index == state.total_items {
262                let state_machines = args
263                    .context()
264                    .map_dyn(vec![MintClientStateMachines::Output(
265                        MintOutputStateMachine {
266                            common: OutputSMCommon {
267                                operation_id: OperationId::new_random(),
268                                range: None,
269                                issuance_requests: state.requests.into_values().collect(),
270                            },
271                            state: OutputSMState::Pending,
272                        },
273                    )])
274                    .collect();
275
276                args.context()
277                    .add_state_machines_dbtx(&mut dbtx.to_ref_nc(), state_machines)
278                    .await
279                    .expect("state machine is valid");
280
281                dbtx.commit_tx().await;
282
283                return Ok(());
284            }
285
286            dbtx.commit_tx().await;
287
288            args.update_recovery_progress(RecoveryProgress {
289                complete: state.next_index.try_into().unwrap_or(u32::MAX),
290                total: state.total_items.try_into().unwrap_or(u32::MAX),
291            });
292        }
293    }
294
295    async fn init(&self, args: &ClientModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
296        let (tweak_sender, tweak_receiver) = async_channel::bounded(50);
297
298        let filter = issuance::tweak_filter(args.module_root_secret());
299
300        // Only ~1/65536 random tweaks pass the filter, so grinding is
301        // CPU-bound. Pre-grind in the background and feed tweaks through a
302        // channel to keep the cost off the spend path. `send` only suspends
303        // once the channel is full, so we yield explicitly on each found tweak
304        // to keep single-threaded (wasm) runtimes responsive while the channel
305        // still has space.
306        fedimint_core::task::spawn("mintv2-tweak-grinder", async move {
307            loop {
308                let tweak: [u8; 16] = thread_rng().r#gen();
309
310                if !issuance::check_tweak(tweak, filter) {
311                    continue;
312                }
313
314                if tweak_sender.send(tweak).await.is_err() {
315                    return;
316                }
317
318                fedimint_core::task::sleep(Duration::ZERO).await;
319            }
320        });
321
322        Ok(MintClientModule {
323            federation_id: *args.federation_id(),
324            cfg: args.cfg().clone(),
325            root_secret: args.module_root_secret().clone(),
326            notifier: args.notifier().clone(),
327            client_ctx: args.context(),
328            balance_update_sender: tokio::sync::watch::channel(()).0,
329            tweak_receiver,
330        })
331    }
332
333    fn get_database_migrations(&self) -> BTreeMap<DatabaseVersion, ClientModuleMigrationFn> {
334        BTreeMap::new()
335    }
336}
337
338#[derive(Debug)]
339pub struct MintClientModule {
340    federation_id: FederationId,
341    cfg: MintClientConfig,
342    root_secret: DerivableSecret,
343    notifier: ModuleNotifier<MintClientStateMachines>,
344    client_ctx: ClientContext<Self>,
345    balance_update_sender: tokio::sync::watch::Sender<()>,
346    tweak_receiver: async_channel::Receiver<[u8; 16]>,
347}
348
349#[derive(Debug, Clone)]
350pub struct MintClientContext {
351    client_ctx: ClientContext<MintClientModule>,
352    tbs_agg_pks: BTreeMap<Denomination, AggregatePublicKey>,
353    tbs_pks: BTreeMap<Denomination, BTreeMap<PeerId, tbs::PublicKeyShare>>,
354    pub balance_update_sender: tokio::sync::watch::Sender<()>,
355}
356
357impl Context for MintClientContext {
358    const KIND: Option<ModuleKind> = Some(KIND);
359}
360
361#[apply(async_trait_maybe_send!)]
362impl ClientModule for MintClientModule {
363    type Init = MintClientInit;
364    type Common = MintModuleTypes;
365    type Backup = NoModuleBackup;
366    type ModuleStateMachineContext = MintClientContext;
367    type States = MintClientStateMachines;
368
369    fn context(&self) -> Self::ModuleStateMachineContext {
370        MintClientContext {
371            client_ctx: self.client_ctx.clone(),
372            tbs_agg_pks: self.cfg.tbs_agg_pks.clone(),
373            tbs_pks: self.cfg.tbs_pks.clone(),
374            balance_update_sender: self.balance_update_sender.clone(),
375        }
376    }
377
378    fn input_fee(
379        &self,
380        amounts: &Amounts,
381        _input: &<Self::Common as ModuleCommon>::Input,
382    ) -> Option<Amounts> {
383        let unit = self.cfg.amount_unit;
384        let amount = amounts.get(&unit).copied().unwrap_or_default();
385        let fee = self.cfg.fee_consensus.fee(amount);
386
387        Some(Amounts::new_custom(unit, fee))
388    }
389
390    fn output_fee(
391        &self,
392        amounts: &Amounts,
393        _output: &<Self::Common as ModuleCommon>::Output,
394    ) -> Option<Amounts> {
395        let unit = self.cfg.amount_unit;
396        let amount = amounts.get(&unit).copied().unwrap_or_default();
397        let fee = self.cfg.fee_consensus.fee(amount);
398
399        Some(Amounts::new_custom(unit, fee))
400    }
401
402    #[cfg(feature = "cli")]
403    async fn handle_cli_command(
404        &self,
405        args: &[std::ffi::OsString],
406    ) -> anyhow::Result<serde_json::Value> {
407        cli::handle_cli_command(self, args).await
408    }
409
410    fn supports_being_primary(&self) -> PrimaryModuleSupport {
411        PrimaryModuleSupport::selected(PrimaryModulePriority::HIGH, [self.cfg.amount_unit])
412    }
413
414    async fn create_final_inputs_and_outputs(
415        &self,
416        dbtx: &mut DatabaseTransaction<'_>,
417        operation_id: OperationId,
418        unit: AmountUnit,
419        mut input_amount: Amount,
420        mut output_amount: Amount,
421    ) -> anyhow::Result<(
422        ClientInputBundle<MintInput, MintClientStateMachines>,
423        ClientOutputBundle<MintOutput, MintClientStateMachines>,
424    )> {
425        if unit != self.cfg.amount_unit {
426            anyhow::bail!("Module can only handle its configured amount unit");
427        }
428
429        let funding_notes = self
430            .select_funding_input(dbtx, output_amount.saturating_sub(input_amount))
431            .await
432            .context("Insufficient funds")?;
433
434        for note in &funding_notes {
435            self.remove_spendable_note(dbtx, note).await;
436        }
437
438        input_amount += funding_notes.iter().map(SpendableNote::amount).sum();
439
440        output_amount += funding_notes
441            .iter()
442            .map(|input| self.cfg.fee_consensus.fee(input.amount()))
443            .sum();
444
445        assert!(output_amount <= input_amount);
446
447        let (input_notes, output_amounts) = self
448            .rebalance(dbtx, &self.cfg.fee_consensus, input_amount - output_amount)
449            .await;
450
451        for note in &input_notes {
452            self.remove_spendable_note(dbtx, note).await;
453        }
454
455        input_amount += input_notes.iter().map(SpendableNote::amount).sum();
456
457        output_amount += input_notes
458            .iter()
459            .map(|note| self.cfg.fee_consensus.fee(note.amount()))
460            .sum();
461
462        output_amount += output_amounts
463            .iter()
464            .map(|denomination| {
465                denomination.amount() + self.cfg.fee_consensus.fee(denomination.amount())
466            })
467            .sum();
468
469        assert!(output_amount <= input_amount);
470
471        let mut spendable_notes = funding_notes
472            .into_iter()
473            .chain(input_notes)
474            .collect::<Vec<SpendableNote>>();
475
476        // We sort the notes by denomination to minimize the leaked information.
477        spendable_notes.sort_by_key(|note| note.denomination);
478
479        let input_bundle =
480            Self::create_input_bundle(operation_id, spendable_notes, false, self.cfg.amount_unit);
481
482        let mut denominations = represent_amount_with_fees(
483            input_amount.saturating_sub(output_amount),
484            &self.cfg.fee_consensus,
485        )
486        .into_iter()
487        .chain(output_amounts)
488        .collect::<Vec<Denomination>>();
489
490        // We sort the amounts to minimize the leaked information.
491        denominations.sort();
492
493        let output_bundle = self.create_output_bundle(operation_id, denominations).await;
494
495        let sender = self.balance_update_sender.clone();
496        dbtx.on_commit(move || sender.send_replace(()));
497
498        Ok((input_bundle, output_bundle))
499    }
500
501    async fn await_primary_module_output(
502        &self,
503        operation_id: OperationId,
504        outpoint: OutPoint,
505    ) -> anyhow::Result<()> {
506        self.await_output_sm_success(operation_id, outpoint).await
507    }
508
509    async fn get_balance(&self, dbtx: &mut DatabaseTransaction<'_>, unit: AmountUnit) -> Amount {
510        if unit != self.cfg.amount_unit {
511            return Amount::ZERO;
512        }
513
514        self.get_count_by_denomination_dbtx(dbtx)
515            .await
516            .into_iter()
517            .map(|(denomination, count)| denomination.amount().mul_u64(count))
518            .sum()
519    }
520
521    async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
522        Box::pin(tokio_stream::wrappers::WatchStream::new(
523            self.balance_update_sender.subscribe(),
524        ))
525    }
526}
527
528impl MintClientModule {
529    async fn select_funding_input(
530        &self,
531        dbtx: &mut DatabaseTransaction<'_>,
532        mut excess_output: Amount,
533    ) -> Option<Vec<SpendableNote>> {
534        let mut selected_notes = Vec::new();
535        let mut target_notes = Vec::new();
536        let mut excess_notes = Vec::new();
537
538        for amount in client_denominations().rev() {
539            let notes_amount = dbtx
540                .find_by_prefix(&SpendableNoteAmountPrefix(amount))
541                .await
542                .map(|entry| entry.0.0)
543                .collect::<Vec<SpendableNote>>()
544                .await;
545
546            target_notes.extend(notes_amount.iter().take(TARGET_PER_DENOMINATION).cloned());
547
548            if notes_amount.len() > 2 * TARGET_PER_DENOMINATION {
549                for note in notes_amount.into_iter().skip(TARGET_PER_DENOMINATION) {
550                    let note_fee = self.cfg.fee_consensus.fee(note.amount());
551
552                    let note_value = note
553                        .amount()
554                        .checked_sub(note_fee)
555                        .expect("All our notes are economical");
556
557                    excess_output = excess_output.saturating_sub(note_value);
558
559                    selected_notes.push(note);
560                }
561            } else {
562                excess_notes.extend(notes_amount.into_iter().skip(TARGET_PER_DENOMINATION));
563            }
564        }
565
566        if excess_output == Amount::ZERO {
567            return Some(selected_notes);
568        }
569
570        for note in excess_notes.into_iter().chain(target_notes) {
571            let note_amount = note.amount();
572            let note_value = note_amount
573                .checked_sub(self.cfg.fee_consensus.fee(note_amount))
574                .expect("All our notes are economical");
575
576            excess_output = excess_output.saturating_sub(note_value);
577
578            selected_notes.push(note);
579
580            if excess_output == Amount::ZERO {
581                return Some(selected_notes);
582            }
583        }
584
585        None
586    }
587
588    async fn rebalance(
589        &self,
590        dbtx: &mut DatabaseTransaction<'_>,
591        fee: &FeeConsensus,
592        mut excess_input: Amount,
593    ) -> (Vec<SpendableNote>, Vec<Denomination>) {
594        let n_denominations = self.get_count_by_denomination_dbtx(dbtx).await;
595
596        let mut notes = dbtx
597            .find_by_prefix_sorted_descending(&SpendableNotePrefix)
598            .await
599            .map(|entry| entry.0.0)
600            .fuse();
601
602        let mut input_notes = Vec::new();
603        let mut output_denominations = Vec::new();
604
605        for d in client_denominations() {
606            let n_denomination = n_denominations.get(&d).copied().unwrap_or(0);
607
608            let n_missing = TARGET_PER_DENOMINATION.saturating_sub(n_denomination as usize);
609
610            for _ in 0..n_missing {
611                match excess_input.checked_sub(d.amount() + fee.fee(d.amount())) {
612                    Some(remaining_excess) => excess_input = remaining_excess,
613                    None => match notes.next().await {
614                        Some(note) => {
615                            if note.amount() <= d.amount() + fee.fee(d.amount()) {
616                                break;
617                            }
618
619                            excess_input += note.amount() - (d.amount() + fee.fee(d.amount()));
620
621                            input_notes.push(note);
622                        }
623                        None => break,
624                    },
625                }
626
627                output_denominations.push(d);
628            }
629        }
630
631        (input_notes, output_denominations)
632    }
633
634    fn create_input_bundle(
635        operation_id: OperationId,
636        notes: Vec<SpendableNote>,
637        include_receive_sm: bool,
638        amount_unit: AmountUnit,
639    ) -> ClientInputBundle<MintInput, MintClientStateMachines> {
640        let inputs = notes
641            .iter()
642            .map(|spendable_note| ClientInput {
643                input: MintInput::new_v0(spendable_note.note()),
644                keys: vec![spendable_note.keypair],
645                amounts: Amounts::new_custom(amount_unit, spendable_note.amount()),
646            })
647            .collect();
648
649        let input_sms = vec![ClientInputSM {
650            state_machines: Arc::new(move |range: OutPointRange| {
651                let mut sms = vec![MintClientStateMachines::Input(InputStateMachine {
652                    common: InputSMCommon {
653                        operation_id,
654                        txid: range.txid(),
655                        spendable_notes: notes.clone(),
656                    },
657                    state: InputSMState::Pending,
658                })];
659
660                if include_receive_sm {
661                    sms.push(MintClientStateMachines::Receive(ReceiveStateMachine {
662                        common: crate::receive::ReceiveSMCommon {
663                            operation_id,
664                            txid: range.txid(),
665                        },
666                        state: crate::receive::ReceiveSMState::Pending,
667                    }));
668                }
669
670                sms
671            }),
672        }];
673
674        ClientInputBundle::new(inputs, input_sms)
675    }
676
677    async fn create_output_bundle(
678        &self,
679        operation_id: OperationId,
680        requested_denominations: Vec<Denomination>,
681    ) -> ClientOutputBundle<MintOutput, MintClientStateMachines> {
682        let issuance_requests = futures::stream::iter(requested_denominations)
683            .zip(self.tweak_receiver.clone())
684            .map(|(d, tweak)| NoteIssuanceRequest::new(d, tweak, &self.root_secret))
685            .collect::<Vec<NoteIssuanceRequest>>()
686            .await;
687
688        let amount_unit = self.cfg.amount_unit;
689        let outputs = issuance_requests
690            .iter()
691            .map(|request| ClientOutput {
692                output: request.output(),
693                amounts: Amounts::new_custom(amount_unit, request.denomination.amount()),
694            })
695            .collect();
696
697        let output_sms = vec![ClientOutputSM {
698            state_machines: Arc::new(move |range: OutPointRange| {
699                vec![MintClientStateMachines::Output(MintOutputStateMachine {
700                    common: OutputSMCommon {
701                        operation_id,
702                        range: Some(range),
703                        issuance_requests: issuance_requests.clone(),
704                    },
705                    state: OutputSMState::Pending,
706                })]
707            }),
708        }];
709
710        ClientOutputBundle::new(outputs, output_sms)
711    }
712
713    async fn await_output_sm_success(
714        &self,
715        operation_id: OperationId,
716        outpoint: OutPoint,
717    ) -> anyhow::Result<()> {
718        let stream = self
719            .notifier
720            .subscribe(operation_id)
721            .await
722            .filter_map(|state| async {
723                let MintClientStateMachines::Output(state) = state else {
724                    return None;
725                };
726
727                if !state.common.range?.into_iter().contains(&outpoint) {
728                    return None;
729                }
730
731                match state.state {
732                    OutputSMState::Pending => None,
733                    OutputSMState::Success => Some(Ok(())),
734                    OutputSMState::Aborted => Some(Err(anyhow!("Transaction was rejected"))),
735                    OutputSMState::Failure => Some(Err(anyhow!("Failed to finalize notes",))),
736                }
737            });
738
739        pin_mut!(stream);
740
741        stream.next_or_pending().await
742    }
743
744    /// Count the `ECash` notes in the client's database by denomination.
745    pub async fn get_count_by_denomination(&self) -> BTreeMap<Denomination, u64> {
746        self.get_count_by_denomination_dbtx(
747            &mut self.client_ctx.module_db().begin_transaction_nc().await,
748        )
749        .await
750    }
751
752    async fn get_count_by_denomination_dbtx(
753        &self,
754        dbtx: &mut DatabaseTransaction<'_>,
755    ) -> BTreeMap<Denomination, u64> {
756        dbtx.find_by_prefix(&SpendableNotePrefix)
757            .await
758            .fold(BTreeMap::new(), |mut acc, entry| async move {
759                acc.entry(entry.0.0.denomination)
760                    .and_modify(|count| *count += 1)
761                    .or_insert(1);
762
763                acc
764            })
765            .await
766    }
767
768    /// Send `ECash` for the given amount and return the send operation ID. The
769    /// amount will be rounded up to a multiple of 512 msats which is the
770    /// smallest denomination used throughout the client. If the rounded
771    /// amount cannot be covered with the ecash notes in the client's
772    /// database the client will create a transaction to reissue the
773    /// required denominations. It is safe to cancel the send method call
774    /// before the reissue is complete in which case the reissued notes are
775    /// returned to the regular balance. To cancel a successful ecash send
776    /// simply receive it yourself.
777    ///
778    /// If `include_invite` is set, the federation's invite code is embedded in
779    /// the returned ecash so a recipient that has not joined the federation can
780    /// do so directly from the received ecash.
781    pub async fn send(
782        &self,
783        amount: Amount,
784        custom_meta: Value,
785        include_invite: bool,
786    ) -> Result<(OperationId, ECash), SendECashError> {
787        let amount = round_to_multiple(amount, client_denominations().next().unwrap().amount());
788
789        if let Some((operation_id, ecash)) = self
790            .client_ctx
791            .module_db()
792            .autocommit(
793                |dbtx, _| {
794                    Box::pin(self.send_ecash_dbtx(
795                        dbtx,
796                        amount,
797                        custom_meta.clone(),
798                        include_invite,
799                    ))
800                },
801                Some(100),
802            )
803            .await
804            .expect("Failed to commit dbtx after 100 retries")
805        {
806            return Ok((operation_id, ecash));
807        }
808
809        self.client_ctx
810            .global_api()
811            .session_count()
812            .await
813            .map_err(|_| SendECashError::Offline)?;
814
815        let operation_id = OperationId::new_random();
816
817        let output = self
818            .create_output_bundle(operation_id, represent_amount(amount))
819            .await;
820        let output = self.client_ctx.make_client_outputs(output);
821        let cm = custom_meta.clone();
822
823        let range = self
824            .client_ctx
825            .finalize_and_submit_transaction(
826                operation_id,
827                MintCommonInit::KIND.as_str(),
828                move |change_outpoint_range| MintOperationMeta::Reissue {
829                    change_outpoint_range,
830                    amount,
831                    custom_meta: cm.clone(),
832                },
833                TransactionBuilder::new().with_outputs(output),
834            )
835            .await
836            .map_err(|_| SendECashError::InsufficientBalance)?;
837
838        for outpoint in range {
839            self.await_output_sm_success(operation_id, outpoint)
840                .await
841                .map_err(|_| SendECashError::Failure)?;
842        }
843
844        Box::pin(self.send(amount, custom_meta, include_invite)).await
845    }
846
847    async fn send_ecash_dbtx(
848        &self,
849        dbtx: &mut DatabaseTransaction<'_>,
850        remaining_amount: Amount,
851        custom_meta: Value,
852        include_invite: bool,
853    ) -> Result<Option<(OperationId, ECash)>, Infallible> {
854        let Some(notes) = Self::select_exact_change(&mut dbtx.to_ref_nc(), remaining_amount).await
855        else {
856            return Ok(None);
857        };
858
859        for spendable_note in &notes {
860            self.remove_spendable_note(dbtx, spendable_note).await;
861        }
862
863        let ecash = if include_invite {
864            let invite = self.client_ctx.get_invite_code().await;
865            ECash::new_with_invite(notes, &invite)
866        } else {
867            ECash::new(self.federation_id, notes)
868        };
869        let amount = ecash.amount();
870        let operation_id = OperationId::new_random();
871
872        self.client_ctx
873            .add_operation_log_entry_dbtx(
874                dbtx,
875                operation_id,
876                MintCommonInit::KIND.as_str(),
877                MintOperationMeta::Send {
878                    ecash: base32::encode_prefixed(FEDIMINT_PREFIX, &ecash),
879                    custom_meta,
880                },
881            )
882            .await;
883
884        self.client_ctx
885            .log_event(
886                dbtx,
887                SendPaymentEvent {
888                    operation_id,
889                    amount,
890                    ecash: base32::encode_prefixed(FEDIMINT_PREFIX, &ecash),
891                },
892            )
893            .await;
894
895        let sender = self.balance_update_sender.clone();
896        dbtx.on_commit(move || sender.send_replace(()));
897
898        Ok(Some((operation_id, ecash)))
899    }
900
901    /// Receive the `ECash` by reissuing the notes and return the total amount
902    /// of the ecash reissued. This method is idempotent.
903    pub async fn receive(
904        &self,
905        ecash: ECash,
906        custom_meta: Value,
907    ) -> Result<OperationId, ReceiveECashError> {
908        let operation_id = OperationId::from_encodable(&ecash);
909
910        if self.client_ctx.operation_exists(operation_id).await {
911            return Ok(operation_id);
912        }
913
914        if ecash.mint() != Some(self.federation_id) {
915            return Err(ReceiveECashError::WrongFederation);
916        }
917
918        if ecash
919            .notes()
920            .iter()
921            .any(|note| note.amount() <= self.cfg.fee_consensus.base_fee())
922        {
923            return Err(ReceiveECashError::UneconomicalDenomination);
924        }
925
926        let input =
927            Self::create_input_bundle(operation_id, ecash.notes(), true, self.cfg.amount_unit);
928        let input = self.client_ctx.make_client_inputs(input);
929        let ec = base32::encode_prefixed(FEDIMINT_PREFIX, &ecash);
930
931        self.client_ctx
932            .finalize_and_submit_transaction(
933                operation_id,
934                MintCommonInit::KIND.as_str(),
935                move |change_outpoint_range| MintOperationMeta::Receive {
936                    change_outpoint_range,
937                    ecash: ec.clone(),
938                    custom_meta: custom_meta.clone(),
939                },
940                TransactionBuilder::new().with_inputs(input),
941            )
942            .await
943            .map_err(|_| ReceiveECashError::InsufficientFunds)?;
944
945        let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
946
947        self.client_ctx
948            .log_event(
949                &mut dbtx,
950                ReceivePaymentEvent {
951                    operation_id,
952                    amount: ecash.amount(),
953                },
954            )
955            .await;
956
957        dbtx.commit_tx().await;
958
959        Ok(operation_id)
960    }
961
962    /// Computes the exact fee a `receive(ecash)` would incur given the client's
963    /// current note inventory, without submitting anything.
964    ///
965    /// This runs the same change generation the real receive does
966    /// (`create_final_inputs_and_outputs`, including funding selection and
967    /// rebalancing) against a non-committable transaction that is dropped
968    /// rather than committed, so the client's notes are read but left
969    /// untouched. The quote is point-in-time: it depends on the current
970    /// inventory and can move as notes change.
971    pub async fn receive_fee_quote(&self, ecash: &ECash) -> anyhow::Result<FeeQuote> {
972        // A receive submits the ecash notes as explicit inputs and no explicit
973        // outputs; the shared, module-agnostic fee quote runs the primary-module
974        // balancing (rebalancing + minting change) over the real inventory.
975        let notes = ecash.notes();
976        let input_amount: Amount = notes.iter().map(SpendableNote::amount).sum();
977        let input_fee: Amount = notes
978            .iter()
979            .map(|note| self.cfg.fee_consensus.fee(note.amount()))
980            .sum();
981
982        self.client_ctx
983            .fee_quote(
984                OperationId::new_random(),
985                FeeQuoteRequest {
986                    input_amount: Amounts::new_custom(self.cfg.amount_unit, input_amount),
987                    output_amount: Amounts::ZERO,
988                    input_fee: Amounts::new_custom(self.cfg.amount_unit, input_fee),
989                    output_fee: Amounts::ZERO,
990                },
991            )
992            .await
993    }
994
995    /// Computes the fee a `send(amount)` would incur given the client's current
996    /// note inventory, without sending anything.
997    ///
998    /// A send is free when the client's existing notes can cover the (rounded)
999    /// amount exactly — it just hands those notes out. Otherwise the send first
1000    /// reissues itself the right denominations, and that self-reissue
1001    /// transaction is the only thing a send ever pays a fee for. This quote
1002    /// mirrors that: it returns [`FeeQuote::ZERO`] when exact change is
1003    /// available, and otherwise quotes the reissue the same way the real send
1004    /// submits it (explicit outputs `represent_amount(amount)`, no explicit
1005    /// inputs) via the shared, module-agnostic fee quote over the real
1006    /// inventory. The quote is point-in-time: it depends on the current
1007    /// inventory and can move as notes change.
1008    pub async fn send_fee_quote(&self, amount: Amount) -> anyhow::Result<FeeQuote> {
1009        let amount = round_to_multiple(amount, client_denominations().next().unwrap().amount());
1010
1011        // Exact-change path: handing out existing notes never costs a fee.
1012        if self.can_make_exact_change(amount).await {
1013            return Ok(FeeQuote::ZERO);
1014        }
1015
1016        // Reissue path: the send mints itself `represent_amount(amount)` as
1017        // explicit outputs (no explicit inputs) and the primary module funds and
1018        // balances it. Quote that exact transaction.
1019        let denominations = represent_amount(amount);
1020        let output_amount: Amount = denominations.iter().map(|d| d.amount()).sum();
1021        let output_fee: Amount = denominations
1022            .iter()
1023            .map(|d| self.cfg.fee_consensus.fee(d.amount()))
1024            .sum();
1025
1026        self.client_ctx
1027            .fee_quote(
1028                OperationId::new_random(),
1029                FeeQuoteRequest {
1030                    input_amount: Amounts::ZERO,
1031                    output_amount: Amounts::new_custom(self.cfg.amount_unit, output_amount),
1032                    input_fee: Amounts::ZERO,
1033                    output_fee: Amounts::new_custom(self.cfg.amount_unit, output_fee),
1034                },
1035            )
1036            .await
1037    }
1038
1039    /// Returns whether the client's current notes can be handed out to cover
1040    /// `amount` exactly — the free path in [`Self::send`] — without modifying
1041    /// the inventory. Shares its greedy selection with
1042    /// [`Self::send_ecash_dbtx`] via [`Self::select_exact_change`].
1043    async fn can_make_exact_change(&self, remaining_amount: Amount) -> bool {
1044        let mut dbtx = self.client_ctx.module_db().begin_transaction_nc().await;
1045
1046        Self::select_exact_change(&mut dbtx, remaining_amount)
1047            .await
1048            .is_some()
1049    }
1050
1051    /// Greedily selects spendable notes (largest-first) summing to *exactly*
1052    /// `remaining_amount`, or `None` if the inventory can't make exact change.
1053    /// Reads the DB but does not modify it. Single source of truth for the free
1054    /// (hand-out-existing-notes) selection shared by [`Self::send_ecash_dbtx`]
1055    /// and [`Self::can_make_exact_change`].
1056    async fn select_exact_change(
1057        dbtx: &mut DatabaseTransaction<'_>,
1058        mut remaining_amount: Amount,
1059    ) -> Option<Vec<SpendableNote>> {
1060        let mut stream = dbtx
1061            .find_by_prefix_sorted_descending(&SpendableNotePrefix)
1062            .await
1063            .map(|entry| entry.0.0);
1064
1065        let mut notes = vec![];
1066
1067        while let Some(spendable_note) = stream.next().await {
1068            remaining_amount = match remaining_amount.checked_sub(spendable_note.amount()) {
1069                Some(amount) => amount,
1070                None => continue,
1071            };
1072
1073            notes.push(spendable_note);
1074
1075            if remaining_amount == Amount::ZERO {
1076                break;
1077            }
1078        }
1079
1080        (remaining_amount == Amount::ZERO).then_some(notes)
1081    }
1082
1083    /// Await the final state of the receive operation.
1084    pub async fn await_final_receive_operation_state(
1085        &self,
1086        operation_id: OperationId,
1087    ) -> anyhow::Result<FinalReceiveOperationState> {
1088        let operation = self.client_ctx.get_operation(operation_id).await?;
1089        let mut stream = self.notifier.subscribe(operation_id).await;
1090
1091        let mut stream = self
1092            .client_ctx
1093            .outcome_or_updates(operation, operation_id, move || {
1094                async_stream::stream! {
1095                    loop {
1096                        if let Some(MintClientStateMachines::Receive(state)) = stream.next().await {
1097                            match state.state {
1098                                ReceiveSMState::Pending => {}
1099                                ReceiveSMState::Success => {
1100                                    yield FinalReceiveOperationState::Success;
1101                                    return;
1102                                }
1103                                ReceiveSMState::Rejected(..) => {
1104                                    yield FinalReceiveOperationState::Rejected;
1105                                    return;
1106                                }
1107                            }
1108                        }
1109                    }
1110                }
1111            })
1112            .into_stream();
1113
1114        let mut final_state = None;
1115
1116        while let Some(state) = stream.next().await {
1117            final_state = Some(state);
1118        }
1119
1120        Ok(final_state.expect("Stream contains one final state"))
1121    }
1122
1123    async fn remove_spendable_note(
1124        &self,
1125        dbtx: &mut DatabaseTransaction<'_>,
1126        spendable_note: &SpendableNote,
1127    ) {
1128        dbtx.remove_entry(&SpendableNoteKey(spendable_note.clone()))
1129            .await
1130            .expect("Must delete existing spendable note");
1131    }
1132}
1133
1134#[derive(Clone)]
1135struct PeerSelector {
1136    latency: Arc<RwLock<BTreeMap<PeerId, Duration>>>,
1137}
1138
1139impl PeerSelector {
1140    fn new(peers: BTreeSet<PeerId>) -> Self {
1141        let latency = peers
1142            .into_iter()
1143            .map(|peer| (peer, Duration::ZERO))
1144            .collect();
1145
1146        Self {
1147            latency: Arc::new(RwLock::new(latency)),
1148        }
1149    }
1150
1151    /// Pick 2 peers at random, return the one with lower latency
1152    fn choose_peer(&self) -> PeerId {
1153        let latency = self.latency.read().unwrap();
1154
1155        let peer_a = latency.iter().choose(&mut thread_rng()).unwrap();
1156        let peer_b = latency.iter().choose(&mut thread_rng()).unwrap();
1157
1158        if peer_a.1 <= peer_b.1 {
1159            *peer_a.0
1160        } else {
1161            *peer_b.0
1162        }
1163    }
1164
1165    // Update with exponential moving average (α = 0.1)
1166    fn report(&self, peer: PeerId, duration: Duration) {
1167        self.latency
1168            .write()
1169            .unwrap()
1170            .entry(peer)
1171            .and_modify(|latency| *latency = *latency * 9 / 10 + duration * 1 / 10)
1172            .or_insert(duration);
1173    }
1174
1175    fn remove(&self, peer: PeerId) {
1176        self.latency.write().unwrap().remove(&peer);
1177    }
1178}
1179
1180/// Download a slice with hash verification and peer selection
1181async fn download_slice_with_hash(
1182    module_api: DynModuleApi,
1183    peer_selector: PeerSelector,
1184    start: u64,
1185    end: u64,
1186    expected_hash: sha256::Hash,
1187) -> Vec<RecoveryItem> {
1188    const TIMEOUT: Duration = Duration::from_secs(30);
1189
1190    loop {
1191        let peer = peer_selector.choose_peer();
1192        let start_time = fedimint_core::time::now();
1193
1194        if let Ok(data) = module_api
1195            .fetch_recovery_slice(peer, TIMEOUT, start, end)
1196            .await
1197        {
1198            let elapsed = fedimint_core::time::now()
1199                .duration_since(start_time)
1200                .unwrap_or_default();
1201
1202            peer_selector.report(peer, elapsed);
1203
1204            if data.consensus_hash::<sha256::Hash>() == expected_hash {
1205                return data;
1206            }
1207
1208            peer_selector.remove(peer);
1209        } else {
1210            peer_selector.report(peer, TIMEOUT);
1211        }
1212    }
1213}
1214
1215#[derive(Error, Debug, Clone, Eq, PartialEq)]
1216pub enum SendECashError {
1217    #[error("We need to reissue notes but the client is offline")]
1218    Offline,
1219    #[error("The clients balance is insufficient")]
1220    InsufficientBalance,
1221    #[error("A non-recoverable error has occurred")]
1222    Failure,
1223}
1224
1225#[derive(Error, Debug, Clone, Eq, PartialEq)]
1226pub enum ReceiveECashError {
1227    #[error("The ECash is from a different federation")]
1228    WrongFederation,
1229    #[error("ECash contains an uneconomical denomination")]
1230    UneconomicalDenomination,
1231    #[error("Receiving ecash requires additional funds")]
1232    InsufficientFunds,
1233}
1234
1235#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
1236pub enum FinalReceiveOperationState {
1237    // The ecash notes have been reissued
1238    Success,
1239    // The ecash notes were already spent
1240    Rejected,
1241}
1242
1243#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
1244pub enum MintClientStateMachines {
1245    Input(InputStateMachine),
1246    Output(MintOutputStateMachine),
1247    Receive(ReceiveStateMachine),
1248}
1249
1250impl IntoDynInstance for MintClientStateMachines {
1251    type DynType = DynState;
1252
1253    fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
1254        DynState::from_typed(instance_id, self)
1255    }
1256}
1257
1258impl State for MintClientStateMachines {
1259    type ModuleContext = MintClientContext;
1260
1261    fn transitions(
1262        &self,
1263        context: &Self::ModuleContext,
1264        global_context: &DynGlobalClientContext,
1265    ) -> Vec<StateTransition<Self>> {
1266        match self {
1267            MintClientStateMachines::Input(redemption_state) => {
1268                sm_enum_variant_translation!(
1269                    redemption_state.transitions(context, global_context),
1270                    MintClientStateMachines::Input
1271                )
1272            }
1273            MintClientStateMachines::Output(issuance_state) => {
1274                sm_enum_variant_translation!(
1275                    issuance_state.transitions(context, global_context),
1276                    MintClientStateMachines::Output
1277                )
1278            }
1279            MintClientStateMachines::Receive(receive_state) => {
1280                sm_enum_variant_translation!(
1281                    receive_state.transitions(context, global_context),
1282                    MintClientStateMachines::Receive
1283                )
1284            }
1285        }
1286    }
1287
1288    fn operation_id(&self) -> OperationId {
1289        match self {
1290            MintClientStateMachines::Input(redemption_state) => redemption_state.operation_id(),
1291            MintClientStateMachines::Output(issuance_state) => issuance_state.operation_id(),
1292            MintClientStateMachines::Receive(receive_state) => receive_state.operation_id(),
1293        }
1294    }
1295}
1296
1297fn round_to_multiple(amount: Amount, min_denomiation: Amount) -> Amount {
1298    Amount::from_msats(amount.msats.next_multiple_of(min_denomiation.msats))
1299}
1300
1301fn represent_amount_with_fees(
1302    mut remaining_amount: Amount,
1303    fee_consensus: &FeeConsensus,
1304) -> Vec<Denomination> {
1305    let mut denominations = Vec::new();
1306
1307    // Add denominations with a greedy algorithm
1308    for denomination in client_denominations().rev() {
1309        let n_add =
1310            remaining_amount / (denomination.amount() + fee_consensus.fee(denomination.amount()));
1311
1312        denominations.extend(std::iter::repeat_n(denomination, n_add as usize));
1313
1314        remaining_amount -=
1315            n_add * (denomination.amount() + fee_consensus.fee(denomination.amount()));
1316    }
1317
1318    // We sort the notes by amount to minimize the leaked information.
1319    denominations.sort();
1320
1321    denominations
1322}
1323
1324fn represent_amount(mut remaining_amount: Amount) -> Vec<Denomination> {
1325    let mut denominations = Vec::new();
1326
1327    // Add denominations with a greedy algorithm
1328    for denomination in client_denominations().rev() {
1329        let n_add = remaining_amount / denomination.amount();
1330
1331        denominations.extend(std::iter::repeat_n(denomination, n_add as usize));
1332
1333        remaining_amount -= n_add * denomination.amount();
1334    }
1335
1336    denominations
1337}