fedimint_wallet_client/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::missing_errors_doc)]
4#![allow(clippy::missing_panics_doc)]
5#![allow(clippy::module_name_repetitions)]
6#![allow(clippy::must_use_candidate)]
7
8pub mod api;
9#[cfg(feature = "cli")]
10mod cli;
11
12mod backup;
13
14pub mod client_db;
15/// Legacy, state-machine based peg-ins, replaced by `pegin_monitor`
16/// but retained for time being to ensure existing peg-ins complete.
17mod deposit;
18pub mod events;
19use events::SendPaymentEvent;
20/// Peg-in monitor: a task monitoring deposit addresses for peg-ins.
21mod pegin_monitor;
22mod withdraw;
23
24use std::collections::{BTreeMap, BTreeSet};
25use std::future;
26use std::sync::Arc;
27use std::time::{Duration, SystemTime};
28
29use anyhow::{Context as AnyhowContext, anyhow, bail, ensure};
30use async_stream::{stream, try_stream};
31use backup::WalletModuleBackup;
32use bitcoin::address::NetworkUnchecked;
33use bitcoin::secp256k1::{All, SECP256K1, Secp256k1};
34use bitcoin::{Address, Network, ScriptBuf};
35use client_db::{DbKeyPrefix, PegInTweakIndexKey, SupportsSafeDepositKey, TweakIdx};
36use fedimint_api_client::api::{DynModuleApi, FederationResult};
37use fedimint_bitcoind::{DynBitcoindRpc, create_esplora_rpc};
38use fedimint_client_module::module::init::{
39    ClientModuleInit, ClientModuleInitArgs, ClientModuleRecoverArgs,
40};
41use fedimint_client_module::module::{ClientContext, ClientModule, IClientModule, OutPointRange};
42use fedimint_client_module::oplog::UpdateStreamOrOutcome;
43use fedimint_client_module::sm::{Context, DynState, ModuleNotifier, State, StateTransition};
44use fedimint_client_module::transaction::{
45    ClientOutput, ClientOutputBundle, ClientOutputSM, TransactionBuilder,
46};
47use fedimint_client_module::{DynGlobalClientContext, sm_enum_variant_translation};
48use fedimint_core::core::{Decoder, IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId};
49use fedimint_core::db::{
50    AutocommitError, Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped,
51};
52use fedimint_core::encoding::{Decodable, Encodable};
53use fedimint_core::envs::{BitcoinRpcConfig, is_running_in_test_env};
54use fedimint_core::module::{
55    Amounts, ApiAuth, ApiVersion, CommonModuleInit, ModuleCommon, ModuleConsensusVersion,
56    ModuleInit, MultiApiVersion,
57};
58use fedimint_core::task::{MaybeSend, MaybeSync, TaskGroup, sleep};
59use fedimint_core::util::backoff_util::background_backoff;
60use fedimint_core::util::{BoxStream, backoff_util, retry};
61use fedimint_core::{
62    BitcoinHash, OutPoint, TransactionId, apply, async_trait_maybe_send, push_db_pair_items,
63    runtime, secp256k1,
64};
65use fedimint_derive_secret::{ChildId, DerivableSecret};
66use fedimint_logging::LOG_CLIENT_MODULE_WALLET;
67use fedimint_wallet_common::config::{FeeConsensus, WalletClientConfig};
68use fedimint_wallet_common::tweakable::Tweakable;
69pub use fedimint_wallet_common::*;
70use futures::{Stream, StreamExt};
71use rand::{Rng, thread_rng};
72use secp256k1::Keypair;
73use serde::{Deserialize, Serialize};
74use strum::IntoEnumIterator;
75use tokio::sync::watch;
76use tracing::{debug, instrument};
77
78use crate::api::WalletFederationApi;
79use crate::backup::WalletRecovery;
80use crate::client_db::{
81    ClaimedPegInData, ClaimedPegInKey, ClaimedPegInPrefix, NextPegInTweakIndexKey,
82    PegInTweakIndexData, PegInTweakIndexPrefix, RecoveryFinalizedKey, SupportsSafeDepositPrefix,
83};
84use crate::deposit::DepositStateMachine;
85use crate::withdraw::{CreatedWithdrawState, WithdrawStateMachine, WithdrawStates};
86
87const WALLET_TWEAK_CHILD_ID: ChildId = ChildId(0);
88
89#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
90pub struct BitcoinTransactionData {
91    /// The bitcoin transaction is saved as soon as we see it so the transaction
92    /// can be re-transmitted if it's evicted from the mempool.
93    pub btc_transaction: bitcoin::Transaction,
94    /// Index of the deposit output
95    pub out_idx: u32,
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
99pub enum DepositStateV1 {
100    WaitingForTransaction,
101    WaitingForConfirmation(BitcoinTransactionData),
102    Confirmed(BitcoinTransactionData),
103    Claimed(BitcoinTransactionData),
104    Failed(String),
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
108pub enum DepositStateV2 {
109    WaitingForTransaction,
110    WaitingForConfirmation {
111        #[serde(with = "bitcoin::amount::serde::as_sat")]
112        btc_deposited: bitcoin::Amount,
113        btc_out_point: bitcoin::OutPoint,
114    },
115    Confirmed {
116        #[serde(with = "bitcoin::amount::serde::as_sat")]
117        btc_deposited: bitcoin::Amount,
118        btc_out_point: bitcoin::OutPoint,
119    },
120    Claimed {
121        #[serde(with = "bitcoin::amount::serde::as_sat")]
122        btc_deposited: bitcoin::Amount,
123        btc_out_point: bitcoin::OutPoint,
124    },
125    Failed(String),
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
129pub enum WithdrawState {
130    Created,
131    Succeeded(bitcoin::Txid),
132    Failed(String),
133    // TODO: track refund
134    // Refunded,
135    // RefundFailed(String),
136}
137
138async fn next_withdraw_state<S>(stream: &mut S) -> Option<WithdrawStates>
139where
140    S: Stream<Item = WalletClientStates> + Unpin,
141{
142    loop {
143        if let WalletClientStates::Withdraw(ds) = stream.next().await? {
144            return Some(ds.state);
145        }
146        tokio::task::yield_now().await;
147    }
148}
149
150#[derive(Debug, Clone, Default)]
151// TODO: should probably move to DB
152pub struct WalletClientInit(pub Option<DynBitcoindRpc>);
153
154impl WalletClientInit {
155    pub fn new(rpc: DynBitcoindRpc) -> Self {
156        Self(Some(rpc))
157    }
158}
159
160impl ModuleInit for WalletClientInit {
161    type Common = WalletCommonInit;
162
163    async fn dump_database(
164        &self,
165        dbtx: &mut DatabaseTransaction<'_>,
166        prefix_names: Vec<String>,
167    ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
168        let mut wallet_client_items: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> =
169            BTreeMap::new();
170        let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
171            prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
172        });
173
174        for table in filtered_prefixes {
175            match table {
176                DbKeyPrefix::NextPegInTweakIndex => {
177                    if let Some(index) = dbtx.get_value(&NextPegInTweakIndexKey).await {
178                        wallet_client_items
179                            .insert("NextPegInTweakIndex".to_string(), Box::new(index));
180                    }
181                }
182                DbKeyPrefix::PegInTweakIndex => {
183                    push_db_pair_items!(
184                        dbtx,
185                        PegInTweakIndexPrefix,
186                        PegInTweakIndexKey,
187                        PegInTweakIndexData,
188                        wallet_client_items,
189                        "Peg-In Tweak Index"
190                    );
191                }
192                DbKeyPrefix::ClaimedPegIn => {
193                    push_db_pair_items!(
194                        dbtx,
195                        ClaimedPegInPrefix,
196                        ClaimedPegInKey,
197                        ClaimedPegInData,
198                        wallet_client_items,
199                        "Claimed Peg-In"
200                    );
201                }
202                DbKeyPrefix::RecoveryFinalized => {
203                    if let Some(val) = dbtx.get_value(&RecoveryFinalizedKey).await {
204                        wallet_client_items.insert("RecoveryFinalized".to_string(), Box::new(val));
205                    }
206                }
207                DbKeyPrefix::SupportsSafeDeposit => {
208                    push_db_pair_items!(
209                        dbtx,
210                        SupportsSafeDepositPrefix,
211                        SupportsSafeDepositKey,
212                        (),
213                        wallet_client_items,
214                        "Supports Safe Deposit"
215                    );
216                }
217                DbKeyPrefix::RecoveryState
218                | DbKeyPrefix::ExternalReservedStart
219                | DbKeyPrefix::CoreInternalReservedStart
220                | DbKeyPrefix::CoreInternalReservedEnd => {}
221            }
222        }
223
224        Box::new(wallet_client_items.into_iter())
225    }
226}
227
228#[apply(async_trait_maybe_send!)]
229impl ClientModuleInit for WalletClientInit {
230    type Module = WalletClientModule;
231
232    fn supported_api_versions(&self) -> MultiApiVersion {
233        MultiApiVersion::try_from_iter([ApiVersion { major: 0, minor: 0 }])
234            .expect("no version conflicts")
235    }
236
237    async fn init(&self, args: &ClientModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
238        let data = WalletClientModuleData {
239            cfg: args.cfg().clone(),
240            module_root_secret: args.module_root_secret().clone(),
241        };
242
243        let db = args.db().clone();
244
245        let btc_rpc = self.0.clone().unwrap_or(create_esplora_rpc(
246            &WalletClientModule::get_rpc_config(args.cfg()).url,
247        )?);
248
249        let module_api = args.module_api().clone();
250
251        let (pegin_claimed_sender, pegin_claimed_receiver) = watch::channel(());
252        let (pegin_monitor_wakeup_sender, pegin_monitor_wakeup_receiver) = watch::channel(());
253
254        Ok(WalletClientModule {
255            db,
256            data,
257            module_api,
258            notifier: args.notifier().clone(),
259            rpc: btc_rpc,
260            client_ctx: args.context(),
261            pegin_monitor_wakeup_sender,
262            pegin_monitor_wakeup_receiver,
263            pegin_claimed_receiver,
264            pegin_claimed_sender,
265            task_group: args.task_group().clone(),
266            admin_auth: args.admin_auth().cloned(),
267        })
268    }
269
270    /// Wallet recovery
271    ///
272    /// Query bitcoin rpc for history of addresses from last known used
273    /// addresses (or index 0) until `MAX_GAP` unused ones.
274    ///
275    /// Notably does not persist the progress of addresses being queried,
276    /// because it is not expected that it would take long enough to bother.
277    async fn recover(
278        &self,
279        args: &ClientModuleRecoverArgs<Self>,
280        snapshot: Option<&<Self::Module as ClientModule>::Backup>,
281    ) -> anyhow::Result<()> {
282        args.recover_from_history::<WalletRecovery>(self, snapshot)
283            .await
284    }
285
286    fn used_db_prefixes(&self) -> Option<BTreeSet<u8>> {
287        Some(
288            DbKeyPrefix::iter()
289                .map(|p| p as u8)
290                .chain(
291                    DbKeyPrefix::ExternalReservedStart as u8
292                        ..=DbKeyPrefix::CoreInternalReservedEnd as u8,
293                )
294                .collect(),
295        )
296    }
297}
298
299#[derive(Debug, Clone, Serialize, Deserialize)]
300pub struct WalletOperationMeta {
301    pub variant: WalletOperationMetaVariant,
302    pub extra_meta: serde_json::Value,
303}
304
305#[derive(Debug, Clone, Serialize, Deserialize)]
306#[serde(rename_all = "snake_case")]
307pub enum WalletOperationMetaVariant {
308    Deposit {
309        address: Address<NetworkUnchecked>,
310        /// Added in 0.4.2, can be `None` for old deposits or `Some` for ones
311        /// using the pegin monitor. The value is the child index of the key
312        /// used to generate the address, so we can re-generate the secret key
313        /// from our root secret.
314        #[serde(default)]
315        tweak_idx: Option<TweakIdx>,
316        #[serde(default, skip_serializing_if = "Option::is_none")]
317        expires_at: Option<SystemTime>,
318    },
319    Withdraw {
320        address: Address<NetworkUnchecked>,
321        #[serde(with = "bitcoin::amount::serde::as_sat")]
322        amount: bitcoin::Amount,
323        fee: PegOutFees,
324        change: Vec<OutPoint>,
325    },
326
327    RbfWithdraw {
328        rbf: Rbf,
329        change: Vec<OutPoint>,
330    },
331}
332
333/// The non-resource, just plain-data parts of [`WalletClientModule`]
334#[derive(Debug, Clone)]
335pub struct WalletClientModuleData {
336    cfg: WalletClientConfig,
337    module_root_secret: DerivableSecret,
338}
339
340impl WalletClientModuleData {
341    fn derive_deposit_address(
342        &self,
343        idx: TweakIdx,
344    ) -> (Keypair, secp256k1::PublicKey, Address, OperationId) {
345        let idx = ChildId(idx.0);
346
347        let secret_tweak_key = self
348            .module_root_secret
349            .child_key(WALLET_TWEAK_CHILD_ID)
350            .child_key(idx)
351            .to_secp_key(fedimint_core::secp256k1::SECP256K1);
352
353        let public_tweak_key = secret_tweak_key.public_key();
354
355        let address = self
356            .cfg
357            .peg_in_descriptor
358            .tweak(&public_tweak_key, bitcoin::secp256k1::SECP256K1)
359            .address(self.cfg.network.0)
360            .unwrap();
361
362        // TODO: make hash?
363        let operation_id = OperationId(public_tweak_key.x_only_public_key().0.serialize());
364
365        (secret_tweak_key, public_tweak_key, address, operation_id)
366    }
367
368    fn derive_peg_in_script(
369        &self,
370        idx: TweakIdx,
371    ) -> (ScriptBuf, bitcoin::Address, Keypair, OperationId) {
372        let (secret_tweak_key, _, address, operation_id) = self.derive_deposit_address(idx);
373
374        (
375            self.cfg
376                .peg_in_descriptor
377                .tweak(&secret_tweak_key.public_key(), SECP256K1)
378                .script_pubkey(),
379            address,
380            secret_tweak_key,
381            operation_id,
382        )
383    }
384}
385
386#[derive(Debug)]
387pub struct WalletClientModule {
388    data: WalletClientModuleData,
389    db: Database,
390    module_api: DynModuleApi,
391    notifier: ModuleNotifier<WalletClientStates>,
392    rpc: DynBitcoindRpc,
393    client_ctx: ClientContext<Self>,
394    /// Updated to wake up pegin monitor
395    pegin_monitor_wakeup_sender: watch::Sender<()>,
396    pegin_monitor_wakeup_receiver: watch::Receiver<()>,
397    /// Called every time a peg-in was claimed
398    pegin_claimed_sender: watch::Sender<()>,
399    pegin_claimed_receiver: watch::Receiver<()>,
400    task_group: TaskGroup,
401    admin_auth: Option<ApiAuth>,
402}
403
404#[apply(async_trait_maybe_send!)]
405impl ClientModule for WalletClientModule {
406    type Init = WalletClientInit;
407    type Common = WalletModuleTypes;
408    type Backup = WalletModuleBackup;
409    type ModuleStateMachineContext = WalletClientContext;
410    type States = WalletClientStates;
411
412    fn context(&self) -> Self::ModuleStateMachineContext {
413        WalletClientContext {
414            rpc: self.rpc.clone(),
415            wallet_descriptor: self.cfg().peg_in_descriptor.clone(),
416            wallet_decoder: self.decoder(),
417            secp: Secp256k1::default(),
418            client_ctx: self.client_ctx.clone(),
419        }
420    }
421
422    async fn start(&self) {
423        self.task_group.spawn_cancellable("peg-in monitor", {
424            let client_ctx = self.client_ctx.clone();
425            let db = self.db.clone();
426            let btc_rpc = self.rpc.clone();
427            let module_api = self.module_api.clone();
428            let data = self.data.clone();
429            let pegin_claimed_sender = self.pegin_claimed_sender.clone();
430            let pegin_monitor_wakeup_receiver = self.pegin_monitor_wakeup_receiver.clone();
431            pegin_monitor::run_peg_in_monitor(
432                client_ctx,
433                db,
434                btc_rpc,
435                module_api,
436                data,
437                pegin_claimed_sender,
438                pegin_monitor_wakeup_receiver,
439            )
440        });
441
442        self.task_group
443            .spawn_cancellable("supports-safe-deposit-version", {
444                let db = self.db.clone();
445                let module_api = self.module_api.clone();
446
447                poll_supports_safe_deposit_version(db, module_api)
448            });
449    }
450
451    fn supports_backup(&self) -> bool {
452        true
453    }
454
455    async fn backup(&self) -> anyhow::Result<backup::WalletModuleBackup> {
456        // fetch consensus height first
457        let session_count = self.client_ctx.global_api().session_count().await?;
458
459        let mut dbtx = self.db.begin_transaction_nc().await;
460        let next_pegin_tweak_idx = dbtx
461            .get_value(&NextPegInTweakIndexKey)
462            .await
463            .unwrap_or_default();
464        let claimed = dbtx
465            .find_by_prefix(&PegInTweakIndexPrefix)
466            .await
467            .filter_map(|(k, v)| async move {
468                if v.claimed.is_empty() {
469                    None
470                } else {
471                    Some(k.0)
472                }
473            })
474            .collect()
475            .await;
476        Ok(backup::WalletModuleBackup::new_v1(
477            session_count,
478            next_pegin_tweak_idx,
479            claimed,
480        ))
481    }
482
483    fn input_fee(
484        &self,
485        _amount: &Amounts,
486        _input: &<Self::Common as ModuleCommon>::Input,
487    ) -> Option<Amounts> {
488        Some(Amounts::new_bitcoin(self.cfg().fee_consensus.peg_in_abs))
489    }
490
491    fn output_fee(
492        &self,
493        _amount: &Amounts,
494        _output: &<Self::Common as ModuleCommon>::Output,
495    ) -> Option<Amounts> {
496        Some(Amounts::new_bitcoin(self.cfg().fee_consensus.peg_out_abs))
497    }
498
499    async fn handle_rpc(
500        &self,
501        method: String,
502        request: serde_json::Value,
503    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
504        Box::pin(try_stream! {
505            match method.as_str() {
506                "get_wallet_summary" => {
507                    let _req: WalletSummaryRequest = serde_json::from_value(request)?;
508                    let wallet_summary = self.get_wallet_summary()
509                        .await
510                        .expect("Failed to fetch wallet summary");
511                    let result = serde_json::to_value(&wallet_summary)
512                        .expect("Serialization error");
513                    yield result;
514                }
515                "get_block_count_local" => {
516                    let block_count = self.get_block_count_local().await
517                        .expect("Failed to fetch block count");
518                    yield serde_json::to_value(block_count)?;
519                }
520                "peg_in" => {
521                    let req: PegInRequest = serde_json::from_value(request)?;
522                    let response = self.peg_in(req)
523                        .await
524                        .map_err(|e| anyhow::anyhow!("peg_in failed: {}", e))?;
525                    let result = serde_json::to_value(&response)?;
526                    yield result;
527                },
528                "peg_out" => {
529                    let req: PegOutRequest = serde_json::from_value(request)?;
530                    let response = self.peg_out(req)
531                        .await
532                        .map_err(|e| anyhow::anyhow!("peg_out failed: {}", e))?;
533                    let result = serde_json::to_value(&response)?;
534                    yield result;
535                },
536                "subscribe_deposit" => {
537                    let req: SubscribeDepositRequest = serde_json::from_value(request)?;
538                    for await state in self.subscribe_deposit(req.operation_id).await?.into_stream() {
539                        yield serde_json::to_value(state)?;
540                    }
541                },
542                "subscribe_withdraw" => {
543                    let req: SubscribeWithdrawRequest = serde_json::from_value(request)?;
544                    for await state in self.subscribe_withdraw_updates(req.operation_id).await?.into_stream(){
545                        yield serde_json::to_value(state)?;
546                    }
547                }
548                _ => {
549                    Err(anyhow::format_err!("Unknown method: {}", method))?;
550                }
551            }
552        })
553    }
554
555    #[cfg(feature = "cli")]
556    async fn handle_cli_command(
557        &self,
558        args: &[std::ffi::OsString],
559    ) -> anyhow::Result<serde_json::Value> {
560        cli::handle_cli_command(self, args).await
561    }
562}
563
564#[derive(Deserialize)]
565struct WalletSummaryRequest {}
566
567#[derive(Debug, Clone)]
568pub struct WalletClientContext {
569    rpc: DynBitcoindRpc,
570    wallet_descriptor: PegInDescriptor,
571    wallet_decoder: Decoder,
572    secp: Secp256k1<All>,
573    pub client_ctx: ClientContext<WalletClientModule>,
574}
575
576#[derive(Debug, Clone, Serialize, Deserialize)]
577pub struct PegInRequest {
578    pub extra_meta: serde_json::Value,
579}
580
581#[derive(Deserialize)]
582struct SubscribeDepositRequest {
583    operation_id: OperationId,
584}
585
586#[derive(Deserialize)]
587struct SubscribeWithdrawRequest {
588    operation_id: OperationId,
589}
590
591#[derive(Debug, Clone, Serialize, Deserialize)]
592pub struct PegInResponse {
593    pub deposit_address: Address<NetworkUnchecked>,
594    pub operation_id: OperationId,
595}
596
597#[derive(Debug, Clone, Serialize, Deserialize)]
598pub struct PegOutRequest {
599    pub amount_sat: u64,
600    pub destination_address: Address<NetworkUnchecked>,
601    pub extra_meta: serde_json::Value,
602}
603
604#[derive(Debug, Clone, Serialize, Deserialize)]
605pub struct PegOutResponse {
606    pub operation_id: OperationId,
607}
608
609impl Context for WalletClientContext {
610    const KIND: Option<ModuleKind> = Some(KIND);
611}
612
613impl WalletClientModule {
614    fn cfg(&self) -> &WalletClientConfig {
615        &self.data.cfg
616    }
617
618    fn get_rpc_config(cfg: &WalletClientConfig) -> BitcoinRpcConfig {
619        match BitcoinRpcConfig::get_defaults_from_env_vars() {
620            Ok(rpc_config) => {
621                // TODO: Wallet client cannot support bitcoind RPC until the bitcoin dep is
622                // updated to 0.30
623                if rpc_config.kind == "bitcoind" {
624                    cfg.default_bitcoin_rpc.clone()
625                } else {
626                    rpc_config
627                }
628            }
629            _ => cfg.default_bitcoin_rpc.clone(),
630        }
631    }
632
633    pub fn get_network(&self) -> Network {
634        self.cfg().network.0
635    }
636
637    pub fn get_finality_delay(&self) -> u32 {
638        self.cfg().finality_delay
639    }
640
641    pub fn get_fee_consensus(&self) -> FeeConsensus {
642        self.cfg().fee_consensus
643    }
644
645    async fn allocate_deposit_address_inner(
646        &self,
647        dbtx: &mut DatabaseTransaction<'_>,
648    ) -> (OperationId, Address, TweakIdx) {
649        dbtx.ensure_isolated().expect("Must be isolated db");
650
651        let tweak_idx = get_next_peg_in_tweak_child_id(dbtx).await;
652        let (_secret_tweak_key, _, address, operation_id) =
653            self.data.derive_deposit_address(tweak_idx);
654
655        let now = fedimint_core::time::now();
656
657        dbtx.insert_new_entry(
658            &PegInTweakIndexKey(tweak_idx),
659            &PegInTweakIndexData {
660                creation_time: now,
661                next_check_time: Some(now),
662                last_check_time: None,
663                operation_id,
664                claimed: vec![],
665            },
666        )
667        .await;
668
669        (operation_id, address, tweak_idx)
670    }
671
672    /// Fetches the fees that would need to be paid to make the withdraw request
673    /// using [`Self::withdraw`] work *right now*.
674    ///
675    /// Note that we do not receive a guarantee that these fees will be valid in
676    /// the future, thus even the next second using these fees *may* fail.
677    /// The caller should be prepared to retry with a new fee estimate.
678    pub async fn get_withdraw_fees(
679        &self,
680        address: &bitcoin::Address,
681        amount: bitcoin::Amount,
682    ) -> anyhow::Result<PegOutFees> {
683        self.module_api
684            .fetch_peg_out_fees(address, amount)
685            .await?
686            .context("Federation didn't return peg-out fees")
687    }
688
689    /// Returns a summary of the wallet's coins
690    pub async fn get_wallet_summary(&self) -> anyhow::Result<WalletSummary> {
691        Ok(self.module_api.fetch_wallet_summary().await?)
692    }
693
694    pub async fn get_block_count_local(&self) -> anyhow::Result<u32> {
695        Ok(self.module_api.fetch_block_count_local().await?)
696    }
697
698    pub fn create_withdraw_output(
699        &self,
700        operation_id: OperationId,
701        address: bitcoin::Address,
702        amount: bitcoin::Amount,
703        fees: PegOutFees,
704    ) -> anyhow::Result<ClientOutputBundle<WalletOutput, WalletClientStates>> {
705        let output = WalletOutput::new_v0_peg_out(address, amount, fees);
706
707        let amount = output.maybe_v0_ref().expect("v0 output").amount().into();
708
709        let sm_gen = move |out_point_range: OutPointRange| {
710            assert_eq!(out_point_range.count(), 1);
711            let out_idx = out_point_range.start_idx();
712            vec![WalletClientStates::Withdraw(WithdrawStateMachine {
713                operation_id,
714                state: WithdrawStates::Created(CreatedWithdrawState {
715                    fm_outpoint: OutPoint {
716                        txid: out_point_range.txid(),
717                        out_idx,
718                    },
719                }),
720            })]
721        };
722
723        Ok(ClientOutputBundle::new(
724            vec![ClientOutput::<WalletOutput> {
725                output,
726                amounts: Amounts::new_bitcoin(amount),
727            }],
728            vec![ClientOutputSM::<WalletClientStates> {
729                state_machines: Arc::new(sm_gen),
730            }],
731        ))
732    }
733
734    pub async fn peg_in(&self, req: PegInRequest) -> anyhow::Result<PegInResponse> {
735        let (operation_id, address, _) = self.safe_allocate_deposit_address(req.extra_meta).await?;
736
737        Ok(PegInResponse {
738            deposit_address: Address::from_script(&address.script_pubkey(), self.get_network())?
739                .as_unchecked()
740                .clone(),
741            operation_id,
742        })
743    }
744
745    pub async fn peg_out(&self, req: PegOutRequest) -> anyhow::Result<PegOutResponse> {
746        let amount = bitcoin::Amount::from_sat(req.amount_sat);
747        let destination = req
748            .destination_address
749            .require_network(self.get_network())?;
750
751        let fees = self.get_withdraw_fees(&destination, amount).await?;
752        let operation_id = self
753            .withdraw(&destination, amount, fees, req.extra_meta)
754            .await
755            .context("Failed to initiate withdraw")?;
756
757        Ok(PegOutResponse { operation_id })
758    }
759
760    pub fn create_rbf_withdraw_output(
761        &self,
762        operation_id: OperationId,
763        rbf: &Rbf,
764    ) -> anyhow::Result<ClientOutputBundle<WalletOutput, WalletClientStates>> {
765        let output = WalletOutput::new_v0_rbf(rbf.fees, rbf.txid);
766
767        let amount = output.maybe_v0_ref().expect("v0 output").amount().into();
768
769        let sm_gen = move |out_point_range: OutPointRange| {
770            assert_eq!(out_point_range.count(), 1);
771            let out_idx = out_point_range.start_idx();
772            vec![WalletClientStates::Withdraw(WithdrawStateMachine {
773                operation_id,
774                state: WithdrawStates::Created(CreatedWithdrawState {
775                    fm_outpoint: OutPoint {
776                        txid: out_point_range.txid(),
777                        out_idx,
778                    },
779                }),
780            })]
781        };
782
783        Ok(ClientOutputBundle::new(
784            vec![ClientOutput::<WalletOutput> {
785                output,
786                amounts: Amounts::new_bitcoin(amount),
787            }],
788            vec![ClientOutputSM::<WalletClientStates> {
789                state_machines: Arc::new(sm_gen),
790            }],
791        ))
792    }
793
794    pub async fn btc_tx_has_no_size_limit(&self) -> FederationResult<bool> {
795        Ok(self.module_api.module_consensus_version().await? >= ModuleConsensusVersion::new(2, 2))
796    }
797
798    /// Returns true if the federation's wallet module consensus version
799    /// supports processing all deposits.
800    ///
801    /// This method is safe to call offline, since it first attempts to read a
802    /// key from the db that represents the client has previously been able to
803    /// verify the wallet module consensus version. If the client has not
804    /// verified the version, it must be online to fetch the latest wallet
805    /// module consensus version.
806    pub async fn supports_safe_deposit(&self) -> bool {
807        let mut dbtx = self.db.begin_transaction().await;
808
809        let already_verified_supports_safe_deposit =
810            dbtx.get_value(&SupportsSafeDepositKey).await.is_some();
811
812        already_verified_supports_safe_deposit || {
813            match self.module_api.module_consensus_version().await {
814                Ok(module_consensus_version) => {
815                    let supported_version =
816                        SAFE_DEPOSIT_MODULE_CONSENSUS_VERSION <= module_consensus_version;
817
818                    if supported_version {
819                        dbtx.insert_new_entry(&SupportsSafeDepositKey, &()).await;
820                        dbtx.commit_tx().await;
821                    }
822
823                    supported_version
824                }
825                Err(_) => false,
826            }
827        }
828    }
829
830    /// Allocates a deposit address controlled by the federation, guaranteeing
831    /// safe handling of all deposits, including on-chain transactions exceeding
832    /// `ALEPH_BFT_UNIT_BYTE_LIMIT`.
833    ///
834    /// Returns an error if the client has never been online to verify the
835    /// federation's wallet module consensus version supports processing all
836    /// deposits.
837    pub async fn safe_allocate_deposit_address<M>(
838        &self,
839        extra_meta: M,
840    ) -> anyhow::Result<(OperationId, Address, TweakIdx)>
841    where
842        M: Serialize + MaybeSend + MaybeSync,
843    {
844        ensure!(
845            self.supports_safe_deposit().await,
846            "Wallet module consensus version doesn't support safe deposits",
847        );
848
849        self.allocate_deposit_address_expert_only(extra_meta).await
850    }
851
852    /// Allocates a deposit address that is controlled by the federation.
853    ///
854    /// This is an EXPERT ONLY method intended for power users such as Lightning
855    /// gateways allocating liquidity, and we discourage exposing peg-in
856    /// functionality to everyday users of a Fedimint wallet due to the
857    /// following two limitations:
858    ///
859    /// The transaction sending to this address needs to be smaller than 40KB in
860    /// order for the peg-in to be claimable. If the transaction is too large,
861    /// funds will be lost.
862    ///
863    /// In the future, federations will also enforce a minimum peg-in amount to
864    /// prevent accumulation of dust UTXOs. Peg-ins under this minimum cannot be
865    /// claimed and funds will be lost.
866    ///
867    /// Everyday users should rely on Lightning to move funds into the
868    /// federation.
869    pub async fn allocate_deposit_address_expert_only<M>(
870        &self,
871        extra_meta: M,
872    ) -> anyhow::Result<(OperationId, Address, TweakIdx)>
873    where
874        M: Serialize + MaybeSend + MaybeSync,
875    {
876        let extra_meta_value =
877            serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
878        let (operation_id, address, tweak_idx) = self
879            .db
880            .autocommit(
881                move |dbtx, _| {
882                    let extra_meta_value_inner = extra_meta_value.clone();
883                    Box::pin(async move {
884                        let (operation_id, address, tweak_idx) = self
885                            .allocate_deposit_address_inner(dbtx)
886                            .await;
887
888                        self.client_ctx.manual_operation_start_dbtx(
889                            dbtx,
890                            operation_id,
891                            WalletCommonInit::KIND.as_str(),
892                            WalletOperationMeta {
893                                variant: WalletOperationMetaVariant::Deposit {
894                                    address: address.clone().into_unchecked(),
895                                    tweak_idx: Some(tweak_idx),
896                                    expires_at: None,
897                                },
898                                extra_meta: extra_meta_value_inner,
899                            },
900                            vec![]
901                        ).await?;
902
903                        debug!(target: LOG_CLIENT_MODULE_WALLET, %tweak_idx, %address, "Derived a new deposit address");
904
905                        // Begin watching the script address
906                        self.rpc.watch_script_history(&address.script_pubkey()).await?;
907
908                        let sender = self.pegin_monitor_wakeup_sender.clone();
909                        dbtx.on_commit(move || {
910                            sender.send_replace(());
911                        });
912
913                        Ok((operation_id, address, tweak_idx))
914                    })
915                },
916                Some(100),
917            )
918            .await
919            .map_err(|e| match e {
920                AutocommitError::CommitFailed {
921                    last_error,
922                    attempts,
923                } => last_error.context(format!("Failed to commit after {attempts} attempts")),
924                AutocommitError::ClosureError { error, .. } => error,
925            })?;
926
927        Ok((operation_id, address, tweak_idx))
928    }
929
930    /// Returns a stream of updates about an ongoing deposit operation created
931    /// with [`WalletClientModule::allocate_deposit_address_expert_only`].
932    /// Returns an error for old deposit operations created prior to the 0.4
933    /// release and not driven to completion yet. This should be rare enough
934    /// that an indeterminate state is ok here.
935    pub async fn subscribe_deposit(
936        &self,
937        operation_id: OperationId,
938    ) -> anyhow::Result<UpdateStreamOrOutcome<DepositStateV2>> {
939        let operation = self
940            .client_ctx
941            .get_operation(operation_id)
942            .await
943            .with_context(|| anyhow!("Operation not found: {}", operation_id.fmt_short()))?;
944
945        if operation.operation_module_kind() != WalletCommonInit::KIND.as_str() {
946            bail!("Operation is not a wallet operation");
947        }
948
949        let operation_meta = operation.meta::<WalletOperationMeta>();
950
951        let WalletOperationMetaVariant::Deposit {
952            address, tweak_idx, ..
953        } = operation_meta.variant
954        else {
955            bail!("Operation is not a deposit operation");
956        };
957
958        let address = address.require_network(self.cfg().network.0)?;
959
960        // The old deposit operations don't have tweak_idx set
961        let Some(tweak_idx) = tweak_idx else {
962            // In case we are dealing with an old deposit that still uses state machines we
963            // don't have the logic here anymore to subscribe to updates. We can still read
964            // the final state though if it reached any.
965            let outcome_v1 = operation
966                .outcome::<DepositStateV1>()
967                .context("Old pending deposit, can't subscribe to updates")?;
968
969            let outcome_v2 = match outcome_v1 {
970                DepositStateV1::Claimed(tx_info) => DepositStateV2::Claimed {
971                    btc_deposited: tx_info.btc_transaction.output[tx_info.out_idx as usize].value,
972                    btc_out_point: bitcoin::OutPoint {
973                        txid: tx_info.btc_transaction.compute_txid(),
974                        vout: tx_info.out_idx,
975                    },
976                },
977                DepositStateV1::Failed(error) => DepositStateV2::Failed(error),
978                _ => bail!("Non-final outcome in operation log"),
979            };
980
981            return Ok(UpdateStreamOrOutcome::Outcome(outcome_v2));
982        };
983
984        Ok(self.client_ctx.outcome_or_updates(operation, operation_id, {
985            let stream_rpc = self.rpc.clone();
986            let stream_client_ctx = self.client_ctx.clone();
987            let stream_script_pub_key = address.script_pubkey();
988            move || {
989
990            stream! {
991                yield DepositStateV2::WaitingForTransaction;
992
993                retry(
994                    "subscribe script history",
995                    background_backoff(),
996                    || stream_rpc.watch_script_history(&stream_script_pub_key)
997                ).await.expect("Will never give up");
998                let (btc_out_point, btc_deposited) = retry(
999                    "fetch history",
1000                    background_backoff(),
1001                    || async {
1002                        let history = stream_rpc.get_script_history(&stream_script_pub_key).await?;
1003                        history.first().and_then(|tx| {
1004                            let (out_idx, amount) = tx.output
1005                                .iter()
1006                                .enumerate()
1007                                .find_map(|(idx, output)| (output.script_pubkey == stream_script_pub_key).then_some((idx, output.value)))?;
1008                            let txid = tx.compute_txid();
1009
1010                            Some((
1011                                bitcoin::OutPoint {
1012                                    txid,
1013                                    vout: out_idx as u32,
1014                                },
1015                                amount
1016                            ))
1017                        }).context("No deposit transaction found")
1018                    }
1019                ).await.expect("Will never give up");
1020
1021                yield DepositStateV2::WaitingForConfirmation {
1022                    btc_deposited,
1023                    btc_out_point
1024                };
1025
1026                let claim_data = stream_client_ctx.module_db().wait_key_exists(&ClaimedPegInKey {
1027                    peg_in_index: tweak_idx,
1028                    btc_out_point,
1029                }).await;
1030
1031                yield DepositStateV2::Confirmed {
1032                    btc_deposited,
1033                    btc_out_point
1034                };
1035
1036                match stream_client_ctx.await_primary_module_outputs(operation_id, claim_data.change).await {
1037                    Ok(()) => yield DepositStateV2::Claimed {
1038                        btc_deposited,
1039                        btc_out_point
1040                    },
1041                    Err(e) => yield DepositStateV2::Failed(e.to_string())
1042                }
1043            }
1044        }}))
1045    }
1046
1047    pub async fn list_peg_in_tweak_idxes(&self) -> BTreeMap<TweakIdx, PegInTweakIndexData> {
1048        self.client_ctx
1049            .module_db()
1050            .clone()
1051            .begin_transaction_nc()
1052            .await
1053            .find_by_prefix(&PegInTweakIndexPrefix)
1054            .await
1055            .map(|(key, data)| (key.0, data))
1056            .collect()
1057            .await
1058    }
1059
1060    pub async fn find_tweak_idx_by_address(
1061        &self,
1062        address: bitcoin::Address<NetworkUnchecked>,
1063    ) -> anyhow::Result<TweakIdx> {
1064        let data = self.data.clone();
1065        let Some((tweak_idx, _)) = self
1066            .db
1067            .begin_transaction_nc()
1068            .await
1069            .find_by_prefix(&PegInTweakIndexPrefix)
1070            .await
1071            .filter(|(k, _)| {
1072                let (_, derived_address, _tweak_key, _) = data.derive_peg_in_script(k.0);
1073                future::ready(derived_address.into_unchecked() == address)
1074            })
1075            .next()
1076            .await
1077        else {
1078            bail!("Address not found in the list of derived keys");
1079        };
1080
1081        Ok(tweak_idx.0)
1082    }
1083    pub async fn find_tweak_idx_by_operation_id(
1084        &self,
1085        operation_id: OperationId,
1086    ) -> anyhow::Result<TweakIdx> {
1087        Ok(self
1088            .client_ctx
1089            .module_db()
1090            .clone()
1091            .begin_transaction_nc()
1092            .await
1093            .find_by_prefix(&PegInTweakIndexPrefix)
1094            .await
1095            .filter(|(_k, v)| future::ready(v.operation_id == operation_id))
1096            .next()
1097            .await
1098            .ok_or_else(|| anyhow::format_err!("OperationId not found"))?
1099            .0
1100            .0)
1101    }
1102
1103    pub async fn get_pegin_tweak_idx(
1104        &self,
1105        tweak_idx: TweakIdx,
1106    ) -> anyhow::Result<PegInTweakIndexData> {
1107        self.client_ctx
1108            .module_db()
1109            .clone()
1110            .begin_transaction_nc()
1111            .await
1112            .get_value(&PegInTweakIndexKey(tweak_idx))
1113            .await
1114            .ok_or_else(|| anyhow::format_err!("TweakIdx not found"))
1115    }
1116
1117    pub async fn get_claimed_pegins(
1118        &self,
1119        dbtx: &mut DatabaseTransaction<'_>,
1120        tweak_idx: TweakIdx,
1121    ) -> Vec<(
1122        bitcoin::OutPoint,
1123        TransactionId,
1124        Vec<fedimint_core::OutPoint>,
1125    )> {
1126        let outpoints = dbtx
1127            .get_value(&PegInTweakIndexKey(tweak_idx))
1128            .await
1129            .map(|v| v.claimed)
1130            .unwrap_or_default();
1131
1132        let mut res = vec![];
1133
1134        for outpoint in outpoints {
1135            let claimed_peg_in_data = dbtx
1136                .get_value(&ClaimedPegInKey {
1137                    peg_in_index: tweak_idx,
1138                    btc_out_point: outpoint,
1139                })
1140                .await
1141                .expect("Must have a corresponding claim record");
1142            res.push((
1143                outpoint,
1144                claimed_peg_in_data.claim_txid,
1145                claimed_peg_in_data.change,
1146            ));
1147        }
1148
1149        res
1150    }
1151
1152    /// Like [`Self::recheck_pegin_address`] but by `operation_id`
1153    pub async fn recheck_pegin_address_by_op_id(
1154        &self,
1155        operation_id: OperationId,
1156    ) -> anyhow::Result<()> {
1157        let tweak_idx = self.find_tweak_idx_by_operation_id(operation_id).await?;
1158
1159        self.recheck_pegin_address(tweak_idx).await
1160    }
1161
1162    /// Schedule given address for immediate re-check for deposits
1163    pub async fn recheck_pegin_address_by_address(
1164        &self,
1165        address: bitcoin::Address<NetworkUnchecked>,
1166    ) -> anyhow::Result<()> {
1167        self.recheck_pegin_address(self.find_tweak_idx_by_address(address).await?)
1168            .await
1169    }
1170
1171    /// Schedule given address for immediate re-check for deposits
1172    pub async fn recheck_pegin_address(&self, tweak_idx: TweakIdx) -> anyhow::Result<()> {
1173        self.db
1174            .autocommit(
1175                |dbtx, _| {
1176                    Box::pin(async {
1177                        let db_key = PegInTweakIndexKey(tweak_idx);
1178                        let db_val = dbtx
1179                            .get_value(&db_key)
1180                            .await
1181                            .ok_or_else(|| anyhow::format_err!("DBKey not found"))?;
1182
1183                        dbtx.insert_entry(
1184                            &db_key,
1185                            &PegInTweakIndexData {
1186                                next_check_time: Some(fedimint_core::time::now()),
1187                                ..db_val
1188                            },
1189                        )
1190                        .await;
1191
1192                        let sender = self.pegin_monitor_wakeup_sender.clone();
1193                        dbtx.on_commit(move || {
1194                            sender.send_replace(());
1195                        });
1196
1197                        Ok::<_, anyhow::Error>(())
1198                    })
1199                },
1200                Some(100),
1201            )
1202            .await?;
1203
1204        Ok(())
1205    }
1206
1207    /// Await for num deposit by [`OperationId`]
1208    pub async fn await_num_deposits_by_operation_id(
1209        &self,
1210        operation_id: OperationId,
1211        num_deposits: usize,
1212    ) -> anyhow::Result<()> {
1213        let tweak_idx = self.find_tweak_idx_by_operation_id(operation_id).await?;
1214        self.await_num_deposits(tweak_idx, num_deposits).await
1215    }
1216
1217    pub async fn await_num_deposits_by_address(
1218        &self,
1219        address: bitcoin::Address<NetworkUnchecked>,
1220        num_deposits: usize,
1221    ) -> anyhow::Result<()> {
1222        self.await_num_deposits(self.find_tweak_idx_by_address(address).await?, num_deposits)
1223            .await
1224    }
1225
1226    #[instrument(target = LOG_CLIENT_MODULE_WALLET, skip_all, fields(tweak_idx=?tweak_idx, num_deposists=num_deposits))]
1227    pub async fn await_num_deposits(
1228        &self,
1229        tweak_idx: TweakIdx,
1230        num_deposits: usize,
1231    ) -> anyhow::Result<()> {
1232        let operation_id = self.get_pegin_tweak_idx(tweak_idx).await?.operation_id;
1233
1234        let mut receiver = self.pegin_claimed_receiver.clone();
1235        let mut backoff = backoff_util::aggressive_backoff();
1236
1237        loop {
1238            let pegins = self
1239                .get_claimed_pegins(
1240                    &mut self.client_ctx.module_db().begin_transaction_nc().await,
1241                    tweak_idx,
1242                )
1243                .await;
1244
1245            if pegins.len() < num_deposits {
1246                debug!(target: LOG_CLIENT_MODULE_WALLET, has=pegins.len(), "Not enough deposits");
1247                self.recheck_pegin_address(tweak_idx).await?;
1248                runtime::sleep(backoff.next().unwrap_or_default()).await;
1249                receiver.changed().await?;
1250                continue;
1251            }
1252
1253            debug!(target: LOG_CLIENT_MODULE_WALLET, has=pegins.len(), "Enough deposits detected");
1254
1255            for (_outpoint, transaction_id, change) in pegins {
1256                if transaction_id == TransactionId::from_byte_array([0; 32]) && change.is_empty() {
1257                    debug!(target: LOG_CLIENT_MODULE_WALLET, "Deposited amount was too low, skipping");
1258                    continue;
1259                }
1260
1261                debug!(target: LOG_CLIENT_MODULE_WALLET, out_points=?change, "Ensuring deposists claimed");
1262                let tx_subscriber = self.client_ctx.transaction_updates(operation_id).await;
1263
1264                if let Err(e) = tx_subscriber.await_tx_accepted(transaction_id).await {
1265                    bail!("{}", e);
1266                }
1267
1268                debug!(target: LOG_CLIENT_MODULE_WALLET, out_points=?change, "Ensuring outputs claimed");
1269                self.client_ctx
1270                    .await_primary_module_outputs(operation_id, change)
1271                    .await
1272                    .expect("Cannot fail if tx was accepted and federation is honest");
1273            }
1274
1275            return Ok(());
1276        }
1277    }
1278
1279    /// Attempt to withdraw a given `amount` of Bitcoin to a destination
1280    /// `address`. The caller has to supply the fee rate to be used which can be
1281    /// fetched using [`Self::get_withdraw_fees`] and should be
1282    /// acknowledged by the user since it can be unexpectedly high.
1283    pub async fn withdraw<M: Serialize + MaybeSend + MaybeSync>(
1284        &self,
1285        address: &bitcoin::Address,
1286        amount: bitcoin::Amount,
1287        fee: PegOutFees,
1288        extra_meta: M,
1289    ) -> anyhow::Result<OperationId> {
1290        {
1291            let operation_id = OperationId(thread_rng().r#gen());
1292
1293            let withdraw_output =
1294                self.create_withdraw_output(operation_id, address.clone(), amount, fee)?;
1295            let tx_builder = TransactionBuilder::new()
1296                .with_outputs(self.client_ctx.make_client_outputs(withdraw_output));
1297
1298            let extra_meta =
1299                serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
1300            self.client_ctx
1301                .finalize_and_submit_transaction(
1302                    operation_id,
1303                    WalletCommonInit::KIND.as_str(),
1304                    {
1305                        let address = address.clone();
1306                        move |change_range: OutPointRange| WalletOperationMeta {
1307                            variant: WalletOperationMetaVariant::Withdraw {
1308                                address: address.clone().into_unchecked(),
1309                                amount,
1310                                fee,
1311                                change: change_range.into_iter().collect(),
1312                            },
1313                            extra_meta: extra_meta.clone(),
1314                        }
1315                    },
1316                    tx_builder,
1317                )
1318                .await?;
1319
1320            let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
1321
1322            self.client_ctx
1323                .log_event(
1324                    &mut dbtx,
1325                    SendPaymentEvent {
1326                        operation_id,
1327                        amount: amount + fee.amount(),
1328                        fee: fee.amount(),
1329                    },
1330                )
1331                .await;
1332
1333            dbtx.commit_tx().await;
1334
1335            Ok(operation_id)
1336        }
1337    }
1338
1339    /// Attempt to increase the fee of a onchain withdraw transaction using
1340    /// replace by fee (RBF).
1341    /// This can prevent transactions from getting stuck
1342    /// in the mempool
1343    #[deprecated(
1344        since = "0.4.0",
1345        note = "RBF withdrawals are rejected by the federation"
1346    )]
1347    pub async fn rbf_withdraw<M: Serialize + MaybeSync + MaybeSend>(
1348        &self,
1349        rbf: Rbf,
1350        extra_meta: M,
1351    ) -> anyhow::Result<OperationId> {
1352        let operation_id = OperationId(thread_rng().r#gen());
1353
1354        let withdraw_output = self.create_rbf_withdraw_output(operation_id, &rbf)?;
1355        let tx_builder = TransactionBuilder::new()
1356            .with_outputs(self.client_ctx.make_client_outputs(withdraw_output));
1357
1358        let extra_meta = serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
1359        self.client_ctx
1360            .finalize_and_submit_transaction(
1361                operation_id,
1362                WalletCommonInit::KIND.as_str(),
1363                move |change_range: OutPointRange| WalletOperationMeta {
1364                    variant: WalletOperationMetaVariant::RbfWithdraw {
1365                        rbf: rbf.clone(),
1366                        change: change_range.into_iter().collect(),
1367                    },
1368                    extra_meta: extra_meta.clone(),
1369                },
1370                tx_builder,
1371            )
1372            .await?;
1373
1374        Ok(operation_id)
1375    }
1376
1377    pub async fn subscribe_withdraw_updates(
1378        &self,
1379        operation_id: OperationId,
1380    ) -> anyhow::Result<UpdateStreamOrOutcome<WithdrawState>> {
1381        let operation = self
1382            .client_ctx
1383            .get_operation(operation_id)
1384            .await
1385            .with_context(|| anyhow!("Operation not found: {}", operation_id.fmt_short()))?;
1386
1387        if operation.operation_module_kind() != WalletCommonInit::KIND.as_str() {
1388            bail!("Operation is not a wallet operation");
1389        }
1390
1391        let operation_meta = operation.meta::<WalletOperationMeta>();
1392
1393        let (WalletOperationMetaVariant::Withdraw { change, .. }
1394        | WalletOperationMetaVariant::RbfWithdraw { change, .. }) = operation_meta.variant
1395        else {
1396            bail!("Operation is not a withdraw operation");
1397        };
1398
1399        let mut operation_stream = self.notifier.subscribe(operation_id).await;
1400        let client_ctx = self.client_ctx.clone();
1401
1402        Ok(self
1403            .client_ctx
1404            .outcome_or_updates(operation, operation_id, move || {
1405                stream! {
1406                    match next_withdraw_state(&mut operation_stream).await {
1407                        Some(WithdrawStates::Created(_)) => {
1408                            yield WithdrawState::Created;
1409                        },
1410                        Some(s) => {
1411                            panic!("Unexpected state {s:?}")
1412                        },
1413                        None => return,
1414                    }
1415
1416                    // TODO: get rid of awaiting change here, there has to be a better way to make tests deterministic
1417
1418                        // Swallowing potential errors since the transaction failing  is handled by
1419                        // output outcome fetching already
1420                        let _ = client_ctx
1421                            .await_primary_module_outputs(operation_id, change)
1422                            .await;
1423
1424
1425                    match next_withdraw_state(&mut operation_stream).await {
1426                        Some(WithdrawStates::Aborted(inner)) => {
1427                            yield WithdrawState::Failed(inner.error);
1428                        },
1429                        Some(WithdrawStates::Success(inner)) => {
1430                            yield WithdrawState::Succeeded(inner.txid);
1431                        },
1432                        Some(s) => {
1433                            panic!("Unexpected state {s:?}")
1434                        },
1435                        None => {},
1436                    }
1437                }
1438            }))
1439    }
1440
1441    fn admin_auth(&self) -> anyhow::Result<ApiAuth> {
1442        self.admin_auth
1443            .clone()
1444            .ok_or_else(|| anyhow::format_err!("Admin auth not set"))
1445    }
1446
1447    pub async fn activate_consensus_version_voting(&self) -> anyhow::Result<()> {
1448        self.module_api
1449            .activate_consensus_version_voting(self.admin_auth()?)
1450            .await?;
1451
1452        Ok(())
1453    }
1454}
1455
1456/// Polls the federation checking if the activated module consensus version
1457/// supports safe deposits, saving the result in the db once it does.
1458async fn poll_supports_safe_deposit_version(db: Database, module_api: DynModuleApi) {
1459    loop {
1460        let mut dbtx = db.begin_transaction().await;
1461
1462        if dbtx.get_value(&SupportsSafeDepositKey).await.is_some() {
1463            break;
1464        }
1465
1466        if let Ok(module_consensus_version) = module_api.module_consensus_version().await
1467            && SAFE_DEPOSIT_MODULE_CONSENSUS_VERSION <= module_consensus_version
1468        {
1469            dbtx.insert_new_entry(&SupportsSafeDepositKey, &()).await;
1470            dbtx.commit_tx().await;
1471            break;
1472        }
1473
1474        drop(dbtx);
1475
1476        if is_running_in_test_env() {
1477            // Even in tests we don't want to spam the federation with requests about it
1478            sleep(Duration::from_secs(10)).await;
1479        } else {
1480            sleep(Duration::from_secs(3600)).await;
1481        }
1482    }
1483}
1484
1485/// Returns the child index to derive the next peg-in tweak key from.
1486async fn get_next_peg_in_tweak_child_id(dbtx: &mut DatabaseTransaction<'_>) -> TweakIdx {
1487    let index = dbtx
1488        .get_value(&NextPegInTweakIndexKey)
1489        .await
1490        .unwrap_or_default();
1491    dbtx.insert_entry(&NextPegInTweakIndexKey, &(index.next()))
1492        .await;
1493    index
1494}
1495
1496#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
1497pub enum WalletClientStates {
1498    Deposit(DepositStateMachine),
1499    Withdraw(WithdrawStateMachine),
1500}
1501
1502impl IntoDynInstance for WalletClientStates {
1503    type DynType = DynState;
1504
1505    fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
1506        DynState::from_typed(instance_id, self)
1507    }
1508}
1509
1510impl State for WalletClientStates {
1511    type ModuleContext = WalletClientContext;
1512
1513    fn transitions(
1514        &self,
1515        context: &Self::ModuleContext,
1516        global_context: &DynGlobalClientContext,
1517    ) -> Vec<StateTransition<Self>> {
1518        match self {
1519            WalletClientStates::Deposit(sm) => {
1520                sm_enum_variant_translation!(
1521                    sm.transitions(context, global_context),
1522                    WalletClientStates::Deposit
1523                )
1524            }
1525            WalletClientStates::Withdraw(sm) => {
1526                sm_enum_variant_translation!(
1527                    sm.transitions(context, global_context),
1528                    WalletClientStates::Withdraw
1529                )
1530            }
1531        }
1532    }
1533
1534    fn operation_id(&self) -> OperationId {
1535        match self {
1536            WalletClientStates::Deposit(sm) => sm.operation_id(),
1537            WalletClientStates::Withdraw(sm) => sm.operation_id(),
1538        }
1539    }
1540}
1541
1542#[cfg(all(test, not(target_family = "wasm")))]
1543mod tests {
1544    use std::collections::BTreeSet;
1545    use std::sync::atomic::{AtomicBool, Ordering};
1546
1547    use super::*;
1548    use crate::backup::{
1549        RECOVER_NUM_IDX_ADD_TO_LAST_USED, RecoverScanOutcome, recover_scan_idxes_for_activity,
1550    };
1551
1552    #[allow(clippy::too_many_lines)] // shut-up clippy, it's a test
1553    #[tokio::test(flavor = "multi_thread")]
1554    async fn sanity_test_recover_inner() {
1555        {
1556            let last_checked = AtomicBool::new(false);
1557            let last_checked = &last_checked;
1558            assert_eq!(
1559                recover_scan_idxes_for_activity(
1560                    TweakIdx(0),
1561                    &BTreeSet::new(),
1562                    |cur_idx| async move {
1563                        Ok(match cur_idx {
1564                            TweakIdx(9) => {
1565                                last_checked.store(true, Ordering::SeqCst);
1566                                vec![]
1567                            }
1568                            TweakIdx(10) => panic!("Shouldn't happen"),
1569                            TweakIdx(11) => {
1570                                vec![0usize] /* just for type inference */
1571                            }
1572                            _ => vec![],
1573                        })
1574                    }
1575                )
1576                .await
1577                .unwrap(),
1578                RecoverScanOutcome {
1579                    last_used_idx: None,
1580                    new_start_idx: TweakIdx(RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1581                    tweak_idxes_with_pegins: BTreeSet::from([])
1582                }
1583            );
1584            assert!(last_checked.load(Ordering::SeqCst));
1585        }
1586
1587        {
1588            let last_checked = AtomicBool::new(false);
1589            let last_checked = &last_checked;
1590            assert_eq!(
1591                recover_scan_idxes_for_activity(
1592                    TweakIdx(0),
1593                    &BTreeSet::from([TweakIdx(1), TweakIdx(2)]),
1594                    |cur_idx| async move {
1595                        Ok(match cur_idx {
1596                            TweakIdx(1) => panic!("Shouldn't happen: already used (1)"),
1597                            TweakIdx(2) => panic!("Shouldn't happen: already used (2)"),
1598                            TweakIdx(11) => {
1599                                last_checked.store(true, Ordering::SeqCst);
1600                                vec![]
1601                            }
1602                            TweakIdx(12) => panic!("Shouldn't happen"),
1603                            TweakIdx(13) => {
1604                                vec![0usize] /* just for type inference */
1605                            }
1606                            _ => vec![],
1607                        })
1608                    }
1609                )
1610                .await
1611                .unwrap(),
1612                RecoverScanOutcome {
1613                    last_used_idx: Some(TweakIdx(2)),
1614                    new_start_idx: TweakIdx(2 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1615                    tweak_idxes_with_pegins: BTreeSet::from([])
1616                }
1617            );
1618            assert!(last_checked.load(Ordering::SeqCst));
1619        }
1620
1621        {
1622            let last_checked = AtomicBool::new(false);
1623            let last_checked = &last_checked;
1624            assert_eq!(
1625                recover_scan_idxes_for_activity(
1626                    TweakIdx(10),
1627                    &BTreeSet::new(),
1628                    |cur_idx| async move {
1629                        Ok(match cur_idx {
1630                            TweakIdx(10) => vec![()],
1631                            TweakIdx(19) => {
1632                                last_checked.store(true, Ordering::SeqCst);
1633                                vec![]
1634                            }
1635                            TweakIdx(20) => panic!("Shouldn't happen"),
1636                            _ => vec![],
1637                        })
1638                    }
1639                )
1640                .await
1641                .unwrap(),
1642                RecoverScanOutcome {
1643                    last_used_idx: Some(TweakIdx(10)),
1644                    new_start_idx: TweakIdx(10 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1645                    tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(10)])
1646                }
1647            );
1648            assert!(last_checked.load(Ordering::SeqCst));
1649        }
1650
1651        assert_eq!(
1652            recover_scan_idxes_for_activity(TweakIdx(0), &BTreeSet::new(), |cur_idx| async move {
1653                Ok(match cur_idx {
1654                    TweakIdx(6 | 15) => vec![()],
1655                    _ => vec![],
1656                })
1657            })
1658            .await
1659            .unwrap(),
1660            RecoverScanOutcome {
1661                last_used_idx: Some(TweakIdx(15)),
1662                new_start_idx: TweakIdx(15 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1663                tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(6), TweakIdx(15)])
1664            }
1665        );
1666        assert_eq!(
1667            recover_scan_idxes_for_activity(TweakIdx(10), &BTreeSet::new(), |cur_idx| async move {
1668                Ok(match cur_idx {
1669                    TweakIdx(8) => {
1670                        vec![()] /* for type inference only */
1671                    }
1672                    TweakIdx(9) => {
1673                        panic!("Shouldn't happen")
1674                    }
1675                    _ => vec![],
1676                })
1677            })
1678            .await
1679            .unwrap(),
1680            RecoverScanOutcome {
1681                last_used_idx: None,
1682                new_start_idx: TweakIdx(9 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1683                tweak_idxes_with_pegins: BTreeSet::from([])
1684            }
1685        );
1686        assert_eq!(
1687            recover_scan_idxes_for_activity(TweakIdx(10), &BTreeSet::new(), |cur_idx| async move {
1688                Ok(match cur_idx {
1689                    TweakIdx(9) => panic!("Shouldn't happen"),
1690                    TweakIdx(15) => vec![()],
1691                    _ => vec![],
1692                })
1693            })
1694            .await
1695            .unwrap(),
1696            RecoverScanOutcome {
1697                last_used_idx: Some(TweakIdx(15)),
1698                new_start_idx: TweakIdx(15 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1699                tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(15)])
1700            }
1701        );
1702    }
1703}