Skip to main content

fedimint_walletv2_client/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::missing_errors_doc)]
3#![allow(clippy::missing_panics_doc)]
4#![allow(clippy::must_use_candidate)]
5#![allow(clippy::module_name_repetitions)]
6
7pub use fedimint_walletv2_common as common;
8
9mod api;
10#[cfg(feature = "cli")]
11mod cli;
12mod db;
13pub mod events;
14mod receive_sm;
15mod send_sm;
16
17use std::collections::{BTreeMap, BTreeSet};
18use std::sync::Arc;
19use std::time::Duration;
20
21use anyhow::anyhow;
22use api::WalletFederationApi;
23use bitcoin::address::NetworkUnchecked;
24use bitcoin::{Address, ScriptBuf};
25use db::{NextOutputIndexKey, ValidAddressIndexKey, ValidAddressIndexPrefix};
26use events::{ReceivePaymentEvent, SendPaymentEvent};
27use fedimint_api_client::api::{DynModuleApi, FederationResult};
28use fedimint_client::DynGlobalClientContext;
29use fedimint_client::transaction::{
30    ClientInput, ClientInputBundle, ClientInputSM, ClientOutput, ClientOutputBundle,
31    ClientOutputSM, TransactionBuilder,
32};
33use fedimint_client_module::db::ClientModuleMigrationFn;
34use fedimint_client_module::module::init::{ClientModuleInit, ClientModuleInitArgs};
35use fedimint_client_module::module::recovery::NoModuleBackup;
36use fedimint_client_module::module::{ClientContext, ClientModule, OutPointRange};
37use fedimint_client_module::sm::{Context, DynState, ModuleNotifier, State, StateTransition};
38use fedimint_client_module::sm_enum_variant_translation;
39use fedimint_core::core::{IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId};
40use fedimint_core::db::{
41    Database, DatabaseTransaction, DatabaseVersion, IDatabaseTransactionOpsCoreTyped,
42};
43use fedimint_core::encoding::{Decodable, Encodable};
44use fedimint_core::module::{
45    AmountUnit, Amounts, ApiVersion, CommonModuleInit, ModuleCommon, ModuleInit, MultiApiVersion,
46};
47use fedimint_core::task::{TaskGroup, block_in_place, sleep};
48use fedimint_core::{Amount, OutPoint, TransactionId, apply, async_trait_maybe_send};
49use fedimint_derive_secret::{ChildId, DerivableSecret};
50use fedimint_logging::LOG_CLIENT_MODULE_WALLETV2;
51use fedimint_walletv2_common::config::WalletClientConfig;
52use fedimint_walletv2_common::{
53    KIND, StandardScript, TxInfo, WalletCommonInit, WalletInput, WalletInputV0, WalletModuleTypes,
54    WalletOutput, WalletOutputV0, descriptor, is_potential_receive,
55};
56use futures::StreamExt;
57use receive_sm::{ReceiveSMCommon, ReceiveSMState, ReceiveStateMachine};
58use secp256k1::Keypair;
59use send_sm::{SendSMCommon, SendSMState, SendStateMachine};
60use serde::{Deserialize, Serialize};
61use strum::IntoEnumIterator as _;
62use thiserror::Error;
63use tracing::{info, warn};
64
65/// Number of output info entries to scan per batch.
66const SLICE_SIZE: u64 = 1000;
67
68#[derive(Debug, Clone, Serialize, Deserialize)]
69pub enum WalletOperationMeta {
70    Send(SendMeta),
71    Receive(ReceiveMeta),
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct SendMeta {
76    pub change_outpoint_range: OutPointRange,
77    pub address: Address<NetworkUnchecked>,
78    pub value: bitcoin::Amount,
79    pub fee: bitcoin::Amount,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct ReceiveMeta {
84    pub change_outpoint_range: OutPointRange,
85    pub value: bitcoin::Amount,
86    pub fee: bitcoin::Amount,
87}
88
89/// The final state of an operation sending bitcoin onchain.
90#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
91pub enum FinalSendOperationState {
92    /// The transaction was successful.
93    Success(bitcoin::Txid),
94    /// The funding transaction was aborted.
95    Aborted,
96    /// A programming error has occurred or the federation is malicious.
97    Failure,
98}
99
100#[derive(Debug, Clone)]
101pub struct WalletClientModule {
102    root_secret: DerivableSecret,
103    cfg: WalletClientConfig,
104    notifier: ModuleNotifier<WalletClientStateMachines>,
105    client_ctx: ClientContext<Self>,
106    db: Database,
107    module_api: DynModuleApi,
108}
109
110#[derive(Debug, Clone)]
111pub struct WalletClientContext {
112    pub client_ctx: ClientContext<WalletClientModule>,
113}
114
115impl Context for WalletClientContext {
116    const KIND: Option<ModuleKind> = Some(KIND);
117}
118
119#[apply(async_trait_maybe_send!)]
120impl ClientModule for WalletClientModule {
121    type Init = WalletClientInit;
122    type Common = WalletModuleTypes;
123    type Backup = NoModuleBackup;
124    type ModuleStateMachineContext = WalletClientContext;
125    type States = WalletClientStateMachines;
126
127    fn context(&self) -> Self::ModuleStateMachineContext {
128        WalletClientContext {
129            client_ctx: self.client_ctx.clone(),
130        }
131    }
132
133    fn input_fee(
134        &self,
135        amount: &Amounts,
136        _input: &<Self::Common as ModuleCommon>::Input,
137    ) -> Option<Amounts> {
138        amount
139            .get(&AmountUnit::BITCOIN)
140            .map(|a| Amounts::new_bitcoin(self.cfg.fee_consensus.fee(*a)))
141    }
142
143    fn output_fee(
144        &self,
145        amount: &Amounts,
146        _output: &<Self::Common as ModuleCommon>::Output,
147    ) -> Option<Amounts> {
148        amount
149            .get(&AmountUnit::BITCOIN)
150            .map(|a| Amounts::new_bitcoin(self.cfg.fee_consensus.fee(*a)))
151    }
152
153    #[cfg(feature = "cli")]
154    async fn handle_cli_command(
155        &self,
156        args: &[std::ffi::OsString],
157    ) -> anyhow::Result<serde_json::Value> {
158        cli::handle_cli_command(self, args).await
159    }
160}
161
162#[derive(Debug, Clone, Default)]
163pub struct WalletClientInit;
164
165impl ModuleInit for WalletClientInit {
166    type Common = WalletCommonInit;
167
168    async fn dump_database(
169        &self,
170        _dbtx: &mut DatabaseTransaction<'_>,
171        _prefix_names: Vec<String>,
172    ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
173        Box::new(BTreeMap::new().into_iter())
174    }
175}
176
177#[apply(async_trait_maybe_send!)]
178impl ClientModuleInit for WalletClientInit {
179    type Module = WalletClientModule;
180
181    fn supported_api_versions(&self) -> MultiApiVersion {
182        MultiApiVersion::try_from_iter([ApiVersion { major: 0, minor: 0 }])
183            .expect("no version conflicts")
184    }
185
186    async fn init(&self, args: &ClientModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
187        let module = WalletClientModule {
188            root_secret: args.module_root_secret().clone(),
189            cfg: args.cfg().clone(),
190            notifier: args.notifier().clone(),
191            client_ctx: args.context(),
192            db: args.db().clone(),
193            module_api: args.module_api().clone(),
194        };
195
196        module.spawn_output_scanner(args.task_group());
197
198        Ok(module)
199    }
200
201    fn get_database_migrations(&self) -> BTreeMap<DatabaseVersion, ClientModuleMigrationFn> {
202        BTreeMap::new()
203    }
204
205    fn used_db_prefixes(&self) -> Option<BTreeSet<u8>> {
206        Some(db::DbKeyPrefix::iter().map(|p| p as u8).collect())
207    }
208}
209
210impl WalletClientModule {
211    /// Returns the Bitcoin network for this federation.
212    pub fn get_network(&self) -> bitcoin::Network {
213        self.cfg.network
214    }
215
216    /// Fetch the total value of bitcoin controlled by the federation.
217    pub async fn total_value(&self) -> FederationResult<bitcoin::Amount> {
218        self.module_api
219            .federation_wallet()
220            .await
221            .map(|tx_out| tx_out.map_or(bitcoin::Amount::ZERO, |tx_out| tx_out.value))
222    }
223
224    /// Fetch the consensus block count of the federation.
225    pub async fn block_count(&self) -> FederationResult<u64> {
226        self.module_api.consensus_block_count().await
227    }
228
229    /// Fetch the current consensus feerate.
230    pub async fn feerate(&self) -> FederationResult<Option<u64>> {
231        self.module_api.consensus_feerate().await
232    }
233
234    /// Fetch information on the chain of pending bitcoin transactions.
235    async fn pending_tx_chain(&self) -> FederationResult<Vec<TxInfo>> {
236        self.module_api.pending_tx_chain().await
237    }
238
239    /// Display log of bitcoin transactions.
240    async fn tx_chain(&self) -> FederationResult<Vec<TxInfo>> {
241        self.module_api.tx_chain().await
242    }
243
244    /// Fetch the current fee required to send an onchain payment.
245    pub async fn send_fee(&self) -> Result<bitcoin::Amount, SendError> {
246        self.module_api
247            .send_fee()
248            .await
249            .map_err(|e| SendError::FederationError(e.to_string()))?
250            .ok_or(SendError::NoConsensusFeerateAvailable)
251    }
252
253    /// Send an onchain payment with the given fee.
254    pub async fn send(
255        &self,
256        address: Address<NetworkUnchecked>,
257        value: bitcoin::Amount,
258        fee: Option<bitcoin::Amount>,
259    ) -> Result<OperationId, SendError> {
260        if !address.is_valid_for_network(self.cfg.network) {
261            return Err(SendError::WrongNetwork);
262        }
263
264        if value < self.cfg.dust_limit {
265            return Err(SendError::DustValue);
266        }
267
268        let fee = match fee {
269            Some(value) => value,
270            None => self
271                .module_api
272                .send_fee()
273                .await
274                .map_err(|e| SendError::FederationError(e.to_string()))?
275                .ok_or(SendError::NoConsensusFeerateAvailable)?,
276        };
277
278        let operation_id = OperationId::new_random();
279
280        let destination = StandardScript::from_address(&address.clone().assume_checked())
281            .ok_or(SendError::UnsupportedAddress)?;
282
283        let client_output = ClientOutput::<WalletOutput> {
284            output: WalletOutput::V0(WalletOutputV0 {
285                destination,
286                value,
287                fee,
288            }),
289            amounts: Amounts::new_bitcoin(Amount::from_sats((value + fee).to_sat())),
290        };
291
292        let client_output_sm = ClientOutputSM::<WalletClientStateMachines> {
293            state_machines: Arc::new(move |range: OutPointRange| {
294                vec![WalletClientStateMachines::Send(SendStateMachine {
295                    common: SendSMCommon {
296                        operation_id,
297                        outpoint: OutPoint {
298                            txid: range.txid(),
299                            out_idx: 0,
300                        },
301                        value,
302                        fee,
303                    },
304                    state: SendSMState::Funding,
305                })]
306            }),
307        };
308
309        let client_output_bundle = self.client_ctx.make_client_outputs(ClientOutputBundle::new(
310            vec![client_output],
311            vec![client_output_sm],
312        ));
313
314        let address_clone = address.clone();
315
316        self.client_ctx
317            .finalize_and_submit_transaction(
318                operation_id,
319                WalletCommonInit::KIND.as_str(),
320                move |change_outpoint_range| {
321                    WalletOperationMeta::Send(SendMeta {
322                        change_outpoint_range,
323                        address: address_clone.clone(),
324                        value,
325                        fee,
326                    })
327                },
328                TransactionBuilder::new().with_outputs(client_output_bundle),
329            )
330            .await
331            .map_err(|_| SendError::InsufficientFunds)?;
332
333        let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
334
335        self.client_ctx
336            .log_event(
337                &mut dbtx,
338                SendPaymentEvent {
339                    operation_id,
340                    address,
341                    value,
342                    fee,
343                },
344            )
345            .await;
346
347        dbtx.commit_tx().await;
348
349        Ok(operation_id)
350    }
351
352    /// Await the final state of the send operation.
353    pub async fn await_final_send_operation_state(
354        &self,
355        operation_id: OperationId,
356    ) -> FinalSendOperationState {
357        let mut stream = self.notifier.subscribe(operation_id).await;
358
359        loop {
360            let Some(WalletClientStateMachines::Send(state)) = stream.next().await else {
361                panic!("stream must produce a terminal send state");
362            };
363
364            match state.state {
365                SendSMState::Funding => {}
366                SendSMState::Success(txid) => return FinalSendOperationState::Success(txid),
367                SendSMState::Aborted(..) => return FinalSendOperationState::Aborted,
368                SendSMState::Failure => return FinalSendOperationState::Failure,
369            }
370        }
371    }
372
373    /// Returns the next unused receive address, polling until the initial
374    /// address derivation has completed.
375    pub async fn receive(&self) -> Address {
376        loop {
377            if let Some(entry) = self
378                .db
379                .begin_transaction_nc()
380                .await
381                .find_by_prefix_sorted_descending(&ValidAddressIndexPrefix)
382                .await
383                .next()
384                .await
385            {
386                return self.derive_address(entry.0.0);
387            }
388
389            sleep(Duration::from_secs(1)).await;
390        }
391    }
392
393    fn derive_address(&self, index: u64) -> Address {
394        descriptor(
395            &self.cfg.bitcoin_pks,
396            &self.derive_tweak(index).public_key().consensus_hash(),
397        )
398        .address(self.cfg.network)
399    }
400
401    fn derive_tweak(&self, index: u64) -> Keypair {
402        self.root_secret
403            .child_key(ChildId(index))
404            .to_secp_key(secp256k1::SECP256K1)
405    }
406
407    /// Find the next valid index starting from (and including) `start_index`.
408    #[allow(clippy::maybe_infinite_iter)]
409    fn next_valid_index(&self, start_index: u64) -> u64 {
410        let pks_hash = self.cfg.bitcoin_pks.consensus_hash();
411
412        block_in_place(|| {
413            (start_index..)
414                .find(|i| is_potential_receive(&self.derive_address(*i).script_pubkey(), &pks_hash))
415                .expect("Will always find a valid index")
416        })
417    }
418
419    /// Issue ecash for an unspent output with a given fee.
420    async fn receive_output(
421        &self,
422        output_index: u64,
423        value: bitcoin::Amount,
424        address_index: u64,
425        fee: bitcoin::Amount,
426    ) -> (OperationId, TransactionId) {
427        let operation_id = OperationId::new_random();
428
429        let client_input = ClientInput::<WalletInput> {
430            input: WalletInput::V0(WalletInputV0 {
431                output_index,
432                fee,
433                tweak: self.derive_tweak(address_index).public_key(),
434            }),
435            keys: vec![self.derive_tweak(address_index)],
436            amounts: Amounts::new_bitcoin(Amount::from_sats((value - fee).to_sat())),
437        };
438
439        let client_input_sm = ClientInputSM::<WalletClientStateMachines> {
440            state_machines: Arc::new(move |range: OutPointRange| {
441                vec![WalletClientStateMachines::Receive(ReceiveStateMachine {
442                    common: ReceiveSMCommon {
443                        operation_id,
444                        txid: range.txid(),
445                        value,
446                        fee,
447                    },
448                    state: ReceiveSMState::Funding,
449                })]
450            }),
451        };
452
453        let client_input_bundle = self.client_ctx.make_client_inputs(ClientInputBundle::new(
454            vec![client_input],
455            vec![client_input_sm],
456        ));
457
458        let range = self
459            .client_ctx
460            .finalize_and_submit_transaction(
461                operation_id,
462                WalletCommonInit::KIND.as_str(),
463                move |change_outpoint_range| {
464                    WalletOperationMeta::Receive(ReceiveMeta {
465                        change_outpoint_range,
466                        value,
467                        fee,
468                    })
469                },
470                TransactionBuilder::new().with_inputs(client_input_bundle),
471            )
472            .await
473            .expect("Input amount is sufficient to finalize transaction");
474
475        let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
476
477        self.client_ctx
478            .log_event(
479                &mut dbtx,
480                ReceivePaymentEvent {
481                    operation_id,
482                    address: self.derive_address(address_index).as_unchecked().clone(),
483                    value,
484                    fee,
485                },
486            )
487            .await;
488
489        dbtx.commit_tx().await;
490
491        (operation_id, range.txid())
492    }
493
494    fn spawn_output_scanner(&self, task_group: &TaskGroup) {
495        let module = self.clone();
496
497        task_group.spawn_cancellable("output-scanner", async move {
498            let mut dbtx = module.db.begin_transaction().await;
499
500            if dbtx
501                .find_by_prefix(&ValidAddressIndexPrefix)
502                .await
503                .next()
504                .await
505                .is_none()
506            {
507                dbtx.insert_new_entry(&ValidAddressIndexKey(module.next_valid_index(0)), &())
508                    .await;
509            }
510
511            dbtx.commit_tx().await;
512
513            loop {
514                match module.check_outputs().await {
515                    Ok(skip_wait) => {
516                        if skip_wait {
517                            continue;
518                        }
519                    }
520                    Err(e) => {
521                        warn!(target: LOG_CLIENT_MODULE_WALLETV2, "Failed to fetch outputs: {e}");
522                    }
523                }
524
525                sleep(fedimint_walletv2_common::sleep_duration()).await;
526            }
527        });
528    }
529
530    async fn check_outputs(&self) -> anyhow::Result<bool> {
531        let mut dbtx = self.db.begin_transaction_nc().await;
532
533        let next_output_index = dbtx.get_value(&NextOutputIndexKey).await.unwrap_or(0);
534
535        let mut valid_indices: Vec<u64> = dbtx
536            .find_by_prefix(&ValidAddressIndexPrefix)
537            .await
538            .map(|entry| entry.0.0)
539            .collect()
540            .await;
541
542        let mut address_map: BTreeMap<ScriptBuf, u64> = valid_indices
543            .iter()
544            .map(|&i| (self.derive_address(i).script_pubkey(), i))
545            .collect();
546
547        let outputs = self
548            .module_api
549            .output_info_slice(next_output_index, next_output_index + SLICE_SIZE)
550            .await?;
551
552        info!(
553            target: LOG_CLIENT_MODULE_WALLETV2,
554            "Scanning for outputs..."
555        );
556
557        for output in &outputs {
558            if let Some(&address_index) = address_map.get(&output.script) {
559                let next_address_index = valid_indices
560                    .last()
561                    .copied()
562                    .expect("we have at least one address index");
563
564                // If we used the highest valid index, add the next valid one
565                if address_index == next_address_index {
566                    let index = self.next_valid_index(next_address_index + 1);
567
568                    let mut dbtx = self.db.begin_transaction().await;
569
570                    dbtx.insert_entry(&ValidAddressIndexKey(index), &()).await;
571
572                    dbtx.commit_tx_result().await?;
573
574                    valid_indices.push(index);
575
576                    address_map.insert(self.derive_address(index).script_pubkey(), index);
577                }
578
579                if !output.spent {
580                    // In order to not overpay on fees we choose to wait,
581                    // the congestion will clear up within a few blocks.
582                    if self.module_api.pending_tx_chain().await?.len() >= 3 {
583                        return Ok(false);
584                    }
585
586                    let receive_fee = self
587                        .module_api
588                        .receive_fee()
589                        .await?
590                        .ok_or(anyhow!("No consensus feerate is available"))?;
591
592                    if output.value > receive_fee {
593                        let (operation_id, txid) = self
594                            .receive_output(output.index, output.value, address_index, receive_fee)
595                            .await;
596
597                        self.client_ctx
598                            .transaction_updates(operation_id)
599                            .await
600                            .await_tx_accepted(txid)
601                            .await
602                            .map_err(|e| anyhow!("Claim transaction was rejected: {e}"))?;
603                    }
604                }
605            }
606
607            let mut dbtx = self.db.begin_transaction().await;
608
609            dbtx.insert_entry(&NextOutputIndexKey, &(output.index + 1))
610                .await;
611
612            dbtx.commit_tx_result().await?;
613        }
614
615        Ok(!outputs.is_empty())
616    }
617}
618
619#[derive(Error, Debug, Clone, Eq, PartialEq)]
620pub enum SendError {
621    #[error("Address is from a different network than the federation.")]
622    WrongNetwork,
623    #[error("The value is too small")]
624    DustValue,
625    #[error("Federation returned an error: {0}")]
626    FederationError(String),
627    #[error("No consensus feerate is available at this time")]
628    NoConsensusFeerateAvailable,
629    #[error("The client does not have sufficient funds to send the payment")]
630    InsufficientFunds,
631    #[error("Unsupported address type")]
632    UnsupportedAddress,
633}
634
635#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
636pub enum WalletClientStateMachines {
637    Send(send_sm::SendStateMachine),
638    Receive(receive_sm::ReceiveStateMachine),
639}
640
641impl State for WalletClientStateMachines {
642    type ModuleContext = WalletClientContext;
643
644    fn transitions(
645        &self,
646        context: &Self::ModuleContext,
647        global_context: &DynGlobalClientContext,
648    ) -> Vec<StateTransition<Self>> {
649        match self {
650            WalletClientStateMachines::Send(sm) => sm_enum_variant_translation!(
651                sm.transitions(context, global_context),
652                WalletClientStateMachines::Send
653            ),
654            WalletClientStateMachines::Receive(sm) => sm_enum_variant_translation!(
655                sm.transitions(context, global_context),
656                WalletClientStateMachines::Receive
657            ),
658        }
659    }
660
661    fn operation_id(&self) -> OperationId {
662        match self {
663            WalletClientStateMachines::Send(sm) => sm.operation_id(),
664            WalletClientStateMachines::Receive(sm) => sm.operation_id(),
665        }
666    }
667}
668
669impl IntoDynInstance for WalletClientStateMachines {
670    type DynType = DynState;
671
672    fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
673        DynState::from_typed(instance_id, self)
674    }
675}