Skip to main content

fedimint_walletv2_client/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::missing_errors_doc)]
3#![allow(clippy::missing_panics_doc)]
4#![allow(clippy::must_use_candidate)]
5#![allow(clippy::module_name_repetitions)]
6
7use std::collections::{BTreeMap, BTreeSet};
8use std::sync::Arc;
9
10use anyhow::anyhow;
11use api::WalletFederationApi;
12use bitcoin::address::NetworkUnchecked;
13use bitcoin::{Address, ScriptBuf};
14use db::{NextDepositIndexKey, ValidAddressIndexKey, ValidAddressIndexPrefix};
15use fedimint_api_client::api::{DynModuleApi, FederationResult};
16use fedimint_client::DynGlobalClientContext;
17use fedimint_client::transaction::{
18    ClientInput, ClientInputBundle, ClientInputSM, ClientOutput, ClientOutputBundle,
19    ClientOutputSM, TransactionBuilder,
20};
21use fedimint_client_module::db::ClientModuleMigrationFn;
22use fedimint_client_module::module::init::{ClientModuleInit, ClientModuleInitArgs};
23use fedimint_client_module::module::recovery::NoModuleBackup;
24use fedimint_client_module::module::{ClientContext, ClientModule, OutPointRange};
25use fedimint_client_module::sm::{Context, DynState, ModuleNotifier, State, StateTransition};
26use fedimint_client_module::sm_enum_variant_translation;
27use fedimint_core::core::{IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId};
28use fedimint_core::db::{
29    Database, DatabaseTransaction, DatabaseVersion, IDatabaseTransactionOpsCoreTyped,
30};
31use fedimint_core::encoding::{Decodable, Encodable};
32use fedimint_core::module::{
33    AmountUnit, Amounts, ApiVersion, CommonModuleInit, ModuleCommon, ModuleInit, MultiApiVersion,
34};
35use fedimint_core::task::{TaskGroup, sleep};
36use fedimint_core::{Amount, TransactionId, apply, async_trait_maybe_send};
37use fedimint_derive_secret::{ChildId, DerivableSecret};
38use fedimint_logging::LOG_CLIENT_MODULE_WALLETV2;
39use fedimint_walletv2_common::config::WalletClientConfig;
40use fedimint_walletv2_common::{
41    KIND, StandardScript, TxInfo, WalletCommonInit, WalletInput, WalletInputV0, WalletModuleTypes,
42    WalletOutput, WalletOutputV0, descriptor, is_potential_receive,
43};
44use futures::StreamExt;
45use secp256k1::Keypair;
46use serde::{Deserialize, Serialize};
47use strum::IntoEnumIterator as _;
48use thiserror::Error;
49use tracing::{info, warn};
50
51mod api;
52#[cfg(feature = "cli")]
53mod cli;
54mod db;
55pub mod events;
56mod receive_sm;
57mod send_sm;
58
59use events::{ReceivePaymentEvent, SendPaymentEvent};
60use receive_sm::{ReceiveSMCommon, ReceiveSMState, ReceiveStateMachine};
61use send_sm::{SendSMCommon, SendSMState, SendStateMachine};
62
63/// Number of deposit log entries to scan per batch.
64const DEPOSIT_RANGE_SIZE: u64 = 1000;
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub enum WalletOperationMeta {
68    Send(SendMeta),
69    Receive(ReceiveMeta),
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct SendMeta {
74    pub change_outpoint_range: OutPointRange,
75    pub address: Address<NetworkUnchecked>,
76    pub amount: bitcoin::Amount,
77    pub fee: bitcoin::Amount,
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct ReceiveMeta {
82    pub change_outpoint_range: OutPointRange,
83    pub amount: bitcoin::Amount,
84    pub fee: bitcoin::Amount,
85}
86
87/// The final state of an operation sending bitcoin on-chain.
88#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
89pub enum FinalSendOperationState {
90    /// The transaction was successful.
91    Success(bitcoin::Txid),
92    /// The funding transaction was aborted.
93    Aborted,
94    /// A programming error has occurred or the federation is malicious.
95    Failure,
96}
97
98#[derive(Debug, Clone)]
99pub struct WalletClientModule {
100    root_secret: DerivableSecret,
101    cfg: WalletClientConfig,
102    notifier: ModuleNotifier<WalletClientStateMachines>,
103    client_ctx: ClientContext<Self>,
104    db: Database,
105    module_api: DynModuleApi,
106}
107
108#[derive(Debug, Clone)]
109pub struct WalletClientContext {
110    pub client_ctx: ClientContext<WalletClientModule>,
111}
112
113impl Context for WalletClientContext {
114    const KIND: Option<ModuleKind> = Some(KIND);
115}
116
117#[apply(async_trait_maybe_send!)]
118impl ClientModule for WalletClientModule {
119    type Init = WalletClientInit;
120    type Common = WalletModuleTypes;
121    type Backup = NoModuleBackup;
122    type ModuleStateMachineContext = WalletClientContext;
123    type States = WalletClientStateMachines;
124
125    fn context(&self) -> Self::ModuleStateMachineContext {
126        WalletClientContext {
127            client_ctx: self.client_ctx.clone(),
128        }
129    }
130
131    fn input_fee(
132        &self,
133        amount: &Amounts,
134        _input: &<Self::Common as ModuleCommon>::Input,
135    ) -> Option<Amounts> {
136        amount
137            .get(&AmountUnit::BITCOIN)
138            .map(|a| Amounts::new_bitcoin(self.cfg.fee_consensus.fee(*a)))
139    }
140
141    fn output_fee(
142        &self,
143        amount: &Amounts,
144        _output: &<Self::Common as ModuleCommon>::Output,
145    ) -> Option<Amounts> {
146        amount
147            .get(&AmountUnit::BITCOIN)
148            .map(|a| Amounts::new_bitcoin(self.cfg.fee_consensus.fee(*a)))
149    }
150
151    #[cfg(feature = "cli")]
152    async fn handle_cli_command(
153        &self,
154        args: &[std::ffi::OsString],
155    ) -> anyhow::Result<serde_json::Value> {
156        cli::handle_cli_command(self, args).await
157    }
158}
159
160#[derive(Debug, Clone, Default)]
161pub struct WalletClientInit;
162
163impl ModuleInit for WalletClientInit {
164    type Common = WalletCommonInit;
165
166    async fn dump_database(
167        &self,
168        _dbtx: &mut DatabaseTransaction<'_>,
169        _prefix_names: Vec<String>,
170    ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
171        Box::new(BTreeMap::new().into_iter())
172    }
173}
174
175#[apply(async_trait_maybe_send!)]
176impl ClientModuleInit for WalletClientInit {
177    type Module = WalletClientModule;
178
179    fn supported_api_versions(&self) -> MultiApiVersion {
180        MultiApiVersion::try_from_iter([ApiVersion { major: 0, minor: 0 }])
181            .expect("no version conflicts")
182    }
183
184    async fn init(&self, args: &ClientModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
185        let module = WalletClientModule {
186            root_secret: args.module_root_secret().clone(),
187            cfg: args.cfg().clone(),
188            notifier: args.notifier().clone(),
189            client_ctx: args.context(),
190            db: args.db().clone(),
191            module_api: args.module_api().clone(),
192        };
193
194        module.spawn_deposit_scanner(args.task_group());
195
196        Ok(module)
197    }
198
199    fn get_database_migrations(&self) -> BTreeMap<DatabaseVersion, ClientModuleMigrationFn> {
200        BTreeMap::new()
201    }
202
203    fn used_db_prefixes(&self) -> Option<BTreeSet<u8>> {
204        Some(db::DbKeyPrefix::iter().map(|p| p as u8).collect())
205    }
206}
207
208impl WalletClientModule {
209    /// Returns the Bitcoin network for this federation.
210    pub fn get_network(&self) -> bitcoin::Network {
211        self.cfg.network
212    }
213
214    /// Fetch the total value of bitcoin controlled by the federation.
215    pub async fn total_value(&self) -> FederationResult<bitcoin::Amount> {
216        self.module_api
217            .federation_wallet()
218            .await
219            .map(|tx_out| tx_out.map_or(bitcoin::Amount::ZERO, |tx_out| tx_out.value))
220    }
221
222    /// Fetch the consensus block count of the federation.
223    pub async fn block_count(&self) -> FederationResult<u64> {
224        self.module_api.consensus_block_count().await
225    }
226
227    /// Fetch the current consensus feerate.
228    pub async fn feerate(&self) -> FederationResult<Option<u64>> {
229        self.module_api.consensus_feerate().await
230    }
231
232    /// Fetch information on the chain of pending bitcoin transactions.
233    async fn pending_tx_chain(&self) -> FederationResult<Vec<TxInfo>> {
234        self.module_api.pending_tx_chain().await
235    }
236
237    /// Display log of bitcoin transactions.
238    async fn tx_chain(&self, n: usize) -> FederationResult<Vec<TxInfo>> {
239        self.module_api.tx_chain(n).await
240    }
241
242    /// Fetch the current fee required to send an on-chain payment.
243    pub async fn send_fee(&self) -> Result<bitcoin::Amount, SendError> {
244        self.module_api
245            .send_fee()
246            .await
247            .map_err(|e| SendError::FederationError(e.to_string()))?
248            .ok_or(SendError::NoConsensusFeerateAvailable)
249    }
250
251    /// Send an on-chain payment with the given fee.
252    pub async fn send(
253        &self,
254        address: Address<NetworkUnchecked>,
255        amount: bitcoin::Amount,
256        fee: Option<bitcoin::Amount>,
257    ) -> Result<OperationId, SendError> {
258        if !address.is_valid_for_network(self.cfg.network) {
259            return Err(SendError::WrongNetwork);
260        }
261
262        if amount < self.cfg.dust_limit {
263            return Err(SendError::DustAmount);
264        }
265
266        let fee = match fee {
267            Some(value) => value,
268            None => self
269                .module_api
270                .send_fee()
271                .await
272                .map_err(|e| SendError::FederationError(e.to_string()))?
273                .ok_or(SendError::NoConsensusFeerateAvailable)?,
274        };
275
276        let operation_id = OperationId::new_random();
277
278        let client_output = ClientOutput::<WalletOutput> {
279            output: WalletOutput::V0(WalletOutputV0 {
280                destination: StandardScript::from_address(&address.clone().assume_checked())
281                    .ok_or(SendError::UnsupportedAddress)?,
282                value: amount,
283                fee,
284            }),
285            amounts: Amounts::new_bitcoin(Amount::from_sats((amount + fee).to_sat())),
286        };
287
288        let client_output_sm = ClientOutputSM::<WalletClientStateMachines> {
289            state_machines: Arc::new(move |range: OutPointRange| {
290                vec![WalletClientStateMachines::Send(SendStateMachine {
291                    common: SendSMCommon {
292                        operation_id,
293                        outpoint: range.into_iter().next().expect("must have one output"),
294                        amount,
295                        fee,
296                    },
297                    state: SendSMState::Funding,
298                })]
299            }),
300        };
301
302        let client_output_bundle = self.client_ctx.make_client_outputs(ClientOutputBundle::new(
303            vec![client_output],
304            vec![client_output_sm],
305        ));
306
307        self.client_ctx
308            .finalize_and_submit_transaction(
309                operation_id,
310                WalletCommonInit::KIND.as_str(),
311                move |change_outpoint_range| {
312                    WalletOperationMeta::Send(SendMeta {
313                        change_outpoint_range,
314                        address: address.clone(),
315                        amount,
316                        fee,
317                    })
318                },
319                TransactionBuilder::new().with_outputs(client_output_bundle),
320            )
321            .await
322            .map_err(|_| SendError::InsufficientFunds)?;
323
324        let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
325
326        self.client_ctx
327            .log_event(
328                &mut dbtx,
329                SendPaymentEvent {
330                    operation_id,
331                    amount,
332                    fee,
333                },
334            )
335            .await;
336
337        dbtx.commit_tx().await;
338
339        Ok(operation_id)
340    }
341
342    /// Await the final state of the send operation.
343    pub async fn await_final_send_operation_state(
344        &self,
345        operation_id: OperationId,
346    ) -> FinalSendOperationState {
347        let mut stream = self.notifier.subscribe(operation_id).await;
348
349        loop {
350            let Some(WalletClientStateMachines::Send(state)) = stream.next().await else {
351                panic!("stream must produce a terminal send state");
352            };
353
354            match state.state {
355                SendSMState::Funding => {}
356                SendSMState::Success(txid) => return FinalSendOperationState::Success(txid),
357                SendSMState::Aborted(..) => return FinalSendOperationState::Aborted,
358                SendSMState::Failure => return FinalSendOperationState::Failure,
359            }
360        }
361    }
362
363    /// Returns the next unused address.
364    pub async fn receive(&self) -> Address {
365        if let Some(entry) = self
366            .db
367            .begin_transaction_nc()
368            .await
369            .find_by_prefix_sorted_descending(&ValidAddressIndexPrefix)
370            .await
371            .next()
372            .await
373        {
374            self.derive_address(entry.0.0)
375        } else {
376            self.derive_address(self.next_valid_index(0))
377        }
378    }
379
380    fn derive_address(&self, index: u64) -> Address {
381        descriptor(
382            &self.cfg.bitcoin_pks,
383            &self.derive_tweak(index).public_key().consensus_hash(),
384        )
385        .address(self.cfg.network)
386    }
387
388    fn derive_tweak(&self, index: u64) -> Keypair {
389        self.root_secret
390            .child_key(ChildId(index))
391            .to_secp_key(secp256k1::SECP256K1)
392    }
393
394    /// Find the next valid index starting from (and including) `start_index`.
395    #[allow(clippy::maybe_infinite_iter)]
396    fn next_valid_index(&self, start_index: u64) -> u64 {
397        let pks_hash = self.cfg.bitcoin_pks.consensus_hash();
398
399        (start_index..)
400            .find(|i| is_potential_receive(&self.derive_address(*i).script_pubkey(), &pks_hash))
401            .expect("Will always find a valid index")
402    }
403
404    /// Issue ecash for an unspent deposit with a given fee.
405    async fn receive_deposit(
406        &self,
407        deposit_index: u64,
408        amount: bitcoin::Amount,
409        address_index: u64,
410        fee: bitcoin::Amount,
411    ) -> (OperationId, TransactionId) {
412        let operation_id = OperationId::new_random();
413
414        let client_input = ClientInput::<WalletInput> {
415            input: WalletInput::V0(WalletInputV0 {
416                deposit_index,
417                fee,
418                tweak: self.derive_tweak(address_index).public_key(),
419            }),
420            keys: vec![self.derive_tweak(address_index)],
421            amounts: Amounts::new_bitcoin(Amount::from_sats((amount - fee).to_sat())),
422        };
423
424        let client_input_sm = ClientInputSM::<WalletClientStateMachines> {
425            state_machines: Arc::new(move |range: OutPointRange| {
426                vec![WalletClientStateMachines::Receive(ReceiveStateMachine {
427                    common: ReceiveSMCommon {
428                        operation_id,
429                        txid: range.txid(),
430                        amount,
431                        fee,
432                    },
433                    state: ReceiveSMState::Funding,
434                })]
435            }),
436        };
437
438        let client_input_bundle = self.client_ctx.make_client_inputs(ClientInputBundle::new(
439            vec![client_input],
440            vec![client_input_sm],
441        ));
442
443        let range = self
444            .client_ctx
445            .finalize_and_submit_transaction(
446                operation_id,
447                WalletCommonInit::KIND.as_str(),
448                move |change_outpoint_range| {
449                    WalletOperationMeta::Receive(ReceiveMeta {
450                        change_outpoint_range,
451                        amount,
452                        fee,
453                    })
454                },
455                TransactionBuilder::new().with_inputs(client_input_bundle),
456            )
457            .await
458            .expect("Input amount is sufficient to finalize transaction");
459
460        let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
461
462        self.client_ctx
463            .log_event(
464                &mut dbtx,
465                ReceivePaymentEvent {
466                    operation_id,
467                    amount,
468                    fee,
469                },
470            )
471            .await;
472
473        dbtx.commit_tx().await;
474
475        (operation_id, range.txid())
476    }
477
478    fn spawn_deposit_scanner(&self, task_group: &TaskGroup) {
479        let module = self.clone();
480
481        task_group.spawn_cancellable("deposit-scanner", async move {
482            let mut dbtx = module.db.begin_transaction().await;
483
484            if dbtx
485                .find_by_prefix(&ValidAddressIndexPrefix)
486                .await
487                .next()
488                .await
489                .is_none()
490            {
491                dbtx.insert_new_entry(&ValidAddressIndexKey(module.next_valid_index(0)), &())
492                    .await;
493            }
494
495            dbtx.commit_tx().await;
496
497            loop {
498                match module.check_deposits().await {
499                    Ok(skip_wait) => {
500                        if skip_wait {
501                            continue;
502                        }
503                    }
504                    Err(e) => {
505                        warn!(target: LOG_CLIENT_MODULE_WALLETV2, "Failed to fetch deposits: {e}");
506                    }
507                }
508
509                sleep(fedimint_walletv2_common::sleep_duration()).await;
510            }
511        });
512    }
513
514    async fn check_deposits(&self) -> anyhow::Result<bool> {
515        let mut dbtx = self.db.begin_transaction_nc().await;
516
517        let next_deposit_index = dbtx.get_value(&NextDepositIndexKey).await.unwrap_or(0);
518
519        let mut valid_indices: Vec<u64> = dbtx
520            .find_by_prefix(&ValidAddressIndexPrefix)
521            .await
522            .map(|entry| entry.0.0)
523            .collect()
524            .await;
525
526        let mut address_map: BTreeMap<ScriptBuf, u64> = valid_indices
527            .iter()
528            .map(|&i| (self.derive_address(i).script_pubkey(), i))
529            .collect();
530
531        let deposit_range = self
532            .module_api
533            .deposit_range(next_deposit_index, next_deposit_index + DEPOSIT_RANGE_SIZE)
534            .await?;
535
536        info!(
537            target: LOG_CLIENT_MODULE_WALLETV2,
538            "Scanning for deposits..."
539        );
540
541        for (deposit_index, tx_out) in (next_deposit_index..).zip(deposit_range.deposits.clone()) {
542            if let Some(&address_index) = address_map.get(&tx_out.script_pubkey) {
543                let receive_fee = self
544                    .module_api
545                    .receive_fee()
546                    .await?
547                    .ok_or(anyhow!("No consensus feerate is available"))?;
548
549                if tx_out.value > receive_fee && !deposit_range.spent.contains(&deposit_index) {
550                    // In order to not overpay on fees we choose to wait, the congestion will clear
551                    // up within a few blocks.
552                    if self.module_api.pending_tx_chain().await?.len() >= 3 {
553                        return Ok(false);
554                    }
555
556                    let (operation_id, txid) = self
557                        .receive_deposit(deposit_index, tx_out.value, address_index, receive_fee)
558                        .await;
559
560                    self.client_ctx
561                        .transaction_updates(operation_id)
562                        .await
563                        .await_tx_accepted(txid)
564                        .await
565                        .map_err(|e| anyhow!("Claim transaction for deposit was rejected: {e}"))?;
566                }
567
568                let next_address_index = valid_indices
569                    .last()
570                    .copied()
571                    .expect("we have at least one address index");
572
573                // If we used the highest valid index, add the next valid one
574                if address_index == next_address_index {
575                    let index = self.next_valid_index(next_address_index + 1);
576
577                    let mut dbtx = self.db.begin_transaction().await;
578
579                    dbtx.insert_entry(&ValidAddressIndexKey(index), &()).await;
580
581                    dbtx.commit_tx_result().await?;
582
583                    valid_indices.push(index);
584
585                    address_map.insert(self.derive_address(index).script_pubkey(), index);
586                }
587            }
588
589            let mut dbtx = self.db.begin_transaction().await;
590
591            dbtx.insert_entry(&NextDepositIndexKey, &(deposit_index + 1))
592                .await;
593
594            dbtx.commit_tx_result().await?;
595        }
596
597        Ok(deposit_range.deposits.len() as u64 == DEPOSIT_RANGE_SIZE)
598    }
599}
600
601#[derive(Error, Debug, Clone, Eq, PartialEq)]
602pub enum SendError {
603    #[error("Address is from a different network than the federation.")]
604    WrongNetwork,
605    #[error("The amount is too small to be sent on-chain")]
606    DustAmount,
607    #[error("Federation returned an error: {0}")]
608    FederationError(String),
609    #[error("No consensus feerate is available at this time")]
610    NoConsensusFeerateAvailable,
611    #[error("The client does not have sufficient funds to send the payment")]
612    InsufficientFunds,
613    #[error("Unsupported address type")]
614    UnsupportedAddress,
615}
616
617#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
618pub enum WalletClientStateMachines {
619    Send(send_sm::SendStateMachine),
620    Receive(receive_sm::ReceiveStateMachine),
621}
622
623impl State for WalletClientStateMachines {
624    type ModuleContext = WalletClientContext;
625
626    fn transitions(
627        &self,
628        context: &Self::ModuleContext,
629        global_context: &DynGlobalClientContext,
630    ) -> Vec<StateTransition<Self>> {
631        match self {
632            WalletClientStateMachines::Send(sm) => sm_enum_variant_translation!(
633                sm.transitions(context, global_context),
634                WalletClientStateMachines::Send
635            ),
636            WalletClientStateMachines::Receive(sm) => sm_enum_variant_translation!(
637                sm.transitions(context, global_context),
638                WalletClientStateMachines::Receive
639            ),
640        }
641    }
642
643    fn operation_id(&self) -> OperationId {
644        match self {
645            WalletClientStateMachines::Send(sm) => sm.operation_id(),
646            WalletClientStateMachines::Receive(sm) => sm.operation_id(),
647        }
648    }
649}
650
651impl IntoDynInstance for WalletClientStateMachines {
652    type DynType = DynState;
653
654    fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
655        DynState::from_typed(instance_id, self)
656    }
657}