fedimint_wallet_client/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::missing_errors_doc)]
4#![allow(clippy::missing_panics_doc)]
5#![allow(clippy::module_name_repetitions)]
6#![allow(clippy::must_use_candidate)]
7
8pub mod api;
9#[cfg(feature = "cli")]
10mod cli;
11
12mod backup;
13
14pub mod client_db;
15/// Legacy, state-machine based peg-ins, replaced by `pegin_monitor`
16/// but retained for time being to ensure existing peg-ins complete.
17mod deposit;
18pub mod events;
19/// Peg-in monitor: a task monitoring deposit addresses for peg-ins.
20mod pegin_monitor;
21mod withdraw;
22
23use std::collections::{BTreeMap, BTreeSet};
24use std::future;
25use std::sync::Arc;
26use std::time::{Duration, SystemTime};
27
28use anyhow::{Context as AnyhowContext, anyhow, bail, ensure};
29use async_stream::{stream, try_stream};
30use backup::WalletModuleBackup;
31use bitcoin::address::NetworkUnchecked;
32use bitcoin::secp256k1::{All, SECP256K1, Secp256k1};
33use bitcoin::{Address, Network, ScriptBuf};
34use client_db::{DbKeyPrefix, PegInTweakIndexKey, SupportsSafeDepositKey, TweakIdx};
35use fedimint_api_client::api::{DynModuleApi, FederationResult};
36use fedimint_bitcoind::{DynBitcoindRpc, create_esplora_rpc};
37use fedimint_client_module::module::init::{
38    ClientModuleInit, ClientModuleInitArgs, ClientModuleRecoverArgs,
39};
40use fedimint_client_module::module::{ClientContext, ClientModule, IClientModule, OutPointRange};
41use fedimint_client_module::oplog::UpdateStreamOrOutcome;
42use fedimint_client_module::sm::{Context, DynState, ModuleNotifier, State, StateTransition};
43use fedimint_client_module::transaction::{
44    ClientOutput, ClientOutputBundle, ClientOutputSM, TransactionBuilder,
45};
46use fedimint_client_module::{DynGlobalClientContext, sm_enum_variant_translation};
47use fedimint_core::core::{Decoder, IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId};
48use fedimint_core::db::{
49    AutocommitError, Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped,
50};
51use fedimint_core::encoding::{Decodable, Encodable};
52use fedimint_core::envs::{BitcoinRpcConfig, is_running_in_test_env};
53use fedimint_core::module::{
54    Amounts, ApiAuth, ApiVersion, CommonModuleInit, ModuleCommon, ModuleConsensusVersion,
55    ModuleInit, MultiApiVersion,
56};
57use fedimint_core::task::{MaybeSend, MaybeSync, TaskGroup, sleep};
58use fedimint_core::util::backoff_util::background_backoff;
59use fedimint_core::util::{BoxStream, backoff_util, retry};
60use fedimint_core::{
61    BitcoinHash, OutPoint, TransactionId, apply, async_trait_maybe_send, push_db_pair_items,
62    runtime, secp256k1,
63};
64use fedimint_derive_secret::{ChildId, DerivableSecret};
65use fedimint_logging::LOG_CLIENT_MODULE_WALLET;
66use fedimint_wallet_common::config::{FeeConsensus, WalletClientConfig};
67use fedimint_wallet_common::tweakable::Tweakable;
68pub use fedimint_wallet_common::*;
69use futures::{Stream, StreamExt};
70use rand::{Rng, thread_rng};
71use secp256k1::Keypair;
72use serde::{Deserialize, Serialize};
73use strum::IntoEnumIterator;
74use tokio::sync::watch;
75use tracing::{debug, instrument};
76
77use crate::api::WalletFederationApi;
78use crate::backup::WalletRecovery;
79use crate::client_db::{
80    ClaimedPegInData, ClaimedPegInKey, ClaimedPegInPrefix, NextPegInTweakIndexKey,
81    PegInTweakIndexData, PegInTweakIndexPrefix, RecoveryFinalizedKey, SupportsSafeDepositPrefix,
82};
83use crate::deposit::DepositStateMachine;
84use crate::withdraw::{CreatedWithdrawState, WithdrawStateMachine, WithdrawStates};
85
86const WALLET_TWEAK_CHILD_ID: ChildId = ChildId(0);
87
88#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
89pub struct BitcoinTransactionData {
90    /// The bitcoin transaction is saved as soon as we see it so the transaction
91    /// can be re-transmitted if it's evicted from the mempool.
92    pub btc_transaction: bitcoin::Transaction,
93    /// Index of the deposit output
94    pub out_idx: u32,
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
98pub enum DepositStateV1 {
99    WaitingForTransaction,
100    WaitingForConfirmation(BitcoinTransactionData),
101    Confirmed(BitcoinTransactionData),
102    Claimed(BitcoinTransactionData),
103    Failed(String),
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
107pub enum DepositStateV2 {
108    WaitingForTransaction,
109    WaitingForConfirmation {
110        #[serde(with = "bitcoin::amount::serde::as_sat")]
111        btc_deposited: bitcoin::Amount,
112        btc_out_point: bitcoin::OutPoint,
113    },
114    Confirmed {
115        #[serde(with = "bitcoin::amount::serde::as_sat")]
116        btc_deposited: bitcoin::Amount,
117        btc_out_point: bitcoin::OutPoint,
118    },
119    Claimed {
120        #[serde(with = "bitcoin::amount::serde::as_sat")]
121        btc_deposited: bitcoin::Amount,
122        btc_out_point: bitcoin::OutPoint,
123    },
124    Failed(String),
125}
126
127#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
128pub enum WithdrawState {
129    Created,
130    Succeeded(bitcoin::Txid),
131    Failed(String),
132    // TODO: track refund
133    // Refunded,
134    // RefundFailed(String),
135}
136
137async fn next_withdraw_state<S>(stream: &mut S) -> Option<WithdrawStates>
138where
139    S: Stream<Item = WalletClientStates> + Unpin,
140{
141    loop {
142        if let WalletClientStates::Withdraw(ds) = stream.next().await? {
143            return Some(ds.state);
144        }
145        tokio::task::yield_now().await;
146    }
147}
148
149#[derive(Debug, Clone, Default)]
150// TODO: should probably move to DB
151pub struct WalletClientInit(pub Option<DynBitcoindRpc>);
152
153impl WalletClientInit {
154    pub fn new(rpc: DynBitcoindRpc) -> Self {
155        Self(Some(rpc))
156    }
157}
158
159impl ModuleInit for WalletClientInit {
160    type Common = WalletCommonInit;
161
162    async fn dump_database(
163        &self,
164        dbtx: &mut DatabaseTransaction<'_>,
165        prefix_names: Vec<String>,
166    ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
167        let mut wallet_client_items: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> =
168            BTreeMap::new();
169        let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
170            prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
171        });
172
173        for table in filtered_prefixes {
174            match table {
175                DbKeyPrefix::NextPegInTweakIndex => {
176                    if let Some(index) = dbtx.get_value(&NextPegInTweakIndexKey).await {
177                        wallet_client_items
178                            .insert("NextPegInTweakIndex".to_string(), Box::new(index));
179                    }
180                }
181                DbKeyPrefix::PegInTweakIndex => {
182                    push_db_pair_items!(
183                        dbtx,
184                        PegInTweakIndexPrefix,
185                        PegInTweakIndexKey,
186                        PegInTweakIndexData,
187                        wallet_client_items,
188                        "Peg-In Tweak Index"
189                    );
190                }
191                DbKeyPrefix::ClaimedPegIn => {
192                    push_db_pair_items!(
193                        dbtx,
194                        ClaimedPegInPrefix,
195                        ClaimedPegInKey,
196                        ClaimedPegInData,
197                        wallet_client_items,
198                        "Claimed Peg-In"
199                    );
200                }
201                DbKeyPrefix::RecoveryFinalized => {
202                    if let Some(val) = dbtx.get_value(&RecoveryFinalizedKey).await {
203                        wallet_client_items.insert("RecoveryFinalized".to_string(), Box::new(val));
204                    }
205                }
206                DbKeyPrefix::SupportsSafeDeposit => {
207                    push_db_pair_items!(
208                        dbtx,
209                        SupportsSafeDepositPrefix,
210                        SupportsSafeDepositKey,
211                        (),
212                        wallet_client_items,
213                        "Supports Safe Deposit"
214                    );
215                }
216                DbKeyPrefix::RecoveryState
217                | DbKeyPrefix::ExternalReservedStart
218                | DbKeyPrefix::CoreInternalReservedStart
219                | DbKeyPrefix::CoreInternalReservedEnd => {}
220            }
221        }
222
223        Box::new(wallet_client_items.into_iter())
224    }
225}
226
227#[apply(async_trait_maybe_send!)]
228impl ClientModuleInit for WalletClientInit {
229    type Module = WalletClientModule;
230
231    fn supported_api_versions(&self) -> MultiApiVersion {
232        MultiApiVersion::try_from_iter([ApiVersion { major: 0, minor: 0 }])
233            .expect("no version conflicts")
234    }
235
236    async fn init(&self, args: &ClientModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
237        let data = WalletClientModuleData {
238            cfg: args.cfg().clone(),
239            module_root_secret: args.module_root_secret().clone(),
240        };
241
242        let db = args.db().clone();
243
244        let btc_rpc = self.0.clone().unwrap_or(create_esplora_rpc(
245            &WalletClientModule::get_rpc_config(args.cfg()).url,
246        )?);
247
248        let module_api = args.module_api().clone();
249
250        let (pegin_claimed_sender, pegin_claimed_receiver) = watch::channel(());
251        let (pegin_monitor_wakeup_sender, pegin_monitor_wakeup_receiver) = watch::channel(());
252
253        Ok(WalletClientModule {
254            db,
255            data,
256            module_api,
257            notifier: args.notifier().clone(),
258            rpc: btc_rpc,
259            client_ctx: args.context(),
260            pegin_monitor_wakeup_sender,
261            pegin_monitor_wakeup_receiver,
262            pegin_claimed_receiver,
263            pegin_claimed_sender,
264            task_group: args.task_group().clone(),
265            admin_auth: args.admin_auth().cloned(),
266        })
267    }
268
269    /// Wallet recovery
270    ///
271    /// Query bitcoin rpc for history of addresses from last known used
272    /// addresses (or index 0) until `MAX_GAP` unused ones.
273    ///
274    /// Notably does not persist the progress of addresses being queried,
275    /// because it is not expected that it would take long enough to bother.
276    async fn recover(
277        &self,
278        args: &ClientModuleRecoverArgs<Self>,
279        snapshot: Option<&<Self::Module as ClientModule>::Backup>,
280    ) -> anyhow::Result<()> {
281        args.recover_from_history::<WalletRecovery>(self, snapshot)
282            .await
283    }
284
285    fn used_db_prefixes(&self) -> Option<BTreeSet<u8>> {
286        Some(
287            DbKeyPrefix::iter()
288                .map(|p| p as u8)
289                .chain(
290                    DbKeyPrefix::ExternalReservedStart as u8
291                        ..=DbKeyPrefix::CoreInternalReservedEnd as u8,
292                )
293                .collect(),
294        )
295    }
296}
297
298#[derive(Debug, Clone, Serialize, Deserialize)]
299pub struct WalletOperationMeta {
300    pub variant: WalletOperationMetaVariant,
301    pub extra_meta: serde_json::Value,
302}
303
304#[derive(Debug, Clone, Serialize, Deserialize)]
305#[serde(rename_all = "snake_case")]
306pub enum WalletOperationMetaVariant {
307    Deposit {
308        address: Address<NetworkUnchecked>,
309        /// Added in 0.4.2, can be `None` for old deposits or `Some` for ones
310        /// using the pegin monitor. The value is the child index of the key
311        /// used to generate the address, so we can re-generate the secret key
312        /// from our root secret.
313        #[serde(default)]
314        tweak_idx: Option<TweakIdx>,
315        #[serde(default, skip_serializing_if = "Option::is_none")]
316        expires_at: Option<SystemTime>,
317    },
318    Withdraw {
319        address: Address<NetworkUnchecked>,
320        #[serde(with = "bitcoin::amount::serde::as_sat")]
321        amount: bitcoin::Amount,
322        fee: PegOutFees,
323        change: Vec<OutPoint>,
324    },
325
326    RbfWithdraw {
327        rbf: Rbf,
328        change: Vec<OutPoint>,
329    },
330}
331
332/// The non-resource, just plain-data parts of [`WalletClientModule`]
333#[derive(Debug, Clone)]
334pub struct WalletClientModuleData {
335    cfg: WalletClientConfig,
336    module_root_secret: DerivableSecret,
337}
338
339impl WalletClientModuleData {
340    fn derive_deposit_address(
341        &self,
342        idx: TweakIdx,
343    ) -> (Keypair, secp256k1::PublicKey, Address, OperationId) {
344        let idx = ChildId(idx.0);
345
346        let secret_tweak_key = self
347            .module_root_secret
348            .child_key(WALLET_TWEAK_CHILD_ID)
349            .child_key(idx)
350            .to_secp_key(fedimint_core::secp256k1::SECP256K1);
351
352        let public_tweak_key = secret_tweak_key.public_key();
353
354        let address = self
355            .cfg
356            .peg_in_descriptor
357            .tweak(&public_tweak_key, bitcoin::secp256k1::SECP256K1)
358            .address(self.cfg.network.0)
359            .unwrap();
360
361        // TODO: make hash?
362        let operation_id = OperationId(public_tweak_key.x_only_public_key().0.serialize());
363
364        (secret_tweak_key, public_tweak_key, address, operation_id)
365    }
366
367    fn derive_peg_in_script(
368        &self,
369        idx: TweakIdx,
370    ) -> (ScriptBuf, bitcoin::Address, Keypair, OperationId) {
371        let (secret_tweak_key, _, address, operation_id) = self.derive_deposit_address(idx);
372
373        (
374            self.cfg
375                .peg_in_descriptor
376                .tweak(&secret_tweak_key.public_key(), SECP256K1)
377                .script_pubkey(),
378            address,
379            secret_tweak_key,
380            operation_id,
381        )
382    }
383}
384
385#[derive(Debug)]
386pub struct WalletClientModule {
387    data: WalletClientModuleData,
388    db: Database,
389    module_api: DynModuleApi,
390    notifier: ModuleNotifier<WalletClientStates>,
391    rpc: DynBitcoindRpc,
392    client_ctx: ClientContext<Self>,
393    /// Updated to wake up pegin monitor
394    pegin_monitor_wakeup_sender: watch::Sender<()>,
395    pegin_monitor_wakeup_receiver: watch::Receiver<()>,
396    /// Called every time a peg-in was claimed
397    pegin_claimed_sender: watch::Sender<()>,
398    pegin_claimed_receiver: watch::Receiver<()>,
399    task_group: TaskGroup,
400    admin_auth: Option<ApiAuth>,
401}
402
403#[apply(async_trait_maybe_send!)]
404impl ClientModule for WalletClientModule {
405    type Init = WalletClientInit;
406    type Common = WalletModuleTypes;
407    type Backup = WalletModuleBackup;
408    type ModuleStateMachineContext = WalletClientContext;
409    type States = WalletClientStates;
410
411    fn context(&self) -> Self::ModuleStateMachineContext {
412        WalletClientContext {
413            rpc: self.rpc.clone(),
414            wallet_descriptor: self.cfg().peg_in_descriptor.clone(),
415            wallet_decoder: self.decoder(),
416            secp: Secp256k1::default(),
417            client_ctx: self.client_ctx.clone(),
418        }
419    }
420
421    async fn start(&self) {
422        self.task_group.spawn_cancellable("peg-in monitor", {
423            let client_ctx = self.client_ctx.clone();
424            let db = self.db.clone();
425            let btc_rpc = self.rpc.clone();
426            let module_api = self.module_api.clone();
427            let data = self.data.clone();
428            let pegin_claimed_sender = self.pegin_claimed_sender.clone();
429            let pegin_monitor_wakeup_receiver = self.pegin_monitor_wakeup_receiver.clone();
430            pegin_monitor::run_peg_in_monitor(
431                client_ctx,
432                db,
433                btc_rpc,
434                module_api,
435                data,
436                pegin_claimed_sender,
437                pegin_monitor_wakeup_receiver,
438            )
439        });
440
441        self.task_group
442            .spawn_cancellable("supports-safe-deposit-version", {
443                let db = self.db.clone();
444                let module_api = self.module_api.clone();
445
446                poll_supports_safe_deposit_version(db, module_api)
447            });
448    }
449
450    fn supports_backup(&self) -> bool {
451        true
452    }
453
454    async fn backup(&self) -> anyhow::Result<backup::WalletModuleBackup> {
455        // fetch consensus height first
456        let session_count = self.client_ctx.global_api().session_count().await?;
457
458        let mut dbtx = self.db.begin_transaction_nc().await;
459        let next_pegin_tweak_idx = dbtx
460            .get_value(&NextPegInTweakIndexKey)
461            .await
462            .unwrap_or_default();
463        let claimed = dbtx
464            .find_by_prefix(&PegInTweakIndexPrefix)
465            .await
466            .filter_map(|(k, v)| async move {
467                if v.claimed.is_empty() {
468                    None
469                } else {
470                    Some(k.0)
471                }
472            })
473            .collect()
474            .await;
475        Ok(backup::WalletModuleBackup::new_v1(
476            session_count,
477            next_pegin_tweak_idx,
478            claimed,
479        ))
480    }
481
482    fn input_fee(
483        &self,
484        _amount: &Amounts,
485        _input: &<Self::Common as ModuleCommon>::Input,
486    ) -> Option<Amounts> {
487        Some(Amounts::new_bitcoin(self.cfg().fee_consensus.peg_in_abs))
488    }
489
490    fn output_fee(
491        &self,
492        _amount: &Amounts,
493        _output: &<Self::Common as ModuleCommon>::Output,
494    ) -> Option<Amounts> {
495        Some(Amounts::new_bitcoin(self.cfg().fee_consensus.peg_out_abs))
496    }
497
498    async fn handle_rpc(
499        &self,
500        method: String,
501        request: serde_json::Value,
502    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
503        Box::pin(try_stream! {
504            match method.as_str() {
505                "get_wallet_summary" => {
506                    let _req: WalletSummaryRequest = serde_json::from_value(request)?;
507                    let wallet_summary = self.get_wallet_summary()
508                        .await
509                        .expect("Failed to fetch wallet summary");
510                    let result = serde_json::to_value(&wallet_summary)
511                        .expect("Serialization error");
512                    yield result;
513                }
514                "get_block_count_local" => {
515                    let block_count = self.get_block_count_local().await
516                        .expect("Failed to fetch block count");
517                    yield serde_json::to_value(block_count)?;
518                }
519                "peg_in" => {
520                    let req: PegInRequest = serde_json::from_value(request)?;
521                    let response = self.peg_in(req)
522                        .await
523                        .map_err(|e| anyhow::anyhow!("peg_in failed: {}", e))?;
524                    let result = serde_json::to_value(&response)?;
525                    yield result;
526                },
527                "peg_out" => {
528                    let req: PegOutRequest = serde_json::from_value(request)?;
529                    let response = self.peg_out(req)
530                        .await
531                        .map_err(|e| anyhow::anyhow!("peg_out failed: {}", e))?;
532                    let result = serde_json::to_value(&response)?;
533                    yield result;
534                },
535                "subscribe_deposit" => {
536                    let req: SubscribeDepositRequest = serde_json::from_value(request)?;
537                    for await state in self.subscribe_deposit(req.operation_id).await?.into_stream() {
538                        yield serde_json::to_value(state)?;
539                    }
540                },
541                "subscribe_withdraw" => {
542                    let req: SubscribeWithdrawRequest = serde_json::from_value(request)?;
543                    for await state in self.subscribe_withdraw_updates(req.operation_id).await?.into_stream(){
544                        yield serde_json::to_value(state)?;
545                    }
546                }
547                _ => {
548                    Err(anyhow::format_err!("Unknown method: {}", method))?;
549                }
550            }
551        })
552    }
553
554    #[cfg(feature = "cli")]
555    async fn handle_cli_command(
556        &self,
557        args: &[std::ffi::OsString],
558    ) -> anyhow::Result<serde_json::Value> {
559        cli::handle_cli_command(self, args).await
560    }
561}
562
563#[derive(Deserialize)]
564struct WalletSummaryRequest {}
565
566#[derive(Debug, Clone)]
567pub struct WalletClientContext {
568    rpc: DynBitcoindRpc,
569    wallet_descriptor: PegInDescriptor,
570    wallet_decoder: Decoder,
571    secp: Secp256k1<All>,
572    pub client_ctx: ClientContext<WalletClientModule>,
573}
574
575#[derive(Debug, Clone, Serialize, Deserialize)]
576pub struct PegInRequest {
577    pub extra_meta: serde_json::Value,
578}
579
580#[derive(Deserialize)]
581struct SubscribeDepositRequest {
582    operation_id: OperationId,
583}
584
585#[derive(Deserialize)]
586struct SubscribeWithdrawRequest {
587    operation_id: OperationId,
588}
589
590#[derive(Debug, Clone, Serialize, Deserialize)]
591pub struct PegInResponse {
592    pub deposit_address: Address<NetworkUnchecked>,
593    pub operation_id: OperationId,
594}
595
596#[derive(Debug, Clone, Serialize, Deserialize)]
597pub struct PegOutRequest {
598    pub amount_sat: u64,
599    pub destination_address: Address<NetworkUnchecked>,
600    pub extra_meta: serde_json::Value,
601}
602
603#[derive(Debug, Clone, Serialize, Deserialize)]
604pub struct PegOutResponse {
605    pub operation_id: OperationId,
606}
607
608impl Context for WalletClientContext {
609    const KIND: Option<ModuleKind> = Some(KIND);
610}
611
612impl WalletClientModule {
613    fn cfg(&self) -> &WalletClientConfig {
614        &self.data.cfg
615    }
616
617    fn get_rpc_config(cfg: &WalletClientConfig) -> BitcoinRpcConfig {
618        match BitcoinRpcConfig::get_defaults_from_env_vars() {
619            Ok(rpc_config) => {
620                // TODO: Wallet client cannot support bitcoind RPC until the bitcoin dep is
621                // updated to 0.30
622                if rpc_config.kind == "bitcoind" {
623                    cfg.default_bitcoin_rpc.clone()
624                } else {
625                    rpc_config
626                }
627            }
628            _ => cfg.default_bitcoin_rpc.clone(),
629        }
630    }
631
632    pub fn get_network(&self) -> Network {
633        self.cfg().network.0
634    }
635
636    pub fn get_finality_delay(&self) -> u32 {
637        self.cfg().finality_delay
638    }
639
640    pub fn get_fee_consensus(&self) -> FeeConsensus {
641        self.cfg().fee_consensus
642    }
643
644    async fn allocate_deposit_address_inner(
645        &self,
646        dbtx: &mut DatabaseTransaction<'_>,
647    ) -> (OperationId, Address, TweakIdx) {
648        dbtx.ensure_isolated().expect("Must be isolated db");
649
650        let tweak_idx = get_next_peg_in_tweak_child_id(dbtx).await;
651        let (_secret_tweak_key, _, address, operation_id) =
652            self.data.derive_deposit_address(tweak_idx);
653
654        let now = fedimint_core::time::now();
655
656        dbtx.insert_new_entry(
657            &PegInTweakIndexKey(tweak_idx),
658            &PegInTweakIndexData {
659                creation_time: now,
660                next_check_time: Some(now),
661                last_check_time: None,
662                operation_id,
663                claimed: vec![],
664            },
665        )
666        .await;
667
668        (operation_id, address, tweak_idx)
669    }
670
671    /// Fetches the fees that would need to be paid to make the withdraw request
672    /// using [`Self::withdraw`] work *right now*.
673    ///
674    /// Note that we do not receive a guarantee that these fees will be valid in
675    /// the future, thus even the next second using these fees *may* fail.
676    /// The caller should be prepared to retry with a new fee estimate.
677    pub async fn get_withdraw_fees(
678        &self,
679        address: &bitcoin::Address,
680        amount: bitcoin::Amount,
681    ) -> anyhow::Result<PegOutFees> {
682        self.module_api
683            .fetch_peg_out_fees(address, amount)
684            .await?
685            .context("Federation didn't return peg-out fees")
686    }
687
688    /// Returns a summary of the wallet's coins
689    pub async fn get_wallet_summary(&self) -> anyhow::Result<WalletSummary> {
690        Ok(self.module_api.fetch_wallet_summary().await?)
691    }
692
693    pub async fn get_block_count_local(&self) -> anyhow::Result<u32> {
694        Ok(self.module_api.fetch_block_count_local().await?)
695    }
696
697    pub fn create_withdraw_output(
698        &self,
699        operation_id: OperationId,
700        address: bitcoin::Address,
701        amount: bitcoin::Amount,
702        fees: PegOutFees,
703    ) -> anyhow::Result<ClientOutputBundle<WalletOutput, WalletClientStates>> {
704        let output = WalletOutput::new_v0_peg_out(address, amount, fees);
705
706        let amount = output.maybe_v0_ref().expect("v0 output").amount().into();
707
708        let sm_gen = move |out_point_range: OutPointRange| {
709            assert_eq!(out_point_range.count(), 1);
710            let out_idx = out_point_range.start_idx();
711            vec![WalletClientStates::Withdraw(WithdrawStateMachine {
712                operation_id,
713                state: WithdrawStates::Created(CreatedWithdrawState {
714                    fm_outpoint: OutPoint {
715                        txid: out_point_range.txid(),
716                        out_idx,
717                    },
718                }),
719            })]
720        };
721
722        Ok(ClientOutputBundle::new(
723            vec![ClientOutput::<WalletOutput> {
724                output,
725                amounts: Amounts::new_bitcoin(amount),
726            }],
727            vec![ClientOutputSM::<WalletClientStates> {
728                state_machines: Arc::new(sm_gen),
729            }],
730        ))
731    }
732
733    pub async fn peg_in(&self, req: PegInRequest) -> anyhow::Result<PegInResponse> {
734        let (operation_id, address, _) = self.safe_allocate_deposit_address(req.extra_meta).await?;
735
736        Ok(PegInResponse {
737            deposit_address: Address::from_script(&address.script_pubkey(), self.get_network())?
738                .as_unchecked()
739                .clone(),
740            operation_id,
741        })
742    }
743
744    pub async fn peg_out(&self, req: PegOutRequest) -> anyhow::Result<PegOutResponse> {
745        let amount = bitcoin::Amount::from_sat(req.amount_sat);
746        let destination = req
747            .destination_address
748            .require_network(self.get_network())?;
749
750        let fees = self.get_withdraw_fees(&destination, amount).await?;
751        let operation_id = self
752            .withdraw(&destination, amount, fees, req.extra_meta)
753            .await
754            .context("Failed to initiate withdraw")?;
755
756        Ok(PegOutResponse { operation_id })
757    }
758
759    pub fn create_rbf_withdraw_output(
760        &self,
761        operation_id: OperationId,
762        rbf: &Rbf,
763    ) -> anyhow::Result<ClientOutputBundle<WalletOutput, WalletClientStates>> {
764        let output = WalletOutput::new_v0_rbf(rbf.fees, rbf.txid);
765
766        let amount = output.maybe_v0_ref().expect("v0 output").amount().into();
767
768        let sm_gen = move |out_point_range: OutPointRange| {
769            assert_eq!(out_point_range.count(), 1);
770            let out_idx = out_point_range.start_idx();
771            vec![WalletClientStates::Withdraw(WithdrawStateMachine {
772                operation_id,
773                state: WithdrawStates::Created(CreatedWithdrawState {
774                    fm_outpoint: OutPoint {
775                        txid: out_point_range.txid(),
776                        out_idx,
777                    },
778                }),
779            })]
780        };
781
782        Ok(ClientOutputBundle::new(
783            vec![ClientOutput::<WalletOutput> {
784                output,
785                amounts: Amounts::new_bitcoin(amount),
786            }],
787            vec![ClientOutputSM::<WalletClientStates> {
788                state_machines: Arc::new(sm_gen),
789            }],
790        ))
791    }
792
793    pub async fn btc_tx_has_no_size_limit(&self) -> FederationResult<bool> {
794        Ok(self.module_api.module_consensus_version().await? >= ModuleConsensusVersion::new(2, 2))
795    }
796
797    /// Returns true if the federation's wallet module consensus version
798    /// supports processing all deposits.
799    ///
800    /// This method is safe to call offline, since it first attempts to read a
801    /// key from the db that represents the client has previously been able to
802    /// verify the wallet module consensus version. If the client has not
803    /// verified the version, it must be online to fetch the latest wallet
804    /// module consensus version.
805    pub async fn supports_safe_deposit(&self) -> bool {
806        let mut dbtx = self.db.begin_transaction().await;
807
808        let already_verified_supports_safe_deposit =
809            dbtx.get_value(&SupportsSafeDepositKey).await.is_some();
810
811        already_verified_supports_safe_deposit || {
812            match self.module_api.module_consensus_version().await {
813                Ok(module_consensus_version) => {
814                    let supported_version =
815                        SAFE_DEPOSIT_MODULE_CONSENSUS_VERSION <= module_consensus_version;
816
817                    if supported_version {
818                        dbtx.insert_new_entry(&SupportsSafeDepositKey, &()).await;
819                        dbtx.commit_tx().await;
820                    }
821
822                    supported_version
823                }
824                Err(_) => false,
825            }
826        }
827    }
828
829    /// Allocates a deposit address controlled by the federation, guaranteeing
830    /// safe handling of all deposits, including on-chain transactions exceeding
831    /// `ALEPH_BFT_UNIT_BYTE_LIMIT`.
832    ///
833    /// Returns an error if the client has never been online to verify the
834    /// federation's wallet module consensus version supports processing all
835    /// deposits.
836    pub async fn safe_allocate_deposit_address<M>(
837        &self,
838        extra_meta: M,
839    ) -> anyhow::Result<(OperationId, Address, TweakIdx)>
840    where
841        M: Serialize + MaybeSend + MaybeSync,
842    {
843        ensure!(
844            self.supports_safe_deposit().await,
845            "Wallet module consensus version doesn't support safe deposits",
846        );
847
848        self.allocate_deposit_address_expert_only(extra_meta).await
849    }
850
851    /// Allocates a deposit address that is controlled by the federation.
852    ///
853    /// This is an EXPERT ONLY method intended for power users such as Lightning
854    /// gateways allocating liquidity, and we discourage exposing peg-in
855    /// functionality to everyday users of a Fedimint wallet due to the
856    /// following two limitations:
857    ///
858    /// The transaction sending to this address needs to be smaller than 40KB in
859    /// order for the peg-in to be claimable. If the transaction is too large,
860    /// funds will be lost.
861    ///
862    /// In the future, federations will also enforce a minimum peg-in amount to
863    /// prevent accumulation of dust UTXOs. Peg-ins under this minimum cannot be
864    /// claimed and funds will be lost.
865    ///
866    /// Everyday users should rely on Lightning to move funds into the
867    /// federation.
868    pub async fn allocate_deposit_address_expert_only<M>(
869        &self,
870        extra_meta: M,
871    ) -> anyhow::Result<(OperationId, Address, TweakIdx)>
872    where
873        M: Serialize + MaybeSend + MaybeSync,
874    {
875        let extra_meta_value =
876            serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
877        let (operation_id, address, tweak_idx) = self
878            .db
879            .autocommit(
880                move |dbtx, _| {
881                    let extra_meta_value_inner = extra_meta_value.clone();
882                    Box::pin(async move {
883                        let (operation_id, address, tweak_idx) = self
884                            .allocate_deposit_address_inner(dbtx)
885                            .await;
886
887                        self.client_ctx.manual_operation_start_dbtx(
888                            dbtx,
889                            operation_id,
890                            WalletCommonInit::KIND.as_str(),
891                            WalletOperationMeta {
892                                variant: WalletOperationMetaVariant::Deposit {
893                                    address: address.clone().into_unchecked(),
894                                    tweak_idx: Some(tweak_idx),
895                                    expires_at: None,
896                                },
897                                extra_meta: extra_meta_value_inner,
898                            },
899                            vec![]
900                        ).await?;
901
902                        debug!(target: LOG_CLIENT_MODULE_WALLET, %tweak_idx, %address, "Derived a new deposit address");
903
904                        // Begin watching the script address
905                        self.rpc.watch_script_history(&address.script_pubkey()).await?;
906
907                        let sender = self.pegin_monitor_wakeup_sender.clone();
908                        dbtx.on_commit(move || {
909                            sender.send_replace(());
910                        });
911
912                        Ok((operation_id, address, tweak_idx))
913                    })
914                },
915                Some(100),
916            )
917            .await
918            .map_err(|e| match e {
919                AutocommitError::CommitFailed {
920                    last_error,
921                    attempts,
922                } => last_error.context(format!("Failed to commit after {attempts} attempts")),
923                AutocommitError::ClosureError { error, .. } => error,
924            })?;
925
926        Ok((operation_id, address, tweak_idx))
927    }
928
929    /// Returns a stream of updates about an ongoing deposit operation created
930    /// with [`WalletClientModule::allocate_deposit_address_expert_only`].
931    /// Returns an error for old deposit operations created prior to the 0.4
932    /// release and not driven to completion yet. This should be rare enough
933    /// that an indeterminate state is ok here.
934    pub async fn subscribe_deposit(
935        &self,
936        operation_id: OperationId,
937    ) -> anyhow::Result<UpdateStreamOrOutcome<DepositStateV2>> {
938        let operation = self
939            .client_ctx
940            .get_operation(operation_id)
941            .await
942            .with_context(|| anyhow!("Operation not found: {}", operation_id.fmt_short()))?;
943
944        if operation.operation_module_kind() != WalletCommonInit::KIND.as_str() {
945            bail!("Operation is not a wallet operation");
946        }
947
948        let operation_meta = operation.meta::<WalletOperationMeta>();
949
950        let WalletOperationMetaVariant::Deposit {
951            address, tweak_idx, ..
952        } = operation_meta.variant
953        else {
954            bail!("Operation is not a deposit operation");
955        };
956
957        let address = address.require_network(self.cfg().network.0)?;
958
959        // The old deposit operations don't have tweak_idx set
960        let Some(tweak_idx) = tweak_idx else {
961            // In case we are dealing with an old deposit that still uses state machines we
962            // don't have the logic here anymore to subscribe to updates. We can still read
963            // the final state though if it reached any.
964            let outcome_v1 = operation
965                .outcome::<DepositStateV1>()
966                .context("Old pending deposit, can't subscribe to updates")?;
967
968            let outcome_v2 = match outcome_v1 {
969                DepositStateV1::Claimed(tx_info) => DepositStateV2::Claimed {
970                    btc_deposited: tx_info.btc_transaction.output[tx_info.out_idx as usize].value,
971                    btc_out_point: bitcoin::OutPoint {
972                        txid: tx_info.btc_transaction.compute_txid(),
973                        vout: tx_info.out_idx,
974                    },
975                },
976                DepositStateV1::Failed(error) => DepositStateV2::Failed(error),
977                _ => bail!("Non-final outcome in operation log"),
978            };
979
980            return Ok(UpdateStreamOrOutcome::Outcome(outcome_v2));
981        };
982
983        Ok(self.client_ctx.outcome_or_updates(operation, operation_id, {
984            let stream_rpc = self.rpc.clone();
985            let stream_client_ctx = self.client_ctx.clone();
986            let stream_script_pub_key = address.script_pubkey();
987            move || {
988
989            stream! {
990                yield DepositStateV2::WaitingForTransaction;
991
992                retry(
993                    "subscribe script history",
994                    background_backoff(),
995                    || stream_rpc.watch_script_history(&stream_script_pub_key)
996                ).await.expect("Will never give up");
997                let (btc_out_point, btc_deposited) = retry(
998                    "fetch history",
999                    background_backoff(),
1000                    || async {
1001                        let history = stream_rpc.get_script_history(&stream_script_pub_key).await?;
1002                        history.first().and_then(|tx| {
1003                            let (out_idx, amount) = tx.output
1004                                .iter()
1005                                .enumerate()
1006                                .find_map(|(idx, output)| (output.script_pubkey == stream_script_pub_key).then_some((idx, output.value)))?;
1007                            let txid = tx.compute_txid();
1008
1009                            Some((
1010                                bitcoin::OutPoint {
1011                                    txid,
1012                                    vout: out_idx as u32,
1013                                },
1014                                amount
1015                            ))
1016                        }).context("No deposit transaction found")
1017                    }
1018                ).await.expect("Will never give up");
1019
1020                yield DepositStateV2::WaitingForConfirmation {
1021                    btc_deposited,
1022                    btc_out_point
1023                };
1024
1025                let claim_data = stream_client_ctx.module_db().wait_key_exists(&ClaimedPegInKey {
1026                    peg_in_index: tweak_idx,
1027                    btc_out_point,
1028                }).await;
1029
1030                yield DepositStateV2::Confirmed {
1031                    btc_deposited,
1032                    btc_out_point
1033                };
1034
1035                match stream_client_ctx.await_primary_module_outputs(operation_id, claim_data.change).await {
1036                    Ok(()) => yield DepositStateV2::Claimed {
1037                        btc_deposited,
1038                        btc_out_point
1039                    },
1040                    Err(e) => yield DepositStateV2::Failed(e.to_string())
1041                }
1042            }
1043        }}))
1044    }
1045
1046    pub async fn list_peg_in_tweak_idxes(&self) -> BTreeMap<TweakIdx, PegInTweakIndexData> {
1047        self.client_ctx
1048            .module_db()
1049            .clone()
1050            .begin_transaction_nc()
1051            .await
1052            .find_by_prefix(&PegInTweakIndexPrefix)
1053            .await
1054            .map(|(key, data)| (key.0, data))
1055            .collect()
1056            .await
1057    }
1058
1059    pub async fn find_tweak_idx_by_address(
1060        &self,
1061        address: bitcoin::Address<NetworkUnchecked>,
1062    ) -> anyhow::Result<TweakIdx> {
1063        let data = self.data.clone();
1064        let Some((tweak_idx, _)) = self
1065            .db
1066            .begin_transaction_nc()
1067            .await
1068            .find_by_prefix(&PegInTweakIndexPrefix)
1069            .await
1070            .filter(|(k, _)| {
1071                let (_, derived_address, _tweak_key, _) = data.derive_peg_in_script(k.0);
1072                future::ready(derived_address.into_unchecked() == address)
1073            })
1074            .next()
1075            .await
1076        else {
1077            bail!("Address not found in the list of derived keys");
1078        };
1079
1080        Ok(tweak_idx.0)
1081    }
1082    pub async fn find_tweak_idx_by_operation_id(
1083        &self,
1084        operation_id: OperationId,
1085    ) -> anyhow::Result<TweakIdx> {
1086        Ok(self
1087            .client_ctx
1088            .module_db()
1089            .clone()
1090            .begin_transaction_nc()
1091            .await
1092            .find_by_prefix(&PegInTweakIndexPrefix)
1093            .await
1094            .filter(|(_k, v)| future::ready(v.operation_id == operation_id))
1095            .next()
1096            .await
1097            .ok_or_else(|| anyhow::format_err!("OperationId not found"))?
1098            .0
1099            .0)
1100    }
1101
1102    pub async fn get_pegin_tweak_idx(
1103        &self,
1104        tweak_idx: TweakIdx,
1105    ) -> anyhow::Result<PegInTweakIndexData> {
1106        self.client_ctx
1107            .module_db()
1108            .clone()
1109            .begin_transaction_nc()
1110            .await
1111            .get_value(&PegInTweakIndexKey(tweak_idx))
1112            .await
1113            .ok_or_else(|| anyhow::format_err!("TweakIdx not found"))
1114    }
1115
1116    pub async fn get_claimed_pegins(
1117        &self,
1118        dbtx: &mut DatabaseTransaction<'_>,
1119        tweak_idx: TweakIdx,
1120    ) -> Vec<(
1121        bitcoin::OutPoint,
1122        TransactionId,
1123        Vec<fedimint_core::OutPoint>,
1124    )> {
1125        let outpoints = dbtx
1126            .get_value(&PegInTweakIndexKey(tweak_idx))
1127            .await
1128            .map(|v| v.claimed)
1129            .unwrap_or_default();
1130
1131        let mut res = vec![];
1132
1133        for outpoint in outpoints {
1134            let claimed_peg_in_data = dbtx
1135                .get_value(&ClaimedPegInKey {
1136                    peg_in_index: tweak_idx,
1137                    btc_out_point: outpoint,
1138                })
1139                .await
1140                .expect("Must have a corresponding claim record");
1141            res.push((
1142                outpoint,
1143                claimed_peg_in_data.claim_txid,
1144                claimed_peg_in_data.change,
1145            ));
1146        }
1147
1148        res
1149    }
1150
1151    /// Like [`Self::recheck_pegin_address`] but by `operation_id`
1152    pub async fn recheck_pegin_address_by_op_id(
1153        &self,
1154        operation_id: OperationId,
1155    ) -> anyhow::Result<()> {
1156        let tweak_idx = self.find_tweak_idx_by_operation_id(operation_id).await?;
1157
1158        self.recheck_pegin_address(tweak_idx).await
1159    }
1160
1161    /// Schedule given address for immediate re-check for deposits
1162    pub async fn recheck_pegin_address_by_address(
1163        &self,
1164        address: bitcoin::Address<NetworkUnchecked>,
1165    ) -> anyhow::Result<()> {
1166        self.recheck_pegin_address(self.find_tweak_idx_by_address(address).await?)
1167            .await
1168    }
1169
1170    /// Schedule given address for immediate re-check for deposits
1171    pub async fn recheck_pegin_address(&self, tweak_idx: TweakIdx) -> anyhow::Result<()> {
1172        self.db
1173            .autocommit(
1174                |dbtx, _| {
1175                    Box::pin(async {
1176                        let db_key = PegInTweakIndexKey(tweak_idx);
1177                        let db_val = dbtx
1178                            .get_value(&db_key)
1179                            .await
1180                            .ok_or_else(|| anyhow::format_err!("DBKey not found"))?;
1181
1182                        dbtx.insert_entry(
1183                            &db_key,
1184                            &PegInTweakIndexData {
1185                                next_check_time: Some(fedimint_core::time::now()),
1186                                ..db_val
1187                            },
1188                        )
1189                        .await;
1190
1191                        let sender = self.pegin_monitor_wakeup_sender.clone();
1192                        dbtx.on_commit(move || {
1193                            sender.send_replace(());
1194                        });
1195
1196                        Ok::<_, anyhow::Error>(())
1197                    })
1198                },
1199                Some(100),
1200            )
1201            .await?;
1202
1203        Ok(())
1204    }
1205
1206    /// Await for num deposit by [`OperationId`]
1207    pub async fn await_num_deposits_by_operation_id(
1208        &self,
1209        operation_id: OperationId,
1210        num_deposits: usize,
1211    ) -> anyhow::Result<()> {
1212        let tweak_idx = self.find_tweak_idx_by_operation_id(operation_id).await?;
1213        self.await_num_deposits(tweak_idx, num_deposits).await
1214    }
1215
1216    pub async fn await_num_deposits_by_address(
1217        &self,
1218        address: bitcoin::Address<NetworkUnchecked>,
1219        num_deposits: usize,
1220    ) -> anyhow::Result<()> {
1221        self.await_num_deposits(self.find_tweak_idx_by_address(address).await?, num_deposits)
1222            .await
1223    }
1224
1225    #[instrument(target = LOG_CLIENT_MODULE_WALLET, skip_all, fields(tweak_idx=?tweak_idx, num_deposists=num_deposits))]
1226    pub async fn await_num_deposits(
1227        &self,
1228        tweak_idx: TweakIdx,
1229        num_deposits: usize,
1230    ) -> anyhow::Result<()> {
1231        let operation_id = self.get_pegin_tweak_idx(tweak_idx).await?.operation_id;
1232
1233        let mut receiver = self.pegin_claimed_receiver.clone();
1234        let mut backoff = backoff_util::aggressive_backoff();
1235
1236        loop {
1237            let pegins = self
1238                .get_claimed_pegins(
1239                    &mut self.client_ctx.module_db().begin_transaction_nc().await,
1240                    tweak_idx,
1241                )
1242                .await;
1243
1244            if pegins.len() < num_deposits {
1245                debug!(target: LOG_CLIENT_MODULE_WALLET, has=pegins.len(), "Not enough deposits");
1246                self.recheck_pegin_address(tweak_idx).await?;
1247                runtime::sleep(backoff.next().unwrap_or_default()).await;
1248                receiver.changed().await?;
1249                continue;
1250            }
1251
1252            debug!(target: LOG_CLIENT_MODULE_WALLET, has=pegins.len(), "Enough deposits detected");
1253
1254            for (_outpoint, transaction_id, change) in pegins {
1255                if transaction_id == TransactionId::from_byte_array([0; 32]) && change.is_empty() {
1256                    debug!(target: LOG_CLIENT_MODULE_WALLET, "Deposited amount was too low, skipping");
1257                    continue;
1258                }
1259
1260                debug!(target: LOG_CLIENT_MODULE_WALLET, out_points=?change, "Ensuring deposists claimed");
1261                let tx_subscriber = self.client_ctx.transaction_updates(operation_id).await;
1262
1263                if let Err(e) = tx_subscriber.await_tx_accepted(transaction_id).await {
1264                    bail!("{}", e);
1265                }
1266
1267                debug!(target: LOG_CLIENT_MODULE_WALLET, out_points=?change, "Ensuring outputs claimed");
1268                self.client_ctx
1269                    .await_primary_module_outputs(operation_id, change)
1270                    .await
1271                    .expect("Cannot fail if tx was accepted and federation is honest");
1272            }
1273
1274            return Ok(());
1275        }
1276    }
1277
1278    /// Attempt to withdraw a given `amount` of Bitcoin to a destination
1279    /// `address`. The caller has to supply the fee rate to be used which can be
1280    /// fetched using [`Self::get_withdraw_fees`] and should be
1281    /// acknowledged by the user since it can be unexpectedly high.
1282    pub async fn withdraw<M: Serialize + MaybeSend + MaybeSync>(
1283        &self,
1284        address: &bitcoin::Address,
1285        amount: bitcoin::Amount,
1286        fee: PegOutFees,
1287        extra_meta: M,
1288    ) -> anyhow::Result<OperationId> {
1289        {
1290            let operation_id = OperationId(thread_rng().r#gen());
1291
1292            let withdraw_output =
1293                self.create_withdraw_output(operation_id, address.clone(), amount, fee)?;
1294            let tx_builder = TransactionBuilder::new()
1295                .with_outputs(self.client_ctx.make_client_outputs(withdraw_output));
1296
1297            let extra_meta =
1298                serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
1299            self.client_ctx
1300                .finalize_and_submit_transaction(
1301                    operation_id,
1302                    WalletCommonInit::KIND.as_str(),
1303                    {
1304                        let address = address.clone();
1305                        move |change_range: OutPointRange| WalletOperationMeta {
1306                            variant: WalletOperationMetaVariant::Withdraw {
1307                                address: address.clone().into_unchecked(),
1308                                amount,
1309                                fee,
1310                                change: change_range.into_iter().collect(),
1311                            },
1312                            extra_meta: extra_meta.clone(),
1313                        }
1314                    },
1315                    tx_builder,
1316                )
1317                .await?;
1318
1319            Ok(operation_id)
1320        }
1321    }
1322
1323    /// Attempt to increase the fee of a onchain withdraw transaction using
1324    /// replace by fee (RBF).
1325    /// This can prevent transactions from getting stuck
1326    /// in the mempool
1327    #[deprecated(
1328        since = "0.4.0",
1329        note = "RBF withdrawals are rejected by the federation"
1330    )]
1331    pub async fn rbf_withdraw<M: Serialize + MaybeSync + MaybeSend>(
1332        &self,
1333        rbf: Rbf,
1334        extra_meta: M,
1335    ) -> anyhow::Result<OperationId> {
1336        let operation_id = OperationId(thread_rng().r#gen());
1337
1338        let withdraw_output = self.create_rbf_withdraw_output(operation_id, &rbf)?;
1339        let tx_builder = TransactionBuilder::new()
1340            .with_outputs(self.client_ctx.make_client_outputs(withdraw_output));
1341
1342        let extra_meta = serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
1343        self.client_ctx
1344            .finalize_and_submit_transaction(
1345                operation_id,
1346                WalletCommonInit::KIND.as_str(),
1347                move |change_range: OutPointRange| WalletOperationMeta {
1348                    variant: WalletOperationMetaVariant::RbfWithdraw {
1349                        rbf: rbf.clone(),
1350                        change: change_range.into_iter().collect(),
1351                    },
1352                    extra_meta: extra_meta.clone(),
1353                },
1354                tx_builder,
1355            )
1356            .await?;
1357
1358        Ok(operation_id)
1359    }
1360
1361    pub async fn subscribe_withdraw_updates(
1362        &self,
1363        operation_id: OperationId,
1364    ) -> anyhow::Result<UpdateStreamOrOutcome<WithdrawState>> {
1365        let operation = self
1366            .client_ctx
1367            .get_operation(operation_id)
1368            .await
1369            .with_context(|| anyhow!("Operation not found: {}", operation_id.fmt_short()))?;
1370
1371        if operation.operation_module_kind() != WalletCommonInit::KIND.as_str() {
1372            bail!("Operation is not a wallet operation");
1373        }
1374
1375        let operation_meta = operation.meta::<WalletOperationMeta>();
1376
1377        let (WalletOperationMetaVariant::Withdraw { change, .. }
1378        | WalletOperationMetaVariant::RbfWithdraw { change, .. }) = operation_meta.variant
1379        else {
1380            bail!("Operation is not a withdraw operation");
1381        };
1382
1383        let mut operation_stream = self.notifier.subscribe(operation_id).await;
1384        let client_ctx = self.client_ctx.clone();
1385
1386        Ok(self
1387            .client_ctx
1388            .outcome_or_updates(operation, operation_id, move || {
1389                stream! {
1390                    match next_withdraw_state(&mut operation_stream).await {
1391                        Some(WithdrawStates::Created(_)) => {
1392                            yield WithdrawState::Created;
1393                        },
1394                        Some(s) => {
1395                            panic!("Unexpected state {s:?}")
1396                        },
1397                        None => return,
1398                    }
1399
1400                    // TODO: get rid of awaiting change here, there has to be a better way to make tests deterministic
1401
1402                        // Swallowing potential errors since the transaction failing  is handled by
1403                        // output outcome fetching already
1404                        let _ = client_ctx
1405                            .await_primary_module_outputs(operation_id, change)
1406                            .await;
1407
1408
1409                    match next_withdraw_state(&mut operation_stream).await {
1410                        Some(WithdrawStates::Aborted(inner)) => {
1411                            yield WithdrawState::Failed(inner.error);
1412                        },
1413                        Some(WithdrawStates::Success(inner)) => {
1414                            yield WithdrawState::Succeeded(inner.txid);
1415                        },
1416                        Some(s) => {
1417                            panic!("Unexpected state {s:?}")
1418                        },
1419                        None => {},
1420                    }
1421                }
1422            }))
1423    }
1424
1425    fn admin_auth(&self) -> anyhow::Result<ApiAuth> {
1426        self.admin_auth
1427            .clone()
1428            .ok_or_else(|| anyhow::format_err!("Admin auth not set"))
1429    }
1430
1431    pub async fn activate_consensus_version_voting(&self) -> anyhow::Result<()> {
1432        self.module_api
1433            .activate_consensus_version_voting(self.admin_auth()?)
1434            .await?;
1435
1436        Ok(())
1437    }
1438}
1439
1440/// Polls the federation checking if the activated module consensus version
1441/// supports safe deposits, saving the result in the db once it does.
1442async fn poll_supports_safe_deposit_version(db: Database, module_api: DynModuleApi) {
1443    loop {
1444        let mut dbtx = db.begin_transaction().await;
1445
1446        if dbtx.get_value(&SupportsSafeDepositKey).await.is_some() {
1447            break;
1448        }
1449
1450        if let Ok(module_consensus_version) = module_api.module_consensus_version().await
1451            && SAFE_DEPOSIT_MODULE_CONSENSUS_VERSION <= module_consensus_version
1452        {
1453            dbtx.insert_new_entry(&SupportsSafeDepositKey, &()).await;
1454            dbtx.commit_tx().await;
1455            break;
1456        }
1457
1458        drop(dbtx);
1459
1460        if is_running_in_test_env() {
1461            // Even in tests we don't want to spam the federation with requests about it
1462            sleep(Duration::from_secs(10)).await;
1463        } else {
1464            sleep(Duration::from_secs(3600)).await;
1465        }
1466    }
1467}
1468
1469/// Returns the child index to derive the next peg-in tweak key from.
1470async fn get_next_peg_in_tweak_child_id(dbtx: &mut DatabaseTransaction<'_>) -> TweakIdx {
1471    let index = dbtx
1472        .get_value(&NextPegInTweakIndexKey)
1473        .await
1474        .unwrap_or_default();
1475    dbtx.insert_entry(&NextPegInTweakIndexKey, &(index.next()))
1476        .await;
1477    index
1478}
1479
1480#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
1481pub enum WalletClientStates {
1482    Deposit(DepositStateMachine),
1483    Withdraw(WithdrawStateMachine),
1484}
1485
1486impl IntoDynInstance for WalletClientStates {
1487    type DynType = DynState;
1488
1489    fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
1490        DynState::from_typed(instance_id, self)
1491    }
1492}
1493
1494impl State for WalletClientStates {
1495    type ModuleContext = WalletClientContext;
1496
1497    fn transitions(
1498        &self,
1499        context: &Self::ModuleContext,
1500        global_context: &DynGlobalClientContext,
1501    ) -> Vec<StateTransition<Self>> {
1502        match self {
1503            WalletClientStates::Deposit(sm) => {
1504                sm_enum_variant_translation!(
1505                    sm.transitions(context, global_context),
1506                    WalletClientStates::Deposit
1507                )
1508            }
1509            WalletClientStates::Withdraw(sm) => {
1510                sm_enum_variant_translation!(
1511                    sm.transitions(context, global_context),
1512                    WalletClientStates::Withdraw
1513                )
1514            }
1515        }
1516    }
1517
1518    fn operation_id(&self) -> OperationId {
1519        match self {
1520            WalletClientStates::Deposit(sm) => sm.operation_id(),
1521            WalletClientStates::Withdraw(sm) => sm.operation_id(),
1522        }
1523    }
1524}
1525
1526#[cfg(all(test, not(target_family = "wasm")))]
1527mod tests {
1528    use std::collections::BTreeSet;
1529    use std::sync::atomic::{AtomicBool, Ordering};
1530
1531    use super::*;
1532    use crate::backup::{
1533        RECOVER_NUM_IDX_ADD_TO_LAST_USED, RecoverScanOutcome, recover_scan_idxes_for_activity,
1534    };
1535
1536    #[allow(clippy::too_many_lines)] // shut-up clippy, it's a test
1537    #[tokio::test(flavor = "multi_thread")]
1538    async fn sanity_test_recover_inner() {
1539        {
1540            let last_checked = AtomicBool::new(false);
1541            let last_checked = &last_checked;
1542            assert_eq!(
1543                recover_scan_idxes_for_activity(
1544                    TweakIdx(0),
1545                    &BTreeSet::new(),
1546                    |cur_idx| async move {
1547                        Ok(match cur_idx {
1548                            TweakIdx(9) => {
1549                                last_checked.store(true, Ordering::SeqCst);
1550                                vec![]
1551                            }
1552                            TweakIdx(10) => panic!("Shouldn't happen"),
1553                            TweakIdx(11) => {
1554                                vec![0usize] /* just for type inference */
1555                            }
1556                            _ => vec![],
1557                        })
1558                    }
1559                )
1560                .await
1561                .unwrap(),
1562                RecoverScanOutcome {
1563                    last_used_idx: None,
1564                    new_start_idx: TweakIdx(RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1565                    tweak_idxes_with_pegins: BTreeSet::from([])
1566                }
1567            );
1568            assert!(last_checked.load(Ordering::SeqCst));
1569        }
1570
1571        {
1572            let last_checked = AtomicBool::new(false);
1573            let last_checked = &last_checked;
1574            assert_eq!(
1575                recover_scan_idxes_for_activity(
1576                    TweakIdx(0),
1577                    &BTreeSet::from([TweakIdx(1), TweakIdx(2)]),
1578                    |cur_idx| async move {
1579                        Ok(match cur_idx {
1580                            TweakIdx(1) => panic!("Shouldn't happen: already used (1)"),
1581                            TweakIdx(2) => panic!("Shouldn't happen: already used (2)"),
1582                            TweakIdx(11) => {
1583                                last_checked.store(true, Ordering::SeqCst);
1584                                vec![]
1585                            }
1586                            TweakIdx(12) => panic!("Shouldn't happen"),
1587                            TweakIdx(13) => {
1588                                vec![0usize] /* just for type inference */
1589                            }
1590                            _ => vec![],
1591                        })
1592                    }
1593                )
1594                .await
1595                .unwrap(),
1596                RecoverScanOutcome {
1597                    last_used_idx: Some(TweakIdx(2)),
1598                    new_start_idx: TweakIdx(2 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1599                    tweak_idxes_with_pegins: BTreeSet::from([])
1600                }
1601            );
1602            assert!(last_checked.load(Ordering::SeqCst));
1603        }
1604
1605        {
1606            let last_checked = AtomicBool::new(false);
1607            let last_checked = &last_checked;
1608            assert_eq!(
1609                recover_scan_idxes_for_activity(
1610                    TweakIdx(10),
1611                    &BTreeSet::new(),
1612                    |cur_idx| async move {
1613                        Ok(match cur_idx {
1614                            TweakIdx(10) => vec![()],
1615                            TweakIdx(19) => {
1616                                last_checked.store(true, Ordering::SeqCst);
1617                                vec![]
1618                            }
1619                            TweakIdx(20) => panic!("Shouldn't happen"),
1620                            _ => vec![],
1621                        })
1622                    }
1623                )
1624                .await
1625                .unwrap(),
1626                RecoverScanOutcome {
1627                    last_used_idx: Some(TweakIdx(10)),
1628                    new_start_idx: TweakIdx(10 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1629                    tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(10)])
1630                }
1631            );
1632            assert!(last_checked.load(Ordering::SeqCst));
1633        }
1634
1635        assert_eq!(
1636            recover_scan_idxes_for_activity(TweakIdx(0), &BTreeSet::new(), |cur_idx| async move {
1637                Ok(match cur_idx {
1638                    TweakIdx(6 | 15) => vec![()],
1639                    _ => vec![],
1640                })
1641            })
1642            .await
1643            .unwrap(),
1644            RecoverScanOutcome {
1645                last_used_idx: Some(TweakIdx(15)),
1646                new_start_idx: TweakIdx(15 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1647                tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(6), TweakIdx(15)])
1648            }
1649        );
1650        assert_eq!(
1651            recover_scan_idxes_for_activity(TweakIdx(10), &BTreeSet::new(), |cur_idx| async move {
1652                Ok(match cur_idx {
1653                    TweakIdx(8) => {
1654                        vec![()] /* for type inference only */
1655                    }
1656                    TweakIdx(9) => {
1657                        panic!("Shouldn't happen")
1658                    }
1659                    _ => vec![],
1660                })
1661            })
1662            .await
1663            .unwrap(),
1664            RecoverScanOutcome {
1665                last_used_idx: None,
1666                new_start_idx: TweakIdx(9 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1667                tweak_idxes_with_pegins: BTreeSet::from([])
1668            }
1669        );
1670        assert_eq!(
1671            recover_scan_idxes_for_activity(TweakIdx(10), &BTreeSet::new(), |cur_idx| async move {
1672                Ok(match cur_idx {
1673                    TweakIdx(9) => panic!("Shouldn't happen"),
1674                    TweakIdx(15) => vec![()],
1675                    _ => vec![],
1676                })
1677            })
1678            .await
1679            .unwrap(),
1680            RecoverScanOutcome {
1681                last_used_idx: Some(TweakIdx(15)),
1682                new_start_idx: TweakIdx(15 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1683                tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(15)])
1684            }
1685        );
1686    }
1687}