fedimint_wallet_client/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::missing_errors_doc)]
4#![allow(clippy::missing_panics_doc)]
5#![allow(clippy::module_name_repetitions)]
6#![allow(clippy::must_use_candidate)]
7
8pub mod api;
9#[cfg(feature = "cli")]
10mod cli;
11
12mod backup;
13
14pub mod client_db;
15/// Legacy, state-machine based peg-ins, replaced by `pegin_monitor`
16/// but retained for time being to ensure existing peg-ins complete.
17mod deposit;
18pub mod events;
19use events::SendPaymentEvent;
20/// Peg-in monitor: a task monitoring deposit addresses for peg-ins.
21mod pegin_monitor;
22mod withdraw;
23
24use std::collections::{BTreeMap, BTreeSet};
25use std::future;
26use std::sync::Arc;
27use std::time::{Duration, SystemTime};
28
29use anyhow::{Context as AnyhowContext, anyhow, bail, ensure};
30use async_stream::{stream, try_stream};
31use backup::WalletModuleBackup;
32use bitcoin::address::NetworkUnchecked;
33use bitcoin::secp256k1::{All, SECP256K1, Secp256k1};
34use bitcoin::{Address, Network, ScriptBuf};
35use client_db::{DbKeyPrefix, PegInTweakIndexKey, SupportsSafeDepositKey, TweakIdx};
36use fedimint_api_client::api::{DynModuleApi, FederationResult};
37use fedimint_bitcoind::{DynBitcoindRpc, create_esplora_rpc};
38use fedimint_client_module::module::init::{
39    ClientModuleInit, ClientModuleInitArgs, ClientModuleRecoverArgs,
40};
41use fedimint_client_module::module::{ClientContext, ClientModule, IClientModule, OutPointRange};
42use fedimint_client_module::oplog::UpdateStreamOrOutcome;
43use fedimint_client_module::sm::{Context, DynState, ModuleNotifier, State, StateTransition};
44use fedimint_client_module::transaction::{
45    ClientOutput, ClientOutputBundle, ClientOutputSM, TransactionBuilder,
46};
47use fedimint_client_module::{DynGlobalClientContext, sm_enum_variant_translation};
48use fedimint_core::core::{Decoder, IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId};
49use fedimint_core::db::{
50    AutocommitError, Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped,
51};
52use fedimint_core::encoding::{Decodable, Encodable};
53use fedimint_core::envs::{BitcoinRpcConfig, is_running_in_test_env};
54use fedimint_core::module::{
55    Amounts, ApiAuth, ApiVersion, CommonModuleInit, ModuleCommon, ModuleConsensusVersion,
56    ModuleInit, MultiApiVersion,
57};
58use fedimint_core::task::{MaybeSend, MaybeSync, TaskGroup, sleep};
59use fedimint_core::util::backoff_util::background_backoff;
60use fedimint_core::util::{BoxStream, backoff_util, retry};
61use fedimint_core::{
62    BitcoinHash, OutPoint, TransactionId, apply, async_trait_maybe_send, push_db_pair_items,
63    runtime, secp256k1,
64};
65use fedimint_derive_secret::{ChildId, DerivableSecret};
66use fedimint_logging::LOG_CLIENT_MODULE_WALLET;
67pub use fedimint_wallet_common as common;
68use fedimint_wallet_common::config::{FeeConsensus, WalletClientConfig};
69use fedimint_wallet_common::tweakable::Tweakable;
70pub use fedimint_wallet_common::*;
71use futures::{Stream, StreamExt};
72use rand::{Rng, thread_rng};
73use secp256k1::Keypair;
74use serde::{Deserialize, Serialize};
75use strum::IntoEnumIterator;
76use tokio::sync::watch;
77use tracing::{debug, instrument};
78
79use crate::api::WalletFederationApi;
80use crate::backup::WalletRecovery;
81use crate::client_db::{
82    ClaimedPegInData, ClaimedPegInKey, ClaimedPegInPrefix, NextPegInTweakIndexKey,
83    PegInTweakIndexData, PegInTweakIndexPrefix, RecoveryFinalizedKey, SupportsSafeDepositPrefix,
84};
85use crate::deposit::DepositStateMachine;
86use crate::withdraw::{CreatedWithdrawState, WithdrawStateMachine, WithdrawStates};
87
88const WALLET_TWEAK_CHILD_ID: ChildId = ChildId(0);
89
90#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
91pub struct BitcoinTransactionData {
92    /// The bitcoin transaction is saved as soon as we see it so the transaction
93    /// can be re-transmitted if it's evicted from the mempool.
94    pub btc_transaction: bitcoin::Transaction,
95    /// Index of the deposit output
96    pub out_idx: u32,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
100pub enum DepositStateV1 {
101    WaitingForTransaction,
102    WaitingForConfirmation(BitcoinTransactionData),
103    Confirmed(BitcoinTransactionData),
104    Claimed(BitcoinTransactionData),
105    Failed(String),
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
109pub enum DepositStateV2 {
110    WaitingForTransaction,
111    WaitingForConfirmation {
112        #[serde(with = "bitcoin::amount::serde::as_sat")]
113        btc_deposited: bitcoin::Amount,
114        btc_out_point: bitcoin::OutPoint,
115    },
116    Confirmed {
117        #[serde(with = "bitcoin::amount::serde::as_sat")]
118        btc_deposited: bitcoin::Amount,
119        btc_out_point: bitcoin::OutPoint,
120    },
121    Claimed {
122        #[serde(with = "bitcoin::amount::serde::as_sat")]
123        btc_deposited: bitcoin::Amount,
124        btc_out_point: bitcoin::OutPoint,
125    },
126    Failed(String),
127}
128
129#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
130pub enum WithdrawState {
131    Created,
132    Succeeded(bitcoin::Txid),
133    Failed(String),
134    // TODO: track refund
135    // Refunded,
136    // RefundFailed(String),
137}
138
139async fn next_withdraw_state<S>(stream: &mut S) -> Option<WithdrawStates>
140where
141    S: Stream<Item = WalletClientStates> + Unpin,
142{
143    loop {
144        if let WalletClientStates::Withdraw(ds) = stream.next().await? {
145            return Some(ds.state);
146        }
147        tokio::task::yield_now().await;
148    }
149}
150
151#[derive(Debug, Clone, Default)]
152// TODO: should probably move to DB
153pub struct WalletClientInit(pub Option<DynBitcoindRpc>);
154
155impl WalletClientInit {
156    pub fn new(rpc: DynBitcoindRpc) -> Self {
157        Self(Some(rpc))
158    }
159}
160
161impl ModuleInit for WalletClientInit {
162    type Common = WalletCommonInit;
163
164    async fn dump_database(
165        &self,
166        dbtx: &mut DatabaseTransaction<'_>,
167        prefix_names: Vec<String>,
168    ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
169        let mut wallet_client_items: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> =
170            BTreeMap::new();
171        let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
172            prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
173        });
174
175        for table in filtered_prefixes {
176            match table {
177                DbKeyPrefix::NextPegInTweakIndex => {
178                    if let Some(index) = dbtx.get_value(&NextPegInTweakIndexKey).await {
179                        wallet_client_items
180                            .insert("NextPegInTweakIndex".to_string(), Box::new(index));
181                    }
182                }
183                DbKeyPrefix::PegInTweakIndex => {
184                    push_db_pair_items!(
185                        dbtx,
186                        PegInTweakIndexPrefix,
187                        PegInTweakIndexKey,
188                        PegInTweakIndexData,
189                        wallet_client_items,
190                        "Peg-In Tweak Index"
191                    );
192                }
193                DbKeyPrefix::ClaimedPegIn => {
194                    push_db_pair_items!(
195                        dbtx,
196                        ClaimedPegInPrefix,
197                        ClaimedPegInKey,
198                        ClaimedPegInData,
199                        wallet_client_items,
200                        "Claimed Peg-In"
201                    );
202                }
203                DbKeyPrefix::RecoveryFinalized => {
204                    if let Some(val) = dbtx.get_value(&RecoveryFinalizedKey).await {
205                        wallet_client_items.insert("RecoveryFinalized".to_string(), Box::new(val));
206                    }
207                }
208                DbKeyPrefix::SupportsSafeDeposit => {
209                    push_db_pair_items!(
210                        dbtx,
211                        SupportsSafeDepositPrefix,
212                        SupportsSafeDepositKey,
213                        (),
214                        wallet_client_items,
215                        "Supports Safe Deposit"
216                    );
217                }
218                DbKeyPrefix::RecoveryState
219                | DbKeyPrefix::ExternalReservedStart
220                | DbKeyPrefix::CoreInternalReservedStart
221                | DbKeyPrefix::CoreInternalReservedEnd => {}
222            }
223        }
224
225        Box::new(wallet_client_items.into_iter())
226    }
227}
228
229#[apply(async_trait_maybe_send!)]
230impl ClientModuleInit for WalletClientInit {
231    type Module = WalletClientModule;
232
233    fn supported_api_versions(&self) -> MultiApiVersion {
234        MultiApiVersion::try_from_iter([ApiVersion { major: 0, minor: 0 }])
235            .expect("no version conflicts")
236    }
237
238    async fn init(&self, args: &ClientModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
239        let data = WalletClientModuleData {
240            cfg: args.cfg().clone(),
241            module_root_secret: args.module_root_secret().clone(),
242        };
243
244        let db = args.db().clone();
245
246        let btc_rpc = self.0.clone().unwrap_or(create_esplora_rpc(
247            &WalletClientModule::get_rpc_config(args.cfg()).url,
248        )?);
249
250        let module_api = args.module_api().clone();
251
252        let (pegin_claimed_sender, pegin_claimed_receiver) = watch::channel(());
253        let (pegin_monitor_wakeup_sender, pegin_monitor_wakeup_receiver) = watch::channel(());
254
255        Ok(WalletClientModule {
256            db,
257            data,
258            module_api,
259            notifier: args.notifier().clone(),
260            rpc: btc_rpc,
261            client_ctx: args.context(),
262            pegin_monitor_wakeup_sender,
263            pegin_monitor_wakeup_receiver,
264            pegin_claimed_receiver,
265            pegin_claimed_sender,
266            task_group: args.task_group().clone(),
267            admin_auth: args.admin_auth().cloned(),
268        })
269    }
270
271    /// Wallet recovery
272    ///
273    /// Query bitcoin rpc for history of addresses from last known used
274    /// addresses (or index 0) until `MAX_GAP` unused ones.
275    ///
276    /// Notably does not persist the progress of addresses being queried,
277    /// because it is not expected that it would take long enough to bother.
278    async fn recover(
279        &self,
280        args: &ClientModuleRecoverArgs<Self>,
281        snapshot: Option<&<Self::Module as ClientModule>::Backup>,
282    ) -> anyhow::Result<()> {
283        args.recover_from_history::<WalletRecovery>(self, snapshot)
284            .await
285    }
286
287    fn used_db_prefixes(&self) -> Option<BTreeSet<u8>> {
288        Some(
289            DbKeyPrefix::iter()
290                .map(|p| p as u8)
291                .chain(
292                    DbKeyPrefix::ExternalReservedStart as u8
293                        ..=DbKeyPrefix::CoreInternalReservedEnd as u8,
294                )
295                .collect(),
296        )
297    }
298}
299
300#[derive(Debug, Clone, Serialize, Deserialize)]
301pub struct WalletOperationMeta {
302    pub variant: WalletOperationMetaVariant,
303    pub extra_meta: serde_json::Value,
304}
305
306#[derive(Debug, Clone, Serialize, Deserialize)]
307#[serde(rename_all = "snake_case")]
308pub enum WalletOperationMetaVariant {
309    Deposit {
310        address: Address<NetworkUnchecked>,
311        /// Added in 0.4.2, can be `None` for old deposits or `Some` for ones
312        /// using the pegin monitor. The value is the child index of the key
313        /// used to generate the address, so we can re-generate the secret key
314        /// from our root secret.
315        #[serde(default)]
316        tweak_idx: Option<TweakIdx>,
317        #[serde(default, skip_serializing_if = "Option::is_none")]
318        expires_at: Option<SystemTime>,
319    },
320    Withdraw {
321        address: Address<NetworkUnchecked>,
322        #[serde(with = "bitcoin::amount::serde::as_sat")]
323        amount: bitcoin::Amount,
324        fee: PegOutFees,
325        change: Vec<OutPoint>,
326    },
327
328    RbfWithdraw {
329        rbf: Rbf,
330        change: Vec<OutPoint>,
331    },
332}
333
334/// The non-resource, just plain-data parts of [`WalletClientModule`]
335#[derive(Debug, Clone)]
336pub struct WalletClientModuleData {
337    cfg: WalletClientConfig,
338    module_root_secret: DerivableSecret,
339}
340
341impl WalletClientModuleData {
342    fn derive_deposit_address(
343        &self,
344        idx: TweakIdx,
345    ) -> (Keypair, secp256k1::PublicKey, Address, OperationId) {
346        let idx = ChildId(idx.0);
347
348        let secret_tweak_key = self
349            .module_root_secret
350            .child_key(WALLET_TWEAK_CHILD_ID)
351            .child_key(idx)
352            .to_secp_key(fedimint_core::secp256k1::SECP256K1);
353
354        let public_tweak_key = secret_tweak_key.public_key();
355
356        let address = self
357            .cfg
358            .peg_in_descriptor
359            .tweak(&public_tweak_key, bitcoin::secp256k1::SECP256K1)
360            .address(self.cfg.network.0)
361            .unwrap();
362
363        // TODO: make hash?
364        let operation_id = OperationId(public_tweak_key.x_only_public_key().0.serialize());
365
366        (secret_tweak_key, public_tweak_key, address, operation_id)
367    }
368
369    fn derive_peg_in_script(
370        &self,
371        idx: TweakIdx,
372    ) -> (ScriptBuf, bitcoin::Address, Keypair, OperationId) {
373        let (secret_tweak_key, _, address, operation_id) = self.derive_deposit_address(idx);
374
375        (
376            self.cfg
377                .peg_in_descriptor
378                .tweak(&secret_tweak_key.public_key(), SECP256K1)
379                .script_pubkey(),
380            address,
381            secret_tweak_key,
382            operation_id,
383        )
384    }
385}
386
387#[derive(Debug)]
388pub struct WalletClientModule {
389    data: WalletClientModuleData,
390    db: Database,
391    module_api: DynModuleApi,
392    notifier: ModuleNotifier<WalletClientStates>,
393    rpc: DynBitcoindRpc,
394    client_ctx: ClientContext<Self>,
395    /// Updated to wake up pegin monitor
396    pegin_monitor_wakeup_sender: watch::Sender<()>,
397    pegin_monitor_wakeup_receiver: watch::Receiver<()>,
398    /// Called every time a peg-in was claimed
399    pegin_claimed_sender: watch::Sender<()>,
400    pegin_claimed_receiver: watch::Receiver<()>,
401    task_group: TaskGroup,
402    admin_auth: Option<ApiAuth>,
403}
404
405#[apply(async_trait_maybe_send!)]
406impl ClientModule for WalletClientModule {
407    type Init = WalletClientInit;
408    type Common = WalletModuleTypes;
409    type Backup = WalletModuleBackup;
410    type ModuleStateMachineContext = WalletClientContext;
411    type States = WalletClientStates;
412
413    fn context(&self) -> Self::ModuleStateMachineContext {
414        WalletClientContext {
415            rpc: self.rpc.clone(),
416            wallet_descriptor: self.cfg().peg_in_descriptor.clone(),
417            wallet_decoder: self.decoder(),
418            secp: Secp256k1::default(),
419            client_ctx: self.client_ctx.clone(),
420        }
421    }
422
423    async fn start(&self) {
424        self.task_group.spawn_cancellable("peg-in monitor", {
425            let client_ctx = self.client_ctx.clone();
426            let db = self.db.clone();
427            let btc_rpc = self.rpc.clone();
428            let module_api = self.module_api.clone();
429            let data = self.data.clone();
430            let pegin_claimed_sender = self.pegin_claimed_sender.clone();
431            let pegin_monitor_wakeup_receiver = self.pegin_monitor_wakeup_receiver.clone();
432            pegin_monitor::run_peg_in_monitor(
433                client_ctx,
434                db,
435                btc_rpc,
436                module_api,
437                data,
438                pegin_claimed_sender,
439                pegin_monitor_wakeup_receiver,
440            )
441        });
442
443        self.task_group
444            .spawn_cancellable("supports-safe-deposit-version", {
445                let db = self.db.clone();
446                let module_api = self.module_api.clone();
447
448                poll_supports_safe_deposit_version(db, module_api)
449            });
450    }
451
452    fn supports_backup(&self) -> bool {
453        true
454    }
455
456    async fn backup(&self) -> anyhow::Result<backup::WalletModuleBackup> {
457        // fetch consensus height first
458        let session_count = self.client_ctx.global_api().session_count().await?;
459
460        let mut dbtx = self.db.begin_transaction_nc().await;
461        let next_pegin_tweak_idx = dbtx
462            .get_value(&NextPegInTweakIndexKey)
463            .await
464            .unwrap_or_default();
465        let claimed = dbtx
466            .find_by_prefix(&PegInTweakIndexPrefix)
467            .await
468            .filter_map(|(k, v)| async move {
469                if v.claimed.is_empty() {
470                    None
471                } else {
472                    Some(k.0)
473                }
474            })
475            .collect()
476            .await;
477        Ok(backup::WalletModuleBackup::new_v1(
478            session_count,
479            next_pegin_tweak_idx,
480            claimed,
481        ))
482    }
483
484    fn input_fee(
485        &self,
486        _amount: &Amounts,
487        _input: &<Self::Common as ModuleCommon>::Input,
488    ) -> Option<Amounts> {
489        Some(Amounts::new_bitcoin(self.cfg().fee_consensus.peg_in_abs))
490    }
491
492    fn output_fee(
493        &self,
494        _amount: &Amounts,
495        _output: &<Self::Common as ModuleCommon>::Output,
496    ) -> Option<Amounts> {
497        Some(Amounts::new_bitcoin(self.cfg().fee_consensus.peg_out_abs))
498    }
499
500    async fn handle_rpc(
501        &self,
502        method: String,
503        request: serde_json::Value,
504    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
505        Box::pin(try_stream! {
506            match method.as_str() {
507                "get_wallet_summary" => {
508                    let _req: WalletSummaryRequest = serde_json::from_value(request)?;
509                    let wallet_summary = self.get_wallet_summary()
510                        .await
511                        .expect("Failed to fetch wallet summary");
512                    let result = serde_json::to_value(&wallet_summary)
513                        .expect("Serialization error");
514                    yield result;
515                }
516                "get_block_count_local" => {
517                    let block_count = self.get_block_count_local().await
518                        .expect("Failed to fetch block count");
519                    yield serde_json::to_value(block_count)?;
520                }
521                "peg_in" => {
522                    let req: PegInRequest = serde_json::from_value(request)?;
523                    let response = self.peg_in(req)
524                        .await
525                        .map_err(|e| anyhow::anyhow!("peg_in failed: {}", e))?;
526                    let result = serde_json::to_value(&response)?;
527                    yield result;
528                },
529                "peg_out" => {
530                    let req: PegOutRequest = serde_json::from_value(request)?;
531                    let response = self.peg_out(req)
532                        .await
533                        .map_err(|e| anyhow::anyhow!("peg_out failed: {}", e))?;
534                    let result = serde_json::to_value(&response)?;
535                    yield result;
536                },
537                "subscribe_deposit" => {
538                    let req: SubscribeDepositRequest = serde_json::from_value(request)?;
539                    for await state in self.subscribe_deposit(req.operation_id).await?.into_stream() {
540                        yield serde_json::to_value(state)?;
541                    }
542                },
543                "subscribe_withdraw" => {
544                    let req: SubscribeWithdrawRequest = serde_json::from_value(request)?;
545                    for await state in self.subscribe_withdraw_updates(req.operation_id).await?.into_stream(){
546                        yield serde_json::to_value(state)?;
547                    }
548                }
549                _ => {
550                    Err(anyhow::format_err!("Unknown method: {}", method))?;
551                }
552            }
553        })
554    }
555
556    #[cfg(feature = "cli")]
557    async fn handle_cli_command(
558        &self,
559        args: &[std::ffi::OsString],
560    ) -> anyhow::Result<serde_json::Value> {
561        cli::handle_cli_command(self, args).await
562    }
563}
564
565#[derive(Deserialize)]
566struct WalletSummaryRequest {}
567
568#[derive(Debug, Clone)]
569pub struct WalletClientContext {
570    rpc: DynBitcoindRpc,
571    wallet_descriptor: PegInDescriptor,
572    wallet_decoder: Decoder,
573    secp: Secp256k1<All>,
574    pub client_ctx: ClientContext<WalletClientModule>,
575}
576
577#[derive(Debug, Clone, Serialize, Deserialize)]
578pub struct PegInRequest {
579    pub extra_meta: serde_json::Value,
580}
581
582#[derive(Deserialize)]
583struct SubscribeDepositRequest {
584    operation_id: OperationId,
585}
586
587#[derive(Deserialize)]
588struct SubscribeWithdrawRequest {
589    operation_id: OperationId,
590}
591
592#[derive(Debug, Clone, Serialize, Deserialize)]
593pub struct PegInResponse {
594    pub deposit_address: Address<NetworkUnchecked>,
595    pub operation_id: OperationId,
596}
597
598#[derive(Debug, Clone, Serialize, Deserialize)]
599pub struct PegOutRequest {
600    pub amount_sat: u64,
601    pub destination_address: Address<NetworkUnchecked>,
602    pub extra_meta: serde_json::Value,
603}
604
605#[derive(Debug, Clone, Serialize, Deserialize)]
606pub struct PegOutResponse {
607    pub operation_id: OperationId,
608}
609
610impl Context for WalletClientContext {
611    const KIND: Option<ModuleKind> = Some(KIND);
612}
613
614impl WalletClientModule {
615    fn cfg(&self) -> &WalletClientConfig {
616        &self.data.cfg
617    }
618
619    fn get_rpc_config(cfg: &WalletClientConfig) -> BitcoinRpcConfig {
620        match BitcoinRpcConfig::get_defaults_from_env_vars() {
621            Ok(rpc_config) => {
622                // TODO: Wallet client cannot support bitcoind RPC until the bitcoin dep is
623                // updated to 0.30
624                if rpc_config.kind == "bitcoind" {
625                    cfg.default_bitcoin_rpc.clone()
626                } else {
627                    rpc_config
628                }
629            }
630            _ => cfg.default_bitcoin_rpc.clone(),
631        }
632    }
633
634    pub fn get_network(&self) -> Network {
635        self.cfg().network.0
636    }
637
638    pub fn get_finality_delay(&self) -> u32 {
639        self.cfg().finality_delay
640    }
641
642    pub fn get_fee_consensus(&self) -> FeeConsensus {
643        self.cfg().fee_consensus
644    }
645
646    async fn allocate_deposit_address_inner(
647        &self,
648        dbtx: &mut DatabaseTransaction<'_>,
649    ) -> (OperationId, Address, TweakIdx) {
650        dbtx.ensure_isolated().expect("Must be isolated db");
651
652        let tweak_idx = get_next_peg_in_tweak_child_id(dbtx).await;
653        let (_secret_tweak_key, _, address, operation_id) =
654            self.data.derive_deposit_address(tweak_idx);
655
656        let now = fedimint_core::time::now();
657
658        dbtx.insert_new_entry(
659            &PegInTweakIndexKey(tweak_idx),
660            &PegInTweakIndexData {
661                creation_time: now,
662                next_check_time: Some(now),
663                last_check_time: None,
664                operation_id,
665                claimed: vec![],
666            },
667        )
668        .await;
669
670        (operation_id, address, tweak_idx)
671    }
672
673    /// Fetches the fees that would need to be paid to make the withdraw request
674    /// using [`Self::withdraw`] work *right now*.
675    ///
676    /// Note that we do not receive a guarantee that these fees will be valid in
677    /// the future, thus even the next second using these fees *may* fail.
678    /// The caller should be prepared to retry with a new fee estimate.
679    pub async fn get_withdraw_fees(
680        &self,
681        address: &bitcoin::Address,
682        amount: bitcoin::Amount,
683    ) -> anyhow::Result<PegOutFees> {
684        self.module_api
685            .fetch_peg_out_fees(address, amount)
686            .await?
687            .context("Federation didn't return peg-out fees")
688    }
689
690    /// Returns a summary of the wallet's coins
691    pub async fn get_wallet_summary(&self) -> anyhow::Result<WalletSummary> {
692        Ok(self.module_api.fetch_wallet_summary().await?)
693    }
694
695    pub async fn get_block_count_local(&self) -> anyhow::Result<u32> {
696        Ok(self.module_api.fetch_block_count_local().await?)
697    }
698
699    pub fn create_withdraw_output(
700        &self,
701        operation_id: OperationId,
702        address: bitcoin::Address,
703        amount: bitcoin::Amount,
704        fees: PegOutFees,
705    ) -> anyhow::Result<ClientOutputBundle<WalletOutput, WalletClientStates>> {
706        let output = WalletOutput::new_v0_peg_out(address, amount, fees);
707
708        let amount = output.maybe_v0_ref().expect("v0 output").amount().into();
709
710        let sm_gen = move |out_point_range: OutPointRange| {
711            assert_eq!(out_point_range.count(), 1);
712            let out_idx = out_point_range.start_idx();
713            vec![WalletClientStates::Withdraw(WithdrawStateMachine {
714                operation_id,
715                state: WithdrawStates::Created(CreatedWithdrawState {
716                    fm_outpoint: OutPoint {
717                        txid: out_point_range.txid(),
718                        out_idx,
719                    },
720                }),
721            })]
722        };
723
724        Ok(ClientOutputBundle::new(
725            vec![ClientOutput::<WalletOutput> {
726                output,
727                amounts: Amounts::new_bitcoin(amount),
728            }],
729            vec![ClientOutputSM::<WalletClientStates> {
730                state_machines: Arc::new(sm_gen),
731            }],
732        ))
733    }
734
735    pub async fn peg_in(&self, req: PegInRequest) -> anyhow::Result<PegInResponse> {
736        let (operation_id, address, _) = self.safe_allocate_deposit_address(req.extra_meta).await?;
737
738        Ok(PegInResponse {
739            deposit_address: Address::from_script(&address.script_pubkey(), self.get_network())?
740                .as_unchecked()
741                .clone(),
742            operation_id,
743        })
744    }
745
746    pub async fn peg_out(&self, req: PegOutRequest) -> anyhow::Result<PegOutResponse> {
747        let amount = bitcoin::Amount::from_sat(req.amount_sat);
748        let destination = req
749            .destination_address
750            .require_network(self.get_network())?;
751
752        let fees = self.get_withdraw_fees(&destination, amount).await?;
753        let operation_id = self
754            .withdraw(&destination, amount, fees, req.extra_meta)
755            .await
756            .context("Failed to initiate withdraw")?;
757
758        Ok(PegOutResponse { operation_id })
759    }
760
761    pub fn create_rbf_withdraw_output(
762        &self,
763        operation_id: OperationId,
764        rbf: &Rbf,
765    ) -> anyhow::Result<ClientOutputBundle<WalletOutput, WalletClientStates>> {
766        let output = WalletOutput::new_v0_rbf(rbf.fees, rbf.txid);
767
768        let amount = output.maybe_v0_ref().expect("v0 output").amount().into();
769
770        let sm_gen = move |out_point_range: OutPointRange| {
771            assert_eq!(out_point_range.count(), 1);
772            let out_idx = out_point_range.start_idx();
773            vec![WalletClientStates::Withdraw(WithdrawStateMachine {
774                operation_id,
775                state: WithdrawStates::Created(CreatedWithdrawState {
776                    fm_outpoint: OutPoint {
777                        txid: out_point_range.txid(),
778                        out_idx,
779                    },
780                }),
781            })]
782        };
783
784        Ok(ClientOutputBundle::new(
785            vec![ClientOutput::<WalletOutput> {
786                output,
787                amounts: Amounts::new_bitcoin(amount),
788            }],
789            vec![ClientOutputSM::<WalletClientStates> {
790                state_machines: Arc::new(sm_gen),
791            }],
792        ))
793    }
794
795    pub async fn btc_tx_has_no_size_limit(&self) -> FederationResult<bool> {
796        Ok(self.module_api.module_consensus_version().await? >= ModuleConsensusVersion::new(2, 2))
797    }
798
799    /// Returns true if the federation's wallet module consensus version
800    /// supports processing all deposits.
801    ///
802    /// This method is safe to call offline, since it first attempts to read a
803    /// key from the db that represents the client has previously been able to
804    /// verify the wallet module consensus version. If the client has not
805    /// verified the version, it must be online to fetch the latest wallet
806    /// module consensus version.
807    pub async fn supports_safe_deposit(&self) -> bool {
808        let mut dbtx = self.db.begin_transaction().await;
809
810        let already_verified_supports_safe_deposit =
811            dbtx.get_value(&SupportsSafeDepositKey).await.is_some();
812
813        already_verified_supports_safe_deposit || {
814            match self.module_api.module_consensus_version().await {
815                Ok(module_consensus_version) => {
816                    let supported_version =
817                        SAFE_DEPOSIT_MODULE_CONSENSUS_VERSION <= module_consensus_version;
818
819                    if supported_version {
820                        dbtx.insert_new_entry(&SupportsSafeDepositKey, &()).await;
821                        dbtx.commit_tx().await;
822                    }
823
824                    supported_version
825                }
826                Err(_) => false,
827            }
828        }
829    }
830
831    /// Allocates a deposit address controlled by the federation, guaranteeing
832    /// safe handling of all deposits, including on-chain transactions exceeding
833    /// `ALEPH_BFT_UNIT_BYTE_LIMIT`.
834    ///
835    /// Returns an error if the client has never been online to verify the
836    /// federation's wallet module consensus version supports processing all
837    /// deposits.
838    pub async fn safe_allocate_deposit_address<M>(
839        &self,
840        extra_meta: M,
841    ) -> anyhow::Result<(OperationId, Address, TweakIdx)>
842    where
843        M: Serialize + MaybeSend + MaybeSync,
844    {
845        ensure!(
846            self.supports_safe_deposit().await,
847            "Wallet module consensus version doesn't support safe deposits",
848        );
849
850        self.allocate_deposit_address_expert_only(extra_meta).await
851    }
852
853    /// Allocates a deposit address that is controlled by the federation.
854    ///
855    /// This is an EXPERT ONLY method intended for power users such as Lightning
856    /// gateways allocating liquidity, and we discourage exposing peg-in
857    /// functionality to everyday users of a Fedimint wallet due to the
858    /// following two limitations:
859    ///
860    /// The transaction sending to this address needs to be smaller than 40KB in
861    /// order for the peg-in to be claimable. If the transaction is too large,
862    /// funds will be lost.
863    ///
864    /// In the future, federations will also enforce a minimum peg-in amount to
865    /// prevent accumulation of dust UTXOs. Peg-ins under this minimum cannot be
866    /// claimed and funds will be lost.
867    ///
868    /// Everyday users should rely on Lightning to move funds into the
869    /// federation.
870    pub async fn allocate_deposit_address_expert_only<M>(
871        &self,
872        extra_meta: M,
873    ) -> anyhow::Result<(OperationId, Address, TweakIdx)>
874    where
875        M: Serialize + MaybeSend + MaybeSync,
876    {
877        let extra_meta_value =
878            serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
879        let (operation_id, address, tweak_idx) = self
880            .db
881            .autocommit(
882                move |dbtx, _| {
883                    let extra_meta_value_inner = extra_meta_value.clone();
884                    Box::pin(async move {
885                        let (operation_id, address, tweak_idx) = self
886                            .allocate_deposit_address_inner(dbtx)
887                            .await;
888
889                        self.client_ctx.manual_operation_start_dbtx(
890                            dbtx,
891                            operation_id,
892                            WalletCommonInit::KIND.as_str(),
893                            WalletOperationMeta {
894                                variant: WalletOperationMetaVariant::Deposit {
895                                    address: address.clone().into_unchecked(),
896                                    tweak_idx: Some(tweak_idx),
897                                    expires_at: None,
898                                },
899                                extra_meta: extra_meta_value_inner,
900                            },
901                            vec![]
902                        ).await?;
903
904                        debug!(target: LOG_CLIENT_MODULE_WALLET, %tweak_idx, %address, "Derived a new deposit address");
905
906                        // Begin watching the script address
907                        self.rpc.watch_script_history(&address.script_pubkey()).await?;
908
909                        let sender = self.pegin_monitor_wakeup_sender.clone();
910                        dbtx.on_commit(move || {
911                            sender.send_replace(());
912                        });
913
914                        Ok((operation_id, address, tweak_idx))
915                    })
916                },
917                Some(100),
918            )
919            .await
920            .map_err(|e| match e {
921                AutocommitError::CommitFailed {
922                    last_error,
923                    attempts,
924                } => anyhow!("Failed to commit after {attempts} attempts: {last_error}"),
925                AutocommitError::ClosureError { error, .. } => error,
926            })?;
927
928        Ok((operation_id, address, tweak_idx))
929    }
930
931    /// Returns a stream of updates about an ongoing deposit operation created
932    /// with [`WalletClientModule::allocate_deposit_address_expert_only`].
933    /// Returns an error for old deposit operations created prior to the 0.4
934    /// release and not driven to completion yet. This should be rare enough
935    /// that an indeterminate state is ok here.
936    pub async fn subscribe_deposit(
937        &self,
938        operation_id: OperationId,
939    ) -> anyhow::Result<UpdateStreamOrOutcome<DepositStateV2>> {
940        let operation = self
941            .client_ctx
942            .get_operation(operation_id)
943            .await
944            .with_context(|| anyhow!("Operation not found: {}", operation_id.fmt_short()))?;
945
946        if operation.operation_module_kind() != WalletCommonInit::KIND.as_str() {
947            bail!("Operation is not a wallet operation");
948        }
949
950        let operation_meta = operation.meta::<WalletOperationMeta>();
951
952        let WalletOperationMetaVariant::Deposit {
953            address, tweak_idx, ..
954        } = operation_meta.variant
955        else {
956            bail!("Operation is not a deposit operation");
957        };
958
959        let address = address.require_network(self.cfg().network.0)?;
960
961        // The old deposit operations don't have tweak_idx set
962        let Some(tweak_idx) = tweak_idx else {
963            // In case we are dealing with an old deposit that still uses state machines we
964            // don't have the logic here anymore to subscribe to updates. We can still read
965            // the final state though if it reached any.
966            let outcome_v1 = operation
967                .outcome::<DepositStateV1>()
968                .context("Old pending deposit, can't subscribe to updates")?;
969
970            let outcome_v2 = match outcome_v1 {
971                DepositStateV1::Claimed(tx_info) => DepositStateV2::Claimed {
972                    btc_deposited: tx_info.btc_transaction.output[tx_info.out_idx as usize].value,
973                    btc_out_point: bitcoin::OutPoint {
974                        txid: tx_info.btc_transaction.compute_txid(),
975                        vout: tx_info.out_idx,
976                    },
977                },
978                DepositStateV1::Failed(error) => DepositStateV2::Failed(error),
979                _ => bail!("Non-final outcome in operation log"),
980            };
981
982            return Ok(UpdateStreamOrOutcome::Outcome(outcome_v2));
983        };
984
985        Ok(self.client_ctx.outcome_or_updates(operation, operation_id, {
986            let stream_rpc = self.rpc.clone();
987            let stream_client_ctx = self.client_ctx.clone();
988            let stream_script_pub_key = address.script_pubkey();
989            move || {
990
991            stream! {
992                yield DepositStateV2::WaitingForTransaction;
993
994                retry(
995                    "subscribe script history",
996                    background_backoff(),
997                    || stream_rpc.watch_script_history(&stream_script_pub_key)
998                ).await.expect("Will never give up");
999                let (btc_out_point, btc_deposited) = retry(
1000                    "fetch history",
1001                    background_backoff(),
1002                    || async {
1003                        let history = stream_rpc.get_script_history(&stream_script_pub_key).await?;
1004                        history.first().and_then(|tx| {
1005                            let (out_idx, amount) = tx.output
1006                                .iter()
1007                                .enumerate()
1008                                .find_map(|(idx, output)| (output.script_pubkey == stream_script_pub_key).then_some((idx, output.value)))?;
1009                            let txid = tx.compute_txid();
1010
1011                            Some((
1012                                bitcoin::OutPoint {
1013                                    txid,
1014                                    vout: out_idx as u32,
1015                                },
1016                                amount
1017                            ))
1018                        }).context("No deposit transaction found")
1019                    }
1020                ).await.expect("Will never give up");
1021
1022                yield DepositStateV2::WaitingForConfirmation {
1023                    btc_deposited,
1024                    btc_out_point
1025                };
1026
1027                let claim_data = stream_client_ctx.module_db().wait_key_exists(&ClaimedPegInKey {
1028                    peg_in_index: tweak_idx,
1029                    btc_out_point,
1030                }).await;
1031
1032                yield DepositStateV2::Confirmed {
1033                    btc_deposited,
1034                    btc_out_point
1035                };
1036
1037                match stream_client_ctx.await_primary_module_outputs(operation_id, claim_data.change).await {
1038                    Ok(()) => yield DepositStateV2::Claimed {
1039                        btc_deposited,
1040                        btc_out_point
1041                    },
1042                    Err(e) => yield DepositStateV2::Failed(e.to_string())
1043                }
1044            }
1045        }}))
1046    }
1047
1048    pub async fn list_peg_in_tweak_idxes(&self) -> BTreeMap<TweakIdx, PegInTweakIndexData> {
1049        self.client_ctx
1050            .module_db()
1051            .clone()
1052            .begin_transaction_nc()
1053            .await
1054            .find_by_prefix(&PegInTweakIndexPrefix)
1055            .await
1056            .map(|(key, data)| (key.0, data))
1057            .collect()
1058            .await
1059    }
1060
1061    pub async fn find_tweak_idx_by_address(
1062        &self,
1063        address: bitcoin::Address<NetworkUnchecked>,
1064    ) -> anyhow::Result<TweakIdx> {
1065        let data = self.data.clone();
1066        let Some((tweak_idx, _)) = self
1067            .db
1068            .begin_transaction_nc()
1069            .await
1070            .find_by_prefix(&PegInTweakIndexPrefix)
1071            .await
1072            .filter(|(k, _)| {
1073                let (_, derived_address, _tweak_key, _) = data.derive_peg_in_script(k.0);
1074                future::ready(derived_address.into_unchecked() == address)
1075            })
1076            .next()
1077            .await
1078        else {
1079            bail!("Address not found in the list of derived keys");
1080        };
1081
1082        Ok(tweak_idx.0)
1083    }
1084    pub async fn find_tweak_idx_by_operation_id(
1085        &self,
1086        operation_id: OperationId,
1087    ) -> anyhow::Result<TweakIdx> {
1088        Ok(self
1089            .client_ctx
1090            .module_db()
1091            .clone()
1092            .begin_transaction_nc()
1093            .await
1094            .find_by_prefix(&PegInTweakIndexPrefix)
1095            .await
1096            .filter(|(_k, v)| future::ready(v.operation_id == operation_id))
1097            .next()
1098            .await
1099            .ok_or_else(|| anyhow::format_err!("OperationId not found"))?
1100            .0
1101            .0)
1102    }
1103
1104    pub async fn get_pegin_tweak_idx(
1105        &self,
1106        tweak_idx: TweakIdx,
1107    ) -> anyhow::Result<PegInTweakIndexData> {
1108        self.client_ctx
1109            .module_db()
1110            .clone()
1111            .begin_transaction_nc()
1112            .await
1113            .get_value(&PegInTweakIndexKey(tweak_idx))
1114            .await
1115            .ok_or_else(|| anyhow::format_err!("TweakIdx not found"))
1116    }
1117
1118    pub async fn get_claimed_pegins(
1119        &self,
1120        dbtx: &mut DatabaseTransaction<'_>,
1121        tweak_idx: TweakIdx,
1122    ) -> Vec<(
1123        bitcoin::OutPoint,
1124        TransactionId,
1125        Vec<fedimint_core::OutPoint>,
1126    )> {
1127        let outpoints = dbtx
1128            .get_value(&PegInTweakIndexKey(tweak_idx))
1129            .await
1130            .map(|v| v.claimed)
1131            .unwrap_or_default();
1132
1133        let mut res = vec![];
1134
1135        for outpoint in outpoints {
1136            let claimed_peg_in_data = dbtx
1137                .get_value(&ClaimedPegInKey {
1138                    peg_in_index: tweak_idx,
1139                    btc_out_point: outpoint,
1140                })
1141                .await
1142                .expect("Must have a corresponding claim record");
1143            res.push((
1144                outpoint,
1145                claimed_peg_in_data.claim_txid,
1146                claimed_peg_in_data.change,
1147            ));
1148        }
1149
1150        res
1151    }
1152
1153    /// Like [`Self::recheck_pegin_address`] but by `operation_id`
1154    pub async fn recheck_pegin_address_by_op_id(
1155        &self,
1156        operation_id: OperationId,
1157    ) -> anyhow::Result<()> {
1158        let tweak_idx = self.find_tweak_idx_by_operation_id(operation_id).await?;
1159
1160        self.recheck_pegin_address(tweak_idx).await
1161    }
1162
1163    /// Schedule given address for immediate re-check for deposits
1164    pub async fn recheck_pegin_address_by_address(
1165        &self,
1166        address: bitcoin::Address<NetworkUnchecked>,
1167    ) -> anyhow::Result<()> {
1168        self.recheck_pegin_address(self.find_tweak_idx_by_address(address).await?)
1169            .await
1170    }
1171
1172    /// Schedule given address for immediate re-check for deposits
1173    pub async fn recheck_pegin_address(&self, tweak_idx: TweakIdx) -> anyhow::Result<()> {
1174        self.db
1175            .autocommit(
1176                |dbtx, _| {
1177                    Box::pin(async {
1178                        let db_key = PegInTweakIndexKey(tweak_idx);
1179                        let db_val = dbtx
1180                            .get_value(&db_key)
1181                            .await
1182                            .ok_or_else(|| anyhow::format_err!("DBKey not found"))?;
1183
1184                        dbtx.insert_entry(
1185                            &db_key,
1186                            &PegInTweakIndexData {
1187                                next_check_time: Some(fedimint_core::time::now()),
1188                                ..db_val
1189                            },
1190                        )
1191                        .await;
1192
1193                        let sender = self.pegin_monitor_wakeup_sender.clone();
1194                        dbtx.on_commit(move || {
1195                            sender.send_replace(());
1196                        });
1197
1198                        Ok::<_, anyhow::Error>(())
1199                    })
1200                },
1201                Some(100),
1202            )
1203            .await?;
1204
1205        Ok(())
1206    }
1207
1208    /// Await for num deposit by [`OperationId`]
1209    pub async fn await_num_deposits_by_operation_id(
1210        &self,
1211        operation_id: OperationId,
1212        num_deposits: usize,
1213    ) -> anyhow::Result<()> {
1214        let tweak_idx = self.find_tweak_idx_by_operation_id(operation_id).await?;
1215        self.await_num_deposits(tweak_idx, num_deposits).await
1216    }
1217
1218    pub async fn await_num_deposits_by_address(
1219        &self,
1220        address: bitcoin::Address<NetworkUnchecked>,
1221        num_deposits: usize,
1222    ) -> anyhow::Result<()> {
1223        self.await_num_deposits(self.find_tweak_idx_by_address(address).await?, num_deposits)
1224            .await
1225    }
1226
1227    #[instrument(target = LOG_CLIENT_MODULE_WALLET, skip_all, fields(tweak_idx=?tweak_idx, num_deposists=num_deposits))]
1228    pub async fn await_num_deposits(
1229        &self,
1230        tweak_idx: TweakIdx,
1231        num_deposits: usize,
1232    ) -> anyhow::Result<()> {
1233        let operation_id = self.get_pegin_tweak_idx(tweak_idx).await?.operation_id;
1234
1235        let mut receiver = self.pegin_claimed_receiver.clone();
1236        let mut backoff = backoff_util::aggressive_backoff();
1237
1238        loop {
1239            let pegins = self
1240                .get_claimed_pegins(
1241                    &mut self.client_ctx.module_db().begin_transaction_nc().await,
1242                    tweak_idx,
1243                )
1244                .await;
1245
1246            if pegins.len() < num_deposits {
1247                debug!(target: LOG_CLIENT_MODULE_WALLET, has=pegins.len(), "Not enough deposits");
1248                self.recheck_pegin_address(tweak_idx).await?;
1249                runtime::sleep(backoff.next().unwrap_or_default()).await;
1250                receiver.changed().await?;
1251                continue;
1252            }
1253
1254            debug!(target: LOG_CLIENT_MODULE_WALLET, has=pegins.len(), "Enough deposits detected");
1255
1256            for (_outpoint, transaction_id, change) in pegins {
1257                if transaction_id == TransactionId::from_byte_array([0; 32]) && change.is_empty() {
1258                    debug!(target: LOG_CLIENT_MODULE_WALLET, "Deposited amount was too low, skipping");
1259                    continue;
1260                }
1261
1262                debug!(target: LOG_CLIENT_MODULE_WALLET, out_points=?change, "Ensuring deposists claimed");
1263                let tx_subscriber = self.client_ctx.transaction_updates(operation_id).await;
1264
1265                if let Err(e) = tx_subscriber.await_tx_accepted(transaction_id).await {
1266                    bail!("{}", e);
1267                }
1268
1269                debug!(target: LOG_CLIENT_MODULE_WALLET, out_points=?change, "Ensuring outputs claimed");
1270                self.client_ctx
1271                    .await_primary_module_outputs(operation_id, change)
1272                    .await
1273                    .expect("Cannot fail if tx was accepted and federation is honest");
1274            }
1275
1276            return Ok(());
1277        }
1278    }
1279
1280    /// Attempt to withdraw a given `amount` of Bitcoin to a destination
1281    /// `address`. The caller has to supply the fee rate to be used which can be
1282    /// fetched using [`Self::get_withdraw_fees`] and should be
1283    /// acknowledged by the user since it can be unexpectedly high.
1284    pub async fn withdraw<M: Serialize + MaybeSend + MaybeSync>(
1285        &self,
1286        address: &bitcoin::Address,
1287        amount: bitcoin::Amount,
1288        fee: PegOutFees,
1289        extra_meta: M,
1290    ) -> anyhow::Result<OperationId> {
1291        {
1292            let operation_id = OperationId(thread_rng().r#gen());
1293
1294            let withdraw_output =
1295                self.create_withdraw_output(operation_id, address.clone(), amount, fee)?;
1296            let tx_builder = TransactionBuilder::new()
1297                .with_outputs(self.client_ctx.make_client_outputs(withdraw_output));
1298
1299            let extra_meta =
1300                serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
1301            self.client_ctx
1302                .finalize_and_submit_transaction(
1303                    operation_id,
1304                    WalletCommonInit::KIND.as_str(),
1305                    {
1306                        let address = address.clone();
1307                        move |change_range: OutPointRange| WalletOperationMeta {
1308                            variant: WalletOperationMetaVariant::Withdraw {
1309                                address: address.clone().into_unchecked(),
1310                                amount,
1311                                fee,
1312                                change: change_range.into_iter().collect(),
1313                            },
1314                            extra_meta: extra_meta.clone(),
1315                        }
1316                    },
1317                    tx_builder,
1318                )
1319                .await?;
1320
1321            let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
1322
1323            self.client_ctx
1324                .log_event(
1325                    &mut dbtx,
1326                    SendPaymentEvent {
1327                        operation_id,
1328                        amount: amount + fee.amount(),
1329                        fee: fee.amount(),
1330                    },
1331                )
1332                .await;
1333
1334            dbtx.commit_tx().await;
1335
1336            Ok(operation_id)
1337        }
1338    }
1339
1340    /// Attempt to increase the fee of a onchain withdraw transaction using
1341    /// replace by fee (RBF).
1342    /// This can prevent transactions from getting stuck
1343    /// in the mempool
1344    #[deprecated(
1345        since = "0.4.0",
1346        note = "RBF withdrawals are rejected by the federation"
1347    )]
1348    pub async fn rbf_withdraw<M: Serialize + MaybeSync + MaybeSend>(
1349        &self,
1350        rbf: Rbf,
1351        extra_meta: M,
1352    ) -> anyhow::Result<OperationId> {
1353        let operation_id = OperationId(thread_rng().r#gen());
1354
1355        let withdraw_output = self.create_rbf_withdraw_output(operation_id, &rbf)?;
1356        let tx_builder = TransactionBuilder::new()
1357            .with_outputs(self.client_ctx.make_client_outputs(withdraw_output));
1358
1359        let extra_meta = serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
1360        self.client_ctx
1361            .finalize_and_submit_transaction(
1362                operation_id,
1363                WalletCommonInit::KIND.as_str(),
1364                move |change_range: OutPointRange| WalletOperationMeta {
1365                    variant: WalletOperationMetaVariant::RbfWithdraw {
1366                        rbf: rbf.clone(),
1367                        change: change_range.into_iter().collect(),
1368                    },
1369                    extra_meta: extra_meta.clone(),
1370                },
1371                tx_builder,
1372            )
1373            .await?;
1374
1375        Ok(operation_id)
1376    }
1377
1378    pub async fn subscribe_withdraw_updates(
1379        &self,
1380        operation_id: OperationId,
1381    ) -> anyhow::Result<UpdateStreamOrOutcome<WithdrawState>> {
1382        let operation = self
1383            .client_ctx
1384            .get_operation(operation_id)
1385            .await
1386            .with_context(|| anyhow!("Operation not found: {}", operation_id.fmt_short()))?;
1387
1388        if operation.operation_module_kind() != WalletCommonInit::KIND.as_str() {
1389            bail!("Operation is not a wallet operation");
1390        }
1391
1392        let operation_meta = operation.meta::<WalletOperationMeta>();
1393
1394        let (WalletOperationMetaVariant::Withdraw { change, .. }
1395        | WalletOperationMetaVariant::RbfWithdraw { change, .. }) = operation_meta.variant
1396        else {
1397            bail!("Operation is not a withdraw operation");
1398        };
1399
1400        let mut operation_stream = self.notifier.subscribe(operation_id).await;
1401        let client_ctx = self.client_ctx.clone();
1402
1403        Ok(self
1404            .client_ctx
1405            .outcome_or_updates(operation, operation_id, move || {
1406                stream! {
1407                    match next_withdraw_state(&mut operation_stream).await {
1408                        Some(WithdrawStates::Created(_)) => {
1409                            yield WithdrawState::Created;
1410                        },
1411                        Some(s) => {
1412                            panic!("Unexpected state {s:?}")
1413                        },
1414                        None => return,
1415                    }
1416
1417                    // TODO: get rid of awaiting change here, there has to be a better way to make tests deterministic
1418
1419                        // Swallowing potential errors since the transaction failing  is handled by
1420                        // output outcome fetching already
1421                        let _ = client_ctx
1422                            .await_primary_module_outputs(operation_id, change)
1423                            .await;
1424
1425
1426                    match next_withdraw_state(&mut operation_stream).await {
1427                        Some(WithdrawStates::Aborted(inner)) => {
1428                            yield WithdrawState::Failed(inner.error);
1429                        },
1430                        Some(WithdrawStates::Success(inner)) => {
1431                            yield WithdrawState::Succeeded(inner.txid);
1432                        },
1433                        Some(s) => {
1434                            panic!("Unexpected state {s:?}")
1435                        },
1436                        None => {},
1437                    }
1438                }
1439            }))
1440    }
1441
1442    fn admin_auth(&self) -> anyhow::Result<ApiAuth> {
1443        self.admin_auth
1444            .clone()
1445            .ok_or_else(|| anyhow::format_err!("Admin auth not set"))
1446    }
1447
1448    pub async fn activate_consensus_version_voting(&self) -> anyhow::Result<()> {
1449        self.module_api
1450            .activate_consensus_version_voting(self.admin_auth()?)
1451            .await?;
1452
1453        Ok(())
1454    }
1455}
1456
1457/// Polls the federation checking if the activated module consensus version
1458/// supports safe deposits, saving the result in the db once it does.
1459async fn poll_supports_safe_deposit_version(db: Database, module_api: DynModuleApi) {
1460    loop {
1461        let mut dbtx = db.begin_transaction().await;
1462
1463        if dbtx.get_value(&SupportsSafeDepositKey).await.is_some() {
1464            break;
1465        }
1466
1467        module_api.wait_for_initialized_connections().await;
1468
1469        if let Ok(module_consensus_version) = module_api.module_consensus_version().await
1470            && SAFE_DEPOSIT_MODULE_CONSENSUS_VERSION <= module_consensus_version
1471        {
1472            dbtx.insert_new_entry(&SupportsSafeDepositKey, &()).await;
1473            dbtx.commit_tx().await;
1474            break;
1475        }
1476
1477        drop(dbtx);
1478
1479        if is_running_in_test_env() {
1480            // Even in tests we don't want to spam the federation with requests about it
1481            sleep(Duration::from_secs(10)).await;
1482        } else {
1483            sleep(Duration::from_secs(3600)).await;
1484        }
1485    }
1486}
1487
1488/// Returns the child index to derive the next peg-in tweak key from.
1489async fn get_next_peg_in_tweak_child_id(dbtx: &mut DatabaseTransaction<'_>) -> TweakIdx {
1490    let index = dbtx
1491        .get_value(&NextPegInTweakIndexKey)
1492        .await
1493        .unwrap_or_default();
1494    dbtx.insert_entry(&NextPegInTweakIndexKey, &(index.next()))
1495        .await;
1496    index
1497}
1498
1499#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
1500pub enum WalletClientStates {
1501    Deposit(DepositStateMachine),
1502    Withdraw(WithdrawStateMachine),
1503}
1504
1505impl IntoDynInstance for WalletClientStates {
1506    type DynType = DynState;
1507
1508    fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
1509        DynState::from_typed(instance_id, self)
1510    }
1511}
1512
1513impl State for WalletClientStates {
1514    type ModuleContext = WalletClientContext;
1515
1516    fn transitions(
1517        &self,
1518        context: &Self::ModuleContext,
1519        global_context: &DynGlobalClientContext,
1520    ) -> Vec<StateTransition<Self>> {
1521        match self {
1522            WalletClientStates::Deposit(sm) => {
1523                sm_enum_variant_translation!(
1524                    sm.transitions(context, global_context),
1525                    WalletClientStates::Deposit
1526                )
1527            }
1528            WalletClientStates::Withdraw(sm) => {
1529                sm_enum_variant_translation!(
1530                    sm.transitions(context, global_context),
1531                    WalletClientStates::Withdraw
1532                )
1533            }
1534        }
1535    }
1536
1537    fn operation_id(&self) -> OperationId {
1538        match self {
1539            WalletClientStates::Deposit(sm) => sm.operation_id(),
1540            WalletClientStates::Withdraw(sm) => sm.operation_id(),
1541        }
1542    }
1543}
1544
1545#[cfg(all(test, not(target_family = "wasm")))]
1546mod tests {
1547    use std::collections::BTreeSet;
1548    use std::sync::atomic::{AtomicBool, Ordering};
1549
1550    use super::*;
1551    use crate::backup::{
1552        RECOVER_NUM_IDX_ADD_TO_LAST_USED, RecoverScanOutcome, recover_scan_idxes_for_activity,
1553    };
1554
1555    #[allow(clippy::too_many_lines)] // shut-up clippy, it's a test
1556    #[tokio::test(flavor = "multi_thread")]
1557    async fn sanity_test_recover_inner() {
1558        {
1559            let last_checked = AtomicBool::new(false);
1560            let last_checked = &last_checked;
1561            assert_eq!(
1562                recover_scan_idxes_for_activity(
1563                    TweakIdx(0),
1564                    &BTreeSet::new(),
1565                    |cur_idx| async move {
1566                        Ok(match cur_idx {
1567                            TweakIdx(9) => {
1568                                last_checked.store(true, Ordering::SeqCst);
1569                                vec![]
1570                            }
1571                            TweakIdx(10) => panic!("Shouldn't happen"),
1572                            TweakIdx(11) => {
1573                                vec![0usize] /* just for type inference */
1574                            }
1575                            _ => vec![],
1576                        })
1577                    }
1578                )
1579                .await
1580                .unwrap(),
1581                RecoverScanOutcome {
1582                    last_used_idx: None,
1583                    new_start_idx: TweakIdx(RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1584                    tweak_idxes_with_pegins: BTreeSet::from([])
1585                }
1586            );
1587            assert!(last_checked.load(Ordering::SeqCst));
1588        }
1589
1590        {
1591            let last_checked = AtomicBool::new(false);
1592            let last_checked = &last_checked;
1593            assert_eq!(
1594                recover_scan_idxes_for_activity(
1595                    TweakIdx(0),
1596                    &BTreeSet::from([TweakIdx(1), TweakIdx(2)]),
1597                    |cur_idx| async move {
1598                        Ok(match cur_idx {
1599                            TweakIdx(1) => panic!("Shouldn't happen: already used (1)"),
1600                            TweakIdx(2) => panic!("Shouldn't happen: already used (2)"),
1601                            TweakIdx(11) => {
1602                                last_checked.store(true, Ordering::SeqCst);
1603                                vec![]
1604                            }
1605                            TweakIdx(12) => panic!("Shouldn't happen"),
1606                            TweakIdx(13) => {
1607                                vec![0usize] /* just for type inference */
1608                            }
1609                            _ => vec![],
1610                        })
1611                    }
1612                )
1613                .await
1614                .unwrap(),
1615                RecoverScanOutcome {
1616                    last_used_idx: Some(TweakIdx(2)),
1617                    new_start_idx: TweakIdx(2 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1618                    tweak_idxes_with_pegins: BTreeSet::from([])
1619                }
1620            );
1621            assert!(last_checked.load(Ordering::SeqCst));
1622        }
1623
1624        {
1625            let last_checked = AtomicBool::new(false);
1626            let last_checked = &last_checked;
1627            assert_eq!(
1628                recover_scan_idxes_for_activity(
1629                    TweakIdx(10),
1630                    &BTreeSet::new(),
1631                    |cur_idx| async move {
1632                        Ok(match cur_idx {
1633                            TweakIdx(10) => vec![()],
1634                            TweakIdx(19) => {
1635                                last_checked.store(true, Ordering::SeqCst);
1636                                vec![]
1637                            }
1638                            TweakIdx(20) => panic!("Shouldn't happen"),
1639                            _ => vec![],
1640                        })
1641                    }
1642                )
1643                .await
1644                .unwrap(),
1645                RecoverScanOutcome {
1646                    last_used_idx: Some(TweakIdx(10)),
1647                    new_start_idx: TweakIdx(10 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1648                    tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(10)])
1649                }
1650            );
1651            assert!(last_checked.load(Ordering::SeqCst));
1652        }
1653
1654        assert_eq!(
1655            recover_scan_idxes_for_activity(TweakIdx(0), &BTreeSet::new(), |cur_idx| async move {
1656                Ok(match cur_idx {
1657                    TweakIdx(6 | 15) => vec![()],
1658                    _ => vec![],
1659                })
1660            })
1661            .await
1662            .unwrap(),
1663            RecoverScanOutcome {
1664                last_used_idx: Some(TweakIdx(15)),
1665                new_start_idx: TweakIdx(15 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1666                tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(6), TweakIdx(15)])
1667            }
1668        );
1669        assert_eq!(
1670            recover_scan_idxes_for_activity(TweakIdx(10), &BTreeSet::new(), |cur_idx| async move {
1671                Ok(match cur_idx {
1672                    TweakIdx(8) => {
1673                        vec![()] /* for type inference only */
1674                    }
1675                    TweakIdx(9) => {
1676                        panic!("Shouldn't happen")
1677                    }
1678                    _ => vec![],
1679                })
1680            })
1681            .await
1682            .unwrap(),
1683            RecoverScanOutcome {
1684                last_used_idx: None,
1685                new_start_idx: TweakIdx(9 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1686                tweak_idxes_with_pegins: BTreeSet::from([])
1687            }
1688        );
1689        assert_eq!(
1690            recover_scan_idxes_for_activity(TweakIdx(10), &BTreeSet::new(), |cur_idx| async move {
1691                Ok(match cur_idx {
1692                    TweakIdx(9) => panic!("Shouldn't happen"),
1693                    TweakIdx(15) => vec![()],
1694                    _ => vec![],
1695                })
1696            })
1697            .await
1698            .unwrap(),
1699            RecoverScanOutcome {
1700                last_used_idx: Some(TweakIdx(15)),
1701                new_start_idx: TweakIdx(15 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1702                tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(15)])
1703            }
1704        );
1705    }
1706}