Skip to main content

fedimint_wallet_client/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::missing_errors_doc)]
4#![allow(clippy::missing_panics_doc)]
5#![allow(clippy::module_name_repetitions)]
6#![allow(clippy::must_use_candidate)]
7
8pub mod api;
9#[cfg(feature = "cli")]
10mod cli;
11
12mod backup;
13
14pub mod client_db;
15/// Legacy, state-machine based peg-ins, replaced by `pegin_monitor`
16/// but retained for time being to ensure existing peg-ins complete.
17mod deposit;
18pub mod events;
19use events::SendPaymentEvent;
20/// Peg-in monitor: a task monitoring deposit addresses for peg-ins.
21mod pegin_monitor;
22mod withdraw;
23
24use std::collections::{BTreeMap, BTreeSet};
25use std::future;
26use std::sync::Arc;
27use std::time::{Duration, SystemTime};
28
29use anyhow::{Context as AnyhowContext, anyhow, bail, ensure};
30use async_stream::{stream, try_stream};
31use backup::WalletModuleBackup;
32use bitcoin::address::NetworkUnchecked;
33use bitcoin::secp256k1::{All, SECP256K1, Secp256k1};
34use bitcoin::{Address, Network, ScriptBuf};
35use client_db::{DbKeyPrefix, PegInTweakIndexKey, SupportsSafeDepositKey, TweakIdx};
36use fedimint_api_client::api::{DynModuleApi, FederationResult};
37use fedimint_bitcoind::{BitcoindTracked, DynBitcoindRpc, IBitcoindRpc, create_esplora_rpc};
38use fedimint_client_module::module::init::{
39    ClientModuleInit, ClientModuleInitArgs, ClientModuleRecoverArgs,
40};
41use fedimint_client_module::module::recovery::RecoveryProgress;
42use fedimint_client_module::module::{ClientContext, ClientModule, IClientModule, OutPointRange};
43use fedimint_client_module::oplog::UpdateStreamOrOutcome;
44use fedimint_client_module::sm::{Context, DynState, ModuleNotifier, State, StateTransition};
45use fedimint_client_module::transaction::{
46    ClientOutput, ClientOutputBundle, ClientOutputSM, TransactionBuilder,
47};
48use fedimint_client_module::{DynGlobalClientContext, sm_enum_variant_translation};
49use fedimint_core::core::{Decoder, IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId};
50use fedimint_core::db::{
51    AutocommitError, Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped,
52};
53use fedimint_core::encoding::{Decodable, Encodable};
54use fedimint_core::envs::{BitcoinRpcConfig, is_running_in_test_env};
55use fedimint_core::module::{
56    Amounts, ApiAuth, ApiVersion, CommonModuleInit, ModuleCommon, ModuleConsensusVersion,
57    ModuleInit, MultiApiVersion,
58};
59use fedimint_core::task::{MaybeSend, MaybeSync, TaskGroup, sleep};
60use fedimint_core::util::backoff_util::background_backoff;
61use fedimint_core::util::{BoxStream, backoff_util, retry};
62use fedimint_core::{
63    BitcoinHash, OutPoint, TransactionId, apply, async_trait_maybe_send, push_db_pair_items,
64    runtime, secp256k1,
65};
66use fedimint_derive_secret::{ChildId, DerivableSecret};
67use fedimint_logging::LOG_CLIENT_MODULE_WALLET;
68pub use fedimint_wallet_common as common;
69use fedimint_wallet_common::config::{FeeConsensus, WalletClientConfig};
70use fedimint_wallet_common::tweakable::Tweakable;
71pub use fedimint_wallet_common::*;
72use futures::{Stream, StreamExt};
73use rand::{Rng, thread_rng};
74use secp256k1::Keypair;
75use serde::{Deserialize, Serialize};
76use strum::IntoEnumIterator;
77use tokio::sync::watch;
78use tracing::{debug, instrument};
79
80use crate::api::WalletFederationApi;
81use crate::backup::{FEDERATION_RECOVER_MAX_GAP, RecoveryStateV2, WalletRecovery};
82use crate::client_db::{
83    ClaimedPegInData, ClaimedPegInKey, ClaimedPegInPrefix, NextPegInTweakIndexKey,
84    PegInPoolCursorKey, PegInTweakIndexData, PegInTweakIndexPrefix, RecoveryFinalizedKey,
85    RecoveryStateKey, SupportsSafeDepositPrefix,
86};
87use crate::deposit::DepositStateMachine;
88use crate::withdraw::{CreatedWithdrawState, WithdrawStateMachine, WithdrawStates};
89
90const WALLET_TWEAK_CHILD_ID: ChildId = ChildId(0);
91
92#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
93pub struct BitcoinTransactionData {
94    /// The bitcoin transaction is saved as soon as we see it so the transaction
95    /// can be re-transmitted if it's evicted from the mempool.
96    pub btc_transaction: bitcoin::Transaction,
97    /// Index of the deposit output
98    pub out_idx: u32,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
102pub enum DepositStateV1 {
103    WaitingForTransaction,
104    WaitingForConfirmation(BitcoinTransactionData),
105    Confirmed(BitcoinTransactionData),
106    Claimed(BitcoinTransactionData),
107    Failed(String),
108}
109
110#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
111pub enum DepositStateV2 {
112    WaitingForTransaction,
113    WaitingForConfirmation {
114        #[serde(with = "bitcoin::amount::serde::as_sat")]
115        btc_deposited: bitcoin::Amount,
116        btc_out_point: bitcoin::OutPoint,
117    },
118    Confirmed {
119        #[serde(with = "bitcoin::amount::serde::as_sat")]
120        btc_deposited: bitcoin::Amount,
121        btc_out_point: bitcoin::OutPoint,
122    },
123    Claimed {
124        #[serde(with = "bitcoin::amount::serde::as_sat")]
125        btc_deposited: bitcoin::Amount,
126        btc_out_point: bitcoin::OutPoint,
127    },
128    Failed(String),
129}
130
131/// Deposit address allocated by this client.
132#[derive(Debug, Clone, PartialEq, Eq)]
133pub struct DepositAddressInfo {
134    pub operation_id: OperationId,
135    pub address: Address,
136    pub tweak_idx: TweakIdx,
137}
138
139/// Result of [`WalletClientModule::allocate_deposit_address_pooled_stateless`].
140///
141/// Callers that need a custom address-picking strategy can use
142/// [`MaybeNewAddress::TooManyUnusedAddresses`] to see all currently reusable
143/// addresses instead of relying on this crate's round-robin policy.
144#[allow(clippy::enum_variant_names)]
145#[derive(Debug, Clone, PartialEq, Eq)]
146pub enum MaybeNewAddress {
147    /// A new tweak/operation was created on this call.
148    NewAddress(DepositAddressInfo),
149    /// The unused-address gap is full. No new address was allocated.
150    ///
151    /// Reusable unused addresses are ordered by `creation_time` ascending.
152    TooManyUnusedAddresses(Vec<DepositAddressInfo>),
153}
154
155/// Outcome of [`WalletClientModule::allocate_deposit_address_pooled`], so
156/// callers can decide whether to perform per-operation initialization (notes,
157/// fee bookkeeping, metadata writes) — for `Reused` returns, that work was
158/// already done at the time of the original `Fresh` allocation and must not be
159/// repeated.
160#[derive(Debug, Clone, Copy, PartialEq, Eq)]
161pub enum AllocateDepositOutcome {
162    /// A new tweak/operation was created on this call.
163    Fresh,
164    /// An existing unused address was returned. The carried `TweakIdx` is the
165    /// same as the one returned in the tuple and refers to the original
166    /// allocation; it's exposed here as well for explicit diagnostic use.
167    Reused { original_tweak_idx: TweakIdx },
168}
169
170#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
171pub enum WithdrawState {
172    Created,
173    Succeeded(bitcoin::Txid),
174    Failed(String),
175    // TODO: track refund
176    // Refunded,
177    // RefundFailed(String),
178}
179
180async fn next_withdraw_state<S>(stream: &mut S) -> Option<WithdrawStates>
181where
182    S: Stream<Item = WalletClientStates> + Unpin,
183{
184    loop {
185        if let WalletClientStates::Withdraw(ds) = stream.next().await? {
186            return Some(ds.state);
187        }
188        tokio::task::yield_now().await;
189    }
190}
191
192#[derive(Debug, Clone, Default)]
193// TODO: should probably move to DB
194pub struct WalletClientInit(pub Option<DynBitcoindRpc>);
195
196const SLICE_SIZE: u64 = 1000;
197
198impl WalletClientInit {
199    pub fn new(rpc: DynBitcoindRpc) -> Self {
200        Self(Some(rpc))
201    }
202
203    async fn recover_from_slices(
204        &self,
205        args: &ClientModuleRecoverArgs<Self>,
206    ) -> anyhow::Result<()> {
207        let data = WalletClientModuleData {
208            cfg: args.cfg().clone(),
209            module_root_secret: args.module_root_secret().clone(),
210        };
211
212        let total_items = args.module_api().fetch_recovery_count().await?;
213
214        let mut state = RecoveryStateV2::new();
215
216        state.refill_pending_pool_up_to(&data, TweakIdx(FEDERATION_RECOVER_MAX_GAP));
217
218        for start in (0..total_items).step_by(SLICE_SIZE as usize) {
219            let end = std::cmp::min(start + SLICE_SIZE, total_items);
220
221            let items = args.module_api().fetch_recovery_slice(start, end).await?;
222
223            for item in &items {
224                match item {
225                    RecoveryItem::Input { outpoint, script } => {
226                        state.handle_item(*outpoint, script, &data);
227                    }
228                }
229            }
230
231            args.update_recovery_progress(RecoveryProgress {
232                complete: end.try_into().unwrap_or(u32::MAX),
233                total: total_items.try_into().unwrap_or(u32::MAX),
234            });
235        }
236
237        let mut dbtx = args.db().begin_transaction().await;
238
239        for tweak_idx in 0..state.new_start_idx().0 {
240            let operation_id = data.derive_peg_in_script(TweakIdx(tweak_idx)).3;
241
242            let claimed = state
243                .claimed_outpoints
244                .get(&TweakIdx(tweak_idx))
245                .cloned()
246                .unwrap_or_default();
247
248            dbtx.insert_new_entry(
249                &PegInTweakIndexKey(TweakIdx(tweak_idx)),
250                &PegInTweakIndexData {
251                    operation_id,
252                    creation_time: fedimint_core::time::now(),
253                    last_check_time: None,
254                    next_check_time: Some(fedimint_core::time::now()),
255                    claimed,
256                },
257            )
258            .await;
259        }
260
261        dbtx.insert_new_entry(&NextPegInTweakIndexKey, &state.new_start_idx())
262            .await;
263
264        dbtx.commit_tx().await;
265
266        Ok(())
267    }
268}
269
270impl ModuleInit for WalletClientInit {
271    type Common = WalletCommonInit;
272
273    async fn dump_database(
274        &self,
275        dbtx: &mut DatabaseTransaction<'_>,
276        prefix_names: Vec<String>,
277    ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
278        let mut wallet_client_items: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> =
279            BTreeMap::new();
280        let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
281            prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
282        });
283
284        for table in filtered_prefixes {
285            match table {
286                DbKeyPrefix::NextPegInTweakIndex => {
287                    if let Some(index) = dbtx.get_value(&NextPegInTweakIndexKey).await {
288                        wallet_client_items
289                            .insert("NextPegInTweakIndex".to_string(), Box::new(index));
290                    }
291                }
292                DbKeyPrefix::PegInTweakIndex => {
293                    push_db_pair_items!(
294                        dbtx,
295                        PegInTweakIndexPrefix,
296                        PegInTweakIndexKey,
297                        PegInTweakIndexData,
298                        wallet_client_items,
299                        "Peg-In Tweak Index"
300                    );
301                }
302                DbKeyPrefix::ClaimedPegIn => {
303                    push_db_pair_items!(
304                        dbtx,
305                        ClaimedPegInPrefix,
306                        ClaimedPegInKey,
307                        ClaimedPegInData,
308                        wallet_client_items,
309                        "Claimed Peg-In"
310                    );
311                }
312                DbKeyPrefix::RecoveryFinalized => {
313                    if let Some(val) = dbtx.get_value(&RecoveryFinalizedKey).await {
314                        wallet_client_items.insert("RecoveryFinalized".to_string(), Box::new(val));
315                    }
316                }
317                DbKeyPrefix::SupportsSafeDeposit => {
318                    push_db_pair_items!(
319                        dbtx,
320                        SupportsSafeDepositPrefix,
321                        SupportsSafeDepositKey,
322                        (),
323                        wallet_client_items,
324                        "Supports Safe Deposit"
325                    );
326                }
327                DbKeyPrefix::PegInPoolCursor => {
328                    if let Some(cursor) = dbtx.get_value(&PegInPoolCursorKey).await {
329                        wallet_client_items.insert("PegInPoolCursor".to_string(), Box::new(cursor));
330                    }
331                }
332                DbKeyPrefix::RecoveryState
333                | DbKeyPrefix::ExternalReservedStart
334                | DbKeyPrefix::CoreInternalReservedStart
335                | DbKeyPrefix::CoreInternalReservedEnd => {}
336            }
337        }
338
339        Box::new(wallet_client_items.into_iter())
340    }
341}
342
343#[apply(async_trait_maybe_send!)]
344impl ClientModuleInit for WalletClientInit {
345    type Module = WalletClientModule;
346
347    fn supported_api_versions(&self) -> MultiApiVersion {
348        MultiApiVersion::try_from_iter([ApiVersion { major: 0, minor: 0 }])
349            .expect("no version conflicts")
350    }
351
352    async fn init(&self, args: &ClientModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
353        let data = WalletClientModuleData {
354            cfg: args.cfg().clone(),
355            module_root_secret: args.module_root_secret().clone(),
356        };
357
358        let db = args.db().clone();
359
360        let rpc_config = WalletClientModule::get_rpc_config(args.cfg());
361
362        // Priority:
363        // 1. user-provided bitcoind RPC from ClientBuilder::with_bitcoind_rpc
364        // 2. user-provided no-chain-id factory from
365        //    ClientBuilder::with_bitcoind_rpc_no_chain_id
366        // 3. WalletClientInit constructor
367        // 4. create from config (esplora)
368        let btc_rpc = if let Some(user_rpc) = args.user_bitcoind_rpc() {
369            user_rpc.clone()
370        } else if let Some(factory) = args.user_bitcoind_rpc_no_chain_id() {
371            if let Some(rpc) = factory(rpc_config.url.clone()).await {
372                rpc
373            } else {
374                self.0
375                    .clone()
376                    .unwrap_or(create_esplora_rpc(&rpc_config.url)?)
377            }
378        } else {
379            self.0
380                .clone()
381                .unwrap_or(create_esplora_rpc(&rpc_config.url)?)
382        };
383        let btc_rpc = BitcoindTracked::new(btc_rpc, "wallet-client").into_dyn();
384
385        let module_api = args.module_api().clone();
386
387        let (pegin_claimed_sender, pegin_claimed_receiver) = watch::channel(());
388        let (pegin_monitor_wakeup_sender, pegin_monitor_wakeup_receiver) = watch::channel(());
389
390        Ok(WalletClientModule {
391            db,
392            data,
393            module_api,
394            notifier: args.notifier().clone(),
395            rpc: btc_rpc,
396            client_ctx: args.context(),
397            pegin_monitor_wakeup_sender,
398            pegin_monitor_wakeup_receiver,
399            pegin_claimed_receiver,
400            pegin_claimed_sender,
401            task_group: args.task_group().clone(),
402            client_span: args.client_span().clone(),
403            admin_auth: args.admin_auth().cloned(),
404        })
405    }
406
407    /// Wallet recovery
408    ///
409    /// Uses slice-based recovery if supported by the federation, otherwise
410    /// falls back to session-based history recovery.
411    async fn recover(
412        &self,
413        args: &ClientModuleRecoverArgs<Self>,
414        snapshot: Option<&<Self::Module as ClientModule>::Backup>,
415    ) -> anyhow::Result<()> {
416        // Check if V1 (session-based) recovery state exists (resuming interrupted
417        // recovery)
418        if args
419            .db()
420            .begin_transaction_nc()
421            .await
422            .get_value(&RecoveryStateKey)
423            .await
424            .is_some()
425        {
426            return args
427                .recover_from_history::<WalletRecovery>(self, snapshot)
428                .await;
429        }
430
431        // Determine which method to use based on endpoint availability
432        if args.module_api().fetch_recovery_count().await.is_ok() {
433            self.recover_from_slices(args).await
434        } else {
435            args.recover_from_history::<WalletRecovery>(self, snapshot)
436                .await
437        }
438    }
439
440    fn used_db_prefixes(&self) -> Option<BTreeSet<u8>> {
441        Some(
442            DbKeyPrefix::iter()
443                .map(|p| p as u8)
444                .chain(
445                    DbKeyPrefix::ExternalReservedStart as u8
446                        ..=DbKeyPrefix::CoreInternalReservedEnd as u8,
447                )
448                .collect(),
449        )
450    }
451}
452
453#[derive(Debug, Clone, Serialize, Deserialize)]
454pub struct WalletOperationMeta {
455    pub variant: WalletOperationMetaVariant,
456    pub extra_meta: serde_json::Value,
457}
458
459#[derive(Debug, Clone, Serialize, Deserialize)]
460#[serde(rename_all = "snake_case")]
461pub enum WalletOperationMetaVariant {
462    Deposit {
463        address: Address<NetworkUnchecked>,
464        /// Added in 0.4.2, can be `None` for old deposits or `Some` for ones
465        /// using the pegin monitor. The value is the child index of the key
466        /// used to generate the address, so we can re-generate the secret key
467        /// from our root secret.
468        #[serde(default)]
469        tweak_idx: Option<TweakIdx>,
470        #[serde(default, skip_serializing_if = "Option::is_none")]
471        expires_at: Option<SystemTime>,
472    },
473    Withdraw {
474        address: Address<NetworkUnchecked>,
475        #[serde(with = "bitcoin::amount::serde::as_sat")]
476        amount: bitcoin::Amount,
477        fee: PegOutFees,
478        change: Vec<OutPoint>,
479    },
480
481    RbfWithdraw {
482        rbf: Rbf,
483        change: Vec<OutPoint>,
484    },
485}
486
487/// The non-resource, just plain-data parts of [`WalletClientModule`]
488#[derive(Debug, Clone)]
489pub struct WalletClientModuleData {
490    cfg: WalletClientConfig,
491    module_root_secret: DerivableSecret,
492}
493
494impl WalletClientModuleData {
495    fn derive_deposit_address(
496        &self,
497        idx: TweakIdx,
498    ) -> (Keypair, secp256k1::PublicKey, Address, OperationId) {
499        let idx = ChildId(idx.0);
500
501        let secret_tweak_key = self
502            .module_root_secret
503            .child_key(WALLET_TWEAK_CHILD_ID)
504            .child_key(idx)
505            .to_secp_key(fedimint_core::secp256k1::SECP256K1);
506
507        let public_tweak_key = secret_tweak_key.public_key();
508
509        let address = self
510            .cfg
511            .peg_in_descriptor
512            .tweak(&public_tweak_key, bitcoin::secp256k1::SECP256K1)
513            .address(self.cfg.network.0)
514            .unwrap();
515
516        // TODO: make hash?
517        let operation_id = OperationId(public_tweak_key.x_only_public_key().0.serialize());
518
519        (secret_tweak_key, public_tweak_key, address, operation_id)
520    }
521
522    fn derive_peg_in_script(
523        &self,
524        idx: TweakIdx,
525    ) -> (ScriptBuf, bitcoin::Address, Keypair, OperationId) {
526        let (secret_tweak_key, _, address, operation_id) = self.derive_deposit_address(idx);
527
528        (
529            self.cfg
530                .peg_in_descriptor
531                .tweak(&secret_tweak_key.public_key(), SECP256K1)
532                .script_pubkey(),
533            address,
534            secret_tweak_key,
535            operation_id,
536        )
537    }
538}
539
540#[derive(Debug)]
541pub struct WalletClientModule {
542    data: WalletClientModuleData,
543    db: Database,
544    module_api: DynModuleApi,
545    notifier: ModuleNotifier<WalletClientStates>,
546    rpc: DynBitcoindRpc,
547    client_ctx: ClientContext<Self>,
548    /// Updated to wake up pegin monitor
549    pegin_monitor_wakeup_sender: watch::Sender<()>,
550    pegin_monitor_wakeup_receiver: watch::Receiver<()>,
551    /// Called every time a peg-in was claimed
552    pegin_claimed_sender: watch::Sender<()>,
553    pegin_claimed_receiver: watch::Receiver<()>,
554    task_group: TaskGroup,
555    client_span: tracing::Span,
556    admin_auth: Option<ApiAuth>,
557}
558
559#[apply(async_trait_maybe_send!)]
560impl ClientModule for WalletClientModule {
561    type Init = WalletClientInit;
562    type Common = WalletModuleTypes;
563    type Backup = WalletModuleBackup;
564    type ModuleStateMachineContext = WalletClientContext;
565    type States = WalletClientStates;
566
567    fn context(&self) -> Self::ModuleStateMachineContext {
568        WalletClientContext {
569            rpc: self.rpc.clone(),
570            wallet_descriptor: self.cfg().peg_in_descriptor.clone(),
571            wallet_decoder: self.decoder(),
572            secp: Secp256k1::default(),
573            client_ctx: self.client_ctx.clone(),
574        }
575    }
576
577    async fn start(&self) {
578        self.task_group
579            .spawn_cancellable_with_span(self.client_span.clone(), "peg-in monitor", {
580                let client_ctx = self.client_ctx.clone();
581                let db = self.db.clone();
582                let btc_rpc = self.rpc.clone();
583                let module_api = self.module_api.clone();
584                let data = self.data.clone();
585                let pegin_claimed_sender = self.pegin_claimed_sender.clone();
586                let pegin_monitor_wakeup_receiver = self.pegin_monitor_wakeup_receiver.clone();
587                pegin_monitor::run_peg_in_monitor(
588                    client_ctx,
589                    db,
590                    btc_rpc,
591                    module_api,
592                    data,
593                    pegin_claimed_sender,
594                    pegin_monitor_wakeup_receiver,
595                )
596            });
597
598        self.task_group.spawn_cancellable_with_span(
599            self.client_span.clone(),
600            "supports-safe-deposit-version",
601            {
602                let db = self.db.clone();
603                let module_api = self.module_api.clone();
604
605                poll_supports_safe_deposit_version(db, module_api)
606            },
607        );
608    }
609
610    fn supports_backup(&self) -> bool {
611        true
612    }
613
614    async fn backup(&self) -> anyhow::Result<backup::WalletModuleBackup> {
615        // fetch consensus height first
616        let session_count = self.client_ctx.global_api().session_count().await?;
617
618        let mut dbtx = self.db.begin_transaction_nc().await;
619        let next_pegin_tweak_idx = dbtx
620            .get_value(&NextPegInTweakIndexKey)
621            .await
622            .unwrap_or_default();
623        let claimed = dbtx
624            .find_by_prefix(&PegInTweakIndexPrefix)
625            .await
626            .filter_map(|(k, v)| async move {
627                if v.claimed.is_empty() {
628                    None
629                } else {
630                    Some(k.0)
631                }
632            })
633            .collect()
634            .await;
635        Ok(backup::WalletModuleBackup::new_v1(
636            session_count,
637            next_pegin_tweak_idx,
638            claimed,
639        ))
640    }
641
642    fn input_fee(
643        &self,
644        _amount: &Amounts,
645        _input: &<Self::Common as ModuleCommon>::Input,
646    ) -> Option<Amounts> {
647        Some(Amounts::new_bitcoin(self.cfg().fee_consensus.peg_in_abs))
648    }
649
650    fn output_fee(
651        &self,
652        _amount: &Amounts,
653        _output: &<Self::Common as ModuleCommon>::Output,
654    ) -> Option<Amounts> {
655        Some(Amounts::new_bitcoin(self.cfg().fee_consensus.peg_out_abs))
656    }
657
658    async fn handle_rpc(
659        &self,
660        method: String,
661        request: serde_json::Value,
662    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
663        Box::pin(try_stream! {
664            match method.as_str() {
665                "get_wallet_summary" => {
666                    let _req: WalletSummaryRequest = serde_json::from_value(request)?;
667                    let wallet_summary = self.get_wallet_summary()
668                        .await
669                        .expect("Failed to fetch wallet summary");
670                    let result = serde_json::to_value(&wallet_summary)
671                        .expect("Serialization error");
672                    yield result;
673                }
674                "get_block_count_local" => {
675                    let block_count = self.get_block_count_local().await
676                        .expect("Failed to fetch block count");
677                    yield serde_json::to_value(block_count)?;
678                }
679                "peg_in" => {
680                    let req: PegInRequest = serde_json::from_value(request)?;
681                    let response = self.peg_in(req)
682                        .await
683                        .map_err(|e| anyhow::anyhow!("peg_in failed: {e}"))?;
684                    let result = serde_json::to_value(&response)?;
685                    yield result;
686                },
687                "peg_out" => {
688                    let req: PegOutRequest = serde_json::from_value(request)?;
689                    let response = self.peg_out(req)
690                        .await
691                        .map_err(|e| anyhow::anyhow!("peg_out failed: {e}"))?;
692                    let result = serde_json::to_value(&response)?;
693                    yield result;
694                },
695                "subscribe_deposit" => {
696                    let req: SubscribeDepositRequest = serde_json::from_value(request)?;
697                    for await state in self.subscribe_deposit(req.operation_id).await?.into_stream() {
698                        yield serde_json::to_value(state)?;
699                    }
700                },
701                "subscribe_withdraw" => {
702                    let req: SubscribeWithdrawRequest = serde_json::from_value(request)?;
703                    for await state in self.subscribe_withdraw_updates(req.operation_id).await?.into_stream(){
704                        yield serde_json::to_value(state)?;
705                    }
706                }
707                _ => {
708                    Err(anyhow::format_err!("Unknown method: {method}"))?;
709                }
710            }
711        })
712    }
713
714    #[cfg(feature = "cli")]
715    async fn handle_cli_command(
716        &self,
717        args: &[std::ffi::OsString],
718    ) -> anyhow::Result<serde_json::Value> {
719        cli::handle_cli_command(self, args).await
720    }
721}
722
723#[derive(Deserialize)]
724struct WalletSummaryRequest {}
725
726#[derive(Debug, Clone)]
727pub struct WalletClientContext {
728    rpc: DynBitcoindRpc,
729    wallet_descriptor: PegInDescriptor,
730    wallet_decoder: Decoder,
731    secp: Secp256k1<All>,
732    pub client_ctx: ClientContext<WalletClientModule>,
733}
734
735#[derive(Debug, Clone, Serialize, Deserialize)]
736pub struct PegInRequest {
737    pub extra_meta: serde_json::Value,
738}
739
740#[derive(Deserialize)]
741struct SubscribeDepositRequest {
742    operation_id: OperationId,
743}
744
745#[derive(Deserialize)]
746struct SubscribeWithdrawRequest {
747    operation_id: OperationId,
748}
749
750#[derive(Debug, Clone, Serialize, Deserialize)]
751pub struct PegInResponse {
752    pub deposit_address: Address<NetworkUnchecked>,
753    pub operation_id: OperationId,
754}
755
756#[derive(Debug, Clone, Serialize, Deserialize)]
757pub struct PegOutRequest {
758    pub amount_sat: u64,
759    pub destination_address: Address<NetworkUnchecked>,
760    pub extra_meta: serde_json::Value,
761}
762
763#[derive(Debug, Clone, Serialize, Deserialize)]
764pub struct PegOutResponse {
765    pub operation_id: OperationId,
766}
767
768impl Context for WalletClientContext {
769    const KIND: Option<ModuleKind> = Some(KIND);
770}
771
772impl WalletClientModule {
773    fn cfg(&self) -> &WalletClientConfig {
774        &self.data.cfg
775    }
776
777    fn get_rpc_config(cfg: &WalletClientConfig) -> BitcoinRpcConfig {
778        match BitcoinRpcConfig::get_defaults_from_env_vars() {
779            Ok(rpc_config) => {
780                // TODO: Wallet client cannot support bitcoind RPC until the bitcoin dep is
781                // updated to 0.30
782                if rpc_config.kind == "bitcoind" {
783                    cfg.default_bitcoin_rpc.clone()
784                } else {
785                    rpc_config
786                }
787            }
788            _ => cfg.default_bitcoin_rpc.clone(),
789        }
790    }
791
792    pub fn get_network(&self) -> Network {
793        self.cfg().network.0
794    }
795
796    pub fn get_finality_delay(&self) -> u32 {
797        self.cfg().finality_delay
798    }
799
800    pub fn get_fee_consensus(&self) -> FeeConsensus {
801        self.cfg().fee_consensus
802    }
803
804    async fn allocate_deposit_address_inner(
805        &self,
806        dbtx: &mut DatabaseTransaction<'_>,
807    ) -> DepositAddressInfo {
808        dbtx.ensure_isolated().expect("Must be isolated db");
809
810        let tweak_idx = get_next_peg_in_tweak_child_id(dbtx).await;
811        let (_secret_tweak_key, _, address, operation_id) =
812            self.data.derive_deposit_address(tweak_idx);
813
814        let now = fedimint_core::time::now();
815
816        dbtx.insert_new_entry(
817            &PegInTweakIndexKey(tweak_idx),
818            &PegInTweakIndexData {
819                creation_time: now,
820                next_check_time: Some(now),
821                last_check_time: None,
822                operation_id,
823                claimed: vec![],
824            },
825        )
826        .await;
827
828        DepositAddressInfo {
829            operation_id,
830            address,
831            tweak_idx,
832        }
833    }
834
835    /// Fetches the fees that would need to be paid to make the withdraw request
836    /// using [`Self::withdraw`] work *right now*.
837    ///
838    /// Note that we do not receive a guarantee that these fees will be valid in
839    /// the future, thus even the next second using these fees *may* fail.
840    /// The caller should be prepared to retry with a new fee estimate.
841    pub async fn get_withdraw_fees(
842        &self,
843        address: &bitcoin::Address,
844        amount: bitcoin::Amount,
845    ) -> anyhow::Result<PegOutFees> {
846        self.module_api
847            .fetch_peg_out_fees(address, amount)
848            .await?
849            .context("Federation didn't return peg-out fees")
850    }
851
852    /// Returns a summary of the wallet's coins
853    pub async fn get_wallet_summary(&self) -> anyhow::Result<WalletSummary> {
854        Ok(self.module_api.fetch_wallet_summary().await?)
855    }
856
857    pub async fn get_block_count_local(&self) -> anyhow::Result<u32> {
858        Ok(self.module_api.fetch_block_count_local().await?)
859    }
860
861    pub fn create_withdraw_output(
862        &self,
863        operation_id: OperationId,
864        address: bitcoin::Address,
865        amount: bitcoin::Amount,
866        fees: PegOutFees,
867    ) -> anyhow::Result<ClientOutputBundle<WalletOutput, WalletClientStates>> {
868        let output = WalletOutput::new_v0_peg_out(address, amount, fees);
869
870        let amount = output.maybe_v0_ref().expect("v0 output").amount().into();
871
872        let sm_gen = move |out_point_range: OutPointRange| {
873            assert_eq!(out_point_range.count(), 1);
874            let out_idx = out_point_range.start_idx();
875            vec![WalletClientStates::Withdraw(WithdrawStateMachine {
876                operation_id,
877                state: WithdrawStates::Created(CreatedWithdrawState {
878                    fm_outpoint: OutPoint {
879                        txid: out_point_range.txid(),
880                        out_idx,
881                    },
882                }),
883            })]
884        };
885
886        Ok(ClientOutputBundle::new(
887            vec![ClientOutput::<WalletOutput> {
888                output,
889                amounts: Amounts::new_bitcoin(amount),
890            }],
891            vec![ClientOutputSM::<WalletClientStates> {
892                state_machines: Arc::new(sm_gen),
893            }],
894        ))
895    }
896
897    pub async fn peg_in(&self, req: PegInRequest) -> anyhow::Result<PegInResponse> {
898        let deposit_address = self.safe_allocate_deposit_address(req.extra_meta).await?;
899
900        Ok(PegInResponse {
901            deposit_address: Address::from_script(
902                &deposit_address.address.script_pubkey(),
903                self.get_network(),
904            )?
905            .as_unchecked()
906            .clone(),
907            operation_id: deposit_address.operation_id,
908        })
909    }
910
911    pub async fn peg_out(&self, req: PegOutRequest) -> anyhow::Result<PegOutResponse> {
912        let amount = bitcoin::Amount::from_sat(req.amount_sat);
913        let destination = req
914            .destination_address
915            .require_network(self.get_network())?;
916
917        let fees = self.get_withdraw_fees(&destination, amount).await?;
918        let operation_id = self
919            .withdraw(&destination, amount, fees, req.extra_meta)
920            .await
921            .context("Failed to initiate withdraw")?;
922
923        Ok(PegOutResponse { operation_id })
924    }
925
926    pub fn create_rbf_withdraw_output(
927        &self,
928        operation_id: OperationId,
929        rbf: &Rbf,
930    ) -> anyhow::Result<ClientOutputBundle<WalletOutput, WalletClientStates>> {
931        let output = WalletOutput::new_v0_rbf(rbf.fees, rbf.txid);
932
933        let amount = output.maybe_v0_ref().expect("v0 output").amount().into();
934
935        let sm_gen = move |out_point_range: OutPointRange| {
936            assert_eq!(out_point_range.count(), 1);
937            let out_idx = out_point_range.start_idx();
938            vec![WalletClientStates::Withdraw(WithdrawStateMachine {
939                operation_id,
940                state: WithdrawStates::Created(CreatedWithdrawState {
941                    fm_outpoint: OutPoint {
942                        txid: out_point_range.txid(),
943                        out_idx,
944                    },
945                }),
946            })]
947        };
948
949        Ok(ClientOutputBundle::new(
950            vec![ClientOutput::<WalletOutput> {
951                output,
952                amounts: Amounts::new_bitcoin(amount),
953            }],
954            vec![ClientOutputSM::<WalletClientStates> {
955                state_machines: Arc::new(sm_gen),
956            }],
957        ))
958    }
959
960    pub async fn btc_tx_has_no_size_limit(&self) -> FederationResult<bool> {
961        Ok(self.module_api.module_consensus_version().await? >= ModuleConsensusVersion::new(2, 2))
962    }
963
964    /// Returns true if the federation's wallet module consensus version
965    /// supports processing all deposits.
966    ///
967    /// This method is safe to call offline, since it first attempts to read a
968    /// key from the db that represents the client has previously been able to
969    /// verify the wallet module consensus version. If the client has not
970    /// verified the version, it must be online to fetch the latest wallet
971    /// module consensus version.
972    pub async fn supports_safe_deposit(&self) -> bool {
973        let mut dbtx = self.db.begin_transaction().await;
974
975        let already_verified_supports_safe_deposit =
976            dbtx.get_value(&SupportsSafeDepositKey).await.is_some();
977
978        already_verified_supports_safe_deposit || {
979            match self.module_api.module_consensus_version().await {
980                Ok(module_consensus_version) => {
981                    let supported_version =
982                        SAFE_DEPOSIT_MODULE_CONSENSUS_VERSION <= module_consensus_version;
983
984                    if supported_version {
985                        dbtx.insert_new_entry(&SupportsSafeDepositKey, &()).await;
986                        dbtx.commit_tx().await;
987                    }
988
989                    supported_version
990                }
991                Err(_) => false,
992            }
993        }
994    }
995
996    /// Allocates a deposit address controlled by the federation, guaranteeing
997    /// safe handling of all deposits, including on-chain transactions exceeding
998    /// `ALEPH_BFT_UNIT_BYTE_LIMIT`.
999    ///
1000    /// Returns an error if the client has never been online to verify the
1001    /// federation's wallet module consensus version supports processing all
1002    /// deposits.
1003    pub async fn safe_allocate_deposit_address<M>(
1004        &self,
1005        extra_meta: M,
1006    ) -> anyhow::Result<DepositAddressInfo>
1007    where
1008        M: Serialize + MaybeSend + MaybeSync,
1009    {
1010        ensure!(
1011            self.supports_safe_deposit().await,
1012            "Wallet module consensus version doesn't support safe deposits",
1013        );
1014
1015        self.allocate_deposit_address_expert_only(extra_meta).await
1016    }
1017
1018    /// Allocates a deposit address that is controlled by the federation.
1019    ///
1020    /// This is an EXPERT ONLY method intended for power users such as Lightning
1021    /// gateways allocating liquidity, and we discourage exposing peg-in
1022    /// functionality to everyday users of a Fedimint wallet due to the
1023    /// following two limitations:
1024    ///
1025    /// The transaction sending to this address needs to be smaller than 40KB in
1026    /// order for the peg-in to be claimable. If the transaction is too large,
1027    /// funds will be lost.
1028    ///
1029    /// In the future, federations will also enforce a minimum peg-in amount to
1030    /// prevent accumulation of dust UTXOs. Peg-ins under this minimum cannot be
1031    /// claimed and funds will be lost.
1032    ///
1033    /// Everyday users should rely on Lightning to move funds into the
1034    /// federation.
1035    pub async fn allocate_deposit_address_expert_only<M>(
1036        &self,
1037        extra_meta: M,
1038    ) -> anyhow::Result<DepositAddressInfo>
1039    where
1040        M: Serialize + MaybeSend + MaybeSync,
1041    {
1042        let extra_meta_value =
1043            serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
1044        let deposit_address = self
1045            .db
1046            .autocommit(
1047                move |dbtx, _| {
1048                    let extra_meta_value_inner = extra_meta_value.clone();
1049                    Box::pin(async move {
1050                        let deposit_address = self.allocate_deposit_address_inner(dbtx).await;
1051
1052                        self.client_ctx
1053                            .manual_operation_start_dbtx(
1054                                dbtx,
1055                                deposit_address.operation_id,
1056                                WalletCommonInit::KIND.as_str(),
1057                                WalletOperationMeta {
1058                                    variant: WalletOperationMetaVariant::Deposit {
1059                                        address: deposit_address.address.clone().into_unchecked(),
1060                                        tweak_idx: Some(deposit_address.tweak_idx),
1061                                        expires_at: None,
1062                                    },
1063                                    extra_meta: extra_meta_value_inner,
1064                                },
1065                                vec![],
1066                            )
1067                            .await?;
1068
1069                        debug!(
1070                            target: LOG_CLIENT_MODULE_WALLET,
1071                            tweak_idx = %deposit_address.tweak_idx,
1072                            address = %deposit_address.address,
1073                            "Derived a new deposit address"
1074                        );
1075
1076                        // Begin watching the script address
1077                        self.rpc
1078                            .watch_script_history(&deposit_address.address.script_pubkey())
1079                            .await?;
1080
1081                        let sender = self.pegin_monitor_wakeup_sender.clone();
1082                        dbtx.on_commit(move || {
1083                            sender.send_replace(());
1084                        });
1085
1086                        Ok(deposit_address)
1087                    })
1088                },
1089                Some(100),
1090            )
1091            .await
1092            .map_err(|e| match e {
1093                AutocommitError::CommitFailed {
1094                    last_error,
1095                    attempts,
1096                } => anyhow!("Failed to commit after {attempts} attempts: {last_error}"),
1097                AutocommitError::ClosureError { error, .. } => error,
1098            })?;
1099
1100        Ok(deposit_address)
1101    }
1102
1103    /// Allocate a deposit address, bounding the gap of consecutive unused
1104    /// addresses past the last-used one, without selecting a reusable address.
1105    ///
1106    /// "Unused" here means `PegInTweakIndexData::claimed.is_empty()` — the
1107    /// `pegin_monitor` has not yet successfully claimed any deposit on this
1108    /// address. There is a small race window where a deposit has been
1109    /// observed in the mempool / awaiting confirmations but not yet claimed,
1110    /// in which case the address still looks unused. Reusing such an address
1111    /// is benign in practice: `pegin_monitor` claims any number of deposits
1112    /// per address.
1113    ///
1114    /// Let `gap` be the count of unused tweak indices strictly greater than
1115    /// the highest used index (or the count of all allocated tweaks if none
1116    /// have been used yet). Semantics:
1117    ///   - If `gap >= max_gap_size` and there is at least one unused address,
1118    ///     returns all those addresses with
1119    ///     [`MaybeNewAddress::TooManyUnusedAddresses`].
1120    ///   - Otherwise, allocates a new address (equivalent to
1121    ///     [`Self::allocate_deposit_address_expert_only`]) and returns it with
1122    ///     [`MaybeNewAddress::NewAddress`].
1123    ///
1124    /// `max_gap_size == 0` therefore means "only allocate fresh when the
1125    /// previous address was already used". The very first call still
1126    /// allocates fresh because there are no addresses to reuse.
1127    ///
1128    /// `max_gap_size == usize::MAX` makes this method behave like
1129    /// [`Self::allocate_deposit_address_expert_only`] — always allocating a new
1130    /// address.
1131    ///
1132    /// Reusable addresses are ordered by `creation_time` ascending. This lets
1133    /// callers that need a custom selection strategy choose from all available
1134    /// candidates.
1135    ///
1136    /// Caveats inherited from
1137    /// [`Self::allocate_deposit_address_expert_only`] (40KB tx limit, future
1138    /// minimum peg-in amount) apply equally to newly allocated and reused
1139    /// addresses.
1140    pub async fn allocate_deposit_address_pooled_stateless(
1141        &self,
1142        max_gap_size: usize,
1143    ) -> anyhow::Result<MaybeNewAddress> {
1144        let max_gap_size_u64 = u64::try_from(max_gap_size).unwrap_or(u64::MAX);
1145        let extra_meta_value = serde_json::Value::Null;
1146        let result = self
1147            .db
1148            .autocommit(
1149                move |dbtx, _| {
1150                    let extra_meta_value_inner = extra_meta_value.clone();
1151                    Box::pin(async move {
1152                        let unused = self.unused_pooled_deposit_addresses(dbtx).await;
1153
1154                        if max_gap_size_u64 <= unused.len() as u64 && !unused.is_empty() {
1155                            let addresses = unused
1156                                .into_iter()
1157                                .map(|(tweak_idx, data)| {
1158                                    let (_script, address, _key, operation_id) =
1159                                        self.data.derive_peg_in_script(tweak_idx);
1160
1161                                    debug_assert_eq!(operation_id, data.operation_id);
1162
1163                                    DepositAddressInfo {
1164                                        operation_id,
1165                                        address,
1166                                        tweak_idx,
1167                                    }
1168                                })
1169                                .collect();
1170
1171                            return Ok::<_, anyhow::Error>(
1172                                MaybeNewAddress::TooManyUnusedAddresses(addresses),
1173                            );
1174                        }
1175
1176                        let deposit_address = self.allocate_deposit_address_inner(dbtx).await;
1177
1178                        self.client_ctx
1179                            .manual_operation_start_dbtx(
1180                                dbtx,
1181                                deposit_address.operation_id,
1182                                WalletCommonInit::KIND.as_str(),
1183                                WalletOperationMeta {
1184                                    variant: WalletOperationMetaVariant::Deposit {
1185                                        address: deposit_address.address.clone().into_unchecked(),
1186                                        tweak_idx: Some(deposit_address.tweak_idx),
1187                                        expires_at: None,
1188                                    },
1189                                    extra_meta: extra_meta_value_inner,
1190                                },
1191                                vec![],
1192                            )
1193                            .await?;
1194
1195                        debug!(
1196                            target: LOG_CLIENT_MODULE_WALLET,
1197                            tweak_idx = %deposit_address.tweak_idx,
1198                            address = %deposit_address.address,
1199                            "Derived a new pooled deposit address"
1200                        );
1201
1202                        self.rpc
1203                            .watch_script_history(&deposit_address.address.script_pubkey())
1204                            .await?;
1205
1206                        let sender = self.pegin_monitor_wakeup_sender.clone();
1207                        dbtx.on_commit(move || {
1208                            sender.send_replace(());
1209                        });
1210
1211                        Ok(MaybeNewAddress::NewAddress(deposit_address))
1212                    })
1213                },
1214                Some(100),
1215            )
1216            .await
1217            .map_err(|e| match e {
1218                AutocommitError::CommitFailed {
1219                    last_error,
1220                    attempts,
1221                } => anyhow!("Failed to commit after {attempts} attempts: {last_error}"),
1222                AutocommitError::ClosureError { error, .. } => error,
1223            })?;
1224
1225        Ok(result)
1226    }
1227
1228    async fn unused_pooled_deposit_addresses(
1229        &self,
1230        dbtx: &mut DatabaseTransaction<'_>,
1231    ) -> Vec<(TweakIdx, PegInTweakIndexData)> {
1232        // Walk peg-in tweaks in descending order, taking unused ones
1233        // (`claimed.is_empty()`) until we hit a used one. That gives us exactly
1234        // the trailing gap — `gap + 1` reads regardless of total tweak count.
1235        // No RPC and no separately-maintained `last_used` index needed.
1236        let mut unused: Vec<(TweakIdx, PegInTweakIndexData)> = dbtx
1237            .find_by_prefix_sorted_descending(&PegInTweakIndexPrefix)
1238            .await
1239            .take_while(|(_, d)| std::future::ready(d.claimed.is_empty()))
1240            .map(|(k, v)| (k.0, v))
1241            .collect()
1242            .await;
1243
1244        // Order by `creation_time` ascending for caller selection and stable
1245        // wraparound after stateful reuse mutates `creation_time`.
1246        unused.sort_by_key(|(t, d)| (d.creation_time, *t));
1247        unused
1248    }
1249
1250    /// Allocate a deposit address, bounding the gap of consecutive unused
1251    /// addresses past the last-used one.
1252    ///
1253    /// This is a stateful wrapper over
1254    /// [`Self::allocate_deposit_address_pooled_stateless`]. If the stateless
1255    /// method returns [`MaybeNewAddress::TooManyUnusedAddresses`], this method
1256    /// picks one of those addresses with a persisted round-robin cursor and
1257    /// returns it with [`AllocateDepositOutcome::Reused`].
1258    ///
1259    /// Selection policy when reusing: round-robin over unused addresses
1260    /// ordered by `creation_time` ascending, with the cursor persisted
1261    /// per-client in the wallet module's DB. The cursor stores the next
1262    /// tweak index to consider; after picking idx X it advances to X+1, and
1263    /// when no candidate has `tweak_idx >= cursor` it wraps to the
1264    /// oldest-by-`creation_time` candidate. This gives the caller the
1265    /// predictable "cycle through the unused addresses" behavior.
1266    ///
1267    /// On reuse the `creation_time` and check-schedule fields of the
1268    /// underlying tweak entry are reset to "now", so `pegin_monitor`'s
1269    /// age-proportional polling delay restarts from zero. Without this an
1270    /// old entry would keep its long stale next-check delay and a user
1271    /// sending to it could wait a long time before the client noticed.
1272    #[allow(clippy::too_many_lines)]
1273    pub async fn allocate_deposit_address_pooled(
1274        &self,
1275        max_gap_size: usize,
1276    ) -> anyhow::Result<(DepositAddressInfo, AllocateDepositOutcome)> {
1277        let stateless = self
1278            .allocate_deposit_address_pooled_stateless(max_gap_size)
1279            .await?;
1280
1281        let reused_addresses = match stateless {
1282            MaybeNewAddress::NewAddress(deposit_address) => {
1283                return Ok((deposit_address, AllocateDepositOutcome::Fresh));
1284            }
1285            MaybeNewAddress::TooManyUnusedAddresses(addresses) => addresses,
1286        };
1287
1288        let result = self
1289            .db
1290            .autocommit(
1291                move |dbtx, _| {
1292                    let reused_addresses = reused_addresses.clone();
1293                    Box::pin(async move {
1294                        let cursor = dbtx
1295                            .get_value(&PegInPoolCursorKey)
1296                            .await
1297                            .unwrap_or(TweakIdx(0));
1298
1299                        let pick_pos = reused_addresses
1300                            .iter()
1301                            .position(|a| cursor <= a.tweak_idx)
1302                            .unwrap_or(0);
1303                        let reused_address = reused_addresses[pick_pos].clone();
1304
1305                        let existing_tweak_idx = reused_address.tweak_idx;
1306                        let existing = dbtx
1307                            .get_value(&PegInTweakIndexKey(reused_address.tweak_idx))
1308                            .await
1309                            .with_context(|| {
1310                                format!(
1311                                    "Pooled address disappeared while reusing {}",
1312                                    reused_address.tweak_idx
1313                                )
1314                            })?;
1315
1316                        ensure!(
1317                            existing.claimed.is_empty(),
1318                            "Pooled address was used while reusing {}",
1319                            reused_address.tweak_idx
1320                        );
1321
1322                        dbtx.insert_entry(&PegInPoolCursorKey, &reused_address.tweak_idx.next())
1323                            .await;
1324
1325                        // Reset the monitoring schedule so the reused address
1326                        // gets checked as aggressively as a freshly-allocated
1327                        // one. Without this, an old entry would retain its
1328                        // `age/10` next-check delay from `pegin_monitor`,
1329                        // meaning a user who sends to a long-idle pool address
1330                        // could wait hours before the client polls for the
1331                        // deposit.
1332                        let now = fedimint_core::time::now();
1333                        dbtx.insert_entry(
1334                            &PegInTweakIndexKey(reused_address.tweak_idx),
1335                            &PegInTweakIndexData {
1336                                creation_time: now,
1337                                last_check_time: None,
1338                                next_check_time: Some(now),
1339                                operation_id: existing.operation_id,
1340                                claimed: existing.claimed,
1341                            },
1342                        )
1343                        .await;
1344
1345                        let sender = self.pegin_monitor_wakeup_sender.clone();
1346                        dbtx.on_commit(move || {
1347                            sender.send_replace(());
1348                        });
1349
1350                        Ok::<_, anyhow::Error>((
1351                            reused_address,
1352                            AllocateDepositOutcome::Reused {
1353                                original_tweak_idx: existing_tweak_idx,
1354                            },
1355                        ))
1356                    })
1357                },
1358                Some(100),
1359            )
1360            .await
1361            .map_err(|e| match e {
1362                AutocommitError::CommitFailed {
1363                    last_error,
1364                    attempts,
1365                } => anyhow!("Failed to commit after {attempts} attempts: {last_error}"),
1366                AutocommitError::ClosureError { error, .. } => error,
1367            })?;
1368
1369        Ok(result)
1370    }
1371
1372    /// Returns a stream of updates about an ongoing deposit operation created
1373    /// with [`WalletClientModule::allocate_deposit_address_expert_only`].
1374    /// Returns an error for old deposit operations created prior to the 0.4
1375    /// release and not driven to completion yet. This should be rare enough
1376    /// that an indeterminate state is ok here.
1377    pub async fn subscribe_deposit(
1378        &self,
1379        operation_id: OperationId,
1380    ) -> anyhow::Result<UpdateStreamOrOutcome<DepositStateV2>> {
1381        let operation = self
1382            .client_ctx
1383            .get_operation(operation_id)
1384            .await
1385            .with_context(|| anyhow!("Operation not found: {}", operation_id.fmt_short()))?;
1386
1387        if operation.operation_module_kind() != WalletCommonInit::KIND.as_str() {
1388            bail!("Operation is not a wallet operation");
1389        }
1390
1391        let operation_meta = operation.meta::<WalletOperationMeta>();
1392
1393        let WalletOperationMetaVariant::Deposit {
1394            address, tweak_idx, ..
1395        } = operation_meta.variant
1396        else {
1397            bail!("Operation is not a deposit operation");
1398        };
1399
1400        let address = address.require_network(self.cfg().network.0)?;
1401
1402        // The old deposit operations don't have tweak_idx set
1403        let Some(tweak_idx) = tweak_idx else {
1404            // In case we are dealing with an old deposit that still uses state machines we
1405            // don't have the logic here anymore to subscribe to updates. We can still read
1406            // the final state though if it reached any.
1407            let outcome_v1 = operation
1408                .outcome::<DepositStateV1>()
1409                .context("Old pending deposit, can't subscribe to updates")?;
1410
1411            let outcome_v2 = match outcome_v1 {
1412                DepositStateV1::Claimed(tx_info) => DepositStateV2::Claimed {
1413                    btc_deposited: tx_info.btc_transaction.output[tx_info.out_idx as usize].value,
1414                    btc_out_point: bitcoin::OutPoint {
1415                        txid: tx_info.btc_transaction.compute_txid(),
1416                        vout: tx_info.out_idx,
1417                    },
1418                },
1419                DepositStateV1::Failed(error) => DepositStateV2::Failed(error),
1420                _ => bail!("Non-final outcome in operation log"),
1421            };
1422
1423            return Ok(UpdateStreamOrOutcome::Outcome(outcome_v2));
1424        };
1425
1426        Ok(self.client_ctx.outcome_or_updates(operation, operation_id, {
1427            let stream_rpc = self.rpc.clone();
1428            let stream_client_ctx = self.client_ctx.clone();
1429            let stream_script_pub_key = address.script_pubkey();
1430            move || {
1431
1432            stream! {
1433                yield DepositStateV2::WaitingForTransaction;
1434
1435                retry(
1436                    "subscribe script history",
1437                    background_backoff(),
1438                    || stream_rpc.watch_script_history(&stream_script_pub_key)
1439                ).await.expect("Will never give up");
1440                let (btc_out_point, btc_deposited) = retry(
1441                    "fetch history",
1442                    background_backoff(),
1443                    || async {
1444                        let history = stream_rpc.get_script_history(&stream_script_pub_key).await?;
1445                        history.first().and_then(|tx| {
1446                            let (out_idx, amount) = tx.output
1447                                .iter()
1448                                .enumerate()
1449                                .find_map(|(idx, output)| (output.script_pubkey == stream_script_pub_key).then_some((idx, output.value)))?;
1450                            let txid = tx.compute_txid();
1451
1452                            Some((
1453                                bitcoin::OutPoint {
1454                                    txid,
1455                                    vout: out_idx as u32,
1456                                },
1457                                amount
1458                            ))
1459                        }).context("No deposit transaction found")
1460                    }
1461                ).await.expect("Will never give up");
1462
1463                yield DepositStateV2::WaitingForConfirmation {
1464                    btc_deposited,
1465                    btc_out_point
1466                };
1467
1468                let claim_data = stream_client_ctx.module_db().wait_key_exists(&ClaimedPegInKey {
1469                    peg_in_index: tweak_idx,
1470                    btc_out_point,
1471                }).await;
1472
1473                yield DepositStateV2::Confirmed {
1474                    btc_deposited,
1475                    btc_out_point
1476                };
1477
1478                match stream_client_ctx.await_primary_module_outputs(operation_id, claim_data.change).await {
1479                    Ok(()) => yield DepositStateV2::Claimed {
1480                        btc_deposited,
1481                        btc_out_point
1482                    },
1483                    Err(e) => yield DepositStateV2::Failed(e.to_string())
1484                }
1485            }
1486        }}))
1487    }
1488
1489    pub async fn list_peg_in_tweak_idxes(&self) -> BTreeMap<TweakIdx, PegInTweakIndexData> {
1490        self.client_ctx
1491            .module_db()
1492            .clone()
1493            .begin_transaction_nc()
1494            .await
1495            .find_by_prefix(&PegInTweakIndexPrefix)
1496            .await
1497            .map(|(key, data)| (key.0, data))
1498            .collect()
1499            .await
1500    }
1501
1502    pub async fn find_tweak_idx_by_address(
1503        &self,
1504        address: bitcoin::Address<NetworkUnchecked>,
1505    ) -> anyhow::Result<TweakIdx> {
1506        let data = self.data.clone();
1507        let Some((tweak_idx, _)) = self
1508            .db
1509            .begin_transaction_nc()
1510            .await
1511            .find_by_prefix(&PegInTweakIndexPrefix)
1512            .await
1513            .filter(|(k, _)| {
1514                let (_, derived_address, _tweak_key, _) = data.derive_peg_in_script(k.0);
1515                future::ready(derived_address.into_unchecked() == address)
1516            })
1517            .next()
1518            .await
1519        else {
1520            bail!("Address not found in the list of derived keys");
1521        };
1522
1523        Ok(tweak_idx.0)
1524    }
1525    pub async fn find_tweak_idx_by_operation_id(
1526        &self,
1527        operation_id: OperationId,
1528    ) -> anyhow::Result<TweakIdx> {
1529        Ok(self
1530            .client_ctx
1531            .module_db()
1532            .clone()
1533            .begin_transaction_nc()
1534            .await
1535            .find_by_prefix(&PegInTweakIndexPrefix)
1536            .await
1537            .filter(|(_k, v)| future::ready(v.operation_id == operation_id))
1538            .next()
1539            .await
1540            .ok_or_else(|| anyhow::format_err!("OperationId not found"))?
1541            .0
1542            .0)
1543    }
1544
1545    pub async fn get_pegin_tweak_idx(
1546        &self,
1547        tweak_idx: TweakIdx,
1548    ) -> anyhow::Result<PegInTweakIndexData> {
1549        self.client_ctx
1550            .module_db()
1551            .clone()
1552            .begin_transaction_nc()
1553            .await
1554            .get_value(&PegInTweakIndexKey(tweak_idx))
1555            .await
1556            .ok_or_else(|| anyhow::format_err!("TweakIdx not found"))
1557    }
1558
1559    pub async fn get_claimed_pegins(
1560        &self,
1561        dbtx: &mut DatabaseTransaction<'_>,
1562        tweak_idx: TweakIdx,
1563    ) -> Vec<(
1564        bitcoin::OutPoint,
1565        TransactionId,
1566        Vec<fedimint_core::OutPoint>,
1567    )> {
1568        let outpoints = dbtx
1569            .get_value(&PegInTweakIndexKey(tweak_idx))
1570            .await
1571            .map(|v| v.claimed)
1572            .unwrap_or_default();
1573
1574        let mut res = vec![];
1575
1576        for outpoint in outpoints {
1577            let claimed_peg_in_data = dbtx
1578                .get_value(&ClaimedPegInKey {
1579                    peg_in_index: tweak_idx,
1580                    btc_out_point: outpoint,
1581                })
1582                .await
1583                .expect("Must have a corresponding claim record");
1584            res.push((
1585                outpoint,
1586                claimed_peg_in_data.claim_txid,
1587                claimed_peg_in_data.change,
1588            ));
1589        }
1590
1591        res
1592    }
1593
1594    /// Like [`Self::recheck_pegin_address`] but by `operation_id`
1595    pub async fn recheck_pegin_address_by_op_id(
1596        &self,
1597        operation_id: OperationId,
1598    ) -> anyhow::Result<()> {
1599        let tweak_idx = self.find_tweak_idx_by_operation_id(operation_id).await?;
1600
1601        self.recheck_pegin_address(tweak_idx).await
1602    }
1603
1604    /// Schedule given address for immediate re-check for deposits
1605    pub async fn recheck_pegin_address_by_address(
1606        &self,
1607        address: bitcoin::Address<NetworkUnchecked>,
1608    ) -> anyhow::Result<()> {
1609        self.recheck_pegin_address(self.find_tweak_idx_by_address(address).await?)
1610            .await
1611    }
1612
1613    /// Schedule given address for immediate re-check for deposits
1614    pub async fn recheck_pegin_address(&self, tweak_idx: TweakIdx) -> anyhow::Result<()> {
1615        self.db
1616            .autocommit(
1617                |dbtx, _| {
1618                    Box::pin(async {
1619                        let db_key = PegInTweakIndexKey(tweak_idx);
1620                        let db_val = dbtx
1621                            .get_value(&db_key)
1622                            .await
1623                            .ok_or_else(|| anyhow::format_err!("DBKey not found"))?;
1624
1625                        dbtx.insert_entry(
1626                            &db_key,
1627                            &PegInTweakIndexData {
1628                                next_check_time: Some(fedimint_core::time::now()),
1629                                ..db_val
1630                            },
1631                        )
1632                        .await;
1633
1634                        let sender = self.pegin_monitor_wakeup_sender.clone();
1635                        dbtx.on_commit(move || {
1636                            sender.send_replace(());
1637                        });
1638
1639                        Ok::<_, anyhow::Error>(())
1640                    })
1641                },
1642                Some(100),
1643            )
1644            .await?;
1645
1646        Ok(())
1647    }
1648
1649    /// Await for num deposit by [`OperationId`]
1650    pub async fn await_num_deposits_by_operation_id(
1651        &self,
1652        operation_id: OperationId,
1653        num_deposits: usize,
1654    ) -> anyhow::Result<()> {
1655        let tweak_idx = self.find_tweak_idx_by_operation_id(operation_id).await?;
1656        self.await_num_deposits(tweak_idx, num_deposits).await
1657    }
1658
1659    pub async fn await_num_deposits_by_address(
1660        &self,
1661        address: bitcoin::Address<NetworkUnchecked>,
1662        num_deposits: usize,
1663    ) -> anyhow::Result<()> {
1664        self.await_num_deposits(self.find_tweak_idx_by_address(address).await?, num_deposits)
1665            .await
1666    }
1667
1668    #[instrument(target = LOG_CLIENT_MODULE_WALLET, skip_all, fields(tweak_idx=?tweak_idx, num_deposists=num_deposits))]
1669    pub async fn await_num_deposits(
1670        &self,
1671        tweak_idx: TweakIdx,
1672        num_deposits: usize,
1673    ) -> anyhow::Result<()> {
1674        let operation_id = self.get_pegin_tweak_idx(tweak_idx).await?.operation_id;
1675
1676        let mut receiver = self.pegin_claimed_receiver.clone();
1677        let mut backoff = backoff_util::aggressive_backoff();
1678
1679        loop {
1680            let pegins = self
1681                .get_claimed_pegins(
1682                    &mut self.client_ctx.module_db().begin_transaction_nc().await,
1683                    tweak_idx,
1684                )
1685                .await;
1686
1687            if pegins.len() < num_deposits {
1688                debug!(target: LOG_CLIENT_MODULE_WALLET, has=pegins.len(), "Not enough deposits");
1689                self.recheck_pegin_address(tweak_idx).await?;
1690                runtime::sleep(backoff.next().unwrap_or_default()).await;
1691                receiver.changed().await?;
1692                continue;
1693            }
1694
1695            debug!(target: LOG_CLIENT_MODULE_WALLET, has=pegins.len(), "Enough deposits detected");
1696
1697            for (_outpoint, transaction_id, change) in pegins {
1698                if transaction_id == TransactionId::from_byte_array([0; 32]) && change.is_empty() {
1699                    debug!(target: LOG_CLIENT_MODULE_WALLET, "Deposited amount was too low, skipping");
1700                    continue;
1701                }
1702
1703                debug!(target: LOG_CLIENT_MODULE_WALLET, out_points=?change, "Ensuring deposists claimed");
1704                let tx_subscriber = self.client_ctx.transaction_updates(operation_id).await;
1705
1706                if let Err(e) = tx_subscriber.await_tx_accepted(transaction_id).await {
1707                    bail!("{e}");
1708                }
1709
1710                debug!(target: LOG_CLIENT_MODULE_WALLET, out_points=?change, "Ensuring outputs claimed");
1711                self.client_ctx
1712                    .await_primary_module_outputs(operation_id, change)
1713                    .await
1714                    .expect("Cannot fail if tx was accepted and federation is honest");
1715            }
1716
1717            return Ok(());
1718        }
1719    }
1720
1721    /// Attempt to withdraw a given `amount` of Bitcoin to a destination
1722    /// `address`. The caller has to supply the fee rate to be used which can be
1723    /// fetched using [`Self::get_withdraw_fees`] and should be
1724    /// acknowledged by the user since it can be unexpectedly high.
1725    pub async fn withdraw<M: Serialize + MaybeSend + MaybeSync>(
1726        &self,
1727        address: &bitcoin::Address,
1728        amount: bitcoin::Amount,
1729        fee: PegOutFees,
1730        extra_meta: M,
1731    ) -> anyhow::Result<OperationId> {
1732        {
1733            let operation_id = OperationId(thread_rng().r#gen());
1734
1735            let withdraw_output =
1736                self.create_withdraw_output(operation_id, address.clone(), amount, fee)?;
1737            let tx_builder = TransactionBuilder::new()
1738                .with_outputs(self.client_ctx.make_client_outputs(withdraw_output));
1739
1740            let extra_meta =
1741                serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
1742            self.client_ctx
1743                .finalize_and_submit_transaction(
1744                    operation_id,
1745                    WalletCommonInit::KIND.as_str(),
1746                    {
1747                        let address = address.clone();
1748                        move |change_range: OutPointRange| WalletOperationMeta {
1749                            variant: WalletOperationMetaVariant::Withdraw {
1750                                address: address.clone().into_unchecked(),
1751                                amount,
1752                                fee,
1753                                change: change_range.into_iter().collect(),
1754                            },
1755                            extra_meta: extra_meta.clone(),
1756                        }
1757                    },
1758                    tx_builder,
1759                )
1760                .await?;
1761
1762            let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
1763
1764            self.client_ctx
1765                .log_event(
1766                    &mut dbtx,
1767                    SendPaymentEvent {
1768                        operation_id,
1769                        amount: amount + fee.amount(),
1770                        fee: fee.amount(),
1771                    },
1772                )
1773                .await;
1774
1775            dbtx.commit_tx().await;
1776
1777            Ok(operation_id)
1778        }
1779    }
1780
1781    /// Attempt to increase the fee of a onchain withdraw transaction using
1782    /// replace by fee (RBF).
1783    /// This can prevent transactions from getting stuck
1784    /// in the mempool
1785    #[deprecated(
1786        since = "0.4.0",
1787        note = "RBF withdrawals are rejected by the federation"
1788    )]
1789    pub async fn rbf_withdraw<M: Serialize + MaybeSync + MaybeSend>(
1790        &self,
1791        rbf: Rbf,
1792        extra_meta: M,
1793    ) -> anyhow::Result<OperationId> {
1794        let operation_id = OperationId(thread_rng().r#gen());
1795
1796        let withdraw_output = self.create_rbf_withdraw_output(operation_id, &rbf)?;
1797        let tx_builder = TransactionBuilder::new()
1798            .with_outputs(self.client_ctx.make_client_outputs(withdraw_output));
1799
1800        let extra_meta = serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
1801        self.client_ctx
1802            .finalize_and_submit_transaction(
1803                operation_id,
1804                WalletCommonInit::KIND.as_str(),
1805                move |change_range: OutPointRange| WalletOperationMeta {
1806                    variant: WalletOperationMetaVariant::RbfWithdraw {
1807                        rbf: rbf.clone(),
1808                        change: change_range.into_iter().collect(),
1809                    },
1810                    extra_meta: extra_meta.clone(),
1811                },
1812                tx_builder,
1813            )
1814            .await?;
1815
1816        Ok(operation_id)
1817    }
1818
1819    pub async fn subscribe_withdraw_updates(
1820        &self,
1821        operation_id: OperationId,
1822    ) -> anyhow::Result<UpdateStreamOrOutcome<WithdrawState>> {
1823        let operation = self
1824            .client_ctx
1825            .get_operation(operation_id)
1826            .await
1827            .with_context(|| anyhow!("Operation not found: {}", operation_id.fmt_short()))?;
1828
1829        if operation.operation_module_kind() != WalletCommonInit::KIND.as_str() {
1830            bail!("Operation is not a wallet operation");
1831        }
1832
1833        let operation_meta = operation.meta::<WalletOperationMeta>();
1834
1835        let (WalletOperationMetaVariant::Withdraw { change, .. }
1836        | WalletOperationMetaVariant::RbfWithdraw { change, .. }) = operation_meta.variant
1837        else {
1838            bail!("Operation is not a withdraw operation");
1839        };
1840
1841        let mut operation_stream = self.notifier.subscribe(operation_id).await;
1842        let client_ctx = self.client_ctx.clone();
1843
1844        Ok(self
1845            .client_ctx
1846            .outcome_or_updates(operation, operation_id, move || {
1847                stream! {
1848                    match next_withdraw_state(&mut operation_stream).await {
1849                        Some(WithdrawStates::Created(_)) => {
1850                            yield WithdrawState::Created;
1851                        },
1852                        Some(s) => {
1853                            panic!("Unexpected state {s:?}")
1854                        },
1855                        None => return,
1856                    }
1857
1858                    // TODO: get rid of awaiting change here, there has to be a better way to make tests deterministic
1859
1860                        // Swallowing potential errors since the transaction failing  is handled by
1861                        // output outcome fetching already
1862                        let _ = client_ctx
1863                            .await_primary_module_outputs(operation_id, change)
1864                            .await;
1865
1866
1867                    match next_withdraw_state(&mut operation_stream).await {
1868                        Some(WithdrawStates::Aborted(inner)) => {
1869                            yield WithdrawState::Failed(inner.error);
1870                        },
1871                        Some(WithdrawStates::Success(inner)) => {
1872                            yield WithdrawState::Succeeded(inner.txid);
1873                        },
1874                        Some(s) => {
1875                            panic!("Unexpected state {s:?}")
1876                        },
1877                        None => {},
1878                    }
1879                }
1880            }))
1881    }
1882
1883    fn admin_auth(&self) -> anyhow::Result<ApiAuth> {
1884        self.admin_auth
1885            .clone()
1886            .ok_or_else(|| anyhow::format_err!("Admin auth not set"))
1887    }
1888
1889    pub async fn activate_consensus_version_voting(&self) -> anyhow::Result<()> {
1890        self.module_api
1891            .activate_consensus_version_voting(self.admin_auth()?)
1892            .await?;
1893
1894        Ok(())
1895    }
1896}
1897
1898/// Polls the federation checking if the activated module consensus version
1899/// supports safe deposits, saving the result in the db once it does.
1900async fn poll_supports_safe_deposit_version(db: Database, module_api: DynModuleApi) {
1901    loop {
1902        let mut dbtx = db.begin_transaction().await;
1903
1904        if dbtx.get_value(&SupportsSafeDepositKey).await.is_some() {
1905            break;
1906        }
1907
1908        module_api.wait_for_initialized_connections().await;
1909
1910        if let Ok(module_consensus_version) = module_api.module_consensus_version().await
1911            && SAFE_DEPOSIT_MODULE_CONSENSUS_VERSION <= module_consensus_version
1912        {
1913            dbtx.insert_new_entry(&SupportsSafeDepositKey, &()).await;
1914            dbtx.commit_tx().await;
1915            break;
1916        }
1917
1918        drop(dbtx);
1919
1920        if is_running_in_test_env() {
1921            // Even in tests we don't want to spam the federation with requests about it
1922            sleep(Duration::from_secs(10)).await;
1923        } else {
1924            sleep(Duration::from_hours(1)).await;
1925        }
1926    }
1927}
1928
1929/// Returns the child index to derive the next peg-in tweak key from.
1930async fn get_next_peg_in_tweak_child_id(dbtx: &mut DatabaseTransaction<'_>) -> TweakIdx {
1931    let index = dbtx
1932        .get_value(&NextPegInTweakIndexKey)
1933        .await
1934        .unwrap_or_default();
1935    dbtx.insert_entry(&NextPegInTweakIndexKey, &(index.next()))
1936        .await;
1937    index
1938}
1939
1940#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
1941pub enum WalletClientStates {
1942    Deposit(DepositStateMachine),
1943    Withdraw(WithdrawStateMachine),
1944}
1945
1946impl IntoDynInstance for WalletClientStates {
1947    type DynType = DynState;
1948
1949    fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
1950        DynState::from_typed(instance_id, self)
1951    }
1952}
1953
1954impl State for WalletClientStates {
1955    type ModuleContext = WalletClientContext;
1956
1957    fn transitions(
1958        &self,
1959        context: &Self::ModuleContext,
1960        global_context: &DynGlobalClientContext,
1961    ) -> Vec<StateTransition<Self>> {
1962        match self {
1963            WalletClientStates::Deposit(sm) => {
1964                sm_enum_variant_translation!(
1965                    sm.transitions(context, global_context),
1966                    WalletClientStates::Deposit
1967                )
1968            }
1969            WalletClientStates::Withdraw(sm) => {
1970                sm_enum_variant_translation!(
1971                    sm.transitions(context, global_context),
1972                    WalletClientStates::Withdraw
1973                )
1974            }
1975        }
1976    }
1977
1978    fn operation_id(&self) -> OperationId {
1979        match self {
1980            WalletClientStates::Deposit(sm) => sm.operation_id(),
1981            WalletClientStates::Withdraw(sm) => sm.operation_id(),
1982        }
1983    }
1984}
1985
1986#[cfg(all(test, not(target_family = "wasm")))]
1987mod tests {
1988    use std::collections::BTreeSet;
1989    use std::sync::atomic::{AtomicBool, Ordering};
1990
1991    use super::*;
1992    use crate::backup::{
1993        RECOVER_NUM_IDX_ADD_TO_LAST_USED, RecoverScanOutcome, recover_scan_idxes_for_activity,
1994    };
1995
1996    #[allow(clippy::too_many_lines)] // shut-up clippy, it's a test
1997    #[tokio::test(flavor = "multi_thread")]
1998    async fn sanity_test_recover_inner() {
1999        {
2000            let last_checked = AtomicBool::new(false);
2001            let last_checked = &last_checked;
2002            assert_eq!(
2003                recover_scan_idxes_for_activity(
2004                    TweakIdx(0),
2005                    &BTreeSet::new(),
2006                    |cur_idx| async move {
2007                        Ok(match cur_idx {
2008                            TweakIdx(9) => {
2009                                last_checked.store(true, Ordering::SeqCst);
2010                                vec![]
2011                            }
2012                            TweakIdx(10) => panic!("Shouldn't happen"),
2013                            TweakIdx(11) => {
2014                                vec![0usize] /* just for type inference */
2015                            }
2016                            _ => vec![],
2017                        })
2018                    }
2019                )
2020                .await
2021                .unwrap(),
2022                RecoverScanOutcome {
2023                    last_used_idx: None,
2024                    new_start_idx: TweakIdx(RECOVER_NUM_IDX_ADD_TO_LAST_USED),
2025                    tweak_idxes_with_pegins: BTreeSet::from([])
2026                }
2027            );
2028            assert!(last_checked.load(Ordering::SeqCst));
2029        }
2030
2031        {
2032            let last_checked = AtomicBool::new(false);
2033            let last_checked = &last_checked;
2034            assert_eq!(
2035                recover_scan_idxes_for_activity(
2036                    TweakIdx(0),
2037                    &BTreeSet::from([TweakIdx(1), TweakIdx(2)]),
2038                    |cur_idx| async move {
2039                        Ok(match cur_idx {
2040                            TweakIdx(1) => panic!("Shouldn't happen: already used (1)"),
2041                            TweakIdx(2) => panic!("Shouldn't happen: already used (2)"),
2042                            TweakIdx(11) => {
2043                                last_checked.store(true, Ordering::SeqCst);
2044                                vec![]
2045                            }
2046                            TweakIdx(12) => panic!("Shouldn't happen"),
2047                            TweakIdx(13) => {
2048                                vec![0usize] /* just for type inference */
2049                            }
2050                            _ => vec![],
2051                        })
2052                    }
2053                )
2054                .await
2055                .unwrap(),
2056                RecoverScanOutcome {
2057                    last_used_idx: Some(TweakIdx(2)),
2058                    new_start_idx: TweakIdx(2 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
2059                    tweak_idxes_with_pegins: BTreeSet::from([])
2060                }
2061            );
2062            assert!(last_checked.load(Ordering::SeqCst));
2063        }
2064
2065        {
2066            let last_checked = AtomicBool::new(false);
2067            let last_checked = &last_checked;
2068            assert_eq!(
2069                recover_scan_idxes_for_activity(
2070                    TweakIdx(10),
2071                    &BTreeSet::new(),
2072                    |cur_idx| async move {
2073                        Ok(match cur_idx {
2074                            TweakIdx(10) => vec![()],
2075                            TweakIdx(19) => {
2076                                last_checked.store(true, Ordering::SeqCst);
2077                                vec![]
2078                            }
2079                            TweakIdx(20) => panic!("Shouldn't happen"),
2080                            _ => vec![],
2081                        })
2082                    }
2083                )
2084                .await
2085                .unwrap(),
2086                RecoverScanOutcome {
2087                    last_used_idx: Some(TweakIdx(10)),
2088                    new_start_idx: TweakIdx(10 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
2089                    tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(10)])
2090                }
2091            );
2092            assert!(last_checked.load(Ordering::SeqCst));
2093        }
2094
2095        assert_eq!(
2096            recover_scan_idxes_for_activity(TweakIdx(0), &BTreeSet::new(), |cur_idx| async move {
2097                Ok(match cur_idx {
2098                    TweakIdx(6 | 15) => vec![()],
2099                    _ => vec![],
2100                })
2101            })
2102            .await
2103            .unwrap(),
2104            RecoverScanOutcome {
2105                last_used_idx: Some(TweakIdx(15)),
2106                new_start_idx: TweakIdx(15 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
2107                tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(6), TweakIdx(15)])
2108            }
2109        );
2110        assert_eq!(
2111            recover_scan_idxes_for_activity(TweakIdx(10), &BTreeSet::new(), |cur_idx| async move {
2112                Ok(match cur_idx {
2113                    TweakIdx(8) => {
2114                        vec![()] /* for type inference only */
2115                    }
2116                    TweakIdx(9) => {
2117                        panic!("Shouldn't happen")
2118                    }
2119                    _ => vec![],
2120                })
2121            })
2122            .await
2123            .unwrap(),
2124            RecoverScanOutcome {
2125                last_used_idx: None,
2126                new_start_idx: TweakIdx(9 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
2127                tweak_idxes_with_pegins: BTreeSet::from([])
2128            }
2129        );
2130        assert_eq!(
2131            recover_scan_idxes_for_activity(TweakIdx(10), &BTreeSet::new(), |cur_idx| async move {
2132                Ok(match cur_idx {
2133                    TweakIdx(9) => panic!("Shouldn't happen"),
2134                    TweakIdx(15) => vec![()],
2135                    _ => vec![],
2136                })
2137            })
2138            .await
2139            .unwrap(),
2140            RecoverScanOutcome {
2141                last_used_idx: Some(TweakIdx(15)),
2142                new_start_idx: TweakIdx(15 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
2143                tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(15)])
2144            }
2145        );
2146    }
2147}