fedimint_wallet_client/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::missing_errors_doc)]
4#![allow(clippy::missing_panics_doc)]
5#![allow(clippy::module_name_repetitions)]
6#![allow(clippy::must_use_candidate)]
7
8pub mod api;
9#[cfg(feature = "cli")]
10mod cli;
11
12mod backup;
13
14pub mod client_db;
15/// Legacy, state-machine based peg-ins, replaced by `pegin_monitor`
16/// but retained for time being to ensure existing peg-ins complete.
17mod deposit;
18pub mod events;
19use events::SendPaymentEvent;
20/// Peg-in monitor: a task monitoring deposit addresses for peg-ins.
21mod pegin_monitor;
22mod withdraw;
23
24use std::collections::{BTreeMap, BTreeSet};
25use std::future;
26use std::sync::Arc;
27use std::time::{Duration, SystemTime};
28
29use anyhow::{Context as AnyhowContext, anyhow, bail, ensure};
30use async_stream::{stream, try_stream};
31use backup::WalletModuleBackup;
32use bitcoin::address::NetworkUnchecked;
33use bitcoin::secp256k1::{All, SECP256K1, Secp256k1};
34use bitcoin::{Address, Network, ScriptBuf};
35use client_db::{DbKeyPrefix, PegInTweakIndexKey, SupportsSafeDepositKey, TweakIdx};
36use fedimint_api_client::api::{DynModuleApi, FederationResult};
37use fedimint_bitcoind::{BitcoindTracked, DynBitcoindRpc, IBitcoindRpc, create_esplora_rpc};
38use fedimint_client_module::module::init::{
39    ClientModuleInit, ClientModuleInitArgs, ClientModuleRecoverArgs,
40};
41use fedimint_client_module::module::recovery::RecoveryProgress;
42use fedimint_client_module::module::{ClientContext, ClientModule, IClientModule, OutPointRange};
43use fedimint_client_module::oplog::UpdateStreamOrOutcome;
44use fedimint_client_module::sm::{Context, DynState, ModuleNotifier, State, StateTransition};
45use fedimint_client_module::transaction::{
46    ClientOutput, ClientOutputBundle, ClientOutputSM, TransactionBuilder,
47};
48use fedimint_client_module::{DynGlobalClientContext, sm_enum_variant_translation};
49use fedimint_core::core::{Decoder, IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId};
50use fedimint_core::db::{
51    AutocommitError, Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped,
52};
53use fedimint_core::encoding::{Decodable, Encodable};
54use fedimint_core::envs::{BitcoinRpcConfig, is_running_in_test_env};
55use fedimint_core::module::{
56    Amounts, ApiAuth, ApiVersion, CommonModuleInit, ModuleCommon, ModuleConsensusVersion,
57    ModuleInit, MultiApiVersion,
58};
59use fedimint_core::task::{MaybeSend, MaybeSync, TaskGroup, sleep};
60use fedimint_core::util::backoff_util::background_backoff;
61use fedimint_core::util::{BoxStream, backoff_util, retry};
62use fedimint_core::{
63    BitcoinHash, OutPoint, TransactionId, apply, async_trait_maybe_send, push_db_pair_items,
64    runtime, secp256k1,
65};
66use fedimint_derive_secret::{ChildId, DerivableSecret};
67use fedimint_logging::LOG_CLIENT_MODULE_WALLET;
68pub use fedimint_wallet_common as common;
69use fedimint_wallet_common::config::{FeeConsensus, WalletClientConfig};
70use fedimint_wallet_common::tweakable::Tweakable;
71pub use fedimint_wallet_common::*;
72use futures::{Stream, StreamExt};
73use rand::{Rng, thread_rng};
74use secp256k1::Keypair;
75use serde::{Deserialize, Serialize};
76use strum::IntoEnumIterator;
77use tokio::sync::watch;
78use tracing::{debug, instrument};
79
80use crate::api::WalletFederationApi;
81use crate::backup::{FEDERATION_RECOVER_MAX_GAP, RecoveryStateV2, WalletRecovery};
82use crate::client_db::{
83    ClaimedPegInData, ClaimedPegInKey, ClaimedPegInPrefix, NextPegInTweakIndexKey,
84    PegInTweakIndexData, PegInTweakIndexPrefix, RecoveryFinalizedKey, RecoveryStateKey,
85    SupportsSafeDepositPrefix,
86};
87use crate::deposit::DepositStateMachine;
88use crate::withdraw::{CreatedWithdrawState, WithdrawStateMachine, WithdrawStates};
89
90const WALLET_TWEAK_CHILD_ID: ChildId = ChildId(0);
91
92#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
93pub struct BitcoinTransactionData {
94    /// The bitcoin transaction is saved as soon as we see it so the transaction
95    /// can be re-transmitted if it's evicted from the mempool.
96    pub btc_transaction: bitcoin::Transaction,
97    /// Index of the deposit output
98    pub out_idx: u32,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
102pub enum DepositStateV1 {
103    WaitingForTransaction,
104    WaitingForConfirmation(BitcoinTransactionData),
105    Confirmed(BitcoinTransactionData),
106    Claimed(BitcoinTransactionData),
107    Failed(String),
108}
109
110#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
111pub enum DepositStateV2 {
112    WaitingForTransaction,
113    WaitingForConfirmation {
114        #[serde(with = "bitcoin::amount::serde::as_sat")]
115        btc_deposited: bitcoin::Amount,
116        btc_out_point: bitcoin::OutPoint,
117    },
118    Confirmed {
119        #[serde(with = "bitcoin::amount::serde::as_sat")]
120        btc_deposited: bitcoin::Amount,
121        btc_out_point: bitcoin::OutPoint,
122    },
123    Claimed {
124        #[serde(with = "bitcoin::amount::serde::as_sat")]
125        btc_deposited: bitcoin::Amount,
126        btc_out_point: bitcoin::OutPoint,
127    },
128    Failed(String),
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
132pub enum WithdrawState {
133    Created,
134    Succeeded(bitcoin::Txid),
135    Failed(String),
136    // TODO: track refund
137    // Refunded,
138    // RefundFailed(String),
139}
140
141async fn next_withdraw_state<S>(stream: &mut S) -> Option<WithdrawStates>
142where
143    S: Stream<Item = WalletClientStates> + Unpin,
144{
145    loop {
146        if let WalletClientStates::Withdraw(ds) = stream.next().await? {
147            return Some(ds.state);
148        }
149        tokio::task::yield_now().await;
150    }
151}
152
153#[derive(Debug, Clone, Default)]
154// TODO: should probably move to DB
155pub struct WalletClientInit(pub Option<DynBitcoindRpc>);
156
157const SLICE_SIZE: u64 = 1000;
158
159impl WalletClientInit {
160    pub fn new(rpc: DynBitcoindRpc) -> Self {
161        Self(Some(rpc))
162    }
163
164    async fn recover_from_slices(
165        &self,
166        args: &ClientModuleRecoverArgs<Self>,
167    ) -> anyhow::Result<()> {
168        let data = WalletClientModuleData {
169            cfg: args.cfg().clone(),
170            module_root_secret: args.module_root_secret().clone(),
171        };
172
173        let total_items = args.module_api().fetch_recovery_count().await?;
174
175        let mut state = RecoveryStateV2::new();
176
177        state.refill_pending_pool_up_to(&data, TweakIdx(FEDERATION_RECOVER_MAX_GAP));
178
179        for start in (0..total_items).step_by(SLICE_SIZE as usize) {
180            let end = std::cmp::min(start + SLICE_SIZE, total_items);
181
182            let items = args.module_api().fetch_recovery_slice(start, end).await?;
183
184            for item in &items {
185                match item {
186                    RecoveryItem::Input { outpoint, script } => {
187                        state.handle_item(*outpoint, script, &data);
188                    }
189                }
190            }
191
192            args.update_recovery_progress(RecoveryProgress {
193                complete: end.try_into().unwrap_or(u32::MAX),
194                total: total_items.try_into().unwrap_or(u32::MAX),
195            });
196        }
197
198        let mut dbtx = args.db().begin_transaction().await;
199
200        for tweak_idx in 0..state.new_start_idx().0 {
201            let operation_id = data.derive_peg_in_script(TweakIdx(tweak_idx)).3;
202
203            let claimed = state
204                .claimed_outpoints
205                .get(&TweakIdx(tweak_idx))
206                .cloned()
207                .unwrap_or_default();
208
209            dbtx.insert_new_entry(
210                &PegInTweakIndexKey(TweakIdx(tweak_idx)),
211                &PegInTweakIndexData {
212                    operation_id,
213                    creation_time: fedimint_core::time::now(),
214                    last_check_time: None,
215                    next_check_time: Some(fedimint_core::time::now()),
216                    claimed,
217                },
218            )
219            .await;
220        }
221
222        dbtx.insert_new_entry(&NextPegInTweakIndexKey, &state.new_start_idx())
223            .await;
224
225        dbtx.commit_tx().await;
226
227        Ok(())
228    }
229}
230
231impl ModuleInit for WalletClientInit {
232    type Common = WalletCommonInit;
233
234    async fn dump_database(
235        &self,
236        dbtx: &mut DatabaseTransaction<'_>,
237        prefix_names: Vec<String>,
238    ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
239        let mut wallet_client_items: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> =
240            BTreeMap::new();
241        let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
242            prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
243        });
244
245        for table in filtered_prefixes {
246            match table {
247                DbKeyPrefix::NextPegInTweakIndex => {
248                    if let Some(index) = dbtx.get_value(&NextPegInTweakIndexKey).await {
249                        wallet_client_items
250                            .insert("NextPegInTweakIndex".to_string(), Box::new(index));
251                    }
252                }
253                DbKeyPrefix::PegInTweakIndex => {
254                    push_db_pair_items!(
255                        dbtx,
256                        PegInTweakIndexPrefix,
257                        PegInTweakIndexKey,
258                        PegInTweakIndexData,
259                        wallet_client_items,
260                        "Peg-In Tweak Index"
261                    );
262                }
263                DbKeyPrefix::ClaimedPegIn => {
264                    push_db_pair_items!(
265                        dbtx,
266                        ClaimedPegInPrefix,
267                        ClaimedPegInKey,
268                        ClaimedPegInData,
269                        wallet_client_items,
270                        "Claimed Peg-In"
271                    );
272                }
273                DbKeyPrefix::RecoveryFinalized => {
274                    if let Some(val) = dbtx.get_value(&RecoveryFinalizedKey).await {
275                        wallet_client_items.insert("RecoveryFinalized".to_string(), Box::new(val));
276                    }
277                }
278                DbKeyPrefix::SupportsSafeDeposit => {
279                    push_db_pair_items!(
280                        dbtx,
281                        SupportsSafeDepositPrefix,
282                        SupportsSafeDepositKey,
283                        (),
284                        wallet_client_items,
285                        "Supports Safe Deposit"
286                    );
287                }
288                DbKeyPrefix::RecoveryState
289                | DbKeyPrefix::ExternalReservedStart
290                | DbKeyPrefix::CoreInternalReservedStart
291                | DbKeyPrefix::CoreInternalReservedEnd => {}
292            }
293        }
294
295        Box::new(wallet_client_items.into_iter())
296    }
297}
298
299#[apply(async_trait_maybe_send!)]
300impl ClientModuleInit for WalletClientInit {
301    type Module = WalletClientModule;
302
303    fn supported_api_versions(&self) -> MultiApiVersion {
304        MultiApiVersion::try_from_iter([ApiVersion { major: 0, minor: 0 }])
305            .expect("no version conflicts")
306    }
307
308    async fn init(&self, args: &ClientModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
309        let data = WalletClientModuleData {
310            cfg: args.cfg().clone(),
311            module_root_secret: args.module_root_secret().clone(),
312        };
313
314        let db = args.db().clone();
315
316        let rpc_config = WalletClientModule::get_rpc_config(args.cfg());
317
318        // Priority:
319        // 1. user-provided bitcoind RPC from ClientBuilder::with_bitcoind_rpc
320        // 2. user-provided no-chain-id factory from
321        //    ClientBuilder::with_bitcoind_rpc_no_chain_id
322        // 3. WalletClientInit constructor
323        // 4. create from config (esplora)
324        let btc_rpc = if let Some(user_rpc) = args.user_bitcoind_rpc() {
325            user_rpc.clone()
326        } else if let Some(factory) = args.user_bitcoind_rpc_no_chain_id() {
327            if let Some(rpc) = factory(rpc_config.url.clone()).await {
328                rpc
329            } else {
330                self.0
331                    .clone()
332                    .unwrap_or(create_esplora_rpc(&rpc_config.url)?)
333            }
334        } else {
335            self.0
336                .clone()
337                .unwrap_or(create_esplora_rpc(&rpc_config.url)?)
338        };
339        let btc_rpc = BitcoindTracked::new(btc_rpc, "wallet-client").into_dyn();
340
341        let module_api = args.module_api().clone();
342
343        let (pegin_claimed_sender, pegin_claimed_receiver) = watch::channel(());
344        let (pegin_monitor_wakeup_sender, pegin_monitor_wakeup_receiver) = watch::channel(());
345
346        Ok(WalletClientModule {
347            db,
348            data,
349            module_api,
350            notifier: args.notifier().clone(),
351            rpc: btc_rpc,
352            client_ctx: args.context(),
353            pegin_monitor_wakeup_sender,
354            pegin_monitor_wakeup_receiver,
355            pegin_claimed_receiver,
356            pegin_claimed_sender,
357            task_group: args.task_group().clone(),
358            admin_auth: args.admin_auth().cloned(),
359        })
360    }
361
362    /// Wallet recovery
363    ///
364    /// Uses slice-based recovery if supported by the federation, otherwise
365    /// falls back to session-based history recovery.
366    async fn recover(
367        &self,
368        args: &ClientModuleRecoverArgs<Self>,
369        snapshot: Option<&<Self::Module as ClientModule>::Backup>,
370    ) -> anyhow::Result<()> {
371        // Check if V1 (session-based) recovery state exists (resuming interrupted
372        // recovery)
373        if args
374            .db()
375            .begin_transaction_nc()
376            .await
377            .get_value(&RecoveryStateKey)
378            .await
379            .is_some()
380        {
381            return args
382                .recover_from_history::<WalletRecovery>(self, snapshot)
383                .await;
384        }
385
386        // Determine which method to use based on endpoint availability
387        if args.module_api().fetch_recovery_count().await.is_ok() {
388            self.recover_from_slices(args).await
389        } else {
390            args.recover_from_history::<WalletRecovery>(self, snapshot)
391                .await
392        }
393    }
394
395    fn used_db_prefixes(&self) -> Option<BTreeSet<u8>> {
396        Some(
397            DbKeyPrefix::iter()
398                .map(|p| p as u8)
399                .chain(
400                    DbKeyPrefix::ExternalReservedStart as u8
401                        ..=DbKeyPrefix::CoreInternalReservedEnd as u8,
402                )
403                .collect(),
404        )
405    }
406}
407
408#[derive(Debug, Clone, Serialize, Deserialize)]
409pub struct WalletOperationMeta {
410    pub variant: WalletOperationMetaVariant,
411    pub extra_meta: serde_json::Value,
412}
413
414#[derive(Debug, Clone, Serialize, Deserialize)]
415#[serde(rename_all = "snake_case")]
416pub enum WalletOperationMetaVariant {
417    Deposit {
418        address: Address<NetworkUnchecked>,
419        /// Added in 0.4.2, can be `None` for old deposits or `Some` for ones
420        /// using the pegin monitor. The value is the child index of the key
421        /// used to generate the address, so we can re-generate the secret key
422        /// from our root secret.
423        #[serde(default)]
424        tweak_idx: Option<TweakIdx>,
425        #[serde(default, skip_serializing_if = "Option::is_none")]
426        expires_at: Option<SystemTime>,
427    },
428    Withdraw {
429        address: Address<NetworkUnchecked>,
430        #[serde(with = "bitcoin::amount::serde::as_sat")]
431        amount: bitcoin::Amount,
432        fee: PegOutFees,
433        change: Vec<OutPoint>,
434    },
435
436    RbfWithdraw {
437        rbf: Rbf,
438        change: Vec<OutPoint>,
439    },
440}
441
442/// The non-resource, just plain-data parts of [`WalletClientModule`]
443#[derive(Debug, Clone)]
444pub struct WalletClientModuleData {
445    cfg: WalletClientConfig,
446    module_root_secret: DerivableSecret,
447}
448
449impl WalletClientModuleData {
450    fn derive_deposit_address(
451        &self,
452        idx: TweakIdx,
453    ) -> (Keypair, secp256k1::PublicKey, Address, OperationId) {
454        let idx = ChildId(idx.0);
455
456        let secret_tweak_key = self
457            .module_root_secret
458            .child_key(WALLET_TWEAK_CHILD_ID)
459            .child_key(idx)
460            .to_secp_key(fedimint_core::secp256k1::SECP256K1);
461
462        let public_tweak_key = secret_tweak_key.public_key();
463
464        let address = self
465            .cfg
466            .peg_in_descriptor
467            .tweak(&public_tweak_key, bitcoin::secp256k1::SECP256K1)
468            .address(self.cfg.network.0)
469            .unwrap();
470
471        // TODO: make hash?
472        let operation_id = OperationId(public_tweak_key.x_only_public_key().0.serialize());
473
474        (secret_tweak_key, public_tweak_key, address, operation_id)
475    }
476
477    fn derive_peg_in_script(
478        &self,
479        idx: TweakIdx,
480    ) -> (ScriptBuf, bitcoin::Address, Keypair, OperationId) {
481        let (secret_tweak_key, _, address, operation_id) = self.derive_deposit_address(idx);
482
483        (
484            self.cfg
485                .peg_in_descriptor
486                .tweak(&secret_tweak_key.public_key(), SECP256K1)
487                .script_pubkey(),
488            address,
489            secret_tweak_key,
490            operation_id,
491        )
492    }
493}
494
495#[derive(Debug)]
496pub struct WalletClientModule {
497    data: WalletClientModuleData,
498    db: Database,
499    module_api: DynModuleApi,
500    notifier: ModuleNotifier<WalletClientStates>,
501    rpc: DynBitcoindRpc,
502    client_ctx: ClientContext<Self>,
503    /// Updated to wake up pegin monitor
504    pegin_monitor_wakeup_sender: watch::Sender<()>,
505    pegin_monitor_wakeup_receiver: watch::Receiver<()>,
506    /// Called every time a peg-in was claimed
507    pegin_claimed_sender: watch::Sender<()>,
508    pegin_claimed_receiver: watch::Receiver<()>,
509    task_group: TaskGroup,
510    admin_auth: Option<ApiAuth>,
511}
512
513#[apply(async_trait_maybe_send!)]
514impl ClientModule for WalletClientModule {
515    type Init = WalletClientInit;
516    type Common = WalletModuleTypes;
517    type Backup = WalletModuleBackup;
518    type ModuleStateMachineContext = WalletClientContext;
519    type States = WalletClientStates;
520
521    fn context(&self) -> Self::ModuleStateMachineContext {
522        WalletClientContext {
523            rpc: self.rpc.clone(),
524            wallet_descriptor: self.cfg().peg_in_descriptor.clone(),
525            wallet_decoder: self.decoder(),
526            secp: Secp256k1::default(),
527            client_ctx: self.client_ctx.clone(),
528        }
529    }
530
531    async fn start(&self) {
532        self.task_group.spawn_cancellable("peg-in monitor", {
533            let client_ctx = self.client_ctx.clone();
534            let db = self.db.clone();
535            let btc_rpc = self.rpc.clone();
536            let module_api = self.module_api.clone();
537            let data = self.data.clone();
538            let pegin_claimed_sender = self.pegin_claimed_sender.clone();
539            let pegin_monitor_wakeup_receiver = self.pegin_monitor_wakeup_receiver.clone();
540            pegin_monitor::run_peg_in_monitor(
541                client_ctx,
542                db,
543                btc_rpc,
544                module_api,
545                data,
546                pegin_claimed_sender,
547                pegin_monitor_wakeup_receiver,
548            )
549        });
550
551        self.task_group
552            .spawn_cancellable("supports-safe-deposit-version", {
553                let db = self.db.clone();
554                let module_api = self.module_api.clone();
555
556                poll_supports_safe_deposit_version(db, module_api)
557            });
558    }
559
560    fn supports_backup(&self) -> bool {
561        true
562    }
563
564    async fn backup(&self) -> anyhow::Result<backup::WalletModuleBackup> {
565        // fetch consensus height first
566        let session_count = self.client_ctx.global_api().session_count().await?;
567
568        let mut dbtx = self.db.begin_transaction_nc().await;
569        let next_pegin_tweak_idx = dbtx
570            .get_value(&NextPegInTweakIndexKey)
571            .await
572            .unwrap_or_default();
573        let claimed = dbtx
574            .find_by_prefix(&PegInTweakIndexPrefix)
575            .await
576            .filter_map(|(k, v)| async move {
577                if v.claimed.is_empty() {
578                    None
579                } else {
580                    Some(k.0)
581                }
582            })
583            .collect()
584            .await;
585        Ok(backup::WalletModuleBackup::new_v1(
586            session_count,
587            next_pegin_tweak_idx,
588            claimed,
589        ))
590    }
591
592    fn input_fee(
593        &self,
594        _amount: &Amounts,
595        _input: &<Self::Common as ModuleCommon>::Input,
596    ) -> Option<Amounts> {
597        Some(Amounts::new_bitcoin(self.cfg().fee_consensus.peg_in_abs))
598    }
599
600    fn output_fee(
601        &self,
602        _amount: &Amounts,
603        _output: &<Self::Common as ModuleCommon>::Output,
604    ) -> Option<Amounts> {
605        Some(Amounts::new_bitcoin(self.cfg().fee_consensus.peg_out_abs))
606    }
607
608    async fn handle_rpc(
609        &self,
610        method: String,
611        request: serde_json::Value,
612    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
613        Box::pin(try_stream! {
614            match method.as_str() {
615                "get_wallet_summary" => {
616                    let _req: WalletSummaryRequest = serde_json::from_value(request)?;
617                    let wallet_summary = self.get_wallet_summary()
618                        .await
619                        .expect("Failed to fetch wallet summary");
620                    let result = serde_json::to_value(&wallet_summary)
621                        .expect("Serialization error");
622                    yield result;
623                }
624                "get_block_count_local" => {
625                    let block_count = self.get_block_count_local().await
626                        .expect("Failed to fetch block count");
627                    yield serde_json::to_value(block_count)?;
628                }
629                "peg_in" => {
630                    let req: PegInRequest = serde_json::from_value(request)?;
631                    let response = self.peg_in(req)
632                        .await
633                        .map_err(|e| anyhow::anyhow!("peg_in failed: {}", e))?;
634                    let result = serde_json::to_value(&response)?;
635                    yield result;
636                },
637                "peg_out" => {
638                    let req: PegOutRequest = serde_json::from_value(request)?;
639                    let response = self.peg_out(req)
640                        .await
641                        .map_err(|e| anyhow::anyhow!("peg_out failed: {}", e))?;
642                    let result = serde_json::to_value(&response)?;
643                    yield result;
644                },
645                "subscribe_deposit" => {
646                    let req: SubscribeDepositRequest = serde_json::from_value(request)?;
647                    for await state in self.subscribe_deposit(req.operation_id).await?.into_stream() {
648                        yield serde_json::to_value(state)?;
649                    }
650                },
651                "subscribe_withdraw" => {
652                    let req: SubscribeWithdrawRequest = serde_json::from_value(request)?;
653                    for await state in self.subscribe_withdraw_updates(req.operation_id).await?.into_stream(){
654                        yield serde_json::to_value(state)?;
655                    }
656                }
657                _ => {
658                    Err(anyhow::format_err!("Unknown method: {}", method))?;
659                }
660            }
661        })
662    }
663
664    #[cfg(feature = "cli")]
665    async fn handle_cli_command(
666        &self,
667        args: &[std::ffi::OsString],
668    ) -> anyhow::Result<serde_json::Value> {
669        cli::handle_cli_command(self, args).await
670    }
671}
672
673#[derive(Deserialize)]
674struct WalletSummaryRequest {}
675
676#[derive(Debug, Clone)]
677pub struct WalletClientContext {
678    rpc: DynBitcoindRpc,
679    wallet_descriptor: PegInDescriptor,
680    wallet_decoder: Decoder,
681    secp: Secp256k1<All>,
682    pub client_ctx: ClientContext<WalletClientModule>,
683}
684
685#[derive(Debug, Clone, Serialize, Deserialize)]
686pub struct PegInRequest {
687    pub extra_meta: serde_json::Value,
688}
689
690#[derive(Deserialize)]
691struct SubscribeDepositRequest {
692    operation_id: OperationId,
693}
694
695#[derive(Deserialize)]
696struct SubscribeWithdrawRequest {
697    operation_id: OperationId,
698}
699
700#[derive(Debug, Clone, Serialize, Deserialize)]
701pub struct PegInResponse {
702    pub deposit_address: Address<NetworkUnchecked>,
703    pub operation_id: OperationId,
704}
705
706#[derive(Debug, Clone, Serialize, Deserialize)]
707pub struct PegOutRequest {
708    pub amount_sat: u64,
709    pub destination_address: Address<NetworkUnchecked>,
710    pub extra_meta: serde_json::Value,
711}
712
713#[derive(Debug, Clone, Serialize, Deserialize)]
714pub struct PegOutResponse {
715    pub operation_id: OperationId,
716}
717
718impl Context for WalletClientContext {
719    const KIND: Option<ModuleKind> = Some(KIND);
720}
721
722impl WalletClientModule {
723    fn cfg(&self) -> &WalletClientConfig {
724        &self.data.cfg
725    }
726
727    fn get_rpc_config(cfg: &WalletClientConfig) -> BitcoinRpcConfig {
728        match BitcoinRpcConfig::get_defaults_from_env_vars() {
729            Ok(rpc_config) => {
730                // TODO: Wallet client cannot support bitcoind RPC until the bitcoin dep is
731                // updated to 0.30
732                if rpc_config.kind == "bitcoind" {
733                    cfg.default_bitcoin_rpc.clone()
734                } else {
735                    rpc_config
736                }
737            }
738            _ => cfg.default_bitcoin_rpc.clone(),
739        }
740    }
741
742    pub fn get_network(&self) -> Network {
743        self.cfg().network.0
744    }
745
746    pub fn get_finality_delay(&self) -> u32 {
747        self.cfg().finality_delay
748    }
749
750    pub fn get_fee_consensus(&self) -> FeeConsensus {
751        self.cfg().fee_consensus
752    }
753
754    async fn allocate_deposit_address_inner(
755        &self,
756        dbtx: &mut DatabaseTransaction<'_>,
757    ) -> (OperationId, Address, TweakIdx) {
758        dbtx.ensure_isolated().expect("Must be isolated db");
759
760        let tweak_idx = get_next_peg_in_tweak_child_id(dbtx).await;
761        let (_secret_tweak_key, _, address, operation_id) =
762            self.data.derive_deposit_address(tweak_idx);
763
764        let now = fedimint_core::time::now();
765
766        dbtx.insert_new_entry(
767            &PegInTweakIndexKey(tweak_idx),
768            &PegInTweakIndexData {
769                creation_time: now,
770                next_check_time: Some(now),
771                last_check_time: None,
772                operation_id,
773                claimed: vec![],
774            },
775        )
776        .await;
777
778        (operation_id, address, tweak_idx)
779    }
780
781    /// Fetches the fees that would need to be paid to make the withdraw request
782    /// using [`Self::withdraw`] work *right now*.
783    ///
784    /// Note that we do not receive a guarantee that these fees will be valid in
785    /// the future, thus even the next second using these fees *may* fail.
786    /// The caller should be prepared to retry with a new fee estimate.
787    pub async fn get_withdraw_fees(
788        &self,
789        address: &bitcoin::Address,
790        amount: bitcoin::Amount,
791    ) -> anyhow::Result<PegOutFees> {
792        self.module_api
793            .fetch_peg_out_fees(address, amount)
794            .await?
795            .context("Federation didn't return peg-out fees")
796    }
797
798    /// Returns a summary of the wallet's coins
799    pub async fn get_wallet_summary(&self) -> anyhow::Result<WalletSummary> {
800        Ok(self.module_api.fetch_wallet_summary().await?)
801    }
802
803    pub async fn get_block_count_local(&self) -> anyhow::Result<u32> {
804        Ok(self.module_api.fetch_block_count_local().await?)
805    }
806
807    pub fn create_withdraw_output(
808        &self,
809        operation_id: OperationId,
810        address: bitcoin::Address,
811        amount: bitcoin::Amount,
812        fees: PegOutFees,
813    ) -> anyhow::Result<ClientOutputBundle<WalletOutput, WalletClientStates>> {
814        let output = WalletOutput::new_v0_peg_out(address, amount, fees);
815
816        let amount = output.maybe_v0_ref().expect("v0 output").amount().into();
817
818        let sm_gen = move |out_point_range: OutPointRange| {
819            assert_eq!(out_point_range.count(), 1);
820            let out_idx = out_point_range.start_idx();
821            vec![WalletClientStates::Withdraw(WithdrawStateMachine {
822                operation_id,
823                state: WithdrawStates::Created(CreatedWithdrawState {
824                    fm_outpoint: OutPoint {
825                        txid: out_point_range.txid(),
826                        out_idx,
827                    },
828                }),
829            })]
830        };
831
832        Ok(ClientOutputBundle::new(
833            vec![ClientOutput::<WalletOutput> {
834                output,
835                amounts: Amounts::new_bitcoin(amount),
836            }],
837            vec![ClientOutputSM::<WalletClientStates> {
838                state_machines: Arc::new(sm_gen),
839            }],
840        ))
841    }
842
843    pub async fn peg_in(&self, req: PegInRequest) -> anyhow::Result<PegInResponse> {
844        let (operation_id, address, _) = self.safe_allocate_deposit_address(req.extra_meta).await?;
845
846        Ok(PegInResponse {
847            deposit_address: Address::from_script(&address.script_pubkey(), self.get_network())?
848                .as_unchecked()
849                .clone(),
850            operation_id,
851        })
852    }
853
854    pub async fn peg_out(&self, req: PegOutRequest) -> anyhow::Result<PegOutResponse> {
855        let amount = bitcoin::Amount::from_sat(req.amount_sat);
856        let destination = req
857            .destination_address
858            .require_network(self.get_network())?;
859
860        let fees = self.get_withdraw_fees(&destination, amount).await?;
861        let operation_id = self
862            .withdraw(&destination, amount, fees, req.extra_meta)
863            .await
864            .context("Failed to initiate withdraw")?;
865
866        Ok(PegOutResponse { operation_id })
867    }
868
869    pub fn create_rbf_withdraw_output(
870        &self,
871        operation_id: OperationId,
872        rbf: &Rbf,
873    ) -> anyhow::Result<ClientOutputBundle<WalletOutput, WalletClientStates>> {
874        let output = WalletOutput::new_v0_rbf(rbf.fees, rbf.txid);
875
876        let amount = output.maybe_v0_ref().expect("v0 output").amount().into();
877
878        let sm_gen = move |out_point_range: OutPointRange| {
879            assert_eq!(out_point_range.count(), 1);
880            let out_idx = out_point_range.start_idx();
881            vec![WalletClientStates::Withdraw(WithdrawStateMachine {
882                operation_id,
883                state: WithdrawStates::Created(CreatedWithdrawState {
884                    fm_outpoint: OutPoint {
885                        txid: out_point_range.txid(),
886                        out_idx,
887                    },
888                }),
889            })]
890        };
891
892        Ok(ClientOutputBundle::new(
893            vec![ClientOutput::<WalletOutput> {
894                output,
895                amounts: Amounts::new_bitcoin(amount),
896            }],
897            vec![ClientOutputSM::<WalletClientStates> {
898                state_machines: Arc::new(sm_gen),
899            }],
900        ))
901    }
902
903    pub async fn btc_tx_has_no_size_limit(&self) -> FederationResult<bool> {
904        Ok(self.module_api.module_consensus_version().await? >= ModuleConsensusVersion::new(2, 2))
905    }
906
907    /// Returns true if the federation's wallet module consensus version
908    /// supports processing all deposits.
909    ///
910    /// This method is safe to call offline, since it first attempts to read a
911    /// key from the db that represents the client has previously been able to
912    /// verify the wallet module consensus version. If the client has not
913    /// verified the version, it must be online to fetch the latest wallet
914    /// module consensus version.
915    pub async fn supports_safe_deposit(&self) -> bool {
916        let mut dbtx = self.db.begin_transaction().await;
917
918        let already_verified_supports_safe_deposit =
919            dbtx.get_value(&SupportsSafeDepositKey).await.is_some();
920
921        already_verified_supports_safe_deposit || {
922            match self.module_api.module_consensus_version().await {
923                Ok(module_consensus_version) => {
924                    let supported_version =
925                        SAFE_DEPOSIT_MODULE_CONSENSUS_VERSION <= module_consensus_version;
926
927                    if supported_version {
928                        dbtx.insert_new_entry(&SupportsSafeDepositKey, &()).await;
929                        dbtx.commit_tx().await;
930                    }
931
932                    supported_version
933                }
934                Err(_) => false,
935            }
936        }
937    }
938
939    /// Allocates a deposit address controlled by the federation, guaranteeing
940    /// safe handling of all deposits, including on-chain transactions exceeding
941    /// `ALEPH_BFT_UNIT_BYTE_LIMIT`.
942    ///
943    /// Returns an error if the client has never been online to verify the
944    /// federation's wallet module consensus version supports processing all
945    /// deposits.
946    pub async fn safe_allocate_deposit_address<M>(
947        &self,
948        extra_meta: M,
949    ) -> anyhow::Result<(OperationId, Address, TweakIdx)>
950    where
951        M: Serialize + MaybeSend + MaybeSync,
952    {
953        ensure!(
954            self.supports_safe_deposit().await,
955            "Wallet module consensus version doesn't support safe deposits",
956        );
957
958        self.allocate_deposit_address_expert_only(extra_meta).await
959    }
960
961    /// Allocates a deposit address that is controlled by the federation.
962    ///
963    /// This is an EXPERT ONLY method intended for power users such as Lightning
964    /// gateways allocating liquidity, and we discourage exposing peg-in
965    /// functionality to everyday users of a Fedimint wallet due to the
966    /// following two limitations:
967    ///
968    /// The transaction sending to this address needs to be smaller than 40KB in
969    /// order for the peg-in to be claimable. If the transaction is too large,
970    /// funds will be lost.
971    ///
972    /// In the future, federations will also enforce a minimum peg-in amount to
973    /// prevent accumulation of dust UTXOs. Peg-ins under this minimum cannot be
974    /// claimed and funds will be lost.
975    ///
976    /// Everyday users should rely on Lightning to move funds into the
977    /// federation.
978    pub async fn allocate_deposit_address_expert_only<M>(
979        &self,
980        extra_meta: M,
981    ) -> anyhow::Result<(OperationId, Address, TweakIdx)>
982    where
983        M: Serialize + MaybeSend + MaybeSync,
984    {
985        let extra_meta_value =
986            serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
987        let (operation_id, address, tweak_idx) = self
988            .db
989            .autocommit(
990                move |dbtx, _| {
991                    let extra_meta_value_inner = extra_meta_value.clone();
992                    Box::pin(async move {
993                        let (operation_id, address, tweak_idx) = self
994                            .allocate_deposit_address_inner(dbtx)
995                            .await;
996
997                        self.client_ctx.manual_operation_start_dbtx(
998                            dbtx,
999                            operation_id,
1000                            WalletCommonInit::KIND.as_str(),
1001                            WalletOperationMeta {
1002                                variant: WalletOperationMetaVariant::Deposit {
1003                                    address: address.clone().into_unchecked(),
1004                                    tweak_idx: Some(tweak_idx),
1005                                    expires_at: None,
1006                                },
1007                                extra_meta: extra_meta_value_inner,
1008                            },
1009                            vec![]
1010                        ).await?;
1011
1012                        debug!(target: LOG_CLIENT_MODULE_WALLET, %tweak_idx, %address, "Derived a new deposit address");
1013
1014                        // Begin watching the script address
1015                        self.rpc.watch_script_history(&address.script_pubkey()).await?;
1016
1017                        let sender = self.pegin_monitor_wakeup_sender.clone();
1018                        dbtx.on_commit(move || {
1019                            sender.send_replace(());
1020                        });
1021
1022                        Ok((operation_id, address, tweak_idx))
1023                    })
1024                },
1025                Some(100),
1026            )
1027            .await
1028            .map_err(|e| match e {
1029                AutocommitError::CommitFailed {
1030                    last_error,
1031                    attempts,
1032                } => anyhow!("Failed to commit after {attempts} attempts: {last_error}"),
1033                AutocommitError::ClosureError { error, .. } => error,
1034            })?;
1035
1036        Ok((operation_id, address, tweak_idx))
1037    }
1038
1039    /// Returns a stream of updates about an ongoing deposit operation created
1040    /// with [`WalletClientModule::allocate_deposit_address_expert_only`].
1041    /// Returns an error for old deposit operations created prior to the 0.4
1042    /// release and not driven to completion yet. This should be rare enough
1043    /// that an indeterminate state is ok here.
1044    pub async fn subscribe_deposit(
1045        &self,
1046        operation_id: OperationId,
1047    ) -> anyhow::Result<UpdateStreamOrOutcome<DepositStateV2>> {
1048        let operation = self
1049            .client_ctx
1050            .get_operation(operation_id)
1051            .await
1052            .with_context(|| anyhow!("Operation not found: {}", operation_id.fmt_short()))?;
1053
1054        if operation.operation_module_kind() != WalletCommonInit::KIND.as_str() {
1055            bail!("Operation is not a wallet operation");
1056        }
1057
1058        let operation_meta = operation.meta::<WalletOperationMeta>();
1059
1060        let WalletOperationMetaVariant::Deposit {
1061            address, tweak_idx, ..
1062        } = operation_meta.variant
1063        else {
1064            bail!("Operation is not a deposit operation");
1065        };
1066
1067        let address = address.require_network(self.cfg().network.0)?;
1068
1069        // The old deposit operations don't have tweak_idx set
1070        let Some(tweak_idx) = tweak_idx else {
1071            // In case we are dealing with an old deposit that still uses state machines we
1072            // don't have the logic here anymore to subscribe to updates. We can still read
1073            // the final state though if it reached any.
1074            let outcome_v1 = operation
1075                .outcome::<DepositStateV1>()
1076                .context("Old pending deposit, can't subscribe to updates")?;
1077
1078            let outcome_v2 = match outcome_v1 {
1079                DepositStateV1::Claimed(tx_info) => DepositStateV2::Claimed {
1080                    btc_deposited: tx_info.btc_transaction.output[tx_info.out_idx as usize].value,
1081                    btc_out_point: bitcoin::OutPoint {
1082                        txid: tx_info.btc_transaction.compute_txid(),
1083                        vout: tx_info.out_idx,
1084                    },
1085                },
1086                DepositStateV1::Failed(error) => DepositStateV2::Failed(error),
1087                _ => bail!("Non-final outcome in operation log"),
1088            };
1089
1090            return Ok(UpdateStreamOrOutcome::Outcome(outcome_v2));
1091        };
1092
1093        Ok(self.client_ctx.outcome_or_updates(operation, operation_id, {
1094            let stream_rpc = self.rpc.clone();
1095            let stream_client_ctx = self.client_ctx.clone();
1096            let stream_script_pub_key = address.script_pubkey();
1097            move || {
1098
1099            stream! {
1100                yield DepositStateV2::WaitingForTransaction;
1101
1102                retry(
1103                    "subscribe script history",
1104                    background_backoff(),
1105                    || stream_rpc.watch_script_history(&stream_script_pub_key)
1106                ).await.expect("Will never give up");
1107                let (btc_out_point, btc_deposited) = retry(
1108                    "fetch history",
1109                    background_backoff(),
1110                    || async {
1111                        let history = stream_rpc.get_script_history(&stream_script_pub_key).await?;
1112                        history.first().and_then(|tx| {
1113                            let (out_idx, amount) = tx.output
1114                                .iter()
1115                                .enumerate()
1116                                .find_map(|(idx, output)| (output.script_pubkey == stream_script_pub_key).then_some((idx, output.value)))?;
1117                            let txid = tx.compute_txid();
1118
1119                            Some((
1120                                bitcoin::OutPoint {
1121                                    txid,
1122                                    vout: out_idx as u32,
1123                                },
1124                                amount
1125                            ))
1126                        }).context("No deposit transaction found")
1127                    }
1128                ).await.expect("Will never give up");
1129
1130                yield DepositStateV2::WaitingForConfirmation {
1131                    btc_deposited,
1132                    btc_out_point
1133                };
1134
1135                let claim_data = stream_client_ctx.module_db().wait_key_exists(&ClaimedPegInKey {
1136                    peg_in_index: tweak_idx,
1137                    btc_out_point,
1138                }).await;
1139
1140                yield DepositStateV2::Confirmed {
1141                    btc_deposited,
1142                    btc_out_point
1143                };
1144
1145                match stream_client_ctx.await_primary_module_outputs(operation_id, claim_data.change).await {
1146                    Ok(()) => yield DepositStateV2::Claimed {
1147                        btc_deposited,
1148                        btc_out_point
1149                    },
1150                    Err(e) => yield DepositStateV2::Failed(e.to_string())
1151                }
1152            }
1153        }}))
1154    }
1155
1156    pub async fn list_peg_in_tweak_idxes(&self) -> BTreeMap<TweakIdx, PegInTweakIndexData> {
1157        self.client_ctx
1158            .module_db()
1159            .clone()
1160            .begin_transaction_nc()
1161            .await
1162            .find_by_prefix(&PegInTweakIndexPrefix)
1163            .await
1164            .map(|(key, data)| (key.0, data))
1165            .collect()
1166            .await
1167    }
1168
1169    pub async fn find_tweak_idx_by_address(
1170        &self,
1171        address: bitcoin::Address<NetworkUnchecked>,
1172    ) -> anyhow::Result<TweakIdx> {
1173        let data = self.data.clone();
1174        let Some((tweak_idx, _)) = self
1175            .db
1176            .begin_transaction_nc()
1177            .await
1178            .find_by_prefix(&PegInTweakIndexPrefix)
1179            .await
1180            .filter(|(k, _)| {
1181                let (_, derived_address, _tweak_key, _) = data.derive_peg_in_script(k.0);
1182                future::ready(derived_address.into_unchecked() == address)
1183            })
1184            .next()
1185            .await
1186        else {
1187            bail!("Address not found in the list of derived keys");
1188        };
1189
1190        Ok(tweak_idx.0)
1191    }
1192    pub async fn find_tweak_idx_by_operation_id(
1193        &self,
1194        operation_id: OperationId,
1195    ) -> anyhow::Result<TweakIdx> {
1196        Ok(self
1197            .client_ctx
1198            .module_db()
1199            .clone()
1200            .begin_transaction_nc()
1201            .await
1202            .find_by_prefix(&PegInTweakIndexPrefix)
1203            .await
1204            .filter(|(_k, v)| future::ready(v.operation_id == operation_id))
1205            .next()
1206            .await
1207            .ok_or_else(|| anyhow::format_err!("OperationId not found"))?
1208            .0
1209            .0)
1210    }
1211
1212    pub async fn get_pegin_tweak_idx(
1213        &self,
1214        tweak_idx: TweakIdx,
1215    ) -> anyhow::Result<PegInTweakIndexData> {
1216        self.client_ctx
1217            .module_db()
1218            .clone()
1219            .begin_transaction_nc()
1220            .await
1221            .get_value(&PegInTweakIndexKey(tweak_idx))
1222            .await
1223            .ok_or_else(|| anyhow::format_err!("TweakIdx not found"))
1224    }
1225
1226    pub async fn get_claimed_pegins(
1227        &self,
1228        dbtx: &mut DatabaseTransaction<'_>,
1229        tweak_idx: TweakIdx,
1230    ) -> Vec<(
1231        bitcoin::OutPoint,
1232        TransactionId,
1233        Vec<fedimint_core::OutPoint>,
1234    )> {
1235        let outpoints = dbtx
1236            .get_value(&PegInTweakIndexKey(tweak_idx))
1237            .await
1238            .map(|v| v.claimed)
1239            .unwrap_or_default();
1240
1241        let mut res = vec![];
1242
1243        for outpoint in outpoints {
1244            let claimed_peg_in_data = dbtx
1245                .get_value(&ClaimedPegInKey {
1246                    peg_in_index: tweak_idx,
1247                    btc_out_point: outpoint,
1248                })
1249                .await
1250                .expect("Must have a corresponding claim record");
1251            res.push((
1252                outpoint,
1253                claimed_peg_in_data.claim_txid,
1254                claimed_peg_in_data.change,
1255            ));
1256        }
1257
1258        res
1259    }
1260
1261    /// Like [`Self::recheck_pegin_address`] but by `operation_id`
1262    pub async fn recheck_pegin_address_by_op_id(
1263        &self,
1264        operation_id: OperationId,
1265    ) -> anyhow::Result<()> {
1266        let tweak_idx = self.find_tweak_idx_by_operation_id(operation_id).await?;
1267
1268        self.recheck_pegin_address(tweak_idx).await
1269    }
1270
1271    /// Schedule given address for immediate re-check for deposits
1272    pub async fn recheck_pegin_address_by_address(
1273        &self,
1274        address: bitcoin::Address<NetworkUnchecked>,
1275    ) -> anyhow::Result<()> {
1276        self.recheck_pegin_address(self.find_tweak_idx_by_address(address).await?)
1277            .await
1278    }
1279
1280    /// Schedule given address for immediate re-check for deposits
1281    pub async fn recheck_pegin_address(&self, tweak_idx: TweakIdx) -> anyhow::Result<()> {
1282        self.db
1283            .autocommit(
1284                |dbtx, _| {
1285                    Box::pin(async {
1286                        let db_key = PegInTweakIndexKey(tweak_idx);
1287                        let db_val = dbtx
1288                            .get_value(&db_key)
1289                            .await
1290                            .ok_or_else(|| anyhow::format_err!("DBKey not found"))?;
1291
1292                        dbtx.insert_entry(
1293                            &db_key,
1294                            &PegInTweakIndexData {
1295                                next_check_time: Some(fedimint_core::time::now()),
1296                                ..db_val
1297                            },
1298                        )
1299                        .await;
1300
1301                        let sender = self.pegin_monitor_wakeup_sender.clone();
1302                        dbtx.on_commit(move || {
1303                            sender.send_replace(());
1304                        });
1305
1306                        Ok::<_, anyhow::Error>(())
1307                    })
1308                },
1309                Some(100),
1310            )
1311            .await?;
1312
1313        Ok(())
1314    }
1315
1316    /// Await for num deposit by [`OperationId`]
1317    pub async fn await_num_deposits_by_operation_id(
1318        &self,
1319        operation_id: OperationId,
1320        num_deposits: usize,
1321    ) -> anyhow::Result<()> {
1322        let tweak_idx = self.find_tweak_idx_by_operation_id(operation_id).await?;
1323        self.await_num_deposits(tweak_idx, num_deposits).await
1324    }
1325
1326    pub async fn await_num_deposits_by_address(
1327        &self,
1328        address: bitcoin::Address<NetworkUnchecked>,
1329        num_deposits: usize,
1330    ) -> anyhow::Result<()> {
1331        self.await_num_deposits(self.find_tweak_idx_by_address(address).await?, num_deposits)
1332            .await
1333    }
1334
1335    #[instrument(target = LOG_CLIENT_MODULE_WALLET, skip_all, fields(tweak_idx=?tweak_idx, num_deposists=num_deposits))]
1336    pub async fn await_num_deposits(
1337        &self,
1338        tweak_idx: TweakIdx,
1339        num_deposits: usize,
1340    ) -> anyhow::Result<()> {
1341        let operation_id = self.get_pegin_tweak_idx(tweak_idx).await?.operation_id;
1342
1343        let mut receiver = self.pegin_claimed_receiver.clone();
1344        let mut backoff = backoff_util::aggressive_backoff();
1345
1346        loop {
1347            let pegins = self
1348                .get_claimed_pegins(
1349                    &mut self.client_ctx.module_db().begin_transaction_nc().await,
1350                    tweak_idx,
1351                )
1352                .await;
1353
1354            if pegins.len() < num_deposits {
1355                debug!(target: LOG_CLIENT_MODULE_WALLET, has=pegins.len(), "Not enough deposits");
1356                self.recheck_pegin_address(tweak_idx).await?;
1357                runtime::sleep(backoff.next().unwrap_or_default()).await;
1358                receiver.changed().await?;
1359                continue;
1360            }
1361
1362            debug!(target: LOG_CLIENT_MODULE_WALLET, has=pegins.len(), "Enough deposits detected");
1363
1364            for (_outpoint, transaction_id, change) in pegins {
1365                if transaction_id == TransactionId::from_byte_array([0; 32]) && change.is_empty() {
1366                    debug!(target: LOG_CLIENT_MODULE_WALLET, "Deposited amount was too low, skipping");
1367                    continue;
1368                }
1369
1370                debug!(target: LOG_CLIENT_MODULE_WALLET, out_points=?change, "Ensuring deposists claimed");
1371                let tx_subscriber = self.client_ctx.transaction_updates(operation_id).await;
1372
1373                if let Err(e) = tx_subscriber.await_tx_accepted(transaction_id).await {
1374                    bail!("{}", e);
1375                }
1376
1377                debug!(target: LOG_CLIENT_MODULE_WALLET, out_points=?change, "Ensuring outputs claimed");
1378                self.client_ctx
1379                    .await_primary_module_outputs(operation_id, change)
1380                    .await
1381                    .expect("Cannot fail if tx was accepted and federation is honest");
1382            }
1383
1384            return Ok(());
1385        }
1386    }
1387
1388    /// Attempt to withdraw a given `amount` of Bitcoin to a destination
1389    /// `address`. The caller has to supply the fee rate to be used which can be
1390    /// fetched using [`Self::get_withdraw_fees`] and should be
1391    /// acknowledged by the user since it can be unexpectedly high.
1392    pub async fn withdraw<M: Serialize + MaybeSend + MaybeSync>(
1393        &self,
1394        address: &bitcoin::Address,
1395        amount: bitcoin::Amount,
1396        fee: PegOutFees,
1397        extra_meta: M,
1398    ) -> anyhow::Result<OperationId> {
1399        {
1400            let operation_id = OperationId(thread_rng().r#gen());
1401
1402            let withdraw_output =
1403                self.create_withdraw_output(operation_id, address.clone(), amount, fee)?;
1404            let tx_builder = TransactionBuilder::new()
1405                .with_outputs(self.client_ctx.make_client_outputs(withdraw_output));
1406
1407            let extra_meta =
1408                serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
1409            self.client_ctx
1410                .finalize_and_submit_transaction(
1411                    operation_id,
1412                    WalletCommonInit::KIND.as_str(),
1413                    {
1414                        let address = address.clone();
1415                        move |change_range: OutPointRange| WalletOperationMeta {
1416                            variant: WalletOperationMetaVariant::Withdraw {
1417                                address: address.clone().into_unchecked(),
1418                                amount,
1419                                fee,
1420                                change: change_range.into_iter().collect(),
1421                            },
1422                            extra_meta: extra_meta.clone(),
1423                        }
1424                    },
1425                    tx_builder,
1426                )
1427                .await?;
1428
1429            let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
1430
1431            self.client_ctx
1432                .log_event(
1433                    &mut dbtx,
1434                    SendPaymentEvent {
1435                        operation_id,
1436                        amount: amount + fee.amount(),
1437                        fee: fee.amount(),
1438                    },
1439                )
1440                .await;
1441
1442            dbtx.commit_tx().await;
1443
1444            Ok(operation_id)
1445        }
1446    }
1447
1448    /// Attempt to increase the fee of a onchain withdraw transaction using
1449    /// replace by fee (RBF).
1450    /// This can prevent transactions from getting stuck
1451    /// in the mempool
1452    #[deprecated(
1453        since = "0.4.0",
1454        note = "RBF withdrawals are rejected by the federation"
1455    )]
1456    pub async fn rbf_withdraw<M: Serialize + MaybeSync + MaybeSend>(
1457        &self,
1458        rbf: Rbf,
1459        extra_meta: M,
1460    ) -> anyhow::Result<OperationId> {
1461        let operation_id = OperationId(thread_rng().r#gen());
1462
1463        let withdraw_output = self.create_rbf_withdraw_output(operation_id, &rbf)?;
1464        let tx_builder = TransactionBuilder::new()
1465            .with_outputs(self.client_ctx.make_client_outputs(withdraw_output));
1466
1467        let extra_meta = serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
1468        self.client_ctx
1469            .finalize_and_submit_transaction(
1470                operation_id,
1471                WalletCommonInit::KIND.as_str(),
1472                move |change_range: OutPointRange| WalletOperationMeta {
1473                    variant: WalletOperationMetaVariant::RbfWithdraw {
1474                        rbf: rbf.clone(),
1475                        change: change_range.into_iter().collect(),
1476                    },
1477                    extra_meta: extra_meta.clone(),
1478                },
1479                tx_builder,
1480            )
1481            .await?;
1482
1483        Ok(operation_id)
1484    }
1485
1486    pub async fn subscribe_withdraw_updates(
1487        &self,
1488        operation_id: OperationId,
1489    ) -> anyhow::Result<UpdateStreamOrOutcome<WithdrawState>> {
1490        let operation = self
1491            .client_ctx
1492            .get_operation(operation_id)
1493            .await
1494            .with_context(|| anyhow!("Operation not found: {}", operation_id.fmt_short()))?;
1495
1496        if operation.operation_module_kind() != WalletCommonInit::KIND.as_str() {
1497            bail!("Operation is not a wallet operation");
1498        }
1499
1500        let operation_meta = operation.meta::<WalletOperationMeta>();
1501
1502        let (WalletOperationMetaVariant::Withdraw { change, .. }
1503        | WalletOperationMetaVariant::RbfWithdraw { change, .. }) = operation_meta.variant
1504        else {
1505            bail!("Operation is not a withdraw operation");
1506        };
1507
1508        let mut operation_stream = self.notifier.subscribe(operation_id).await;
1509        let client_ctx = self.client_ctx.clone();
1510
1511        Ok(self
1512            .client_ctx
1513            .outcome_or_updates(operation, operation_id, move || {
1514                stream! {
1515                    match next_withdraw_state(&mut operation_stream).await {
1516                        Some(WithdrawStates::Created(_)) => {
1517                            yield WithdrawState::Created;
1518                        },
1519                        Some(s) => {
1520                            panic!("Unexpected state {s:?}")
1521                        },
1522                        None => return,
1523                    }
1524
1525                    // TODO: get rid of awaiting change here, there has to be a better way to make tests deterministic
1526
1527                        // Swallowing potential errors since the transaction failing  is handled by
1528                        // output outcome fetching already
1529                        let _ = client_ctx
1530                            .await_primary_module_outputs(operation_id, change)
1531                            .await;
1532
1533
1534                    match next_withdraw_state(&mut operation_stream).await {
1535                        Some(WithdrawStates::Aborted(inner)) => {
1536                            yield WithdrawState::Failed(inner.error);
1537                        },
1538                        Some(WithdrawStates::Success(inner)) => {
1539                            yield WithdrawState::Succeeded(inner.txid);
1540                        },
1541                        Some(s) => {
1542                            panic!("Unexpected state {s:?}")
1543                        },
1544                        None => {},
1545                    }
1546                }
1547            }))
1548    }
1549
1550    fn admin_auth(&self) -> anyhow::Result<ApiAuth> {
1551        self.admin_auth
1552            .clone()
1553            .ok_or_else(|| anyhow::format_err!("Admin auth not set"))
1554    }
1555
1556    pub async fn activate_consensus_version_voting(&self) -> anyhow::Result<()> {
1557        self.module_api
1558            .activate_consensus_version_voting(self.admin_auth()?)
1559            .await?;
1560
1561        Ok(())
1562    }
1563}
1564
1565/// Polls the federation checking if the activated module consensus version
1566/// supports safe deposits, saving the result in the db once it does.
1567async fn poll_supports_safe_deposit_version(db: Database, module_api: DynModuleApi) {
1568    loop {
1569        let mut dbtx = db.begin_transaction().await;
1570
1571        if dbtx.get_value(&SupportsSafeDepositKey).await.is_some() {
1572            break;
1573        }
1574
1575        module_api.wait_for_initialized_connections().await;
1576
1577        if let Ok(module_consensus_version) = module_api.module_consensus_version().await
1578            && SAFE_DEPOSIT_MODULE_CONSENSUS_VERSION <= module_consensus_version
1579        {
1580            dbtx.insert_new_entry(&SupportsSafeDepositKey, &()).await;
1581            dbtx.commit_tx().await;
1582            break;
1583        }
1584
1585        drop(dbtx);
1586
1587        if is_running_in_test_env() {
1588            // Even in tests we don't want to spam the federation with requests about it
1589            sleep(Duration::from_secs(10)).await;
1590        } else {
1591            sleep(Duration::from_secs(3600)).await;
1592        }
1593    }
1594}
1595
1596/// Returns the child index to derive the next peg-in tweak key from.
1597async fn get_next_peg_in_tweak_child_id(dbtx: &mut DatabaseTransaction<'_>) -> TweakIdx {
1598    let index = dbtx
1599        .get_value(&NextPegInTweakIndexKey)
1600        .await
1601        .unwrap_or_default();
1602    dbtx.insert_entry(&NextPegInTweakIndexKey, &(index.next()))
1603        .await;
1604    index
1605}
1606
1607#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
1608pub enum WalletClientStates {
1609    Deposit(DepositStateMachine),
1610    Withdraw(WithdrawStateMachine),
1611}
1612
1613impl IntoDynInstance for WalletClientStates {
1614    type DynType = DynState;
1615
1616    fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
1617        DynState::from_typed(instance_id, self)
1618    }
1619}
1620
1621impl State for WalletClientStates {
1622    type ModuleContext = WalletClientContext;
1623
1624    fn transitions(
1625        &self,
1626        context: &Self::ModuleContext,
1627        global_context: &DynGlobalClientContext,
1628    ) -> Vec<StateTransition<Self>> {
1629        match self {
1630            WalletClientStates::Deposit(sm) => {
1631                sm_enum_variant_translation!(
1632                    sm.transitions(context, global_context),
1633                    WalletClientStates::Deposit
1634                )
1635            }
1636            WalletClientStates::Withdraw(sm) => {
1637                sm_enum_variant_translation!(
1638                    sm.transitions(context, global_context),
1639                    WalletClientStates::Withdraw
1640                )
1641            }
1642        }
1643    }
1644
1645    fn operation_id(&self) -> OperationId {
1646        match self {
1647            WalletClientStates::Deposit(sm) => sm.operation_id(),
1648            WalletClientStates::Withdraw(sm) => sm.operation_id(),
1649        }
1650    }
1651}
1652
1653#[cfg(all(test, not(target_family = "wasm")))]
1654mod tests {
1655    use std::collections::BTreeSet;
1656    use std::sync::atomic::{AtomicBool, Ordering};
1657
1658    use super::*;
1659    use crate::backup::{
1660        RECOVER_NUM_IDX_ADD_TO_LAST_USED, RecoverScanOutcome, recover_scan_idxes_for_activity,
1661    };
1662
1663    #[allow(clippy::too_many_lines)] // shut-up clippy, it's a test
1664    #[tokio::test(flavor = "multi_thread")]
1665    async fn sanity_test_recover_inner() {
1666        {
1667            let last_checked = AtomicBool::new(false);
1668            let last_checked = &last_checked;
1669            assert_eq!(
1670                recover_scan_idxes_for_activity(
1671                    TweakIdx(0),
1672                    &BTreeSet::new(),
1673                    |cur_idx| async move {
1674                        Ok(match cur_idx {
1675                            TweakIdx(9) => {
1676                                last_checked.store(true, Ordering::SeqCst);
1677                                vec![]
1678                            }
1679                            TweakIdx(10) => panic!("Shouldn't happen"),
1680                            TweakIdx(11) => {
1681                                vec![0usize] /* just for type inference */
1682                            }
1683                            _ => vec![],
1684                        })
1685                    }
1686                )
1687                .await
1688                .unwrap(),
1689                RecoverScanOutcome {
1690                    last_used_idx: None,
1691                    new_start_idx: TweakIdx(RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1692                    tweak_idxes_with_pegins: BTreeSet::from([])
1693                }
1694            );
1695            assert!(last_checked.load(Ordering::SeqCst));
1696        }
1697
1698        {
1699            let last_checked = AtomicBool::new(false);
1700            let last_checked = &last_checked;
1701            assert_eq!(
1702                recover_scan_idxes_for_activity(
1703                    TweakIdx(0),
1704                    &BTreeSet::from([TweakIdx(1), TweakIdx(2)]),
1705                    |cur_idx| async move {
1706                        Ok(match cur_idx {
1707                            TweakIdx(1) => panic!("Shouldn't happen: already used (1)"),
1708                            TweakIdx(2) => panic!("Shouldn't happen: already used (2)"),
1709                            TweakIdx(11) => {
1710                                last_checked.store(true, Ordering::SeqCst);
1711                                vec![]
1712                            }
1713                            TweakIdx(12) => panic!("Shouldn't happen"),
1714                            TweakIdx(13) => {
1715                                vec![0usize] /* just for type inference */
1716                            }
1717                            _ => vec![],
1718                        })
1719                    }
1720                )
1721                .await
1722                .unwrap(),
1723                RecoverScanOutcome {
1724                    last_used_idx: Some(TweakIdx(2)),
1725                    new_start_idx: TweakIdx(2 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1726                    tweak_idxes_with_pegins: BTreeSet::from([])
1727                }
1728            );
1729            assert!(last_checked.load(Ordering::SeqCst));
1730        }
1731
1732        {
1733            let last_checked = AtomicBool::new(false);
1734            let last_checked = &last_checked;
1735            assert_eq!(
1736                recover_scan_idxes_for_activity(
1737                    TweakIdx(10),
1738                    &BTreeSet::new(),
1739                    |cur_idx| async move {
1740                        Ok(match cur_idx {
1741                            TweakIdx(10) => vec![()],
1742                            TweakIdx(19) => {
1743                                last_checked.store(true, Ordering::SeqCst);
1744                                vec![]
1745                            }
1746                            TweakIdx(20) => panic!("Shouldn't happen"),
1747                            _ => vec![],
1748                        })
1749                    }
1750                )
1751                .await
1752                .unwrap(),
1753                RecoverScanOutcome {
1754                    last_used_idx: Some(TweakIdx(10)),
1755                    new_start_idx: TweakIdx(10 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1756                    tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(10)])
1757                }
1758            );
1759            assert!(last_checked.load(Ordering::SeqCst));
1760        }
1761
1762        assert_eq!(
1763            recover_scan_idxes_for_activity(TweakIdx(0), &BTreeSet::new(), |cur_idx| async move {
1764                Ok(match cur_idx {
1765                    TweakIdx(6 | 15) => vec![()],
1766                    _ => vec![],
1767                })
1768            })
1769            .await
1770            .unwrap(),
1771            RecoverScanOutcome {
1772                last_used_idx: Some(TweakIdx(15)),
1773                new_start_idx: TweakIdx(15 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1774                tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(6), TweakIdx(15)])
1775            }
1776        );
1777        assert_eq!(
1778            recover_scan_idxes_for_activity(TweakIdx(10), &BTreeSet::new(), |cur_idx| async move {
1779                Ok(match cur_idx {
1780                    TweakIdx(8) => {
1781                        vec![()] /* for type inference only */
1782                    }
1783                    TweakIdx(9) => {
1784                        panic!("Shouldn't happen")
1785                    }
1786                    _ => vec![],
1787                })
1788            })
1789            .await
1790            .unwrap(),
1791            RecoverScanOutcome {
1792                last_used_idx: None,
1793                new_start_idx: TweakIdx(9 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1794                tweak_idxes_with_pegins: BTreeSet::from([])
1795            }
1796        );
1797        assert_eq!(
1798            recover_scan_idxes_for_activity(TweakIdx(10), &BTreeSet::new(), |cur_idx| async move {
1799                Ok(match cur_idx {
1800                    TweakIdx(9) => panic!("Shouldn't happen"),
1801                    TweakIdx(15) => vec![()],
1802                    _ => vec![],
1803                })
1804            })
1805            .await
1806            .unwrap(),
1807            RecoverScanOutcome {
1808                last_used_idx: Some(TweakIdx(15)),
1809                new_start_idx: TweakIdx(15 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1810                tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(15)])
1811            }
1812        );
1813    }
1814}