1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::missing_errors_doc)]
4#![allow(clippy::missing_panics_doc)]
5#![allow(clippy::module_name_repetitions)]
6#![allow(clippy::must_use_candidate)]
7#![allow(clippy::return_self_not_must_use)]
8#![allow(clippy::too_many_lines)]
9
10pub use fedimint_mintv2_common as common;
11
12mod api;
13#[cfg(feature = "cli")]
14mod cli;
15mod client_db;
16mod ecash;
17mod events;
18mod input;
19pub mod issuance;
20mod output;
21mod receive;
22
23use std::collections::{BTreeMap, BTreeSet};
24use std::convert::Infallible;
25use std::sync::{Arc, RwLock};
26use std::time::Duration;
27
28use anyhow::{Context as _, anyhow};
29use bitcoin_hashes::sha256;
30use client_db::{RecoveryState, RecoveryStateKey, SpendableNoteAmountPrefix, SpendableNotePrefix};
31pub use events::*;
32use fedimint_api_client::api::DynModuleApi;
33use fedimint_client::module::ClientModule;
34use fedimint_client::transaction::{
35 ClientInput, ClientInputBundle, ClientInputSM, ClientOutput, ClientOutputBundle,
36 ClientOutputSM, FeeQuote, FeeQuoteRequest, TransactionBuilder,
37};
38use fedimint_client_module::db::ClientModuleMigrationFn;
39use fedimint_client_module::module::init::{
40 ClientModuleInit, ClientModuleInitArgs, ClientModuleRecoverArgs,
41};
42use fedimint_client_module::module::recovery::{NoModuleBackup, RecoveryProgress};
43use fedimint_client_module::module::{
44 ClientContext, OutPointRange, PrimaryModulePriority, PrimaryModuleSupport,
45};
46use fedimint_client_module::sm::{Context, DynState, ModuleNotifier, State, StateTransition};
47use fedimint_client_module::{DynGlobalClientContext, sm_enum_variant_translation};
48use fedimint_core::base32::{self, FEDIMINT_PREFIX};
49use fedimint_core::config::FederationId;
50use fedimint_core::core::{IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId};
51use fedimint_core::db::{DatabaseTransaction, DatabaseVersion, IDatabaseTransactionOpsCoreTyped};
52use fedimint_core::encoding::{Decodable, Encodable};
53use fedimint_core::module::{
54 AmountUnit, Amounts, ApiVersion, CommonModuleInit, ModuleCommon, ModuleInit, MultiApiVersion,
55};
56use fedimint_core::secp256k1::rand::{Rng, thread_rng};
57use fedimint_core::secp256k1::{Keypair, PublicKey};
58use fedimint_core::util::{BoxStream, NextOrPending};
59use fedimint_core::{Amount, OutPoint, PeerId, apply, async_trait_maybe_send};
60use fedimint_derive_secret::DerivableSecret;
61use fedimint_mintv2_common::config::{FeeConsensus, MintClientConfig, client_denominations};
62use fedimint_mintv2_common::{
63 Denomination, KIND, MintCommonInit, MintInput, MintModuleTypes, MintOutput, Note, RecoveryItem,
64};
65use futures::{StreamExt, pin_mut};
66use itertools::Itertools;
67use rand::seq::IteratorRandom;
68use serde::{Deserialize, Serialize};
69use serde_json::Value;
70use tbs::AggregatePublicKey;
71use thiserror::Error;
72
73use crate::api::MintV2ModuleApi;
74use crate::client_db::SpendableNoteKey;
75pub use crate::ecash::ECash;
76use crate::input::{InputSMCommon, InputSMState, InputStateMachine};
77use crate::issuance::NoteIssuanceRequest;
78use crate::output::{MintOutputStateMachine, OutputSMCommon, OutputSMState};
79use crate::receive::{ReceiveSMState, ReceiveStateMachine};
80
81const TARGET_PER_DENOMINATION: usize = 3;
82const SLICE_SIZE: u64 = 10000;
83const PARALLEL_HASH_REQUESTS: usize = 10;
84const PARALLEL_SLICE_REQUESTS: usize = 10;
85
86#[derive(Debug, Clone, PartialEq, Eq, Hash, Encodable, Decodable)]
87pub struct SpendableNote {
88 pub denomination: Denomination,
89 pub keypair: Keypair,
90 pub signature: tbs::Signature,
91}
92
93impl SpendableNote {
94 pub fn amount(&self) -> Amount {
95 self.denomination.amount()
96 }
97}
98
99impl SpendableNote {
100 fn nonce(&self) -> PublicKey {
101 self.keypair.public_key()
102 }
103
104 fn note(&self) -> Note {
105 Note {
106 denomination: self.denomination,
107 nonce: self.nonce(),
108 signature: self.signature,
109 }
110 }
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
114pub enum MintOperationMeta {
115 Send {
116 ecash: String,
117 custom_meta: Value,
118 },
119 Reissue {
120 change_outpoint_range: OutPointRange,
121 amount: Amount,
122 custom_meta: Value,
123 },
124 Receive {
125 change_outpoint_range: OutPointRange,
126 ecash: String,
127 custom_meta: Value,
128 },
129}
130
131#[derive(Debug, Clone)]
132pub struct MintClientInit;
133
134impl ModuleInit for MintClientInit {
135 type Common = MintCommonInit;
136
137 async fn dump_database(
138 &self,
139 _dbtx: &mut DatabaseTransaction<'_>,
140 _prefix_names: Vec<String>,
141 ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
142 Box::new(BTreeMap::new().into_iter())
143 }
144}
145
146#[apply(async_trait_maybe_send!)]
147impl ClientModuleInit for MintClientInit {
148 type Module = MintClientModule;
149
150 fn supported_api_versions(&self) -> MultiApiVersion {
151 MultiApiVersion::try_from_iter([ApiVersion { major: 0, minor: 1 }])
152 .expect("no version conflicts")
153 }
154
155 async fn recover(
156 &self,
157 args: &ClientModuleRecoverArgs<Self>,
158 _snapshot: Option<&NoModuleBackup>,
159 ) -> anyhow::Result<()> {
160 let mut state = if let Some(state) = args
161 .db()
162 .begin_transaction_nc()
163 .await
164 .get_value(&RecoveryStateKey)
165 .await
166 {
167 state
168 } else {
169 RecoveryState {
170 next_index: 0,
171 total_items: args.module_api().fetch_recovery_count().await?,
172 requests: BTreeMap::new(),
173 nonces: BTreeSet::new(),
174 }
175 };
176
177 if state.next_index == state.total_items {
178 return Ok(());
179 }
180
181 let peer_selector = PeerSelector::new(args.api().all_peers().clone());
182
183 let mut recovery_stream = futures::stream::iter(
184 (state.next_index..state.total_items).step_by(SLICE_SIZE as usize),
185 )
186 .map(|start| {
187 let api = args.module_api().clone();
188 let end = std::cmp::min(start + SLICE_SIZE, state.total_items);
189
190 async move { (start, end, api.fetch_recovery_slice_hash(start, end).await) }
191 })
192 .buffered(PARALLEL_HASH_REQUESTS)
193 .map(|(start, end, hash)| {
194 download_slice_with_hash(
195 args.module_api().clone(),
196 peer_selector.clone(),
197 start,
198 end,
199 hash,
200 )
201 })
202 .buffered(PARALLEL_SLICE_REQUESTS);
203
204 let tweak_filter = issuance::tweak_filter(args.module_root_secret());
205
206 loop {
207 let items = recovery_stream
208 .next()
209 .await
210 .context("Recovery stream finished before recovery is complete")?;
211
212 for item in &items {
213 match item {
214 RecoveryItem::Output {
215 denomination,
216 nonce_hash,
217 tweak,
218 } => {
219 if !issuance::check_tweak(*tweak, tweak_filter) {
220 continue;
221 }
222 let output_secret = issuance::output_secret(
223 *denomination,
224 *tweak,
225 args.module_root_secret(),
226 );
227
228 if !issuance::check_nonce(&output_secret, *nonce_hash) {
229 continue;
230 }
231
232 let computed_nonce_hash = issuance::nonce(&output_secret).consensus_hash();
233
234 if !state.nonces.insert(computed_nonce_hash) {
236 continue;
237 }
238
239 state.requests.insert(
240 computed_nonce_hash,
241 NoteIssuanceRequest::new(
242 *denomination,
243 *tweak,
244 args.module_root_secret(),
245 ),
246 );
247 }
248 RecoveryItem::Input { nonce_hash } => {
249 state.requests.remove(nonce_hash);
250 state.nonces.remove(nonce_hash);
251 }
252 }
253 }
254
255 state.next_index += items.len() as u64;
256
257 let mut dbtx = args.db().begin_transaction().await;
258
259 dbtx.insert_entry(&RecoveryStateKey, &state).await;
260
261 if state.next_index == state.total_items {
262 let state_machines = args
263 .context()
264 .map_dyn(vec![MintClientStateMachines::Output(
265 MintOutputStateMachine {
266 common: OutputSMCommon {
267 operation_id: OperationId::new_random(),
268 range: None,
269 issuance_requests: state.requests.into_values().collect(),
270 },
271 state: OutputSMState::Pending,
272 },
273 )])
274 .collect();
275
276 args.context()
277 .add_state_machines_dbtx(&mut dbtx.to_ref_nc(), state_machines)
278 .await
279 .expect("state machine is valid");
280
281 dbtx.commit_tx().await;
282
283 return Ok(());
284 }
285
286 dbtx.commit_tx().await;
287
288 args.update_recovery_progress(RecoveryProgress {
289 complete: state.next_index.try_into().unwrap_or(u32::MAX),
290 total: state.total_items.try_into().unwrap_or(u32::MAX),
291 });
292 }
293 }
294
295 async fn init(&self, args: &ClientModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
296 let (tweak_sender, tweak_receiver) = async_channel::bounded(50);
297
298 let filter = issuance::tweak_filter(args.module_root_secret());
299
300 fedimint_core::task::spawn("mintv2-tweak-grinder", async move {
307 loop {
308 let tweak: [u8; 16] = thread_rng().r#gen();
309
310 if !issuance::check_tweak(tweak, filter) {
311 continue;
312 }
313
314 if tweak_sender.send(tweak).await.is_err() {
315 return;
316 }
317
318 fedimint_core::task::sleep(Duration::ZERO).await;
319 }
320 });
321
322 Ok(MintClientModule {
323 federation_id: *args.federation_id(),
324 cfg: args.cfg().clone(),
325 root_secret: args.module_root_secret().clone(),
326 notifier: args.notifier().clone(),
327 client_ctx: args.context(),
328 balance_update_sender: tokio::sync::watch::channel(()).0,
329 tweak_receiver,
330 })
331 }
332
333 fn get_database_migrations(&self) -> BTreeMap<DatabaseVersion, ClientModuleMigrationFn> {
334 BTreeMap::new()
335 }
336}
337
338#[derive(Debug)]
339pub struct MintClientModule {
340 federation_id: FederationId,
341 cfg: MintClientConfig,
342 root_secret: DerivableSecret,
343 notifier: ModuleNotifier<MintClientStateMachines>,
344 client_ctx: ClientContext<Self>,
345 balance_update_sender: tokio::sync::watch::Sender<()>,
346 tweak_receiver: async_channel::Receiver<[u8; 16]>,
347}
348
349#[derive(Debug, Clone)]
350pub struct MintClientContext {
351 client_ctx: ClientContext<MintClientModule>,
352 tbs_agg_pks: BTreeMap<Denomination, AggregatePublicKey>,
353 tbs_pks: BTreeMap<Denomination, BTreeMap<PeerId, tbs::PublicKeyShare>>,
354 pub balance_update_sender: tokio::sync::watch::Sender<()>,
355}
356
357impl Context for MintClientContext {
358 const KIND: Option<ModuleKind> = Some(KIND);
359}
360
361#[apply(async_trait_maybe_send!)]
362impl ClientModule for MintClientModule {
363 type Init = MintClientInit;
364 type Common = MintModuleTypes;
365 type Backup = NoModuleBackup;
366 type ModuleStateMachineContext = MintClientContext;
367 type States = MintClientStateMachines;
368
369 fn context(&self) -> Self::ModuleStateMachineContext {
370 MintClientContext {
371 client_ctx: self.client_ctx.clone(),
372 tbs_agg_pks: self.cfg.tbs_agg_pks.clone(),
373 tbs_pks: self.cfg.tbs_pks.clone(),
374 balance_update_sender: self.balance_update_sender.clone(),
375 }
376 }
377
378 fn input_fee(
379 &self,
380 amounts: &Amounts,
381 _input: &<Self::Common as ModuleCommon>::Input,
382 ) -> Option<Amounts> {
383 let unit = self.cfg.amount_unit;
384 let amount = amounts.get(&unit).copied().unwrap_or_default();
385 let fee = self.cfg.fee_consensus.fee(amount);
386
387 Some(Amounts::new_custom(unit, fee))
388 }
389
390 fn output_fee(
391 &self,
392 amounts: &Amounts,
393 _output: &<Self::Common as ModuleCommon>::Output,
394 ) -> Option<Amounts> {
395 let unit = self.cfg.amount_unit;
396 let amount = amounts.get(&unit).copied().unwrap_or_default();
397 let fee = self.cfg.fee_consensus.fee(amount);
398
399 Some(Amounts::new_custom(unit, fee))
400 }
401
402 #[cfg(feature = "cli")]
403 async fn handle_cli_command(
404 &self,
405 args: &[std::ffi::OsString],
406 ) -> anyhow::Result<serde_json::Value> {
407 cli::handle_cli_command(self, args).await
408 }
409
410 fn supports_being_primary(&self) -> PrimaryModuleSupport {
411 PrimaryModuleSupport::selected(PrimaryModulePriority::HIGH, [self.cfg.amount_unit])
412 }
413
414 async fn create_final_inputs_and_outputs(
415 &self,
416 dbtx: &mut DatabaseTransaction<'_>,
417 operation_id: OperationId,
418 unit: AmountUnit,
419 mut input_amount: Amount,
420 mut output_amount: Amount,
421 ) -> anyhow::Result<(
422 ClientInputBundle<MintInput, MintClientStateMachines>,
423 ClientOutputBundle<MintOutput, MintClientStateMachines>,
424 )> {
425 if unit != self.cfg.amount_unit {
426 anyhow::bail!("Module can only handle its configured amount unit");
427 }
428
429 let funding_notes = self
430 .select_funding_input(dbtx, output_amount.saturating_sub(input_amount))
431 .await
432 .context("Insufficient funds")?;
433
434 for note in &funding_notes {
435 self.remove_spendable_note(dbtx, note).await;
436 }
437
438 input_amount += funding_notes.iter().map(SpendableNote::amount).sum();
439
440 output_amount += funding_notes
441 .iter()
442 .map(|input| self.cfg.fee_consensus.fee(input.amount()))
443 .sum();
444
445 assert!(output_amount <= input_amount);
446
447 let (input_notes, output_amounts) = self
448 .rebalance(dbtx, &self.cfg.fee_consensus, input_amount - output_amount)
449 .await;
450
451 for note in &input_notes {
452 self.remove_spendable_note(dbtx, note).await;
453 }
454
455 input_amount += input_notes.iter().map(SpendableNote::amount).sum();
456
457 output_amount += input_notes
458 .iter()
459 .map(|note| self.cfg.fee_consensus.fee(note.amount()))
460 .sum();
461
462 output_amount += output_amounts
463 .iter()
464 .map(|denomination| {
465 denomination.amount() + self.cfg.fee_consensus.fee(denomination.amount())
466 })
467 .sum();
468
469 assert!(output_amount <= input_amount);
470
471 let mut spendable_notes = funding_notes
472 .into_iter()
473 .chain(input_notes)
474 .collect::<Vec<SpendableNote>>();
475
476 spendable_notes.sort_by_key(|note| note.denomination);
478
479 let input_bundle =
480 Self::create_input_bundle(operation_id, spendable_notes, false, self.cfg.amount_unit);
481
482 let mut denominations = represent_amount_with_fees(
483 input_amount.saturating_sub(output_amount),
484 &self.cfg.fee_consensus,
485 )
486 .into_iter()
487 .chain(output_amounts)
488 .collect::<Vec<Denomination>>();
489
490 denominations.sort();
492
493 let output_bundle = self.create_output_bundle(operation_id, denominations).await;
494
495 let sender = self.balance_update_sender.clone();
496 dbtx.on_commit(move || sender.send_replace(()));
497
498 Ok((input_bundle, output_bundle))
499 }
500
501 async fn await_primary_module_output(
502 &self,
503 operation_id: OperationId,
504 outpoint: OutPoint,
505 ) -> anyhow::Result<()> {
506 self.await_output_sm_success(operation_id, outpoint).await
507 }
508
509 async fn get_balance(&self, dbtx: &mut DatabaseTransaction<'_>, unit: AmountUnit) -> Amount {
510 if unit != self.cfg.amount_unit {
511 return Amount::ZERO;
512 }
513
514 self.get_count_by_denomination_dbtx(dbtx)
515 .await
516 .into_iter()
517 .map(|(denomination, count)| denomination.amount().mul_u64(count))
518 .sum()
519 }
520
521 async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
522 Box::pin(tokio_stream::wrappers::WatchStream::new(
523 self.balance_update_sender.subscribe(),
524 ))
525 }
526}
527
528impl MintClientModule {
529 async fn select_funding_input(
530 &self,
531 dbtx: &mut DatabaseTransaction<'_>,
532 mut excess_output: Amount,
533 ) -> Option<Vec<SpendableNote>> {
534 let mut selected_notes = Vec::new();
535 let mut target_notes = Vec::new();
536 let mut excess_notes = Vec::new();
537
538 for amount in client_denominations().rev() {
539 let notes_amount = dbtx
540 .find_by_prefix(&SpendableNoteAmountPrefix(amount))
541 .await
542 .map(|entry| entry.0.0)
543 .collect::<Vec<SpendableNote>>()
544 .await;
545
546 target_notes.extend(notes_amount.iter().take(TARGET_PER_DENOMINATION).cloned());
547
548 if notes_amount.len() > 2 * TARGET_PER_DENOMINATION {
549 for note in notes_amount.into_iter().skip(TARGET_PER_DENOMINATION) {
550 let note_fee = self.cfg.fee_consensus.fee(note.amount());
551
552 let note_value = note
553 .amount()
554 .checked_sub(note_fee)
555 .expect("All our notes are economical");
556
557 excess_output = excess_output.saturating_sub(note_value);
558
559 selected_notes.push(note);
560 }
561 } else {
562 excess_notes.extend(notes_amount.into_iter().skip(TARGET_PER_DENOMINATION));
563 }
564 }
565
566 if excess_output == Amount::ZERO {
567 return Some(selected_notes);
568 }
569
570 for note in excess_notes.into_iter().chain(target_notes) {
571 let note_amount = note.amount();
572 let note_value = note_amount
573 .checked_sub(self.cfg.fee_consensus.fee(note_amount))
574 .expect("All our notes are economical");
575
576 excess_output = excess_output.saturating_sub(note_value);
577
578 selected_notes.push(note);
579
580 if excess_output == Amount::ZERO {
581 return Some(selected_notes);
582 }
583 }
584
585 None
586 }
587
588 async fn rebalance(
589 &self,
590 dbtx: &mut DatabaseTransaction<'_>,
591 fee: &FeeConsensus,
592 mut excess_input: Amount,
593 ) -> (Vec<SpendableNote>, Vec<Denomination>) {
594 let n_denominations = self.get_count_by_denomination_dbtx(dbtx).await;
595
596 let mut notes = dbtx
597 .find_by_prefix_sorted_descending(&SpendableNotePrefix)
598 .await
599 .map(|entry| entry.0.0)
600 .fuse();
601
602 let mut input_notes = Vec::new();
603 let mut output_denominations = Vec::new();
604
605 for d in client_denominations() {
606 let n_denomination = n_denominations.get(&d).copied().unwrap_or(0);
607
608 let n_missing = TARGET_PER_DENOMINATION.saturating_sub(n_denomination as usize);
609
610 for _ in 0..n_missing {
611 match excess_input.checked_sub(d.amount() + fee.fee(d.amount())) {
612 Some(remaining_excess) => excess_input = remaining_excess,
613 None => match notes.next().await {
614 Some(note) => {
615 if note.amount() <= d.amount() + fee.fee(d.amount()) {
616 break;
617 }
618
619 excess_input += note.amount() - (d.amount() + fee.fee(d.amount()));
620
621 input_notes.push(note);
622 }
623 None => break,
624 },
625 }
626
627 output_denominations.push(d);
628 }
629 }
630
631 (input_notes, output_denominations)
632 }
633
634 fn create_input_bundle(
635 operation_id: OperationId,
636 notes: Vec<SpendableNote>,
637 include_receive_sm: bool,
638 amount_unit: AmountUnit,
639 ) -> ClientInputBundle<MintInput, MintClientStateMachines> {
640 let inputs = notes
641 .iter()
642 .map(|spendable_note| ClientInput {
643 input: MintInput::new_v0(spendable_note.note()),
644 keys: vec![spendable_note.keypair],
645 amounts: Amounts::new_custom(amount_unit, spendable_note.amount()),
646 })
647 .collect();
648
649 let input_sms = vec![ClientInputSM {
650 state_machines: Arc::new(move |range: OutPointRange| {
651 let mut sms = vec![MintClientStateMachines::Input(InputStateMachine {
652 common: InputSMCommon {
653 operation_id,
654 txid: range.txid(),
655 spendable_notes: notes.clone(),
656 },
657 state: InputSMState::Pending,
658 })];
659
660 if include_receive_sm {
661 sms.push(MintClientStateMachines::Receive(ReceiveStateMachine {
662 common: crate::receive::ReceiveSMCommon {
663 operation_id,
664 txid: range.txid(),
665 },
666 state: crate::receive::ReceiveSMState::Pending,
667 }));
668 }
669
670 sms
671 }),
672 }];
673
674 ClientInputBundle::new(inputs, input_sms)
675 }
676
677 async fn create_output_bundle(
678 &self,
679 operation_id: OperationId,
680 requested_denominations: Vec<Denomination>,
681 ) -> ClientOutputBundle<MintOutput, MintClientStateMachines> {
682 let issuance_requests = futures::stream::iter(requested_denominations)
683 .zip(self.tweak_receiver.clone())
684 .map(|(d, tweak)| NoteIssuanceRequest::new(d, tweak, &self.root_secret))
685 .collect::<Vec<NoteIssuanceRequest>>()
686 .await;
687
688 let amount_unit = self.cfg.amount_unit;
689 let outputs = issuance_requests
690 .iter()
691 .map(|request| ClientOutput {
692 output: request.output(),
693 amounts: Amounts::new_custom(amount_unit, request.denomination.amount()),
694 })
695 .collect();
696
697 let output_sms = vec![ClientOutputSM {
698 state_machines: Arc::new(move |range: OutPointRange| {
699 vec![MintClientStateMachines::Output(MintOutputStateMachine {
700 common: OutputSMCommon {
701 operation_id,
702 range: Some(range),
703 issuance_requests: issuance_requests.clone(),
704 },
705 state: OutputSMState::Pending,
706 })]
707 }),
708 }];
709
710 ClientOutputBundle::new(outputs, output_sms)
711 }
712
713 async fn await_output_sm_success(
714 &self,
715 operation_id: OperationId,
716 outpoint: OutPoint,
717 ) -> anyhow::Result<()> {
718 let stream = self
719 .notifier
720 .subscribe(operation_id)
721 .await
722 .filter_map(|state| async {
723 let MintClientStateMachines::Output(state) = state else {
724 return None;
725 };
726
727 if !state.common.range?.into_iter().contains(&outpoint) {
728 return None;
729 }
730
731 match state.state {
732 OutputSMState::Pending => None,
733 OutputSMState::Success => Some(Ok(())),
734 OutputSMState::Aborted => Some(Err(anyhow!("Transaction was rejected"))),
735 OutputSMState::Failure => Some(Err(anyhow!("Failed to finalize notes",))),
736 }
737 });
738
739 pin_mut!(stream);
740
741 stream.next_or_pending().await
742 }
743
744 pub async fn get_count_by_denomination(&self) -> BTreeMap<Denomination, u64> {
746 self.get_count_by_denomination_dbtx(
747 &mut self.client_ctx.module_db().begin_transaction_nc().await,
748 )
749 .await
750 }
751
752 async fn get_count_by_denomination_dbtx(
753 &self,
754 dbtx: &mut DatabaseTransaction<'_>,
755 ) -> BTreeMap<Denomination, u64> {
756 dbtx.find_by_prefix(&SpendableNotePrefix)
757 .await
758 .fold(BTreeMap::new(), |mut acc, entry| async move {
759 acc.entry(entry.0.0.denomination)
760 .and_modify(|count| *count += 1)
761 .or_insert(1);
762
763 acc
764 })
765 .await
766 }
767
768 pub async fn send(
782 &self,
783 amount: Amount,
784 custom_meta: Value,
785 include_invite: bool,
786 ) -> Result<(OperationId, ECash), SendECashError> {
787 let amount = round_to_multiple(amount, client_denominations().next().unwrap().amount());
788
789 if let Some((operation_id, ecash)) = self
790 .client_ctx
791 .module_db()
792 .autocommit(
793 |dbtx, _| {
794 Box::pin(self.send_ecash_dbtx(
795 dbtx,
796 amount,
797 custom_meta.clone(),
798 include_invite,
799 ))
800 },
801 Some(100),
802 )
803 .await
804 .expect("Failed to commit dbtx after 100 retries")
805 {
806 return Ok((operation_id, ecash));
807 }
808
809 self.client_ctx
810 .global_api()
811 .session_count()
812 .await
813 .map_err(|_| SendECashError::Offline)?;
814
815 let operation_id = OperationId::new_random();
816
817 let output = self
818 .create_output_bundle(operation_id, represent_amount(amount))
819 .await;
820 let output = self.client_ctx.make_client_outputs(output);
821 let cm = custom_meta.clone();
822
823 let range = self
824 .client_ctx
825 .finalize_and_submit_transaction(
826 operation_id,
827 MintCommonInit::KIND.as_str(),
828 move |change_outpoint_range| MintOperationMeta::Reissue {
829 change_outpoint_range,
830 amount,
831 custom_meta: cm.clone(),
832 },
833 TransactionBuilder::new().with_outputs(output),
834 )
835 .await
836 .map_err(|_| SendECashError::InsufficientBalance)?;
837
838 for outpoint in range {
839 self.await_output_sm_success(operation_id, outpoint)
840 .await
841 .map_err(|_| SendECashError::Failure)?;
842 }
843
844 Box::pin(self.send(amount, custom_meta, include_invite)).await
845 }
846
847 async fn send_ecash_dbtx(
848 &self,
849 dbtx: &mut DatabaseTransaction<'_>,
850 remaining_amount: Amount,
851 custom_meta: Value,
852 include_invite: bool,
853 ) -> Result<Option<(OperationId, ECash)>, Infallible> {
854 let Some(notes) = Self::select_exact_change(&mut dbtx.to_ref_nc(), remaining_amount).await
855 else {
856 return Ok(None);
857 };
858
859 for spendable_note in ¬es {
860 self.remove_spendable_note(dbtx, spendable_note).await;
861 }
862
863 let ecash = if include_invite {
864 let invite = self.client_ctx.get_invite_code().await;
865 ECash::new_with_invite(notes, &invite)
866 } else {
867 ECash::new(self.federation_id, notes)
868 };
869 let amount = ecash.amount();
870 let operation_id = OperationId::new_random();
871
872 self.client_ctx
873 .add_operation_log_entry_dbtx(
874 dbtx,
875 operation_id,
876 MintCommonInit::KIND.as_str(),
877 MintOperationMeta::Send {
878 ecash: base32::encode_prefixed(FEDIMINT_PREFIX, &ecash),
879 custom_meta,
880 },
881 )
882 .await;
883
884 self.client_ctx
885 .log_event(
886 dbtx,
887 SendPaymentEvent {
888 operation_id,
889 amount,
890 ecash: base32::encode_prefixed(FEDIMINT_PREFIX, &ecash),
891 },
892 )
893 .await;
894
895 let sender = self.balance_update_sender.clone();
896 dbtx.on_commit(move || sender.send_replace(()));
897
898 Ok(Some((operation_id, ecash)))
899 }
900
901 pub async fn receive(
904 &self,
905 ecash: ECash,
906 custom_meta: Value,
907 ) -> Result<OperationId, ReceiveECashError> {
908 let operation_id = OperationId::from_encodable(&ecash);
909
910 if self.client_ctx.operation_exists(operation_id).await {
911 return Ok(operation_id);
912 }
913
914 if ecash.mint() != Some(self.federation_id) {
915 return Err(ReceiveECashError::WrongFederation);
916 }
917
918 if ecash
919 .notes()
920 .iter()
921 .any(|note| note.amount() <= self.cfg.fee_consensus.base_fee())
922 {
923 return Err(ReceiveECashError::UneconomicalDenomination);
924 }
925
926 let input =
927 Self::create_input_bundle(operation_id, ecash.notes(), true, self.cfg.amount_unit);
928 let input = self.client_ctx.make_client_inputs(input);
929 let ec = base32::encode_prefixed(FEDIMINT_PREFIX, &ecash);
930
931 self.client_ctx
932 .finalize_and_submit_transaction(
933 operation_id,
934 MintCommonInit::KIND.as_str(),
935 move |change_outpoint_range| MintOperationMeta::Receive {
936 change_outpoint_range,
937 ecash: ec.clone(),
938 custom_meta: custom_meta.clone(),
939 },
940 TransactionBuilder::new().with_inputs(input),
941 )
942 .await
943 .map_err(|_| ReceiveECashError::InsufficientFunds)?;
944
945 let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
946
947 self.client_ctx
948 .log_event(
949 &mut dbtx,
950 ReceivePaymentEvent {
951 operation_id,
952 amount: ecash.amount(),
953 },
954 )
955 .await;
956
957 dbtx.commit_tx().await;
958
959 Ok(operation_id)
960 }
961
962 pub async fn receive_fee_quote(&self, ecash: &ECash) -> anyhow::Result<FeeQuote> {
972 let notes = ecash.notes();
976 let input_amount: Amount = notes.iter().map(SpendableNote::amount).sum();
977 let input_fee: Amount = notes
978 .iter()
979 .map(|note| self.cfg.fee_consensus.fee(note.amount()))
980 .sum();
981
982 self.client_ctx
983 .fee_quote(
984 OperationId::new_random(),
985 FeeQuoteRequest {
986 input_amount: Amounts::new_custom(self.cfg.amount_unit, input_amount),
987 output_amount: Amounts::ZERO,
988 input_fee: Amounts::new_custom(self.cfg.amount_unit, input_fee),
989 output_fee: Amounts::ZERO,
990 },
991 )
992 .await
993 }
994
995 pub async fn send_fee_quote(&self, amount: Amount) -> anyhow::Result<FeeQuote> {
1009 let amount = round_to_multiple(amount, client_denominations().next().unwrap().amount());
1010
1011 if self.can_make_exact_change(amount).await {
1013 return Ok(FeeQuote::ZERO);
1014 }
1015
1016 let denominations = represent_amount(amount);
1020 let output_amount: Amount = denominations.iter().map(|d| d.amount()).sum();
1021 let output_fee: Amount = denominations
1022 .iter()
1023 .map(|d| self.cfg.fee_consensus.fee(d.amount()))
1024 .sum();
1025
1026 self.client_ctx
1027 .fee_quote(
1028 OperationId::new_random(),
1029 FeeQuoteRequest {
1030 input_amount: Amounts::ZERO,
1031 output_amount: Amounts::new_custom(self.cfg.amount_unit, output_amount),
1032 input_fee: Amounts::ZERO,
1033 output_fee: Amounts::new_custom(self.cfg.amount_unit, output_fee),
1034 },
1035 )
1036 .await
1037 }
1038
1039 async fn can_make_exact_change(&self, remaining_amount: Amount) -> bool {
1044 let mut dbtx = self.client_ctx.module_db().begin_transaction_nc().await;
1045
1046 Self::select_exact_change(&mut dbtx, remaining_amount)
1047 .await
1048 .is_some()
1049 }
1050
1051 async fn select_exact_change(
1057 dbtx: &mut DatabaseTransaction<'_>,
1058 mut remaining_amount: Amount,
1059 ) -> Option<Vec<SpendableNote>> {
1060 let mut stream = dbtx
1061 .find_by_prefix_sorted_descending(&SpendableNotePrefix)
1062 .await
1063 .map(|entry| entry.0.0);
1064
1065 let mut notes = vec![];
1066
1067 while let Some(spendable_note) = stream.next().await {
1068 remaining_amount = match remaining_amount.checked_sub(spendable_note.amount()) {
1069 Some(amount) => amount,
1070 None => continue,
1071 };
1072
1073 notes.push(spendable_note);
1074
1075 if remaining_amount == Amount::ZERO {
1076 break;
1077 }
1078 }
1079
1080 (remaining_amount == Amount::ZERO).then_some(notes)
1081 }
1082
1083 pub async fn await_final_receive_operation_state(
1085 &self,
1086 operation_id: OperationId,
1087 ) -> anyhow::Result<FinalReceiveOperationState> {
1088 let operation = self.client_ctx.get_operation(operation_id).await?;
1089 let mut stream = self.notifier.subscribe(operation_id).await;
1090
1091 let mut stream = self
1092 .client_ctx
1093 .outcome_or_updates(operation, operation_id, move || {
1094 async_stream::stream! {
1095 loop {
1096 if let Some(MintClientStateMachines::Receive(state)) = stream.next().await {
1097 match state.state {
1098 ReceiveSMState::Pending => {}
1099 ReceiveSMState::Success => {
1100 yield FinalReceiveOperationState::Success;
1101 return;
1102 }
1103 ReceiveSMState::Rejected(..) => {
1104 yield FinalReceiveOperationState::Rejected;
1105 return;
1106 }
1107 }
1108 }
1109 }
1110 }
1111 })
1112 .into_stream();
1113
1114 let mut final_state = None;
1115
1116 while let Some(state) = stream.next().await {
1117 final_state = Some(state);
1118 }
1119
1120 Ok(final_state.expect("Stream contains one final state"))
1121 }
1122
1123 async fn remove_spendable_note(
1124 &self,
1125 dbtx: &mut DatabaseTransaction<'_>,
1126 spendable_note: &SpendableNote,
1127 ) {
1128 dbtx.remove_entry(&SpendableNoteKey(spendable_note.clone()))
1129 .await
1130 .expect("Must delete existing spendable note");
1131 }
1132}
1133
1134#[derive(Clone)]
1135struct PeerSelector {
1136 latency: Arc<RwLock<BTreeMap<PeerId, Duration>>>,
1137}
1138
1139impl PeerSelector {
1140 fn new(peers: BTreeSet<PeerId>) -> Self {
1141 let latency = peers
1142 .into_iter()
1143 .map(|peer| (peer, Duration::ZERO))
1144 .collect();
1145
1146 Self {
1147 latency: Arc::new(RwLock::new(latency)),
1148 }
1149 }
1150
1151 fn choose_peer(&self) -> PeerId {
1153 let latency = self.latency.read().unwrap();
1154
1155 let peer_a = latency.iter().choose(&mut thread_rng()).unwrap();
1156 let peer_b = latency.iter().choose(&mut thread_rng()).unwrap();
1157
1158 if peer_a.1 <= peer_b.1 {
1159 *peer_a.0
1160 } else {
1161 *peer_b.0
1162 }
1163 }
1164
1165 fn report(&self, peer: PeerId, duration: Duration) {
1167 self.latency
1168 .write()
1169 .unwrap()
1170 .entry(peer)
1171 .and_modify(|latency| *latency = *latency * 9 / 10 + duration * 1 / 10)
1172 .or_insert(duration);
1173 }
1174
1175 fn remove(&self, peer: PeerId) {
1176 self.latency.write().unwrap().remove(&peer);
1177 }
1178}
1179
1180async fn download_slice_with_hash(
1182 module_api: DynModuleApi,
1183 peer_selector: PeerSelector,
1184 start: u64,
1185 end: u64,
1186 expected_hash: sha256::Hash,
1187) -> Vec<RecoveryItem> {
1188 const TIMEOUT: Duration = Duration::from_secs(30);
1189
1190 loop {
1191 let peer = peer_selector.choose_peer();
1192 let start_time = fedimint_core::time::now();
1193
1194 if let Ok(data) = module_api
1195 .fetch_recovery_slice(peer, TIMEOUT, start, end)
1196 .await
1197 {
1198 let elapsed = fedimint_core::time::now()
1199 .duration_since(start_time)
1200 .unwrap_or_default();
1201
1202 peer_selector.report(peer, elapsed);
1203
1204 if data.consensus_hash::<sha256::Hash>() == expected_hash {
1205 return data;
1206 }
1207
1208 peer_selector.remove(peer);
1209 } else {
1210 peer_selector.report(peer, TIMEOUT);
1211 }
1212 }
1213}
1214
1215#[derive(Error, Debug, Clone, Eq, PartialEq)]
1216pub enum SendECashError {
1217 #[error("We need to reissue notes but the client is offline")]
1218 Offline,
1219 #[error("The clients balance is insufficient")]
1220 InsufficientBalance,
1221 #[error("A non-recoverable error has occurred")]
1222 Failure,
1223}
1224
1225#[derive(Error, Debug, Clone, Eq, PartialEq)]
1226pub enum ReceiveECashError {
1227 #[error("The ECash is from a different federation")]
1228 WrongFederation,
1229 #[error("ECash contains an uneconomical denomination")]
1230 UneconomicalDenomination,
1231 #[error("Receiving ecash requires additional funds")]
1232 InsufficientFunds,
1233}
1234
1235#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
1236pub enum FinalReceiveOperationState {
1237 Success,
1239 Rejected,
1241}
1242
1243#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
1244pub enum MintClientStateMachines {
1245 Input(InputStateMachine),
1246 Output(MintOutputStateMachine),
1247 Receive(ReceiveStateMachine),
1248}
1249
1250impl IntoDynInstance for MintClientStateMachines {
1251 type DynType = DynState;
1252
1253 fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
1254 DynState::from_typed(instance_id, self)
1255 }
1256}
1257
1258impl State for MintClientStateMachines {
1259 type ModuleContext = MintClientContext;
1260
1261 fn transitions(
1262 &self,
1263 context: &Self::ModuleContext,
1264 global_context: &DynGlobalClientContext,
1265 ) -> Vec<StateTransition<Self>> {
1266 match self {
1267 MintClientStateMachines::Input(redemption_state) => {
1268 sm_enum_variant_translation!(
1269 redemption_state.transitions(context, global_context),
1270 MintClientStateMachines::Input
1271 )
1272 }
1273 MintClientStateMachines::Output(issuance_state) => {
1274 sm_enum_variant_translation!(
1275 issuance_state.transitions(context, global_context),
1276 MintClientStateMachines::Output
1277 )
1278 }
1279 MintClientStateMachines::Receive(receive_state) => {
1280 sm_enum_variant_translation!(
1281 receive_state.transitions(context, global_context),
1282 MintClientStateMachines::Receive
1283 )
1284 }
1285 }
1286 }
1287
1288 fn operation_id(&self) -> OperationId {
1289 match self {
1290 MintClientStateMachines::Input(redemption_state) => redemption_state.operation_id(),
1291 MintClientStateMachines::Output(issuance_state) => issuance_state.operation_id(),
1292 MintClientStateMachines::Receive(receive_state) => receive_state.operation_id(),
1293 }
1294 }
1295}
1296
1297fn round_to_multiple(amount: Amount, min_denomiation: Amount) -> Amount {
1298 Amount::from_msats(amount.msats.next_multiple_of(min_denomiation.msats))
1299}
1300
1301fn represent_amount_with_fees(
1302 mut remaining_amount: Amount,
1303 fee_consensus: &FeeConsensus,
1304) -> Vec<Denomination> {
1305 let mut denominations = Vec::new();
1306
1307 for denomination in client_denominations().rev() {
1309 let n_add =
1310 remaining_amount / (denomination.amount() + fee_consensus.fee(denomination.amount()));
1311
1312 denominations.extend(std::iter::repeat_n(denomination, n_add as usize));
1313
1314 remaining_amount -=
1315 n_add * (denomination.amount() + fee_consensus.fee(denomination.amount()));
1316 }
1317
1318 denominations.sort();
1320
1321 denominations
1322}
1323
1324fn represent_amount(mut remaining_amount: Amount) -> Vec<Denomination> {
1325 let mut denominations = Vec::new();
1326
1327 for denomination in client_denominations().rev() {
1329 let n_add = remaining_amount / denomination.amount();
1330
1331 denominations.extend(std::iter::repeat_n(denomination, n_add as usize));
1332
1333 remaining_amount -= n_add * denomination.amount();
1334 }
1335
1336 denominations
1337}