Skip to main content

fedimint_walletv2_client/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::missing_errors_doc)]
3#![allow(clippy::missing_panics_doc)]
4#![allow(clippy::must_use_candidate)]
5#![allow(clippy::module_name_repetitions)]
6
7pub use fedimint_walletv2_common as common;
8
9mod api;
10#[cfg(feature = "cli")]
11mod cli;
12mod db;
13pub mod events;
14mod receive_sm;
15mod send_sm;
16
17use std::collections::{BTreeMap, BTreeSet};
18use std::sync::Arc;
19use std::time::Duration;
20
21use anyhow::anyhow;
22use api::WalletFederationApi;
23use bitcoin::address::NetworkUnchecked;
24use bitcoin::{Address, ScriptBuf};
25use db::{NextOutputIndexKey, ValidAddressIndexKey, ValidAddressIndexPrefix};
26use events::{ReceivePaymentEvent, SendPaymentEvent};
27use fedimint_api_client::api::{DynModuleApi, FederationResult};
28use fedimint_client::DynGlobalClientContext;
29use fedimint_client::transaction::{
30    ClientInput, ClientInputBundle, ClientInputSM, ClientOutput, ClientOutputBundle,
31    ClientOutputSM, FeeQuote, FeeQuoteRequest, TransactionBuilder,
32};
33use fedimint_client_module::db::ClientModuleMigrationFn;
34use fedimint_client_module::module::init::{ClientModuleInit, ClientModuleInitArgs};
35use fedimint_client_module::module::recovery::NoModuleBackup;
36use fedimint_client_module::module::{ClientContext, ClientModule, OutPointRange};
37use fedimint_client_module::sm::{Context, DynState, ModuleNotifier, State, StateTransition};
38use fedimint_client_module::sm_enum_variant_translation;
39use fedimint_core::core::{IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId};
40use fedimint_core::db::{
41    Database, DatabaseTransaction, DatabaseVersion, IDatabaseTransactionOpsCoreTyped,
42};
43use fedimint_core::encoding::{Decodable, Encodable};
44use fedimint_core::module::{
45    AmountUnit, Amounts, ApiVersion, CommonModuleInit, ModuleCommon, ModuleInit, MultiApiVersion,
46};
47use fedimint_core::task::{TaskGroup, TaskHandle, sleep};
48use fedimint_core::{Amount, OutPoint, TransactionId, apply, async_trait_maybe_send};
49use fedimint_derive_secret::{ChildId, DerivableSecret};
50use fedimint_eventlog::{Event, EventLogId};
51use fedimint_logging::LOG_CLIENT_MODULE_WALLETV2;
52use fedimint_walletv2_common::config::WalletClientConfig;
53use fedimint_walletv2_common::{
54    KIND, OutputInfo, StandardScript, TxInfo, WalletCommonInit, WalletInput, WalletInputV0,
55    WalletModuleTypes, WalletOutput, WalletOutputV0, descriptor, is_potential_receive,
56};
57use futures::StreamExt;
58use receive_sm::{ReceiveSMCommon, ReceiveSMState, ReceiveStateMachine};
59use secp256k1::Keypair;
60use send_sm::{SendSMCommon, SendSMState, SendStateMachine};
61use serde::{Deserialize, Serialize};
62use strum::IntoEnumIterator as _;
63use thiserror::Error;
64use tracing::{debug, warn};
65
66/// Number of output info entries to scan per batch.
67const SLICE_SIZE: u64 = 1000;
68
69/// Number of event log entries to read per batch.
70const EVENT_LOG_PAGE_SIZE: u64 = 1000;
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
73pub enum WalletOperationMeta {
74    Send(SendMeta),
75    Receive(ReceiveMeta),
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct SendMeta {
80    pub change_outpoint_range: OutPointRange,
81    pub address: Address<NetworkUnchecked>,
82    pub value: bitcoin::Amount,
83    pub fee: bitcoin::Amount,
84    #[serde(default)]
85    pub custom_meta: serde_json::Value,
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct ReceiveMeta {
90    pub change_outpoint_range: OutPointRange,
91    pub value: bitcoin::Amount,
92    pub fee: bitcoin::Amount,
93    pub address: Option<Address<NetworkUnchecked>>,
94    pub outpoint: Option<bitcoin::OutPoint>,
95}
96
97/// The final state of an operation sending bitcoin onchain.
98#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
99pub enum FinalSendOperationState {
100    /// The transaction was successful.
101    Success(bitcoin::Txid),
102    /// The funding transaction was aborted.
103    Aborted,
104    /// A programming error has occurred or the federation is malicious.
105    Failure,
106}
107
108/// The final state of an operation receiving bitcoin onchain.
109#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
110pub enum FinalReceiveOperationState {
111    /// The federation accepted the claiming transaction.
112    Success,
113    /// The federation rejected the claiming transaction.
114    Aborted,
115}
116
117#[derive(Debug, Clone)]
118pub struct WalletClientModule {
119    root_secret: DerivableSecret,
120    cfg: WalletClientConfig,
121    notifier: ModuleNotifier<WalletClientStateMachines>,
122    client_ctx: ClientContext<Self>,
123    db: Database,
124    module_api: DynModuleApi,
125}
126
127#[derive(Debug, Clone)]
128pub struct WalletClientContext {
129    pub client_ctx: ClientContext<WalletClientModule>,
130}
131
132impl Context for WalletClientContext {
133    const KIND: Option<ModuleKind> = Some(KIND);
134}
135
136#[apply(async_trait_maybe_send!)]
137impl ClientModule for WalletClientModule {
138    type Init = WalletClientInit;
139    type Common = WalletModuleTypes;
140    type Backup = NoModuleBackup;
141    type ModuleStateMachineContext = WalletClientContext;
142    type States = WalletClientStateMachines;
143
144    fn context(&self) -> Self::ModuleStateMachineContext {
145        WalletClientContext {
146            client_ctx: self.client_ctx.clone(),
147        }
148    }
149
150    fn input_fee(
151        &self,
152        amount: &Amounts,
153        _input: &<Self::Common as ModuleCommon>::Input,
154    ) -> Option<Amounts> {
155        amount
156            .get(&AmountUnit::BITCOIN)
157            .map(|a| Amounts::new_bitcoin(self.cfg.fee_consensus.fee(*a)))
158    }
159
160    fn output_fee(
161        &self,
162        amount: &Amounts,
163        _output: &<Self::Common as ModuleCommon>::Output,
164    ) -> Option<Amounts> {
165        amount
166            .get(&AmountUnit::BITCOIN)
167            .map(|a| Amounts::new_bitcoin(self.cfg.fee_consensus.fee(*a)))
168    }
169
170    #[cfg(feature = "cli")]
171    async fn handle_cli_command(
172        &self,
173        args: &[std::ffi::OsString],
174    ) -> anyhow::Result<serde_json::Value> {
175        cli::handle_cli_command(self, args).await
176    }
177}
178
179#[derive(Debug, Clone, Default)]
180pub struct WalletClientInit;
181
182impl ModuleInit for WalletClientInit {
183    type Common = WalletCommonInit;
184
185    async fn dump_database(
186        &self,
187        _dbtx: &mut DatabaseTransaction<'_>,
188        _prefix_names: Vec<String>,
189    ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
190        Box::new(BTreeMap::new().into_iter())
191    }
192}
193
194#[apply(async_trait_maybe_send!)]
195impl ClientModuleInit for WalletClientInit {
196    type Module = WalletClientModule;
197
198    fn supported_api_versions(&self) -> MultiApiVersion {
199        MultiApiVersion::try_from_iter([ApiVersion { major: 0, minor: 0 }])
200            .expect("no version conflicts")
201    }
202
203    async fn init(&self, args: &ClientModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
204        let module = WalletClientModule {
205            root_secret: args.module_root_secret().clone(),
206            cfg: args.cfg().clone(),
207            notifier: args.notifier().clone(),
208            client_ctx: args.context(),
209            db: args.db().clone(),
210            module_api: args.module_api().clone(),
211        };
212
213        module.spawn_output_scanner(args.task_group(), args.client_span());
214
215        Ok(module)
216    }
217
218    fn get_database_migrations(&self) -> BTreeMap<DatabaseVersion, ClientModuleMigrationFn> {
219        BTreeMap::new()
220    }
221
222    fn used_db_prefixes(&self) -> Option<BTreeSet<u8>> {
223        Some(db::DbKeyPrefix::iter().map(|p| p as u8).collect())
224    }
225}
226
227impl WalletClientModule {
228    /// Returns the Bitcoin network for this federation.
229    pub fn get_network(&self) -> bitcoin::Network {
230        self.cfg.network
231    }
232
233    /// Fetch the total value of bitcoin controlled by the federation.
234    pub async fn total_value(&self) -> FederationResult<bitcoin::Amount> {
235        self.module_api
236            .federation_wallet()
237            .await
238            .map(|tx_out| tx_out.map_or(bitcoin::Amount::ZERO, |tx_out| tx_out.value))
239    }
240
241    /// Fetch the consensus block count of the federation.
242    pub async fn block_count(&self) -> FederationResult<u64> {
243        self.module_api.consensus_block_count().await
244    }
245
246    /// Fetch the current consensus feerate.
247    pub async fn feerate(&self) -> FederationResult<Option<u64>> {
248        self.module_api.consensus_feerate().await
249    }
250
251    /// Fetch information on the chain of pending bitcoin transactions.
252    pub async fn pending_tx_chain(&self) -> FederationResult<Vec<TxInfo>> {
253        self.module_api.pending_tx_chain().await
254    }
255
256    /// Display log of bitcoin transactions.
257    pub async fn tx_chain(&self) -> FederationResult<Vec<TxInfo>> {
258        self.module_api.tx_chain().await
259    }
260
261    /// Fetch the current fee required to send an onchain payment.
262    pub async fn send_fee(&self) -> Result<bitcoin::Amount, SendError> {
263        self.module_api
264            .send_fee()
265            .await
266            .map_err(|e| SendError::FederationError(e.to_string()))?
267            .ok_or(SendError::NoConsensusFeerateAvailable)
268    }
269
270    /// Computes the federation fee an onchain send of an output worth `amount`
271    /// (the payment amount plus the on-chain miner fee) would incur, without
272    /// submitting anything.
273    ///
274    /// A send submits a single wallet output worth `amount`; the primary module
275    /// balances it by spending ecash to fund the output and minting any change.
276    /// This quotes the fee of that transaction — the wallet output fee, the
277    /// mint input fees on the funding notes, any mint change output fees,
278    /// and sub-denomination dust — via the shared, module-agnostic fee
279    /// quote.
280    ///
281    /// The on-chain Bitcoin miner fee is deliberately excluded: it is part of
282    /// the output `amount` (see [`Self::send_fee`]), not the on-federation
283    /// transaction fee.
284    pub async fn send_fee_quote(&self, amount: bitcoin::Amount) -> anyhow::Result<FeeQuote> {
285        let amount = Amount::from_sats(amount.to_sat());
286        self.client_ctx
287            .fee_quote(
288                OperationId::new_random(),
289                FeeQuoteRequest {
290                    input_amount: Amounts::ZERO,
291                    output_amount: Amounts::new_bitcoin(amount),
292                    input_fee: Amounts::ZERO,
293                    output_fee: Amounts::new_bitcoin(self.cfg.fee_consensus.fee(amount)),
294                },
295            )
296            .await
297    }
298
299    /// Fetch the current fee required to claim an onchain deposit (peg-in).
300    pub async fn receive_fee(&self) -> Result<bitcoin::Amount, ReceiveError> {
301        self.module_api
302            .receive_fee()
303            .await
304            .map_err(|e| ReceiveError::FederationError(e.to_string()))?
305            .ok_or(ReceiveError::NoConsensusFeerateAvailable)
306    }
307
308    /// Send an onchain payment with the given fee.
309    pub async fn send(
310        &self,
311        address: Address<NetworkUnchecked>,
312        value: bitcoin::Amount,
313        fee: Option<bitcoin::Amount>,
314        custom_meta: serde_json::Value,
315    ) -> Result<OperationId, SendError> {
316        if !address.is_valid_for_network(self.cfg.network) {
317            return Err(SendError::WrongNetwork);
318        }
319
320        if value < self.cfg.dust_limit {
321            return Err(SendError::DustValue);
322        }
323
324        let fee = match fee {
325            Some(value) => value,
326            None => self
327                .module_api
328                .send_fee()
329                .await
330                .map_err(|e| SendError::FederationError(e.to_string()))?
331                .ok_or(SendError::NoConsensusFeerateAvailable)?,
332        };
333
334        let operation_id = OperationId::new_random();
335
336        let destination = StandardScript::from_address(&address.clone().assume_checked())
337            .ok_or(SendError::UnsupportedAddress)?;
338
339        let client_output = ClientOutput::<WalletOutput> {
340            output: WalletOutput::V0(WalletOutputV0 {
341                destination,
342                value,
343                fee,
344            }),
345            amounts: Amounts::new_bitcoin(Amount::from_sats((value + fee).to_sat())),
346        };
347
348        let client_output_sm = ClientOutputSM::<WalletClientStateMachines> {
349            state_machines: Arc::new(move |range: OutPointRange| {
350                vec![WalletClientStateMachines::Send(SendStateMachine {
351                    common: SendSMCommon {
352                        operation_id,
353                        outpoint: OutPoint {
354                            txid: range.txid(),
355                            out_idx: 0,
356                        },
357                        value,
358                        fee,
359                    },
360                    state: SendSMState::Funding,
361                })]
362            }),
363        };
364
365        let client_output_bundle = self.client_ctx.make_client_outputs(ClientOutputBundle::new(
366            vec![client_output],
367            vec![client_output_sm],
368        ));
369
370        let address_clone = address.clone();
371
372        self.client_ctx
373            .finalize_and_submit_transaction(
374                operation_id,
375                WalletCommonInit::KIND.as_str(),
376                move |change_outpoint_range| {
377                    WalletOperationMeta::Send(SendMeta {
378                        change_outpoint_range,
379                        address: address_clone.clone(),
380                        value,
381                        fee,
382                        custom_meta: custom_meta.clone(),
383                    })
384                },
385                TransactionBuilder::new().with_outputs(client_output_bundle),
386            )
387            .await
388            .map_err(|_| SendError::InsufficientFunds)?;
389
390        let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
391
392        self.client_ctx
393            .log_event(
394                &mut dbtx,
395                SendPaymentEvent {
396                    operation_id,
397                    address,
398                    value,
399                    fee,
400                },
401            )
402            .await;
403
404        dbtx.commit_tx().await;
405
406        Ok(operation_id)
407    }
408
409    /// Await the final state of the send operation.
410    pub async fn await_final_send_operation_state(
411        &self,
412        operation_id: OperationId,
413    ) -> anyhow::Result<FinalSendOperationState> {
414        let operation = self.client_ctx.get_operation(operation_id).await?;
415        let mut stream = self.notifier.subscribe(operation_id).await;
416
417        let mut stream = self
418            .client_ctx
419            .outcome_or_updates(operation, operation_id, move || {
420                async_stream::stream! {
421                    loop {
422                        if let Some(WalletClientStateMachines::Send(state)) = stream.next().await {
423                            match state.state {
424                                SendSMState::Funding => {}
425                                SendSMState::Success(txid) => {
426                                    yield FinalSendOperationState::Success(txid);
427                                    return;
428                                }
429                                SendSMState::Aborted(..) => {
430                                    yield FinalSendOperationState::Aborted;
431                                    return;
432                                }
433                                SendSMState::Failure => {
434                                    yield FinalSendOperationState::Failure;
435                                    return;
436                                }
437                            }
438                        }
439                    }
440                }
441            })
442            .into_stream();
443
444        let mut final_state = None;
445
446        while let Some(state) = stream.next().await {
447            final_state = Some(state);
448        }
449
450        Ok(final_state.expect("Stream contains one final state"))
451    }
452
453    /// Await the final state of the receive operation.
454    pub async fn await_final_receive_operation_state(
455        &self,
456        operation_id: OperationId,
457    ) -> anyhow::Result<FinalReceiveOperationState> {
458        let operation = self.client_ctx.get_operation(operation_id).await?;
459        let mut stream = self.notifier.subscribe(operation_id).await;
460
461        let mut stream = self
462            .client_ctx
463            .outcome_or_updates(operation, operation_id, move || {
464                async_stream::stream! {
465                    loop {
466                        if let Some(WalletClientStateMachines::Receive(state)) = stream.next().await {
467                            match state.state {
468                                ReceiveSMState::Funding => {}
469                                ReceiveSMState::Success => {
470                                    yield FinalReceiveOperationState::Success;
471                                    return;
472                                }
473                                ReceiveSMState::Aborted(..) => {
474                                    yield FinalReceiveOperationState::Aborted;
475                                    return;
476                                }
477                            }
478                        }
479                    }
480                }
481            })
482            .into_stream();
483
484        let mut final_state = None;
485
486        while let Some(state) = stream.next().await {
487            final_state = Some(state);
488        }
489
490        Ok(final_state.expect("Stream contains one final state"))
491    }
492
493    /// Returns the highest valid receive address index that the background
494    /// scanner has derived so far, or `None` if it has not derived one yet.
495    async fn valid_index(&self) -> Option<u64> {
496        self.db
497            .begin_transaction_nc()
498            .await
499            .find_by_prefix_sorted_descending(&ValidAddressIndexPrefix)
500            .await
501            .next()
502            .await
503            .map(|entry| entry.0.0)
504    }
505
506    /// Returns the next unused receive address.
507    ///
508    /// To wait for a payment to this address race-free, read the client's
509    /// current event log position (via the global `get_next_event_log_id`)
510    /// *before* calling this, then pass that position to
511    /// [`Self::await_receive`]; it will only consider payments received
512    /// after that position.
513    ///
514    /// If the background scanner has already derived a valid address index this
515    /// returns immediately. Otherwise it blocks, letting the scanner grind
516    /// until it finds the next valid index, and returns once one is
517    /// available.
518    pub async fn receive(&self) -> Address {
519        loop {
520            if let Some(index) = self.valid_index().await {
521                return self.derive_address(index);
522            }
523
524            sleep(Duration::from_secs(1)).await;
525        }
526    }
527
528    /// Block until the next on-chain payment recorded at or after `position` is
529    /// received and successfully claimed by the federation.
530    ///
531    /// Returns the peg-in's final state together with the event log position
532    /// just past it, so that a subsequent call can resume from there to wait
533    /// for the following receive.
534    ///
535    /// A peg-in attempt may be aborted (rejected by the federation), in which
536    /// case the still-unspent output is reprocessed into a new receive
537    /// operation; this keeps waiting until one succeeds.
538    pub async fn await_receive(
539        &self,
540        position: EventLogId,
541    ) -> anyhow::Result<(FinalReceiveOperationState, EventLogId)> {
542        let mut position = position;
543
544        loop {
545            let (operation_id, next_position) = self.next_receive_operation(position).await;
546
547            position = next_position;
548
549            let state = self
550                .await_final_receive_operation_state(operation_id)
551                .await?;
552
553            // A successful peg-in is terminal; an aborted one is retried as a
554            // new receive operation, so keep waiting.
555            if state == FinalReceiveOperationState::Success {
556                // Reaching `Success` only means the peg-in claim transaction was
557                // accepted into consensus. The ecash it mints is issued
558                // asynchronously by the primary module, so wait for those
559                // outputs before returning; otherwise the freshly claimed funds
560                // may not yet be reflected in the client's balance.
561                let operation = self.client_ctx.get_operation(operation_id).await?;
562
563                if let WalletOperationMeta::Receive(ReceiveMeta {
564                    change_outpoint_range,
565                    ..
566                }) = operation.meta::<WalletOperationMeta>()
567                {
568                    self.client_ctx
569                        .await_primary_module_outputs(
570                            operation_id,
571                            change_outpoint_range.into_iter().collect(),
572                        )
573                        .await?;
574                }
575
576                return Ok((state, position));
577            }
578        }
579    }
580
581    /// Scan the event log from `position` for the next [`ReceivePaymentEvent`],
582    /// blocking until one is found, and return its operation id together with
583    /// the event log position just past it.
584    async fn next_receive_operation(&self, position: EventLogId) -> (OperationId, EventLogId) {
585        let mut position = position;
586
587        loop {
588            let events = self
589                .client_ctx
590                .get_event_log(Some(position), EVENT_LOG_PAGE_SIZE)
591                .await;
592
593            for entry in &events {
594                position = entry.id().saturating_add(1);
595
596                if entry.module_kind() == Some(&KIND)
597                    && entry.kind == ReceivePaymentEvent::KIND
598                    && let Some(event) = entry.to_event::<ReceivePaymentEvent>()
599                {
600                    return (event.operation_id, position);
601                }
602            }
603
604            if events.is_empty() {
605                // Caught up with the log; wait for new events to be written.
606                sleep(Duration::from_secs(1)).await;
607            }
608        }
609    }
610
611    fn derive_address(&self, index: u64) -> Address {
612        descriptor(
613            &self.cfg.bitcoin_pks,
614            &self.derive_tweak(index).public_key().consensus_hash(),
615        )
616        .address(self.cfg.network)
617    }
618
619    fn derive_tweak(&self, index: u64) -> Keypair {
620        self.root_secret
621            .child_key(ChildId(index))
622            .to_secp_key(secp256k1::SECP256K1)
623    }
624
625    /// Find the next valid index starting from (and including) `start_index`.
626    ///
627    /// Only ~1/65536 indices are valid, so the search is CPU-bound and may scan
628    /// many indices before finding one. The scan runs in bounded batches and
629    /// yields to the executor between them, so it does not stall the runtime —
630    /// important on wasm, which is single-threaded. It stops and returns `None`
631    /// once the task group begins shutting down.
632    async fn next_valid_index(&self, start_index: u64, handle: &TaskHandle) -> Option<u64> {
633        /// Indices to scan per batch before yielding to the executor.
634        const SCAN_BATCH: u64 = 256;
635
636        let pks_hash = self.cfg.bitcoin_pks.consensus_hash();
637
638        let mut index = start_index;
639
640        while !handle.is_shutting_down() {
641            for _ in 0..SCAN_BATCH {
642                if is_potential_receive(&self.derive_address(index).script_pubkey(), &pks_hash) {
643                    return Some(index);
644                }
645
646                index += 1;
647            }
648
649            // Hand control back to the executor between batches.
650            sleep(Duration::ZERO).await;
651        }
652
653        None
654    }
655
656    /// Issue ecash for an unspent output with a given fee.
657    async fn receive_output(
658        &self,
659        output_index: u64,
660        value: bitcoin::Amount,
661        address_index: u64,
662        fee: bitcoin::Amount,
663        outpoint: Option<bitcoin::OutPoint>,
664    ) -> (OperationId, TransactionId) {
665        let operation_id = OperationId::new_random();
666
667        let client_input = ClientInput::<WalletInput> {
668            input: WalletInput::V0(WalletInputV0 {
669                output_index,
670                fee,
671                tweak: self.derive_tweak(address_index).public_key(),
672            }),
673            keys: vec![self.derive_tweak(address_index)],
674            amounts: Amounts::new_bitcoin(Amount::from_sats((value - fee).to_sat())),
675        };
676
677        let client_input_sm = ClientInputSM::<WalletClientStateMachines> {
678            state_machines: Arc::new(move |range: OutPointRange| {
679                vec![WalletClientStateMachines::Receive(ReceiveStateMachine {
680                    common: ReceiveSMCommon {
681                        operation_id,
682                        txid: range.txid(),
683                        value,
684                        fee,
685                    },
686                    state: ReceiveSMState::Funding,
687                })]
688            }),
689        };
690
691        let client_input_bundle = self.client_ctx.make_client_inputs(ClientInputBundle::new(
692            vec![client_input],
693            vec![client_input_sm],
694        ));
695
696        let address = self.derive_address(address_index).as_unchecked().clone();
697
698        let meta_address = address.clone();
699        let range = self
700            .client_ctx
701            .finalize_and_submit_transaction(
702                operation_id,
703                WalletCommonInit::KIND.as_str(),
704                move |change_outpoint_range| {
705                    WalletOperationMeta::Receive(ReceiveMeta {
706                        change_outpoint_range,
707                        value,
708                        fee,
709                        address: Some(meta_address.clone()),
710                        outpoint,
711                    })
712                },
713                TransactionBuilder::new().with_inputs(client_input_bundle),
714            )
715            .await
716            .expect("Input amount is sufficient to finalize transaction");
717
718        let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
719
720        self.client_ctx
721            .log_event(
722                &mut dbtx,
723                ReceivePaymentEvent {
724                    operation_id,
725                    value,
726                    fee,
727                    address,
728                    outpoint,
729                },
730            )
731            .await;
732
733        dbtx.commit_tx().await;
734
735        (operation_id, range.txid())
736    }
737
738    fn spawn_output_scanner(&self, task_group: &TaskGroup, client_span: &tracing::Span) {
739        let module = self.clone();
740        let handle = task_group.make_handle();
741
742        task_group.spawn_cancellable_with_span(client_span.clone(), "output-scanner", async move {
743            let mut dbtx = module.db.begin_transaction().await;
744
745            if dbtx
746                .find_by_prefix(&ValidAddressIndexPrefix)
747                .await
748                .next()
749                .await
750                .is_none()
751            {
752                let Some(index) = module.next_valid_index(0, &handle).await else {
753                    return;
754                };
755
756                dbtx.insert_new_entry(&ValidAddressIndexKey(index), &())
757                    .await;
758            }
759
760            dbtx.commit_tx().await;
761
762            loop {
763                match module.check_outputs(&handle).await {
764                    Ok(skip_wait) => {
765                        if skip_wait {
766                            continue;
767                        }
768                    }
769                    Err(e) => {
770                        warn!(target: LOG_CLIENT_MODULE_WALLETV2, "Failed to fetch outputs: {e}");
771                    }
772                }
773
774                sleep(fedimint_walletv2_common::sleep_duration()).await;
775            }
776        });
777    }
778
779    async fn check_outputs(&self, handle: &TaskHandle) -> anyhow::Result<bool> {
780        let mut dbtx = self.db.begin_transaction_nc().await;
781
782        let next_output_index = dbtx.get_value(&NextOutputIndexKey).await.unwrap_or(0);
783
784        let mut valid_indices: Vec<u64> = dbtx
785            .find_by_prefix(&ValidAddressIndexPrefix)
786            .await
787            .map(|entry| entry.0.0)
788            .collect()
789            .await;
790
791        let mut address_map: BTreeMap<ScriptBuf, u64> = valid_indices
792            .iter()
793            .map(|&i| (self.derive_address(i).script_pubkey(), i))
794            .collect();
795
796        let outputs = self
797            .module_api
798            .output_info_slice(next_output_index, next_output_index + SLICE_SIZE)
799            .await?;
800
801        let returned_num = outputs.len();
802        let mut matched_num: usize = 0;
803
804        for output in &outputs {
805            if let Some(&address_index) = address_map.get(&output.script) {
806                matched_num += 1;
807                let next_address_index = valid_indices
808                    .last()
809                    .copied()
810                    .expect("we have at least one address index");
811
812                // If we used the highest valid index, add the next valid one
813                if address_index == next_address_index {
814                    let Some(index) = self.next_valid_index(next_address_index + 1, handle).await
815                    else {
816                        return Ok(false);
817                    };
818
819                    let mut dbtx = self.db.begin_transaction().await;
820
821                    dbtx.insert_entry(&ValidAddressIndexKey(index), &()).await;
822
823                    dbtx.commit_tx_result().await?;
824
825                    valid_indices.push(index);
826
827                    address_map.insert(self.derive_address(index).script_pubkey(), index);
828                }
829
830                if !output.spent && !self.process_unspent_output(output, address_index).await? {
831                    return Ok(false);
832                }
833            }
834
835            let mut dbtx = self.db.begin_transaction().await;
836
837            dbtx.insert_entry(&NextOutputIndexKey, &(output.index + 1))
838                .await;
839
840            dbtx.commit_tx_result().await?;
841        }
842
843        debug!(
844            target: LOG_CLIENT_MODULE_WALLETV2,
845            next_output_index,
846            returned_num,
847            matched_num,
848            valid_indices_num = valid_indices.len(),
849            "Scanning for outputs"
850        );
851
852        Ok(!outputs.is_empty())
853    }
854
855    async fn process_unspent_output(
856        &self,
857        output: &OutputInfo,
858        address_index: u64,
859    ) -> anyhow::Result<bool> {
860        debug!(
861            target: LOG_CLIENT_MODULE_WALLETV2,
862            output_index = output.index,
863            value_sat = output.value.to_sat(),
864            address_index,
865            outpoint = ?output.outpoint,
866            "Discovered unspent walletv2 receive output"
867        );
868
869        // In order to not overpay on fees we choose to wait,
870        // the congestion will clear up within a few blocks.
871        let pending_tx_chain_len = self.module_api.pending_tx_chain().await?.len();
872        if 3 <= pending_tx_chain_len {
873            debug!(
874                target: LOG_CLIENT_MODULE_WALLETV2,
875                output_index = output.index,
876                pending_tx_chain_len,
877                "Delaying walletv2 receive claim because pending transaction chain is full"
878            );
879            return Ok(false);
880        }
881
882        let receive_fee = self
883            .module_api
884            .receive_fee()
885            .await?
886            .ok_or(anyhow!("No consensus feerate is available"))?;
887
888        if receive_fee < output.value {
889            debug!(
890                target: LOG_CLIENT_MODULE_WALLETV2,
891                output_index = output.index,
892                value_sat = output.value.to_sat(),
893                fee_sat = receive_fee.to_sat(),
894                "Submitting walletv2 receive claim"
895            );
896            let (operation_id, txid) = self
897                .receive_output(
898                    output.index,
899                    output.value,
900                    address_index,
901                    receive_fee,
902                    output.outpoint,
903                )
904                .await;
905
906            debug!(
907                target: LOG_CLIENT_MODULE_WALLETV2,
908                output_index = output.index,
909                ?operation_id,
910                %txid,
911                "Waiting for walletv2 receive claim acceptance"
912            );
913            self.client_ctx
914                .transaction_updates(operation_id)
915                .await
916                .await_tx_accepted(txid)
917                .await
918                .map_err(|e| anyhow!("Claim transaction was rejected: {e}"))?;
919            debug!(
920                target: LOG_CLIENT_MODULE_WALLETV2,
921                output_index = output.index,
922                ?operation_id,
923                %txid,
924                "Walletv2 receive claim accepted"
925            );
926        } else {
927            debug!(
928                target: LOG_CLIENT_MODULE_WALLETV2,
929                output_index = output.index,
930                value_sat = output.value.to_sat(),
931                fee_sat = receive_fee.to_sat(),
932                "Skipping walletv2 receive output because fee meets or exceeds value"
933            );
934        }
935
936        Ok(true)
937    }
938}
939
940#[derive(Error, Debug, Clone, Eq, PartialEq)]
941pub enum SendError {
942    #[error("Address is from a different network than the federation.")]
943    WrongNetwork,
944    #[error("The value is too small")]
945    DustValue,
946    #[error("Federation returned an error: {0}")]
947    FederationError(String),
948    #[error("No consensus feerate is available at this time")]
949    NoConsensusFeerateAvailable,
950    #[error("The client does not have sufficient funds to send the payment")]
951    InsufficientFunds,
952    #[error("Unsupported address type")]
953    UnsupportedAddress,
954}
955
956#[derive(Error, Debug, Clone, Eq, PartialEq)]
957pub enum ReceiveError {
958    #[error("Federation returned an error: {0}")]
959    FederationError(String),
960    #[error("No consensus feerate is available at this time")]
961    NoConsensusFeerateAvailable,
962}
963
964#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
965pub enum WalletClientStateMachines {
966    Send(send_sm::SendStateMachine),
967    Receive(receive_sm::ReceiveStateMachine),
968}
969
970impl State for WalletClientStateMachines {
971    type ModuleContext = WalletClientContext;
972
973    fn transitions(
974        &self,
975        context: &Self::ModuleContext,
976        global_context: &DynGlobalClientContext,
977    ) -> Vec<StateTransition<Self>> {
978        match self {
979            WalletClientStateMachines::Send(sm) => sm_enum_variant_translation!(
980                sm.transitions(context, global_context),
981                WalletClientStateMachines::Send
982            ),
983            WalletClientStateMachines::Receive(sm) => sm_enum_variant_translation!(
984                sm.transitions(context, global_context),
985                WalletClientStateMachines::Receive
986            ),
987        }
988    }
989
990    fn operation_id(&self) -> OperationId {
991        match self {
992            WalletClientStateMachines::Send(sm) => sm.operation_id(),
993            WalletClientStateMachines::Receive(sm) => sm.operation_id(),
994        }
995    }
996}
997
998impl IntoDynInstance for WalletClientStateMachines {
999    type DynType = DynState;
1000
1001    fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
1002        DynState::from_typed(instance_id, self)
1003    }
1004}