1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::missing_errors_doc)]
4#![allow(clippy::missing_panics_doc)]
5#![allow(clippy::module_name_repetitions)]
6#![allow(clippy::must_use_candidate)]
7#![allow(clippy::return_self_not_must_use)]
8#![allow(clippy::too_many_lines)]
9
10pub use fedimint_mintv2_common as common;
11
12mod api;
13#[cfg(feature = "cli")]
14mod cli;
15mod client_db;
16mod ecash;
17mod events;
18mod input;
19pub mod issuance;
20mod output;
21mod receive;
22
23use std::collections::{BTreeMap, BTreeSet};
24use std::convert::Infallible;
25use std::sync::{Arc, RwLock};
26use std::time::Duration;
27
28use anyhow::{Context as _, anyhow};
29use bitcoin_hashes::sha256;
30use client_db::{RecoveryState, RecoveryStateKey, SpendableNoteAmountPrefix, SpendableNotePrefix};
31pub use events::*;
32use fedimint_api_client::api::DynModuleApi;
33use fedimint_client::module::ClientModule;
34use fedimint_client::transaction::{
35 ClientInput, ClientInputBundle, ClientInputSM, ClientOutput, ClientOutputBundle,
36 ClientOutputSM, TransactionBuilder,
37};
38use fedimint_client_module::db::ClientModuleMigrationFn;
39use fedimint_client_module::module::init::{
40 ClientModuleInit, ClientModuleInitArgs, ClientModuleRecoverArgs,
41};
42use fedimint_client_module::module::recovery::{NoModuleBackup, RecoveryProgress};
43use fedimint_client_module::module::{
44 ClientContext, OutPointRange, PrimaryModulePriority, PrimaryModuleSupport,
45};
46use fedimint_client_module::sm::{Context, DynState, ModuleNotifier, State, StateTransition};
47use fedimint_client_module::{DynGlobalClientContext, sm_enum_variant_translation};
48use fedimint_core::base32::{self, FEDIMINT_PREFIX};
49use fedimint_core::config::FederationId;
50use fedimint_core::core::{IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId};
51use fedimint_core::db::{DatabaseTransaction, DatabaseVersion, IDatabaseTransactionOpsCoreTyped};
52use fedimint_core::encoding::{Decodable, Encodable};
53use fedimint_core::module::{
54 AmountUnit, Amounts, ApiVersion, CommonModuleInit, ModuleCommon, ModuleInit, MultiApiVersion,
55};
56use fedimint_core::secp256k1::rand::{Rng, thread_rng};
57use fedimint_core::secp256k1::{Keypair, PublicKey};
58use fedimint_core::util::{BoxStream, NextOrPending};
59use fedimint_core::{Amount, OutPoint, PeerId, apply, async_trait_maybe_send};
60use fedimint_derive_secret::DerivableSecret;
61use fedimint_mintv2_common::config::{FeeConsensus, MintClientConfig, client_denominations};
62use fedimint_mintv2_common::{
63 Denomination, KIND, MintCommonInit, MintInput, MintModuleTypes, MintOutput, Note, RecoveryItem,
64};
65use futures::{StreamExt, pin_mut};
66use itertools::Itertools;
67use rand::seq::IteratorRandom;
68use serde::{Deserialize, Serialize};
69use serde_json::Value;
70use tbs::AggregatePublicKey;
71use thiserror::Error;
72
73use crate::api::MintV2ModuleApi;
74use crate::client_db::SpendableNoteKey;
75pub use crate::ecash::ECash;
76use crate::input::{InputSMCommon, InputSMState, InputStateMachine};
77use crate::issuance::NoteIssuanceRequest;
78use crate::output::{MintOutputStateMachine, OutputSMCommon, OutputSMState};
79use crate::receive::{ReceiveSMState, ReceiveStateMachine};
80
81const TARGET_PER_DENOMINATION: usize = 3;
82const SLICE_SIZE: u64 = 10000;
83const PARALLEL_HASH_REQUESTS: usize = 10;
84const PARALLEL_SLICE_REQUESTS: usize = 10;
85
86#[derive(Debug, Clone, PartialEq, Eq, Hash, Encodable, Decodable)]
87pub struct SpendableNote {
88 pub denomination: Denomination,
89 pub keypair: Keypair,
90 pub signature: tbs::Signature,
91}
92
93impl SpendableNote {
94 pub fn amount(&self) -> Amount {
95 self.denomination.amount()
96 }
97}
98
99impl SpendableNote {
100 fn nonce(&self) -> PublicKey {
101 self.keypair.public_key()
102 }
103
104 fn note(&self) -> Note {
105 Note {
106 denomination: self.denomination,
107 nonce: self.nonce(),
108 signature: self.signature,
109 }
110 }
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
114pub enum MintOperationMeta {
115 Send {
116 ecash: String,
117 custom_meta: Value,
118 },
119 Reissue {
120 change_outpoint_range: OutPointRange,
121 amount: Amount,
122 custom_meta: Value,
123 },
124 Receive {
125 change_outpoint_range: OutPointRange,
126 ecash: String,
127 custom_meta: Value,
128 },
129}
130
131#[derive(Debug, Clone)]
132pub struct MintClientInit;
133
134impl ModuleInit for MintClientInit {
135 type Common = MintCommonInit;
136
137 async fn dump_database(
138 &self,
139 _dbtx: &mut DatabaseTransaction<'_>,
140 _prefix_names: Vec<String>,
141 ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
142 Box::new(BTreeMap::new().into_iter())
143 }
144}
145
146#[apply(async_trait_maybe_send!)]
147impl ClientModuleInit for MintClientInit {
148 type Module = MintClientModule;
149
150 fn supported_api_versions(&self) -> MultiApiVersion {
151 MultiApiVersion::try_from_iter([ApiVersion { major: 0, minor: 1 }])
152 .expect("no version conflicts")
153 }
154
155 async fn recover(
156 &self,
157 args: &ClientModuleRecoverArgs<Self>,
158 _snapshot: Option<&NoModuleBackup>,
159 ) -> anyhow::Result<()> {
160 let mut state = if let Some(state) = args
161 .db()
162 .begin_transaction_nc()
163 .await
164 .get_value(&RecoveryStateKey)
165 .await
166 {
167 state
168 } else {
169 RecoveryState {
170 next_index: 0,
171 total_items: args.module_api().fetch_recovery_count().await?,
172 requests: BTreeMap::new(),
173 nonces: BTreeSet::new(),
174 }
175 };
176
177 if state.next_index == state.total_items {
178 return Ok(());
179 }
180
181 let peer_selector = PeerSelector::new(args.api().all_peers().clone());
182
183 let mut recovery_stream = futures::stream::iter(
184 (state.next_index..state.total_items).step_by(SLICE_SIZE as usize),
185 )
186 .map(|start| {
187 let api = args.module_api().clone();
188 let end = std::cmp::min(start + SLICE_SIZE, state.total_items);
189
190 async move { (start, end, api.fetch_recovery_slice_hash(start, end).await) }
191 })
192 .buffered(PARALLEL_HASH_REQUESTS)
193 .map(|(start, end, hash)| {
194 download_slice_with_hash(
195 args.module_api().clone(),
196 peer_selector.clone(),
197 start,
198 end,
199 hash,
200 )
201 })
202 .buffered(PARALLEL_SLICE_REQUESTS);
203
204 let tweak_filter = issuance::tweak_filter(args.module_root_secret());
205
206 loop {
207 let items = recovery_stream
208 .next()
209 .await
210 .context("Recovery stream finished before recovery is complete")?;
211
212 for item in &items {
213 match item {
214 RecoveryItem::Output {
215 denomination,
216 nonce_hash,
217 tweak,
218 } => {
219 if !issuance::check_tweak(*tweak, tweak_filter) {
220 continue;
221 }
222 let output_secret = issuance::output_secret(
223 *denomination,
224 *tweak,
225 args.module_root_secret(),
226 );
227
228 if !issuance::check_nonce(&output_secret, *nonce_hash) {
229 continue;
230 }
231
232 let computed_nonce_hash = issuance::nonce(&output_secret).consensus_hash();
233
234 if !state.nonces.insert(computed_nonce_hash) {
236 continue;
237 }
238
239 state.requests.insert(
240 computed_nonce_hash,
241 NoteIssuanceRequest::new(
242 *denomination,
243 *tweak,
244 args.module_root_secret(),
245 ),
246 );
247 }
248 RecoveryItem::Input { nonce_hash } => {
249 state.requests.remove(nonce_hash);
250 state.nonces.remove(nonce_hash);
251 }
252 }
253 }
254
255 state.next_index += items.len() as u64;
256
257 let mut dbtx = args.db().begin_transaction().await;
258
259 dbtx.insert_entry(&RecoveryStateKey, &state).await;
260
261 if state.next_index == state.total_items {
262 let state_machines = args
263 .context()
264 .map_dyn(vec![MintClientStateMachines::Output(
265 MintOutputStateMachine {
266 common: OutputSMCommon {
267 operation_id: OperationId::new_random(),
268 range: None,
269 issuance_requests: state.requests.into_values().collect(),
270 },
271 state: OutputSMState::Pending,
272 },
273 )])
274 .collect();
275
276 args.context()
277 .add_state_machines_dbtx(&mut dbtx.to_ref_nc(), state_machines)
278 .await
279 .expect("state machine is valid");
280
281 dbtx.commit_tx().await;
282
283 return Ok(());
284 }
285
286 dbtx.commit_tx().await;
287
288 args.update_recovery_progress(RecoveryProgress {
289 complete: state.next_index.try_into().unwrap_or(u32::MAX),
290 total: state.total_items.try_into().unwrap_or(u32::MAX),
291 });
292 }
293 }
294
295 async fn init(&self, args: &ClientModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
296 let (tweak_sender, tweak_receiver) = async_channel::bounded(50);
297
298 let filter = issuance::tweak_filter(args.module_root_secret());
299
300 tokio::task::spawn_blocking(move || {
301 loop {
302 let tweak: [u8; 16] = thread_rng().r#gen();
303
304 if !issuance::check_tweak(tweak, filter) {
305 continue;
306 }
307
308 if tweak_sender.send_blocking(tweak).is_err() {
309 return;
310 }
311 }
312 });
313
314 Ok(MintClientModule {
315 federation_id: *args.federation_id(),
316 cfg: args.cfg().clone(),
317 root_secret: args.module_root_secret().clone(),
318 notifier: args.notifier().clone(),
319 client_ctx: args.context(),
320 balance_update_sender: tokio::sync::watch::channel(()).0,
321 tweak_receiver,
322 })
323 }
324
325 fn get_database_migrations(&self) -> BTreeMap<DatabaseVersion, ClientModuleMigrationFn> {
326 BTreeMap::new()
327 }
328}
329
330#[derive(Debug)]
331pub struct MintClientModule {
332 federation_id: FederationId,
333 cfg: MintClientConfig,
334 root_secret: DerivableSecret,
335 notifier: ModuleNotifier<MintClientStateMachines>,
336 client_ctx: ClientContext<Self>,
337 balance_update_sender: tokio::sync::watch::Sender<()>,
338 tweak_receiver: async_channel::Receiver<[u8; 16]>,
339}
340
341#[derive(Debug, Clone)]
342pub struct MintClientContext {
343 client_ctx: ClientContext<MintClientModule>,
344 tbs_agg_pks: BTreeMap<Denomination, AggregatePublicKey>,
345 tbs_pks: BTreeMap<Denomination, BTreeMap<PeerId, tbs::PublicKeyShare>>,
346 pub balance_update_sender: tokio::sync::watch::Sender<()>,
347}
348
349impl Context for MintClientContext {
350 const KIND: Option<ModuleKind> = Some(KIND);
351}
352
353#[apply(async_trait_maybe_send!)]
354impl ClientModule for MintClientModule {
355 type Init = MintClientInit;
356 type Common = MintModuleTypes;
357 type Backup = NoModuleBackup;
358 type ModuleStateMachineContext = MintClientContext;
359 type States = MintClientStateMachines;
360
361 fn context(&self) -> Self::ModuleStateMachineContext {
362 MintClientContext {
363 client_ctx: self.client_ctx.clone(),
364 tbs_agg_pks: self.cfg.tbs_agg_pks.clone(),
365 tbs_pks: self.cfg.tbs_pks.clone(),
366 balance_update_sender: self.balance_update_sender.clone(),
367 }
368 }
369
370 fn input_fee(
371 &self,
372 amounts: &Amounts,
373 _input: &<Self::Common as ModuleCommon>::Input,
374 ) -> Option<Amounts> {
375 let unit = self.cfg.amount_unit;
376 let amount = amounts.get(&unit).copied().unwrap_or_default();
377 let fee = self.cfg.fee_consensus.fee(amount);
378
379 Some(Amounts::new_custom(unit, fee))
380 }
381
382 fn output_fee(
383 &self,
384 amounts: &Amounts,
385 _output: &<Self::Common as ModuleCommon>::Output,
386 ) -> Option<Amounts> {
387 let unit = self.cfg.amount_unit;
388 let amount = amounts.get(&unit).copied().unwrap_or_default();
389 let fee = self.cfg.fee_consensus.fee(amount);
390
391 Some(Amounts::new_custom(unit, fee))
392 }
393
394 #[cfg(feature = "cli")]
395 async fn handle_cli_command(
396 &self,
397 args: &[std::ffi::OsString],
398 ) -> anyhow::Result<serde_json::Value> {
399 cli::handle_cli_command(self, args).await
400 }
401
402 fn supports_being_primary(&self) -> PrimaryModuleSupport {
403 PrimaryModuleSupport::selected(PrimaryModulePriority::HIGH, [self.cfg.amount_unit])
404 }
405
406 async fn create_final_inputs_and_outputs(
407 &self,
408 dbtx: &mut DatabaseTransaction<'_>,
409 operation_id: OperationId,
410 unit: AmountUnit,
411 mut input_amount: Amount,
412 mut output_amount: Amount,
413 ) -> anyhow::Result<(
414 ClientInputBundle<MintInput, MintClientStateMachines>,
415 ClientOutputBundle<MintOutput, MintClientStateMachines>,
416 )> {
417 if unit != self.cfg.amount_unit {
418 anyhow::bail!("Module can only handle its configured amount unit");
419 }
420
421 let funding_notes = self
422 .select_funding_input(dbtx, output_amount.saturating_sub(input_amount))
423 .await
424 .context("Insufficient funds")?;
425
426 for note in &funding_notes {
427 self.remove_spendable_note(dbtx, note).await;
428 }
429
430 input_amount += funding_notes.iter().map(SpendableNote::amount).sum();
431
432 output_amount += funding_notes
433 .iter()
434 .map(|input| self.cfg.fee_consensus.fee(input.amount()))
435 .sum();
436
437 assert!(output_amount <= input_amount);
438
439 let (input_notes, output_amounts) = self
440 .rebalance(dbtx, &self.cfg.fee_consensus, input_amount - output_amount)
441 .await;
442
443 for note in &input_notes {
444 self.remove_spendable_note(dbtx, note).await;
445 }
446
447 input_amount += input_notes.iter().map(SpendableNote::amount).sum();
448
449 output_amount += input_notes
450 .iter()
451 .map(|note| self.cfg.fee_consensus.fee(note.amount()))
452 .sum();
453
454 output_amount += output_amounts
455 .iter()
456 .map(|denomination| {
457 denomination.amount() + self.cfg.fee_consensus.fee(denomination.amount())
458 })
459 .sum();
460
461 assert!(output_amount <= input_amount);
462
463 let mut spendable_notes = funding_notes
464 .into_iter()
465 .chain(input_notes)
466 .collect::<Vec<SpendableNote>>();
467
468 spendable_notes.sort_by_key(|note| note.denomination);
470
471 let input_bundle =
472 Self::create_input_bundle(operation_id, spendable_notes, false, self.cfg.amount_unit);
473
474 let mut denominations = represent_amount_with_fees(
475 input_amount.saturating_sub(output_amount),
476 &self.cfg.fee_consensus,
477 )
478 .into_iter()
479 .chain(output_amounts)
480 .collect::<Vec<Denomination>>();
481
482 denominations.sort();
484
485 let output_bundle = self.create_output_bundle(operation_id, denominations).await;
486
487 let sender = self.balance_update_sender.clone();
488 dbtx.on_commit(move || sender.send_replace(()));
489
490 Ok((input_bundle, output_bundle))
491 }
492
493 async fn await_primary_module_output(
494 &self,
495 operation_id: OperationId,
496 outpoint: OutPoint,
497 ) -> anyhow::Result<()> {
498 self.await_output_sm_success(operation_id, outpoint).await
499 }
500
501 async fn get_balance(&self, dbtx: &mut DatabaseTransaction<'_>, unit: AmountUnit) -> Amount {
502 if unit != self.cfg.amount_unit {
503 return Amount::ZERO;
504 }
505
506 self.get_count_by_denomination_dbtx(dbtx)
507 .await
508 .into_iter()
509 .map(|(denomination, count)| denomination.amount().mul_u64(count))
510 .sum()
511 }
512
513 async fn subscribe_balance_changes(&self) -> BoxStream<'static, ()> {
514 Box::pin(tokio_stream::wrappers::WatchStream::new(
515 self.balance_update_sender.subscribe(),
516 ))
517 }
518}
519
520impl MintClientModule {
521 async fn select_funding_input(
522 &self,
523 dbtx: &mut DatabaseTransaction<'_>,
524 mut excess_output: Amount,
525 ) -> Option<Vec<SpendableNote>> {
526 let mut selected_notes = Vec::new();
527 let mut target_notes = Vec::new();
528 let mut excess_notes = Vec::new();
529
530 for amount in client_denominations().rev() {
531 let notes_amount = dbtx
532 .find_by_prefix(&SpendableNoteAmountPrefix(amount))
533 .await
534 .map(|entry| entry.0.0)
535 .collect::<Vec<SpendableNote>>()
536 .await;
537
538 target_notes.extend(notes_amount.iter().take(TARGET_PER_DENOMINATION).cloned());
539
540 if notes_amount.len() > 2 * TARGET_PER_DENOMINATION {
541 for note in notes_amount.into_iter().skip(TARGET_PER_DENOMINATION) {
542 let note_fee = self.cfg.fee_consensus.fee(note.amount());
543
544 let note_value = note
545 .amount()
546 .checked_sub(note_fee)
547 .expect("All our notes are economical");
548
549 excess_output = excess_output.saturating_sub(note_value);
550
551 selected_notes.push(note);
552 }
553 } else {
554 excess_notes.extend(notes_amount.into_iter().skip(TARGET_PER_DENOMINATION));
555 }
556 }
557
558 if excess_output == Amount::ZERO {
559 return Some(selected_notes);
560 }
561
562 for note in excess_notes.into_iter().chain(target_notes) {
563 let note_amount = note.amount();
564 let note_value = note_amount
565 .checked_sub(self.cfg.fee_consensus.fee(note_amount))
566 .expect("All our notes are economical");
567
568 excess_output = excess_output.saturating_sub(note_value);
569
570 selected_notes.push(note);
571
572 if excess_output == Amount::ZERO {
573 return Some(selected_notes);
574 }
575 }
576
577 None
578 }
579
580 async fn rebalance(
581 &self,
582 dbtx: &mut DatabaseTransaction<'_>,
583 fee: &FeeConsensus,
584 mut excess_input: Amount,
585 ) -> (Vec<SpendableNote>, Vec<Denomination>) {
586 let n_denominations = self.get_count_by_denomination_dbtx(dbtx).await;
587
588 let mut notes = dbtx
589 .find_by_prefix_sorted_descending(&SpendableNotePrefix)
590 .await
591 .map(|entry| entry.0.0)
592 .fuse();
593
594 let mut input_notes = Vec::new();
595 let mut output_denominations = Vec::new();
596
597 for d in client_denominations() {
598 let n_denomination = n_denominations.get(&d).copied().unwrap_or(0);
599
600 let n_missing = TARGET_PER_DENOMINATION.saturating_sub(n_denomination as usize);
601
602 for _ in 0..n_missing {
603 match excess_input.checked_sub(d.amount() + fee.fee(d.amount())) {
604 Some(remaining_excess) => excess_input = remaining_excess,
605 None => match notes.next().await {
606 Some(note) => {
607 if note.amount() <= d.amount() + fee.fee(d.amount()) {
608 break;
609 }
610
611 excess_input += note.amount() - (d.amount() + fee.fee(d.amount()));
612
613 input_notes.push(note);
614 }
615 None => break,
616 },
617 }
618
619 output_denominations.push(d);
620 }
621 }
622
623 (input_notes, output_denominations)
624 }
625
626 fn create_input_bundle(
627 operation_id: OperationId,
628 notes: Vec<SpendableNote>,
629 include_receive_sm: bool,
630 amount_unit: AmountUnit,
631 ) -> ClientInputBundle<MintInput, MintClientStateMachines> {
632 let inputs = notes
633 .iter()
634 .map(|spendable_note| ClientInput {
635 input: MintInput::new_v0(spendable_note.note()),
636 keys: vec![spendable_note.keypair],
637 amounts: Amounts::new_custom(amount_unit, spendable_note.amount()),
638 })
639 .collect();
640
641 let input_sms = vec![ClientInputSM {
642 state_machines: Arc::new(move |range: OutPointRange| {
643 let mut sms = vec![MintClientStateMachines::Input(InputStateMachine {
644 common: InputSMCommon {
645 operation_id,
646 txid: range.txid(),
647 spendable_notes: notes.clone(),
648 },
649 state: InputSMState::Pending,
650 })];
651
652 if include_receive_sm {
653 sms.push(MintClientStateMachines::Receive(ReceiveStateMachine {
654 common: crate::receive::ReceiveSMCommon {
655 operation_id,
656 txid: range.txid(),
657 },
658 state: crate::receive::ReceiveSMState::Pending,
659 }));
660 }
661
662 sms
663 }),
664 }];
665
666 ClientInputBundle::new(inputs, input_sms)
667 }
668
669 async fn create_output_bundle(
670 &self,
671 operation_id: OperationId,
672 requested_denominations: Vec<Denomination>,
673 ) -> ClientOutputBundle<MintOutput, MintClientStateMachines> {
674 let issuance_requests = futures::stream::iter(requested_denominations)
675 .zip(self.tweak_receiver.clone())
676 .map(|(d, tweak)| NoteIssuanceRequest::new(d, tweak, &self.root_secret))
677 .collect::<Vec<NoteIssuanceRequest>>()
678 .await;
679
680 let amount_unit = self.cfg.amount_unit;
681 let outputs = issuance_requests
682 .iter()
683 .map(|request| ClientOutput {
684 output: request.output(),
685 amounts: Amounts::new_custom(amount_unit, request.denomination.amount()),
686 })
687 .collect();
688
689 let output_sms = vec![ClientOutputSM {
690 state_machines: Arc::new(move |range: OutPointRange| {
691 vec![MintClientStateMachines::Output(MintOutputStateMachine {
692 common: OutputSMCommon {
693 operation_id,
694 range: Some(range),
695 issuance_requests: issuance_requests.clone(),
696 },
697 state: OutputSMState::Pending,
698 })]
699 }),
700 }];
701
702 ClientOutputBundle::new(outputs, output_sms)
703 }
704
705 async fn await_output_sm_success(
706 &self,
707 operation_id: OperationId,
708 outpoint: OutPoint,
709 ) -> anyhow::Result<()> {
710 let stream = self
711 .notifier
712 .subscribe(operation_id)
713 .await
714 .filter_map(|state| async {
715 let MintClientStateMachines::Output(state) = state else {
716 return None;
717 };
718
719 if !state.common.range?.into_iter().contains(&outpoint) {
720 return None;
721 }
722
723 match state.state {
724 OutputSMState::Pending => None,
725 OutputSMState::Success => Some(Ok(())),
726 OutputSMState::Aborted => Some(Err(anyhow!("Transaction was rejected"))),
727 OutputSMState::Failure => Some(Err(anyhow!("Failed to finalize notes",))),
728 }
729 });
730
731 pin_mut!(stream);
732
733 stream.next_or_pending().await
734 }
735
736 pub async fn get_count_by_denomination(&self) -> BTreeMap<Denomination, u64> {
738 self.get_count_by_denomination_dbtx(
739 &mut self.client_ctx.module_db().begin_transaction_nc().await,
740 )
741 .await
742 }
743
744 async fn get_count_by_denomination_dbtx(
745 &self,
746 dbtx: &mut DatabaseTransaction<'_>,
747 ) -> BTreeMap<Denomination, u64> {
748 dbtx.find_by_prefix(&SpendableNotePrefix)
749 .await
750 .fold(BTreeMap::new(), |mut acc, entry| async move {
751 acc.entry(entry.0.0.denomination)
752 .and_modify(|count| *count += 1)
753 .or_insert(1);
754
755 acc
756 })
757 .await
758 }
759
760 pub async fn send(
774 &self,
775 amount: Amount,
776 custom_meta: Value,
777 include_invite: bool,
778 ) -> Result<ECash, SendECashError> {
779 let amount = round_to_multiple(amount, client_denominations().next().unwrap().amount());
780
781 if let Some(ecash) = self
782 .client_ctx
783 .module_db()
784 .autocommit(
785 |dbtx, _| {
786 Box::pin(self.send_ecash_dbtx(
787 dbtx,
788 amount,
789 custom_meta.clone(),
790 include_invite,
791 ))
792 },
793 Some(100),
794 )
795 .await
796 .expect("Failed to commit dbtx after 100 retries")
797 {
798 return Ok(ecash);
799 }
800
801 self.client_ctx
802 .global_api()
803 .session_count()
804 .await
805 .map_err(|_| SendECashError::Offline)?;
806
807 let operation_id = OperationId::new_random();
808
809 let output = self
810 .create_output_bundle(operation_id, represent_amount(amount))
811 .await;
812 let output = self.client_ctx.make_client_outputs(output);
813 let cm = custom_meta.clone();
814
815 let range = self
816 .client_ctx
817 .finalize_and_submit_transaction(
818 operation_id,
819 MintCommonInit::KIND.as_str(),
820 move |change_outpoint_range| MintOperationMeta::Reissue {
821 change_outpoint_range,
822 amount,
823 custom_meta: cm.clone(),
824 },
825 TransactionBuilder::new().with_outputs(output),
826 )
827 .await
828 .map_err(|_| SendECashError::InsufficientBalance)?;
829
830 for outpoint in range {
831 self.await_output_sm_success(operation_id, outpoint)
832 .await
833 .map_err(|_| SendECashError::Failure)?;
834 }
835
836 Box::pin(self.send(amount, custom_meta, include_invite)).await
837 }
838
839 async fn send_ecash_dbtx(
840 &self,
841 dbtx: &mut DatabaseTransaction<'_>,
842 mut remaining_amount: Amount,
843 custom_meta: Value,
844 include_invite: bool,
845 ) -> Result<Option<ECash>, Infallible> {
846 let mut stream = dbtx
847 .find_by_prefix_sorted_descending(&SpendableNotePrefix)
848 .await
849 .map(|entry| entry.0.0);
850
851 let mut notes = vec![];
852
853 while let Some(spendable_note) = stream.next().await {
854 remaining_amount = match remaining_amount.checked_sub(spendable_note.amount()) {
855 Some(amount) => amount,
856 None => continue,
857 };
858
859 notes.push(spendable_note);
860 }
861
862 drop(stream);
863
864 if remaining_amount != Amount::ZERO {
865 return Ok(None);
866 }
867
868 for spendable_note in ¬es {
869 self.remove_spendable_note(dbtx, spendable_note).await;
870 }
871
872 let ecash = if include_invite {
873 let invite = self.client_ctx.get_invite_code().await;
874 ECash::new_with_invite(notes, &invite)
875 } else {
876 ECash::new(self.federation_id, notes)
877 };
878 let amount = ecash.amount();
879 let operation_id = OperationId::new_random();
880
881 self.client_ctx
882 .add_operation_log_entry_dbtx(
883 dbtx,
884 operation_id,
885 MintCommonInit::KIND.as_str(),
886 MintOperationMeta::Send {
887 ecash: base32::encode_prefixed(FEDIMINT_PREFIX, &ecash),
888 custom_meta,
889 },
890 )
891 .await;
892
893 self.client_ctx
894 .log_event(
895 dbtx,
896 SendPaymentEvent {
897 operation_id,
898 amount,
899 ecash: base32::encode_prefixed(FEDIMINT_PREFIX, &ecash),
900 },
901 )
902 .await;
903
904 let sender = self.balance_update_sender.clone();
905 dbtx.on_commit(move || sender.send_replace(()));
906
907 Ok(Some(ecash))
908 }
909
910 pub async fn receive(
913 &self,
914 ecash: ECash,
915 custom_meta: Value,
916 ) -> Result<OperationId, ReceiveECashError> {
917 let operation_id = OperationId::from_encodable(&ecash);
918
919 if self.client_ctx.operation_exists(operation_id).await {
920 return Ok(operation_id);
921 }
922
923 if ecash.mint() != Some(self.federation_id) {
924 return Err(ReceiveECashError::WrongFederation);
925 }
926
927 if ecash
928 .notes()
929 .iter()
930 .any(|note| note.amount() <= self.cfg.fee_consensus.base_fee())
931 {
932 return Err(ReceiveECashError::UneconomicalDenomination);
933 }
934
935 let input =
936 Self::create_input_bundle(operation_id, ecash.notes(), true, self.cfg.amount_unit);
937 let input = self.client_ctx.make_client_inputs(input);
938 let ec = base32::encode_prefixed(FEDIMINT_PREFIX, &ecash);
939
940 self.client_ctx
941 .finalize_and_submit_transaction(
942 operation_id,
943 MintCommonInit::KIND.as_str(),
944 move |change_outpoint_range| MintOperationMeta::Receive {
945 change_outpoint_range,
946 ecash: ec.clone(),
947 custom_meta: custom_meta.clone(),
948 },
949 TransactionBuilder::new().with_inputs(input),
950 )
951 .await
952 .map_err(|_| ReceiveECashError::InsufficientFunds)?;
953
954 let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
955
956 self.client_ctx
957 .log_event(
958 &mut dbtx,
959 ReceivePaymentEvent {
960 operation_id,
961 amount: ecash.amount(),
962 },
963 )
964 .await;
965
966 dbtx.commit_tx().await;
967
968 Ok(operation_id)
969 }
970
971 pub async fn await_final_receive_operation_state(
973 &self,
974 operation_id: OperationId,
975 ) -> anyhow::Result<FinalReceiveOperationState> {
976 let operation = self.client_ctx.get_operation(operation_id).await?;
977 let mut stream = self.notifier.subscribe(operation_id).await;
978
979 let mut stream = self
980 .client_ctx
981 .outcome_or_updates(operation, operation_id, move || {
982 async_stream::stream! {
983 loop {
984 if let Some(MintClientStateMachines::Receive(state)) = stream.next().await {
985 match state.state {
986 ReceiveSMState::Pending => {}
987 ReceiveSMState::Success => {
988 yield FinalReceiveOperationState::Success;
989 return;
990 }
991 ReceiveSMState::Rejected(..) => {
992 yield FinalReceiveOperationState::Rejected;
993 return;
994 }
995 }
996 }
997 }
998 }
999 })
1000 .into_stream();
1001
1002 let mut final_state = None;
1003
1004 while let Some(state) = stream.next().await {
1005 final_state = Some(state);
1006 }
1007
1008 Ok(final_state.expect("Stream contains one final state"))
1009 }
1010
1011 async fn remove_spendable_note(
1012 &self,
1013 dbtx: &mut DatabaseTransaction<'_>,
1014 spendable_note: &SpendableNote,
1015 ) {
1016 dbtx.remove_entry(&SpendableNoteKey(spendable_note.clone()))
1017 .await
1018 .expect("Must delete existing spendable note");
1019 }
1020}
1021
1022#[derive(Clone)]
1023struct PeerSelector {
1024 latency: Arc<RwLock<BTreeMap<PeerId, Duration>>>,
1025}
1026
1027impl PeerSelector {
1028 fn new(peers: BTreeSet<PeerId>) -> Self {
1029 let latency = peers
1030 .into_iter()
1031 .map(|peer| (peer, Duration::ZERO))
1032 .collect();
1033
1034 Self {
1035 latency: Arc::new(RwLock::new(latency)),
1036 }
1037 }
1038
1039 fn choose_peer(&self) -> PeerId {
1041 let latency = self.latency.read().unwrap();
1042
1043 let peer_a = latency.iter().choose(&mut thread_rng()).unwrap();
1044 let peer_b = latency.iter().choose(&mut thread_rng()).unwrap();
1045
1046 if peer_a.1 <= peer_b.1 {
1047 *peer_a.0
1048 } else {
1049 *peer_b.0
1050 }
1051 }
1052
1053 fn report(&self, peer: PeerId, duration: Duration) {
1055 self.latency
1056 .write()
1057 .unwrap()
1058 .entry(peer)
1059 .and_modify(|latency| *latency = *latency * 9 / 10 + duration * 1 / 10)
1060 .or_insert(duration);
1061 }
1062
1063 fn remove(&self, peer: PeerId) {
1064 self.latency.write().unwrap().remove(&peer);
1065 }
1066}
1067
1068async fn download_slice_with_hash(
1070 module_api: DynModuleApi,
1071 peer_selector: PeerSelector,
1072 start: u64,
1073 end: u64,
1074 expected_hash: sha256::Hash,
1075) -> Vec<RecoveryItem> {
1076 const TIMEOUT: Duration = Duration::from_secs(30);
1077
1078 loop {
1079 let peer = peer_selector.choose_peer();
1080 let start_time = fedimint_core::time::now();
1081
1082 if let Ok(data) = module_api
1083 .fetch_recovery_slice(peer, TIMEOUT, start, end)
1084 .await
1085 {
1086 let elapsed = fedimint_core::time::now()
1087 .duration_since(start_time)
1088 .unwrap_or_default();
1089
1090 peer_selector.report(peer, elapsed);
1091
1092 if data.consensus_hash::<sha256::Hash>() == expected_hash {
1093 return data;
1094 }
1095
1096 peer_selector.remove(peer);
1097 } else {
1098 peer_selector.report(peer, TIMEOUT);
1099 }
1100 }
1101}
1102
1103#[derive(Error, Debug, Clone, Eq, PartialEq)]
1104pub enum SendECashError {
1105 #[error("We need to reissue notes but the client is offline")]
1106 Offline,
1107 #[error("The clients balance is insufficient")]
1108 InsufficientBalance,
1109 #[error("A non-recoverable error has occurred")]
1110 Failure,
1111}
1112
1113#[derive(Error, Debug, Clone, Eq, PartialEq)]
1114pub enum ReceiveECashError {
1115 #[error("The ECash is from a different federation")]
1116 WrongFederation,
1117 #[error("ECash contains an uneconomical denomination")]
1118 UneconomicalDenomination,
1119 #[error("Receiving ecash requires additional funds")]
1120 InsufficientFunds,
1121}
1122
1123#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
1124pub enum FinalReceiveOperationState {
1125 Success,
1127 Rejected,
1129}
1130
1131#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
1132pub enum MintClientStateMachines {
1133 Input(InputStateMachine),
1134 Output(MintOutputStateMachine),
1135 Receive(ReceiveStateMachine),
1136}
1137
1138impl IntoDynInstance for MintClientStateMachines {
1139 type DynType = DynState;
1140
1141 fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
1142 DynState::from_typed(instance_id, self)
1143 }
1144}
1145
1146impl State for MintClientStateMachines {
1147 type ModuleContext = MintClientContext;
1148
1149 fn transitions(
1150 &self,
1151 context: &Self::ModuleContext,
1152 global_context: &DynGlobalClientContext,
1153 ) -> Vec<StateTransition<Self>> {
1154 match self {
1155 MintClientStateMachines::Input(redemption_state) => {
1156 sm_enum_variant_translation!(
1157 redemption_state.transitions(context, global_context),
1158 MintClientStateMachines::Input
1159 )
1160 }
1161 MintClientStateMachines::Output(issuance_state) => {
1162 sm_enum_variant_translation!(
1163 issuance_state.transitions(context, global_context),
1164 MintClientStateMachines::Output
1165 )
1166 }
1167 MintClientStateMachines::Receive(receive_state) => {
1168 sm_enum_variant_translation!(
1169 receive_state.transitions(context, global_context),
1170 MintClientStateMachines::Receive
1171 )
1172 }
1173 }
1174 }
1175
1176 fn operation_id(&self) -> OperationId {
1177 match self {
1178 MintClientStateMachines::Input(redemption_state) => redemption_state.operation_id(),
1179 MintClientStateMachines::Output(issuance_state) => issuance_state.operation_id(),
1180 MintClientStateMachines::Receive(receive_state) => receive_state.operation_id(),
1181 }
1182 }
1183}
1184
1185fn round_to_multiple(amount: Amount, min_denomiation: Amount) -> Amount {
1186 Amount::from_msats(amount.msats.next_multiple_of(min_denomiation.msats))
1187}
1188
1189fn represent_amount_with_fees(
1190 mut remaining_amount: Amount,
1191 fee_consensus: &FeeConsensus,
1192) -> Vec<Denomination> {
1193 let mut denominations = Vec::new();
1194
1195 for denomination in client_denominations().rev() {
1197 let n_add =
1198 remaining_amount / (denomination.amount() + fee_consensus.fee(denomination.amount()));
1199
1200 denominations.extend(std::iter::repeat_n(denomination, n_add as usize));
1201
1202 remaining_amount -=
1203 n_add * (denomination.amount() + fee_consensus.fee(denomination.amount()));
1204 }
1205
1206 denominations.sort();
1208
1209 denominations
1210}
1211
1212fn represent_amount(mut remaining_amount: Amount) -> Vec<Denomination> {
1213 let mut denominations = Vec::new();
1214
1215 for denomination in client_denominations().rev() {
1217 let n_add = remaining_amount / denomination.amount();
1218
1219 denominations.extend(std::iter::repeat_n(denomination, n_add as usize));
1220
1221 remaining_amount -= n_add * denomination.amount();
1222 }
1223
1224 denominations
1225}