1#![deny(clippy::pedantic)]
2#![allow(clippy::missing_errors_doc)]
3#![allow(clippy::missing_panics_doc)]
4#![allow(clippy::must_use_candidate)]
5#![allow(clippy::module_name_repetitions)]
6
7pub use fedimint_walletv2_common as common;
8
9mod api;
10#[cfg(feature = "cli")]
11mod cli;
12mod db;
13pub mod events;
14mod receive_sm;
15mod send_sm;
16
17use std::collections::{BTreeMap, BTreeSet};
18use std::sync::Arc;
19use std::time::Duration;
20
21use anyhow::anyhow;
22use api::WalletFederationApi;
23use bitcoin::address::NetworkUnchecked;
24use bitcoin::{Address, ScriptBuf};
25use db::{NextOutputIndexKey, ValidAddressIndexKey, ValidAddressIndexPrefix};
26use events::{ReceivePaymentEvent, SendPaymentEvent};
27use fedimint_api_client::api::{DynModuleApi, FederationResult};
28use fedimint_client::DynGlobalClientContext;
29use fedimint_client::transaction::{
30 ClientInput, ClientInputBundle, ClientInputSM, ClientOutput, ClientOutputBundle,
31 ClientOutputSM, TransactionBuilder,
32};
33use fedimint_client_module::db::ClientModuleMigrationFn;
34use fedimint_client_module::module::init::{ClientModuleInit, ClientModuleInitArgs};
35use fedimint_client_module::module::recovery::NoModuleBackup;
36use fedimint_client_module::module::{ClientContext, ClientModule, OutPointRange};
37use fedimint_client_module::sm::{Context, DynState, ModuleNotifier, State, StateTransition};
38use fedimint_client_module::sm_enum_variant_translation;
39use fedimint_core::core::{IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId};
40use fedimint_core::db::{
41 Database, DatabaseTransaction, DatabaseVersion, IDatabaseTransactionOpsCoreTyped,
42};
43use fedimint_core::encoding::{Decodable, Encodable};
44use fedimint_core::module::{
45 AmountUnit, Amounts, ApiVersion, CommonModuleInit, ModuleCommon, ModuleInit, MultiApiVersion,
46};
47use fedimint_core::task::{TaskGroup, block_in_place, sleep};
48use fedimint_core::{Amount, OutPoint, TransactionId, apply, async_trait_maybe_send};
49use fedimint_derive_secret::{ChildId, DerivableSecret};
50use fedimint_logging::LOG_CLIENT_MODULE_WALLETV2;
51use fedimint_walletv2_common::config::WalletClientConfig;
52use fedimint_walletv2_common::{
53 KIND, StandardScript, TxInfo, WalletCommonInit, WalletInput, WalletInputV0, WalletModuleTypes,
54 WalletOutput, WalletOutputV0, descriptor, is_potential_receive,
55};
56use futures::StreamExt;
57use receive_sm::{ReceiveSMCommon, ReceiveSMState, ReceiveStateMachine};
58use secp256k1::Keypair;
59use send_sm::{SendSMCommon, SendSMState, SendStateMachine};
60use serde::{Deserialize, Serialize};
61use strum::IntoEnumIterator as _;
62use thiserror::Error;
63use tracing::{debug, warn};
64
65const SLICE_SIZE: u64 = 1000;
67
68#[derive(Debug, Clone, Serialize, Deserialize)]
69pub enum WalletOperationMeta {
70 Send(SendMeta),
71 Receive(ReceiveMeta),
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct SendMeta {
76 pub change_outpoint_range: OutPointRange,
77 pub address: Address<NetworkUnchecked>,
78 pub value: bitcoin::Amount,
79 pub fee: bitcoin::Amount,
80 #[serde(default)]
81 pub custom_meta: serde_json::Value,
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct ReceiveMeta {
86 pub change_outpoint_range: OutPointRange,
87 pub value: bitcoin::Amount,
88 pub fee: bitcoin::Amount,
89 pub address: Option<Address<NetworkUnchecked>>,
90 pub outpoint: Option<bitcoin::OutPoint>,
91}
92
93#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
95pub enum FinalSendOperationState {
96 Success(bitcoin::Txid),
98 Aborted,
100 Failure,
102}
103
104#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
106pub enum FinalReceiveOperationState {
107 Success,
109 Aborted,
111}
112
113#[derive(Debug, Clone)]
114pub struct WalletClientModule {
115 root_secret: DerivableSecret,
116 cfg: WalletClientConfig,
117 notifier: ModuleNotifier<WalletClientStateMachines>,
118 client_ctx: ClientContext<Self>,
119 db: Database,
120 module_api: DynModuleApi,
121}
122
123#[derive(Debug, Clone)]
124pub struct WalletClientContext {
125 pub client_ctx: ClientContext<WalletClientModule>,
126}
127
128impl Context for WalletClientContext {
129 const KIND: Option<ModuleKind> = Some(KIND);
130}
131
132#[apply(async_trait_maybe_send!)]
133impl ClientModule for WalletClientModule {
134 type Init = WalletClientInit;
135 type Common = WalletModuleTypes;
136 type Backup = NoModuleBackup;
137 type ModuleStateMachineContext = WalletClientContext;
138 type States = WalletClientStateMachines;
139
140 fn context(&self) -> Self::ModuleStateMachineContext {
141 WalletClientContext {
142 client_ctx: self.client_ctx.clone(),
143 }
144 }
145
146 fn input_fee(
147 &self,
148 amount: &Amounts,
149 _input: &<Self::Common as ModuleCommon>::Input,
150 ) -> Option<Amounts> {
151 amount
152 .get(&AmountUnit::BITCOIN)
153 .map(|a| Amounts::new_bitcoin(self.cfg.fee_consensus.fee(*a)))
154 }
155
156 fn output_fee(
157 &self,
158 amount: &Amounts,
159 _output: &<Self::Common as ModuleCommon>::Output,
160 ) -> Option<Amounts> {
161 amount
162 .get(&AmountUnit::BITCOIN)
163 .map(|a| Amounts::new_bitcoin(self.cfg.fee_consensus.fee(*a)))
164 }
165
166 #[cfg(feature = "cli")]
167 async fn handle_cli_command(
168 &self,
169 args: &[std::ffi::OsString],
170 ) -> anyhow::Result<serde_json::Value> {
171 cli::handle_cli_command(self, args).await
172 }
173}
174
175#[derive(Debug, Clone, Default)]
176pub struct WalletClientInit;
177
178impl ModuleInit for WalletClientInit {
179 type Common = WalletCommonInit;
180
181 async fn dump_database(
182 &self,
183 _dbtx: &mut DatabaseTransaction<'_>,
184 _prefix_names: Vec<String>,
185 ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
186 Box::new(BTreeMap::new().into_iter())
187 }
188}
189
190#[apply(async_trait_maybe_send!)]
191impl ClientModuleInit for WalletClientInit {
192 type Module = WalletClientModule;
193
194 fn supported_api_versions(&self) -> MultiApiVersion {
195 MultiApiVersion::try_from_iter([ApiVersion { major: 0, minor: 0 }])
196 .expect("no version conflicts")
197 }
198
199 async fn init(&self, args: &ClientModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
200 let module = WalletClientModule {
201 root_secret: args.module_root_secret().clone(),
202 cfg: args.cfg().clone(),
203 notifier: args.notifier().clone(),
204 client_ctx: args.context(),
205 db: args.db().clone(),
206 module_api: args.module_api().clone(),
207 };
208
209 module.spawn_output_scanner(args.task_group(), args.client_span());
210
211 Ok(module)
212 }
213
214 fn get_database_migrations(&self) -> BTreeMap<DatabaseVersion, ClientModuleMigrationFn> {
215 BTreeMap::new()
216 }
217
218 fn used_db_prefixes(&self) -> Option<BTreeSet<u8>> {
219 Some(db::DbKeyPrefix::iter().map(|p| p as u8).collect())
220 }
221}
222
223impl WalletClientModule {
224 pub fn get_network(&self) -> bitcoin::Network {
226 self.cfg.network
227 }
228
229 pub async fn total_value(&self) -> FederationResult<bitcoin::Amount> {
231 self.module_api
232 .federation_wallet()
233 .await
234 .map(|tx_out| tx_out.map_or(bitcoin::Amount::ZERO, |tx_out| tx_out.value))
235 }
236
237 pub async fn block_count(&self) -> FederationResult<u64> {
239 self.module_api.consensus_block_count().await
240 }
241
242 pub async fn feerate(&self) -> FederationResult<Option<u64>> {
244 self.module_api.consensus_feerate().await
245 }
246
247 pub async fn pending_tx_chain(&self) -> FederationResult<Vec<TxInfo>> {
249 self.module_api.pending_tx_chain().await
250 }
251
252 pub async fn tx_chain(&self) -> FederationResult<Vec<TxInfo>> {
254 self.module_api.tx_chain().await
255 }
256
257 pub async fn send_fee(&self) -> Result<bitcoin::Amount, SendError> {
259 self.module_api
260 .send_fee()
261 .await
262 .map_err(|e| SendError::FederationError(e.to_string()))?
263 .ok_or(SendError::NoConsensusFeerateAvailable)
264 }
265
266 pub async fn receive_fee(&self) -> anyhow::Result<bitcoin::Amount> {
268 self.module_api
269 .receive_fee()
270 .await?
271 .ok_or_else(|| anyhow!("No consensus feerate is available"))
272 }
273
274 pub async fn send(
276 &self,
277 address: Address<NetworkUnchecked>,
278 value: bitcoin::Amount,
279 fee: Option<bitcoin::Amount>,
280 custom_meta: serde_json::Value,
281 ) -> Result<OperationId, SendError> {
282 if !address.is_valid_for_network(self.cfg.network) {
283 return Err(SendError::WrongNetwork);
284 }
285
286 if value < self.cfg.dust_limit {
287 return Err(SendError::DustValue);
288 }
289
290 let fee = match fee {
291 Some(value) => value,
292 None => self
293 .module_api
294 .send_fee()
295 .await
296 .map_err(|e| SendError::FederationError(e.to_string()))?
297 .ok_or(SendError::NoConsensusFeerateAvailable)?,
298 };
299
300 let operation_id = OperationId::new_random();
301
302 let destination = StandardScript::from_address(&address.clone().assume_checked())
303 .ok_or(SendError::UnsupportedAddress)?;
304
305 let client_output = ClientOutput::<WalletOutput> {
306 output: WalletOutput::V0(WalletOutputV0 {
307 destination,
308 value,
309 fee,
310 }),
311 amounts: Amounts::new_bitcoin(Amount::from_sats((value + fee).to_sat())),
312 };
313
314 let client_output_sm = ClientOutputSM::<WalletClientStateMachines> {
315 state_machines: Arc::new(move |range: OutPointRange| {
316 vec![WalletClientStateMachines::Send(SendStateMachine {
317 common: SendSMCommon {
318 operation_id,
319 outpoint: OutPoint {
320 txid: range.txid(),
321 out_idx: 0,
322 },
323 value,
324 fee,
325 },
326 state: SendSMState::Funding,
327 })]
328 }),
329 };
330
331 let client_output_bundle = self.client_ctx.make_client_outputs(ClientOutputBundle::new(
332 vec![client_output],
333 vec![client_output_sm],
334 ));
335
336 let address_clone = address.clone();
337
338 self.client_ctx
339 .finalize_and_submit_transaction(
340 operation_id,
341 WalletCommonInit::KIND.as_str(),
342 move |change_outpoint_range| {
343 WalletOperationMeta::Send(SendMeta {
344 change_outpoint_range,
345 address: address_clone.clone(),
346 value,
347 fee,
348 custom_meta: custom_meta.clone(),
349 })
350 },
351 TransactionBuilder::new().with_outputs(client_output_bundle),
352 )
353 .await
354 .map_err(|_| SendError::InsufficientFunds)?;
355
356 let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
357
358 self.client_ctx
359 .log_event(
360 &mut dbtx,
361 SendPaymentEvent {
362 operation_id,
363 address,
364 value,
365 fee,
366 },
367 )
368 .await;
369
370 dbtx.commit_tx().await;
371
372 Ok(operation_id)
373 }
374
375 pub async fn await_final_send_operation_state(
377 &self,
378 operation_id: OperationId,
379 ) -> anyhow::Result<FinalSendOperationState> {
380 let operation = self.client_ctx.get_operation(operation_id).await?;
381 let mut stream = self.notifier.subscribe(operation_id).await;
382
383 let mut stream = self
384 .client_ctx
385 .outcome_or_updates(operation, operation_id, move || {
386 async_stream::stream! {
387 loop {
388 if let Some(WalletClientStateMachines::Send(state)) = stream.next().await {
389 match state.state {
390 SendSMState::Funding => {}
391 SendSMState::Success(txid) => {
392 yield FinalSendOperationState::Success(txid);
393 return;
394 }
395 SendSMState::Aborted(..) => {
396 yield FinalSendOperationState::Aborted;
397 return;
398 }
399 SendSMState::Failure => {
400 yield FinalSendOperationState::Failure;
401 return;
402 }
403 }
404 }
405 }
406 }
407 })
408 .into_stream();
409
410 let mut final_state = None;
411
412 while let Some(state) = stream.next().await {
413 final_state = Some(state);
414 }
415
416 Ok(final_state.expect("Stream contains one final state"))
417 }
418
419 pub async fn await_final_receive_operation_state(
421 &self,
422 operation_id: OperationId,
423 ) -> anyhow::Result<FinalReceiveOperationState> {
424 let operation = self.client_ctx.get_operation(operation_id).await?;
425 let mut stream = self.notifier.subscribe(operation_id).await;
426
427 let mut stream = self
428 .client_ctx
429 .outcome_or_updates(operation, operation_id, move || {
430 async_stream::stream! {
431 loop {
432 if let Some(WalletClientStateMachines::Receive(state)) = stream.next().await {
433 match state.state {
434 ReceiveSMState::Funding => {}
435 ReceiveSMState::Success => {
436 yield FinalReceiveOperationState::Success;
437 return;
438 }
439 ReceiveSMState::Aborted(..) => {
440 yield FinalReceiveOperationState::Aborted;
441 return;
442 }
443 }
444 }
445 }
446 }
447 })
448 .into_stream();
449
450 let mut final_state = None;
451
452 while let Some(state) = stream.next().await {
453 final_state = Some(state);
454 }
455
456 Ok(final_state.expect("Stream contains one final state"))
457 }
458
459 pub async fn receive(&self) -> Address {
462 loop {
463 if let Some(entry) = self
464 .db
465 .begin_transaction_nc()
466 .await
467 .find_by_prefix_sorted_descending(&ValidAddressIndexPrefix)
468 .await
469 .next()
470 .await
471 {
472 return self.derive_address(entry.0.0);
473 }
474
475 sleep(Duration::from_secs(1)).await;
476 }
477 }
478
479 fn derive_address(&self, index: u64) -> Address {
480 descriptor(
481 &self.cfg.bitcoin_pks,
482 &self.derive_tweak(index).public_key().consensus_hash(),
483 )
484 .address(self.cfg.network)
485 }
486
487 fn derive_tweak(&self, index: u64) -> Keypair {
488 self.root_secret
489 .child_key(ChildId(index))
490 .to_secp_key(secp256k1::SECP256K1)
491 }
492
493 #[allow(clippy::maybe_infinite_iter)]
495 fn next_valid_index(&self, start_index: u64) -> u64 {
496 let pks_hash = self.cfg.bitcoin_pks.consensus_hash();
497
498 block_in_place(|| {
499 (start_index..)
500 .find(|i| is_potential_receive(&self.derive_address(*i).script_pubkey(), &pks_hash))
501 .expect("Will always find a valid index")
502 })
503 }
504
505 async fn receive_output(
507 &self,
508 output_index: u64,
509 value: bitcoin::Amount,
510 address_index: u64,
511 fee: bitcoin::Amount,
512 outpoint: Option<bitcoin::OutPoint>,
513 ) -> (OperationId, TransactionId) {
514 let operation_id = OperationId::new_random();
515
516 let client_input = ClientInput::<WalletInput> {
517 input: WalletInput::V0(WalletInputV0 {
518 output_index,
519 fee,
520 tweak: self.derive_tweak(address_index).public_key(),
521 }),
522 keys: vec![self.derive_tweak(address_index)],
523 amounts: Amounts::new_bitcoin(Amount::from_sats((value - fee).to_sat())),
524 };
525
526 let client_input_sm = ClientInputSM::<WalletClientStateMachines> {
527 state_machines: Arc::new(move |range: OutPointRange| {
528 vec![WalletClientStateMachines::Receive(ReceiveStateMachine {
529 common: ReceiveSMCommon {
530 operation_id,
531 txid: range.txid(),
532 value,
533 fee,
534 },
535 state: ReceiveSMState::Funding,
536 })]
537 }),
538 };
539
540 let client_input_bundle = self.client_ctx.make_client_inputs(ClientInputBundle::new(
541 vec![client_input],
542 vec![client_input_sm],
543 ));
544
545 let address = self.derive_address(address_index).as_unchecked().clone();
546
547 let meta_address = address.clone();
548 let range = self
549 .client_ctx
550 .finalize_and_submit_transaction(
551 operation_id,
552 WalletCommonInit::KIND.as_str(),
553 move |change_outpoint_range| {
554 WalletOperationMeta::Receive(ReceiveMeta {
555 change_outpoint_range,
556 value,
557 fee,
558 address: Some(meta_address.clone()),
559 outpoint,
560 })
561 },
562 TransactionBuilder::new().with_inputs(client_input_bundle),
563 )
564 .await
565 .expect("Input amount is sufficient to finalize transaction");
566
567 let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
568
569 self.client_ctx
570 .log_event(
571 &mut dbtx,
572 ReceivePaymentEvent {
573 operation_id,
574 value,
575 fee,
576 address,
577 outpoint,
578 },
579 )
580 .await;
581
582 dbtx.commit_tx().await;
583
584 (operation_id, range.txid())
585 }
586
587 fn spawn_output_scanner(&self, task_group: &TaskGroup, client_span: &tracing::Span) {
588 let module = self.clone();
589
590 task_group.spawn_cancellable_with_span(client_span.clone(), "output-scanner", async move {
591 let mut dbtx = module.db.begin_transaction().await;
592
593 if dbtx
594 .find_by_prefix(&ValidAddressIndexPrefix)
595 .await
596 .next()
597 .await
598 .is_none()
599 {
600 dbtx.insert_new_entry(&ValidAddressIndexKey(module.next_valid_index(0)), &())
601 .await;
602 }
603
604 dbtx.commit_tx().await;
605
606 loop {
607 match module.check_outputs().await {
608 Ok(skip_wait) => {
609 if skip_wait {
610 continue;
611 }
612 }
613 Err(e) => {
614 warn!(target: LOG_CLIENT_MODULE_WALLETV2, "Failed to fetch outputs: {e}");
615 }
616 }
617
618 sleep(fedimint_walletv2_common::sleep_duration()).await;
619 }
620 });
621 }
622
623 async fn check_outputs(&self) -> anyhow::Result<bool> {
624 let mut dbtx = self.db.begin_transaction_nc().await;
625
626 let next_output_index = dbtx.get_value(&NextOutputIndexKey).await.unwrap_or(0);
627
628 let mut valid_indices: Vec<u64> = dbtx
629 .find_by_prefix(&ValidAddressIndexPrefix)
630 .await
631 .map(|entry| entry.0.0)
632 .collect()
633 .await;
634
635 let mut address_map: BTreeMap<ScriptBuf, u64> = valid_indices
636 .iter()
637 .map(|&i| (self.derive_address(i).script_pubkey(), i))
638 .collect();
639
640 let outputs = self
641 .module_api
642 .output_info_slice(next_output_index, next_output_index + SLICE_SIZE)
643 .await?;
644
645 let returned_num = outputs.len();
646 let mut matched_num: usize = 0;
647
648 for output in &outputs {
649 if let Some(&address_index) = address_map.get(&output.script) {
650 matched_num += 1;
651 let next_address_index = valid_indices
652 .last()
653 .copied()
654 .expect("we have at least one address index");
655
656 if address_index == next_address_index {
658 let index = self.next_valid_index(next_address_index + 1);
659
660 let mut dbtx = self.db.begin_transaction().await;
661
662 dbtx.insert_entry(&ValidAddressIndexKey(index), &()).await;
663
664 dbtx.commit_tx_result().await?;
665
666 valid_indices.push(index);
667
668 address_map.insert(self.derive_address(index).script_pubkey(), index);
669 }
670
671 if !output.spent {
672 if self.module_api.pending_tx_chain().await?.len() >= 3 {
675 return Ok(false);
676 }
677
678 let receive_fee = self
679 .module_api
680 .receive_fee()
681 .await?
682 .ok_or(anyhow!("No consensus feerate is available"))?;
683
684 if output.value > receive_fee {
685 let (operation_id, txid) = self
686 .receive_output(
687 output.index,
688 output.value,
689 address_index,
690 receive_fee,
691 output.outpoint,
692 )
693 .await;
694
695 self.client_ctx
696 .transaction_updates(operation_id)
697 .await
698 .await_tx_accepted(txid)
699 .await
700 .map_err(|e| anyhow!("Claim transaction was rejected: {e}"))?;
701 }
702 }
703 }
704
705 let mut dbtx = self.db.begin_transaction().await;
706
707 dbtx.insert_entry(&NextOutputIndexKey, &(output.index + 1))
708 .await;
709
710 dbtx.commit_tx_result().await?;
711 }
712
713 debug!(
714 target: LOG_CLIENT_MODULE_WALLETV2,
715 next_output_index,
716 returned_num,
717 matched_num,
718 valid_indices_num = valid_indices.len(),
719 "Scanning for outputs"
720 );
721
722 Ok(!outputs.is_empty())
723 }
724}
725
726#[derive(Error, Debug, Clone, Eq, PartialEq)]
727pub enum SendError {
728 #[error("Address is from a different network than the federation.")]
729 WrongNetwork,
730 #[error("The value is too small")]
731 DustValue,
732 #[error("Federation returned an error: {0}")]
733 FederationError(String),
734 #[error("No consensus feerate is available at this time")]
735 NoConsensusFeerateAvailable,
736 #[error("The client does not have sufficient funds to send the payment")]
737 InsufficientFunds,
738 #[error("Unsupported address type")]
739 UnsupportedAddress,
740}
741
742#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
743pub enum WalletClientStateMachines {
744 Send(send_sm::SendStateMachine),
745 Receive(receive_sm::ReceiveStateMachine),
746}
747
748impl State for WalletClientStateMachines {
749 type ModuleContext = WalletClientContext;
750
751 fn transitions(
752 &self,
753 context: &Self::ModuleContext,
754 global_context: &DynGlobalClientContext,
755 ) -> Vec<StateTransition<Self>> {
756 match self {
757 WalletClientStateMachines::Send(sm) => sm_enum_variant_translation!(
758 sm.transitions(context, global_context),
759 WalletClientStateMachines::Send
760 ),
761 WalletClientStateMachines::Receive(sm) => sm_enum_variant_translation!(
762 sm.transitions(context, global_context),
763 WalletClientStateMachines::Receive
764 ),
765 }
766 }
767
768 fn operation_id(&self) -> OperationId {
769 match self {
770 WalletClientStateMachines::Send(sm) => sm.operation_id(),
771 WalletClientStateMachines::Receive(sm) => sm.operation_id(),
772 }
773 }
774}
775
776impl IntoDynInstance for WalletClientStateMachines {
777 type DynType = DynState;
778
779 fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
780 DynState::from_typed(instance_id, self)
781 }
782}