1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::missing_errors_doc)]
4#![allow(clippy::missing_panics_doc)]
5#![allow(clippy::module_name_repetitions)]
6#![allow(clippy::must_use_candidate)]
7
8pub mod api;
9#[cfg(feature = "cli")]
10mod cli;
11
12mod backup;
13
14pub mod client_db;
15mod deposit;
18pub mod events;
19mod pegin_monitor;
21mod withdraw;
22
23use std::collections::{BTreeMap, BTreeSet};
24use std::future;
25use std::sync::Arc;
26use std::time::{Duration, SystemTime};
27
28use anyhow::{Context as AnyhowContext, anyhow, bail, ensure};
29use async_stream::stream;
30use backup::WalletModuleBackup;
31use bitcoin::address::NetworkUnchecked;
32use bitcoin::secp256k1::{All, SECP256K1, Secp256k1};
33use bitcoin::{Address, Network, ScriptBuf};
34use client_db::{DbKeyPrefix, PegInTweakIndexKey, SupportsSafeDepositKey, TweakIdx};
35use fedimint_api_client::api::{DynModuleApi, FederationResult};
36use fedimint_bitcoind::{DynBitcoindRpc, create_bitcoind};
37use fedimint_client_module::module::init::{
38 ClientModuleInit, ClientModuleInitArgs, ClientModuleRecoverArgs,
39};
40use fedimint_client_module::module::{ClientContext, ClientModule, IClientModule, OutPointRange};
41use fedimint_client_module::oplog::UpdateStreamOrOutcome;
42use fedimint_client_module::sm::util::MapStateTransitions;
43use fedimint_client_module::sm::{Context, DynState, ModuleNotifier, State, StateTransition};
44use fedimint_client_module::transaction::{
45 ClientOutput, ClientOutputBundle, ClientOutputSM, TransactionBuilder,
46};
47use fedimint_client_module::{DynGlobalClientContext, sm_enum_variant_translation};
48use fedimint_core::core::{Decoder, IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId};
49use fedimint_core::db::{
50 AutocommitError, Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped,
51};
52use fedimint_core::encoding::{Decodable, Encodable};
53use fedimint_core::envs::{BitcoinRpcConfig, is_running_in_test_env};
54use fedimint_core::module::{
55 ApiAuth, ApiVersion, CommonModuleInit, ModuleCommon, ModuleConsensusVersion, ModuleInit,
56 MultiApiVersion,
57};
58use fedimint_core::task::{MaybeSend, MaybeSync, TaskGroup, sleep};
59use fedimint_core::util::backoff_util::background_backoff;
60use fedimint_core::util::{backoff_util, retry};
61use fedimint_core::{
62 Amount, OutPoint, TransactionId, apply, async_trait_maybe_send, push_db_pair_items, runtime,
63 secp256k1,
64};
65use fedimint_derive_secret::{ChildId, DerivableSecret};
66use fedimint_logging::LOG_CLIENT_MODULE_WALLET;
67use fedimint_wallet_common::config::{FeeConsensus, WalletClientConfig};
68use fedimint_wallet_common::tweakable::Tweakable;
69pub use fedimint_wallet_common::*;
70use futures::{Stream, StreamExt};
71use rand::{Rng, thread_rng};
72use secp256k1::Keypair;
73use serde::{Deserialize, Serialize};
74use strum::IntoEnumIterator;
75use tokio::sync::watch;
76use tracing::{debug, instrument};
77
78use crate::api::WalletFederationApi;
79use crate::backup::WalletRecovery;
80use crate::client_db::{
81 ClaimedPegInData, ClaimedPegInKey, ClaimedPegInPrefix, NextPegInTweakIndexKey,
82 PegInTweakIndexData, PegInTweakIndexPrefix, RecoveryFinalizedKey, SupportsSafeDepositPrefix,
83};
84use crate::deposit::DepositStateMachine;
85use crate::withdraw::{CreatedWithdrawState, WithdrawStateMachine, WithdrawStates};
86
87const WALLET_TWEAK_CHILD_ID: ChildId = ChildId(0);
88
89#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
90pub struct BitcoinTransactionData {
91 pub btc_transaction: bitcoin::Transaction,
94 pub out_idx: u32,
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
99pub enum DepositStateV1 {
100 WaitingForTransaction,
101 WaitingForConfirmation(BitcoinTransactionData),
102 Confirmed(BitcoinTransactionData),
103 Claimed(BitcoinTransactionData),
104 Failed(String),
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
108pub enum DepositStateV2 {
109 WaitingForTransaction,
110 WaitingForConfirmation {
111 #[serde(with = "bitcoin::amount::serde::as_sat")]
112 btc_deposited: bitcoin::Amount,
113 btc_out_point: bitcoin::OutPoint,
114 },
115 Confirmed {
116 #[serde(with = "bitcoin::amount::serde::as_sat")]
117 btc_deposited: bitcoin::Amount,
118 btc_out_point: bitcoin::OutPoint,
119 },
120 Claimed {
121 #[serde(with = "bitcoin::amount::serde::as_sat")]
122 btc_deposited: bitcoin::Amount,
123 btc_out_point: bitcoin::OutPoint,
124 },
125 Failed(String),
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
129pub enum WithdrawState {
130 Created,
131 Succeeded(bitcoin::Txid),
132 Failed(String),
133 }
137
138async fn next_withdraw_state<S>(stream: &mut S) -> Option<WithdrawStates>
139where
140 S: Stream<Item = WalletClientStates> + Unpin,
141{
142 loop {
143 if let WalletClientStates::Withdraw(ds) = stream.next().await? {
144 return Some(ds.state);
145 }
146 tokio::task::yield_now().await;
147 }
148}
149
150#[derive(Debug, Clone, Default)]
151pub struct WalletClientInit(pub Option<BitcoinRpcConfig>);
153
154impl WalletClientInit {
155 pub fn new(rpc: BitcoinRpcConfig) -> Self {
156 Self(Some(rpc))
157 }
158}
159
160impl ModuleInit for WalletClientInit {
161 type Common = WalletCommonInit;
162
163 async fn dump_database(
164 &self,
165 dbtx: &mut DatabaseTransaction<'_>,
166 prefix_names: Vec<String>,
167 ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
168 let mut wallet_client_items: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> =
169 BTreeMap::new();
170 let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
171 prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
172 });
173
174 for table in filtered_prefixes {
175 match table {
176 DbKeyPrefix::NextPegInTweakIndex => {
177 if let Some(index) = dbtx.get_value(&NextPegInTweakIndexKey).await {
178 wallet_client_items
179 .insert("NextPegInTweakIndex".to_string(), Box::new(index));
180 }
181 }
182 DbKeyPrefix::PegInTweakIndex => {
183 push_db_pair_items!(
184 dbtx,
185 PegInTweakIndexPrefix,
186 PegInTweakIndexKey,
187 PegInTweakIndexData,
188 wallet_client_items,
189 "Peg-In Tweak Index"
190 );
191 }
192 DbKeyPrefix::ClaimedPegIn => {
193 push_db_pair_items!(
194 dbtx,
195 ClaimedPegInPrefix,
196 ClaimedPegInKey,
197 ClaimedPegInData,
198 wallet_client_items,
199 "Claimed Peg-In"
200 );
201 }
202 DbKeyPrefix::RecoveryFinalized => {
203 if let Some(val) = dbtx.get_value(&RecoveryFinalizedKey).await {
204 wallet_client_items.insert("RecoveryFinalized".to_string(), Box::new(val));
205 }
206 }
207 DbKeyPrefix::SupportsSafeDeposit => {
208 push_db_pair_items!(
209 dbtx,
210 SupportsSafeDepositPrefix,
211 SupportsSafeDepositKey,
212 (),
213 wallet_client_items,
214 "Supports Safe Deposit"
215 );
216 }
217 DbKeyPrefix::RecoveryState
218 | DbKeyPrefix::ExternalReservedStart
219 | DbKeyPrefix::CoreInternalReservedStart
220 | DbKeyPrefix::CoreInternalReservedEnd => {}
221 }
222 }
223
224 Box::new(wallet_client_items.into_iter())
225 }
226}
227
228#[apply(async_trait_maybe_send!)]
229impl ClientModuleInit for WalletClientInit {
230 type Module = WalletClientModule;
231
232 fn supported_api_versions(&self) -> MultiApiVersion {
233 MultiApiVersion::try_from_iter([ApiVersion { major: 0, minor: 0 }])
234 .expect("no version conflicts")
235 }
236
237 async fn init(&self, args: &ClientModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
238 let data = WalletClientModuleData {
239 cfg: args.cfg().clone(),
240 module_root_secret: args.module_root_secret().clone(),
241 };
242
243 let rpc_config = self
244 .0
245 .clone()
246 .unwrap_or(WalletClientModule::get_rpc_config(args.cfg()));
247
248 let db = args.db().clone();
249
250 let btc_rpc = create_bitcoind(&rpc_config)?;
251 let module_api = args.module_api().clone();
252
253 let (pegin_claimed_sender, pegin_claimed_receiver) = watch::channel(());
254 let (pegin_monitor_wakeup_sender, pegin_monitor_wakeup_receiver) = watch::channel(());
255
256 Ok(WalletClientModule {
257 db,
258 data,
259 module_api,
260 notifier: args.notifier().clone(),
261 rpc: btc_rpc,
262 client_ctx: args.context(),
263 pegin_monitor_wakeup_sender,
264 pegin_monitor_wakeup_receiver,
265 pegin_claimed_receiver,
266 pegin_claimed_sender,
267 task_group: args.task_group().clone(),
268 admin_auth: args.admin_auth().cloned(),
269 })
270 }
271
272 async fn recover(
280 &self,
281 args: &ClientModuleRecoverArgs<Self>,
282 snapshot: Option<&<Self::Module as ClientModule>::Backup>,
283 ) -> anyhow::Result<()> {
284 args.recover_from_history::<WalletRecovery>(self, snapshot)
285 .await
286 }
287
288 fn used_db_prefixes(&self) -> Option<BTreeSet<u8>> {
289 Some(
290 DbKeyPrefix::iter()
291 .map(|p| p as u8)
292 .chain(
293 DbKeyPrefix::ExternalReservedStart as u8
294 ..=DbKeyPrefix::CoreInternalReservedEnd as u8,
295 )
296 .collect(),
297 )
298 }
299}
300
301#[derive(Debug, Clone, Serialize, Deserialize)]
302pub struct WalletOperationMeta {
303 pub variant: WalletOperationMetaVariant,
304 pub extra_meta: serde_json::Value,
305}
306
307#[derive(Debug, Clone, Serialize, Deserialize)]
308#[serde(rename_all = "snake_case")]
309pub enum WalletOperationMetaVariant {
310 Deposit {
311 address: Address<NetworkUnchecked>,
312 #[serde(default)]
317 tweak_idx: Option<TweakIdx>,
318 #[serde(default, skip_serializing_if = "Option::is_none")]
319 expires_at: Option<SystemTime>,
320 },
321 Withdraw {
322 address: Address<NetworkUnchecked>,
323 #[serde(with = "bitcoin::amount::serde::as_sat")]
324 amount: bitcoin::Amount,
325 fee: PegOutFees,
326 change: Vec<OutPoint>,
327 },
328
329 RbfWithdraw {
330 rbf: Rbf,
331 change: Vec<OutPoint>,
332 },
333}
334
335#[derive(Debug, Clone)]
337pub struct WalletClientModuleData {
338 cfg: WalletClientConfig,
339 module_root_secret: DerivableSecret,
340}
341
342impl WalletClientModuleData {
343 fn derive_deposit_address(
344 &self,
345 idx: TweakIdx,
346 ) -> (Keypair, secp256k1::PublicKey, Address, OperationId) {
347 let idx = ChildId(idx.0);
348
349 let secret_tweak_key = self
350 .module_root_secret
351 .child_key(WALLET_TWEAK_CHILD_ID)
352 .child_key(idx)
353 .to_secp_key(fedimint_core::secp256k1::SECP256K1);
354
355 let public_tweak_key = secret_tweak_key.public_key();
356
357 let address = self
358 .cfg
359 .peg_in_descriptor
360 .tweak(&public_tweak_key, bitcoin::secp256k1::SECP256K1)
361 .address(self.cfg.network.0)
362 .unwrap();
363
364 let operation_id = OperationId(public_tweak_key.x_only_public_key().0.serialize());
366
367 (secret_tweak_key, public_tweak_key, address, operation_id)
368 }
369
370 fn derive_peg_in_script(
371 &self,
372 idx: TweakIdx,
373 ) -> (ScriptBuf, bitcoin::Address, Keypair, OperationId) {
374 let (secret_tweak_key, _, address, operation_id) = self.derive_deposit_address(idx);
375
376 (
377 self.cfg
378 .peg_in_descriptor
379 .tweak(&secret_tweak_key.public_key(), SECP256K1)
380 .script_pubkey(),
381 address,
382 secret_tweak_key,
383 operation_id,
384 )
385 }
386}
387
388#[derive(Debug)]
389pub struct WalletClientModule {
390 data: WalletClientModuleData,
391 db: Database,
392 module_api: DynModuleApi,
393 notifier: ModuleNotifier<WalletClientStates>,
394 rpc: DynBitcoindRpc,
395 client_ctx: ClientContext<Self>,
396 pegin_monitor_wakeup_sender: watch::Sender<()>,
398 pegin_monitor_wakeup_receiver: watch::Receiver<()>,
399 pegin_claimed_sender: watch::Sender<()>,
401 pegin_claimed_receiver: watch::Receiver<()>,
402 task_group: TaskGroup,
403 admin_auth: Option<ApiAuth>,
404}
405
406#[apply(async_trait_maybe_send!)]
407impl ClientModule for WalletClientModule {
408 type Init = WalletClientInit;
409 type Common = WalletModuleTypes;
410 type Backup = WalletModuleBackup;
411 type ModuleStateMachineContext = WalletClientContext;
412 type States = WalletClientStates;
413
414 fn context(&self) -> Self::ModuleStateMachineContext {
415 WalletClientContext {
416 rpc: self.rpc.clone(),
417 wallet_descriptor: self.cfg().peg_in_descriptor.clone(),
418 wallet_decoder: self.decoder(),
419 secp: Secp256k1::default(),
420 client_ctx: self.client_ctx.clone(),
421 }
422 }
423
424 async fn start(&self) {
425 self.task_group.spawn_cancellable("peg-in monitor", {
426 let client_ctx = self.client_ctx.clone();
427 let db = self.db.clone();
428 let btc_rpc = self.rpc.clone();
429 let module_api = self.module_api.clone();
430 let data = self.data.clone();
431 let pegin_claimed_sender = self.pegin_claimed_sender.clone();
432 let pegin_monitor_wakeup_receiver = self.pegin_monitor_wakeup_receiver.clone();
433 pegin_monitor::run_peg_in_monitor(
434 client_ctx,
435 db,
436 btc_rpc,
437 module_api,
438 data,
439 pegin_claimed_sender,
440 pegin_monitor_wakeup_receiver,
441 )
442 });
443
444 self.task_group
445 .spawn_cancellable("supports-safe-deposit-version", {
446 let db = self.db.clone();
447 let module_api = self.module_api.clone();
448
449 poll_supports_safe_deposit_version(db, module_api)
450 });
451 }
452
453 fn supports_backup(&self) -> bool {
454 true
455 }
456
457 async fn backup(&self) -> anyhow::Result<backup::WalletModuleBackup> {
458 let session_count = self.client_ctx.global_api().session_count().await?;
460
461 let mut dbtx = self.db.begin_transaction_nc().await;
462 let next_pegin_tweak_idx = dbtx
463 .get_value(&NextPegInTweakIndexKey)
464 .await
465 .unwrap_or_default();
466 let claimed = dbtx
467 .find_by_prefix(&PegInTweakIndexPrefix)
468 .await
469 .filter_map(|(k, v)| async move {
470 if v.claimed.is_empty() {
471 None
472 } else {
473 Some(k.0)
474 }
475 })
476 .collect()
477 .await;
478 Ok(backup::WalletModuleBackup::new_v1(
479 session_count,
480 next_pegin_tweak_idx,
481 claimed,
482 ))
483 }
484
485 fn input_fee(
486 &self,
487 _amount: Amount,
488 _input: &<Self::Common as ModuleCommon>::Input,
489 ) -> Option<Amount> {
490 Some(self.cfg().fee_consensus.peg_in_abs)
491 }
492
493 fn output_fee(
494 &self,
495 _amount: Amount,
496 _output: &<Self::Common as ModuleCommon>::Output,
497 ) -> Option<Amount> {
498 Some(self.cfg().fee_consensus.peg_out_abs)
499 }
500
501 #[cfg(feature = "cli")]
502 async fn handle_cli_command(
503 &self,
504 args: &[std::ffi::OsString],
505 ) -> anyhow::Result<serde_json::Value> {
506 cli::handle_cli_command(self, args).await
507 }
508}
509
510#[derive(Debug, Clone)]
511pub struct WalletClientContext {
512 rpc: DynBitcoindRpc,
513 wallet_descriptor: PegInDescriptor,
514 wallet_decoder: Decoder,
515 secp: Secp256k1<All>,
516 pub client_ctx: ClientContext<WalletClientModule>,
517}
518
519impl Context for WalletClientContext {
520 const KIND: Option<ModuleKind> = Some(KIND);
521}
522
523impl WalletClientModule {
524 fn cfg(&self) -> &WalletClientConfig {
525 &self.data.cfg
526 }
527
528 fn get_rpc_config(cfg: &WalletClientConfig) -> BitcoinRpcConfig {
529 match BitcoinRpcConfig::get_defaults_from_env_vars() {
530 Ok(rpc_config) => {
531 if rpc_config.kind == "bitcoind" {
534 cfg.default_bitcoin_rpc.clone()
535 } else {
536 rpc_config
537 }
538 }
539 _ => cfg.default_bitcoin_rpc.clone(),
540 }
541 }
542
543 pub fn get_network(&self) -> Network {
544 self.cfg().network.0
545 }
546
547 pub fn get_fee_consensus(&self) -> FeeConsensus {
548 self.cfg().fee_consensus
549 }
550
551 async fn allocate_deposit_address_inner(
552 &self,
553 dbtx: &mut DatabaseTransaction<'_>,
554 ) -> (OperationId, Address, TweakIdx) {
555 dbtx.ensure_isolated().expect("Must be isolated db");
556
557 let tweak_idx = get_next_peg_in_tweak_child_id(dbtx).await;
558 let (_secret_tweak_key, _, address, operation_id) =
559 self.data.derive_deposit_address(tweak_idx);
560
561 let now = fedimint_core::time::now();
562
563 dbtx.insert_new_entry(
564 &PegInTweakIndexKey(tweak_idx),
565 &PegInTweakIndexData {
566 creation_time: now,
567 next_check_time: Some(now),
568 last_check_time: None,
569 operation_id,
570 claimed: vec![],
571 },
572 )
573 .await;
574
575 (operation_id, address, tweak_idx)
576 }
577
578 pub async fn get_withdraw_fees(
585 &self,
586 address: &bitcoin::Address,
587 amount: bitcoin::Amount,
588 ) -> anyhow::Result<PegOutFees> {
589 self.module_api
590 .fetch_peg_out_fees(address, amount)
591 .await?
592 .context("Federation didn't return peg-out fees")
593 }
594
595 pub async fn get_wallet_summary(&self) -> anyhow::Result<WalletSummary> {
597 Ok(self.module_api.fetch_wallet_summary().await?)
598 }
599
600 pub fn create_withdraw_output(
601 &self,
602 operation_id: OperationId,
603 address: bitcoin::Address,
604 amount: bitcoin::Amount,
605 fees: PegOutFees,
606 ) -> anyhow::Result<ClientOutputBundle<WalletOutput, WalletClientStates>> {
607 let output = WalletOutput::new_v0_peg_out(address, amount, fees);
608
609 let amount = output.maybe_v0_ref().expect("v0 output").amount().into();
610
611 let sm_gen = move |out_point_range: OutPointRange| {
612 assert_eq!(out_point_range.count(), 1);
613 let out_idx = out_point_range.start_idx();
614 vec![WalletClientStates::Withdraw(WithdrawStateMachine {
615 operation_id,
616 state: WithdrawStates::Created(CreatedWithdrawState {
617 fm_outpoint: OutPoint {
618 txid: out_point_range.txid(),
619 out_idx,
620 },
621 }),
622 })]
623 };
624
625 Ok(ClientOutputBundle::new(
626 vec![ClientOutput::<WalletOutput> { output, amount }],
627 vec![ClientOutputSM::<WalletClientStates> {
628 state_machines: Arc::new(sm_gen),
629 }],
630 ))
631 }
632
633 pub fn create_rbf_withdraw_output(
634 &self,
635 operation_id: OperationId,
636 rbf: &Rbf,
637 ) -> anyhow::Result<ClientOutputBundle<WalletOutput, WalletClientStates>> {
638 let output = WalletOutput::new_v0_rbf(rbf.fees, rbf.txid);
639
640 let amount = output.maybe_v0_ref().expect("v0 output").amount().into();
641
642 let sm_gen = move |out_point_range: OutPointRange| {
643 assert_eq!(out_point_range.count(), 1);
644 let out_idx = out_point_range.start_idx();
645 vec![WalletClientStates::Withdraw(WithdrawStateMachine {
646 operation_id,
647 state: WithdrawStates::Created(CreatedWithdrawState {
648 fm_outpoint: OutPoint {
649 txid: out_point_range.txid(),
650 out_idx,
651 },
652 }),
653 })]
654 };
655
656 Ok(ClientOutputBundle::new(
657 vec![ClientOutput::<WalletOutput> { output, amount }],
658 vec![ClientOutputSM::<WalletClientStates> {
659 state_machines: Arc::new(sm_gen),
660 }],
661 ))
662 }
663
664 pub async fn btc_tx_has_no_size_limit(&self) -> FederationResult<bool> {
665 Ok(self.module_api.module_consensus_version().await? >= ModuleConsensusVersion::new(2, 2))
666 }
667
668 pub async fn supports_safe_deposit(&self) -> bool {
677 let mut dbtx = self.db.begin_transaction().await;
678
679 let already_verified_supports_safe_deposit =
680 dbtx.get_value(&SupportsSafeDepositKey).await.is_some();
681
682 already_verified_supports_safe_deposit || {
683 match self.module_api.module_consensus_version().await {
684 Ok(module_consensus_version) => {
685 let supported_version =
686 SAFE_DEPOSIT_MODULE_CONSENSUS_VERSION <= module_consensus_version;
687
688 if supported_version {
689 dbtx.insert_new_entry(&SupportsSafeDepositKey, &()).await;
690 dbtx.commit_tx().await;
691 }
692
693 supported_version
694 }
695 Err(_) => false,
696 }
697 }
698 }
699
700 pub async fn safe_allocate_deposit_address<M>(
708 &self,
709 extra_meta: M,
710 ) -> anyhow::Result<(OperationId, Address, TweakIdx)>
711 where
712 M: Serialize + MaybeSend + MaybeSync,
713 {
714 ensure!(
715 self.supports_safe_deposit().await,
716 "Wallet module consensus version doesn't support safe deposits",
717 );
718
719 self.allocate_deposit_address_expert_only(extra_meta).await
720 }
721
722 pub async fn allocate_deposit_address_expert_only<M>(
740 &self,
741 extra_meta: M,
742 ) -> anyhow::Result<(OperationId, Address, TweakIdx)>
743 where
744 M: Serialize + MaybeSend + MaybeSync,
745 {
746 let extra_meta_value =
747 serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
748 let (operation_id, address, tweak_idx) = self
749 .db
750 .autocommit(
751 move |dbtx, _| {
752 let extra_meta_value_inner = extra_meta_value.clone();
753 Box::pin(async move {
754 let (operation_id, address, tweak_idx) = self
755 .allocate_deposit_address_inner(dbtx)
756 .await;
757
758 self.client_ctx.manual_operation_start_dbtx(
759 dbtx,
760 operation_id,
761 WalletCommonInit::KIND.as_str(),
762 WalletOperationMeta {
763 variant: WalletOperationMetaVariant::Deposit {
764 address: address.clone().into_unchecked(),
765 tweak_idx: Some(tweak_idx),
766 expires_at: None,
767 },
768 extra_meta: extra_meta_value_inner,
769 },
770 vec![]
771 ).await?;
772
773 debug!(target: LOG_CLIENT_MODULE_WALLET, %tweak_idx, %address, "Derived a new deposit address");
774
775 self.rpc
777 .watch_script_history(&address.script_pubkey())
778 .await?;
779
780 let sender = self.pegin_monitor_wakeup_sender.clone();
781 dbtx.on_commit(move || {
782 let _ = sender.send(());
783 });
784
785 Ok((operation_id, address, tweak_idx))
786 })
787 },
788 Some(100),
789 )
790 .await
791 .map_err(|e| match e {
792 AutocommitError::CommitFailed {
793 last_error,
794 attempts,
795 } => last_error.context(format!("Failed to commit after {attempts} attempts")),
796 AutocommitError::ClosureError { error, .. } => error,
797 })?;
798
799 Ok((operation_id, address, tweak_idx))
800 }
801
802 pub async fn subscribe_deposit(
808 &self,
809 operation_id: OperationId,
810 ) -> anyhow::Result<UpdateStreamOrOutcome<DepositStateV2>> {
811 let operation = self
812 .client_ctx
813 .get_operation(operation_id)
814 .await
815 .with_context(|| anyhow!("Operation not found: {}", operation_id.fmt_short()))?;
816
817 if operation.operation_module_kind() != WalletCommonInit::KIND.as_str() {
818 bail!("Operation is not a wallet operation");
819 }
820
821 let operation_meta = operation.meta::<WalletOperationMeta>();
822
823 let WalletOperationMetaVariant::Deposit {
824 address, tweak_idx, ..
825 } = operation_meta.variant
826 else {
827 bail!("Operation is not a deposit operation");
828 };
829
830 let address = address.require_network(self.cfg().network.0)?;
831
832 let Some(tweak_idx) = tweak_idx else {
834 let outcome_v1 = operation
838 .outcome::<DepositStateV1>()
839 .context("Old pending deposit, can't subscribe to updates")?;
840
841 let outcome_v2 = match outcome_v1 {
842 DepositStateV1::Claimed(tx_info) => DepositStateV2::Claimed {
843 btc_deposited: tx_info.btc_transaction.output[tx_info.out_idx as usize].value,
844 btc_out_point: bitcoin::OutPoint {
845 txid: tx_info.btc_transaction.compute_txid(),
846 vout: tx_info.out_idx,
847 },
848 },
849 DepositStateV1::Failed(error) => DepositStateV2::Failed(error),
850 _ => bail!("Non-final outcome in operation log"),
851 };
852
853 return Ok(UpdateStreamOrOutcome::Outcome(outcome_v2));
854 };
855
856 Ok(self.client_ctx.outcome_or_updates(operation, operation_id, {
857 let stream_rpc = self.rpc.clone();
858 let stream_client_ctx = self.client_ctx.clone();
859 let stream_script_pub_key = address.script_pubkey();
860 move || {
861
862 stream! {
863 yield DepositStateV2::WaitingForTransaction;
864
865 retry(
866 "subscribe script history",
867 background_backoff(),
868 || stream_rpc.watch_script_history(&stream_script_pub_key)
869 ).await.expect("Will never give up");
870 let (btc_out_point, btc_deposited) = retry(
871 "fetch history",
872 background_backoff(),
873 || async {
874 let history = stream_rpc.get_script_history(&stream_script_pub_key).await?;
875 history.first().and_then(|tx| {
876 let (out_idx, amount) = tx.output
877 .iter()
878 .enumerate()
879 .find_map(|(idx, output)| (output.script_pubkey == stream_script_pub_key).then_some((idx, output.value)))?;
880 let txid = tx.compute_txid();
881
882 Some((
883 bitcoin::OutPoint {
884 txid,
885 vout: out_idx as u32,
886 },
887 amount
888 ))
889 }).context("No deposit transaction found")
890 }
891 ).await.expect("Will never give up");
892
893 yield DepositStateV2::WaitingForConfirmation {
894 btc_deposited,
895 btc_out_point
896 };
897
898 let claim_data = stream_client_ctx.module_db().wait_key_exists(&ClaimedPegInKey {
899 peg_in_index: tweak_idx,
900 btc_out_point,
901 }).await;
902
903 yield DepositStateV2::Confirmed {
904 btc_deposited,
905 btc_out_point
906 };
907
908 match stream_client_ctx.await_primary_module_outputs(operation_id, claim_data.change).await {
909 Ok(()) => yield DepositStateV2::Claimed {
910 btc_deposited,
911 btc_out_point
912 },
913 Err(e) => yield DepositStateV2::Failed(e.to_string())
914 }
915 }
916 }}))
917 }
918
919 pub async fn find_tweak_idx_by_address(
920 &self,
921 address: bitcoin::Address<NetworkUnchecked>,
922 ) -> anyhow::Result<TweakIdx> {
923 let data = self.data.clone();
924 let Some((tweak_idx, _)) = self
925 .db
926 .begin_transaction_nc()
927 .await
928 .find_by_prefix(&PegInTweakIndexPrefix)
929 .await
930 .filter(|(k, _)| {
931 let (_, derived_address, _tweak_key, _) = data.derive_peg_in_script(k.0);
932 future::ready(derived_address.into_unchecked() == address)
933 })
934 .next()
935 .await
936 else {
937 bail!("Address not found in the list of derived keys");
938 };
939
940 Ok(tweak_idx.0)
941 }
942 pub async fn find_tweak_idx_by_operation_id(
943 &self,
944 operation_id: OperationId,
945 ) -> anyhow::Result<TweakIdx> {
946 Ok(self
947 .client_ctx
948 .module_db()
949 .clone()
950 .begin_transaction_nc()
951 .await
952 .find_by_prefix(&PegInTweakIndexPrefix)
953 .await
954 .filter(|(_k, v)| future::ready(v.operation_id == operation_id))
955 .next()
956 .await
957 .ok_or_else(|| anyhow::format_err!("OperationId not found"))?
958 .0
959 .0)
960 }
961
962 pub async fn get_pegin_tweak_idx(
963 &self,
964 tweak_idx: TweakIdx,
965 ) -> anyhow::Result<PegInTweakIndexData> {
966 self.client_ctx
967 .module_db()
968 .clone()
969 .begin_transaction_nc()
970 .await
971 .get_value(&PegInTweakIndexKey(tweak_idx))
972 .await
973 .ok_or_else(|| anyhow::format_err!("TweakIdx not found"))
974 }
975
976 pub async fn get_claimed_pegins(
977 &self,
978 dbtx: &mut DatabaseTransaction<'_>,
979 tweak_idx: TweakIdx,
980 ) -> Vec<(
981 bitcoin::OutPoint,
982 TransactionId,
983 Vec<fedimint_core::OutPoint>,
984 )> {
985 let outpoints = dbtx
986 .get_value(&PegInTweakIndexKey(tweak_idx))
987 .await
988 .map(|v| v.claimed)
989 .unwrap_or_default();
990
991 let mut res = vec![];
992
993 for outpoint in outpoints {
994 let claimed_peg_in_data = dbtx
995 .get_value(&ClaimedPegInKey {
996 peg_in_index: tweak_idx,
997 btc_out_point: outpoint,
998 })
999 .await
1000 .expect("Must have a corresponding claim record");
1001 res.push((
1002 outpoint,
1003 claimed_peg_in_data.claim_txid,
1004 claimed_peg_in_data.change,
1005 ));
1006 }
1007
1008 res
1009 }
1010
1011 pub async fn recheck_pegin_address_by_op_id(
1013 &self,
1014 operation_id: OperationId,
1015 ) -> anyhow::Result<()> {
1016 let tweak_idx = self.find_tweak_idx_by_operation_id(operation_id).await?;
1017
1018 self.recheck_pegin_address(tweak_idx).await
1019 }
1020
1021 pub async fn recheck_pegin_address_by_address(
1023 &self,
1024 address: bitcoin::Address<NetworkUnchecked>,
1025 ) -> anyhow::Result<()> {
1026 self.recheck_pegin_address(self.find_tweak_idx_by_address(address).await?)
1027 .await
1028 }
1029
1030 pub async fn recheck_pegin_address(&self, tweak_idx: TweakIdx) -> anyhow::Result<()> {
1032 self.db
1033 .autocommit(
1034 |dbtx, _| {
1035 Box::pin(async {
1036 let db_key = PegInTweakIndexKey(tweak_idx);
1037 let db_val = dbtx
1038 .get_value(&db_key)
1039 .await
1040 .ok_or_else(|| anyhow::format_err!("DBKey not found"))?;
1041
1042 dbtx.insert_entry(
1043 &db_key,
1044 &PegInTweakIndexData {
1045 next_check_time: Some(fedimint_core::time::now()),
1046 ..db_val
1047 },
1048 )
1049 .await;
1050
1051 let sender = self.pegin_monitor_wakeup_sender.clone();
1052 dbtx.on_commit(move || {
1053 let _ = sender.send(());
1054 });
1055
1056 Ok::<_, anyhow::Error>(())
1057 })
1058 },
1059 Some(100),
1060 )
1061 .await?;
1062
1063 Ok(())
1064 }
1065
1066 pub async fn await_num_deposits_by_operation_id(
1068 &self,
1069 operation_id: OperationId,
1070 num_deposits: usize,
1071 ) -> anyhow::Result<()> {
1072 let tweak_idx = self.find_tweak_idx_by_operation_id(operation_id).await?;
1073 self.await_num_deposits(tweak_idx, num_deposits).await
1074 }
1075
1076 pub async fn await_num_deposits_by_address(
1077 &self,
1078 address: bitcoin::Address<NetworkUnchecked>,
1079 num_deposits: usize,
1080 ) -> anyhow::Result<()> {
1081 self.await_num_deposits(self.find_tweak_idx_by_address(address).await?, num_deposits)
1082 .await
1083 }
1084
1085 #[instrument(target = LOG_CLIENT_MODULE_WALLET, skip_all, fields(tweak_idx=?tweak_idx, num_deposists=num_deposits))]
1086 pub async fn await_num_deposits(
1087 &self,
1088 tweak_idx: TweakIdx,
1089 num_deposits: usize,
1090 ) -> anyhow::Result<()> {
1091 let operation_id = self.get_pegin_tweak_idx(tweak_idx).await?.operation_id;
1092
1093 let mut receiver = self.pegin_claimed_receiver.clone();
1094 let mut backoff = backoff_util::aggressive_backoff();
1095
1096 loop {
1097 let pegins = self
1098 .get_claimed_pegins(
1099 &mut self.client_ctx.module_db().begin_transaction_nc().await,
1100 tweak_idx,
1101 )
1102 .await;
1103
1104 if pegins.len() < num_deposits {
1105 debug!(target: LOG_CLIENT_MODULE_WALLET, has=pegins.len(), "Not enough deposits");
1106 self.recheck_pegin_address(tweak_idx).await?;
1107 runtime::sleep(backoff.next().unwrap_or_default()).await;
1108 receiver.changed().await?;
1109 continue;
1110 }
1111
1112 debug!(target: LOG_CLIENT_MODULE_WALLET, has=pegins.len(), "Enough deposits detected");
1113
1114 for (_outpoint, transaction_id, change) in pegins {
1115 debug!(target: LOG_CLIENT_MODULE_WALLET, out_points=?change, "Ensuring deposists claimed");
1116 let tx_subscriber = self.client_ctx.transaction_updates(operation_id).await;
1117
1118 if let Err(e) = tx_subscriber.await_tx_accepted(transaction_id).await {
1119 bail!("{}", e);
1120 }
1121
1122 debug!(target: LOG_CLIENT_MODULE_WALLET, out_points=?change, "Ensuring outputs claimed");
1123 self.client_ctx
1124 .await_primary_module_outputs(operation_id, change)
1125 .await
1126 .expect("Cannot fail if tx was accepted and federation is honest");
1127 }
1128
1129 return Ok(());
1130 }
1131 }
1132
1133 pub async fn withdraw<M: Serialize + MaybeSend + MaybeSync>(
1138 &self,
1139 address: &bitcoin::Address,
1140 amount: bitcoin::Amount,
1141 fee: PegOutFees,
1142 extra_meta: M,
1143 ) -> anyhow::Result<OperationId> {
1144 {
1145 let operation_id = OperationId(thread_rng().r#gen());
1146
1147 let withdraw_output =
1148 self.create_withdraw_output(operation_id, address.clone(), amount, fee)?;
1149 let tx_builder = TransactionBuilder::new()
1150 .with_outputs(self.client_ctx.make_client_outputs(withdraw_output));
1151
1152 let extra_meta =
1153 serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
1154 self.client_ctx
1155 .finalize_and_submit_transaction(
1156 operation_id,
1157 WalletCommonInit::KIND.as_str(),
1158 {
1159 let address = address.clone();
1160 move |change_range: OutPointRange| WalletOperationMeta {
1161 variant: WalletOperationMetaVariant::Withdraw {
1162 address: address.clone().into_unchecked(),
1163 amount,
1164 fee,
1165 change: change_range.into_iter().collect(),
1166 },
1167 extra_meta: extra_meta.clone(),
1168 }
1169 },
1170 tx_builder,
1171 )
1172 .await?;
1173
1174 Ok(operation_id)
1175 }
1176 }
1177
1178 #[deprecated(
1183 since = "0.4.0",
1184 note = "RBF withdrawals are rejected by the federation"
1185 )]
1186 pub async fn rbf_withdraw<M: Serialize + MaybeSync + MaybeSend>(
1187 &self,
1188 rbf: Rbf,
1189 extra_meta: M,
1190 ) -> anyhow::Result<OperationId> {
1191 let operation_id = OperationId(thread_rng().r#gen());
1192
1193 let withdraw_output = self.create_rbf_withdraw_output(operation_id, &rbf)?;
1194 let tx_builder = TransactionBuilder::new()
1195 .with_outputs(self.client_ctx.make_client_outputs(withdraw_output));
1196
1197 let extra_meta = serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
1198 self.client_ctx
1199 .finalize_and_submit_transaction(
1200 operation_id,
1201 WalletCommonInit::KIND.as_str(),
1202 move |change_range: OutPointRange| WalletOperationMeta {
1203 variant: WalletOperationMetaVariant::RbfWithdraw {
1204 rbf: rbf.clone(),
1205 change: change_range.into_iter().collect(),
1206 },
1207 extra_meta: extra_meta.clone(),
1208 },
1209 tx_builder,
1210 )
1211 .await?;
1212
1213 Ok(operation_id)
1214 }
1215
1216 pub async fn subscribe_withdraw_updates(
1217 &self,
1218 operation_id: OperationId,
1219 ) -> anyhow::Result<UpdateStreamOrOutcome<WithdrawState>> {
1220 let operation = self
1221 .client_ctx
1222 .get_operation(operation_id)
1223 .await
1224 .with_context(|| anyhow!("Operation not found: {}", operation_id.fmt_short()))?;
1225
1226 if operation.operation_module_kind() != WalletCommonInit::KIND.as_str() {
1227 bail!("Operation is not a wallet operation");
1228 }
1229
1230 let operation_meta = operation.meta::<WalletOperationMeta>();
1231
1232 let (WalletOperationMetaVariant::Withdraw { change, .. }
1233 | WalletOperationMetaVariant::RbfWithdraw { change, .. }) = operation_meta.variant
1234 else {
1235 bail!("Operation is not a withdraw operation");
1236 };
1237
1238 let mut operation_stream = self.notifier.subscribe(operation_id).await;
1239 let client_ctx = self.client_ctx.clone();
1240
1241 Ok(self
1242 .client_ctx
1243 .outcome_or_updates(operation, operation_id, move || {
1244 stream! {
1245 match next_withdraw_state(&mut operation_stream).await {
1246 Some(WithdrawStates::Created(_)) => {
1247 yield WithdrawState::Created;
1248 },
1249 Some(s) => {
1250 panic!("Unexpected state {s:?}")
1251 },
1252 None => return,
1253 }
1254
1255 let _ = client_ctx
1260 .await_primary_module_outputs(operation_id, change)
1261 .await;
1262
1263
1264 match next_withdraw_state(&mut operation_stream).await {
1265 Some(WithdrawStates::Aborted(inner)) => {
1266 yield WithdrawState::Failed(inner.error);
1267 },
1268 Some(WithdrawStates::Success(inner)) => {
1269 yield WithdrawState::Succeeded(inner.txid);
1270 },
1271 Some(s) => {
1272 panic!("Unexpected state {s:?}")
1273 },
1274 None => {},
1275 }
1276 }
1277 }))
1278 }
1279
1280 fn admin_auth(&self) -> anyhow::Result<ApiAuth> {
1281 self.admin_auth
1282 .clone()
1283 .ok_or_else(|| anyhow::format_err!("Admin auth not set"))
1284 }
1285
1286 pub async fn activate_consensus_version_voting(&self) -> anyhow::Result<()> {
1287 self.module_api
1288 .activate_consensus_version_voting(self.admin_auth()?)
1289 .await?;
1290
1291 Ok(())
1292 }
1293}
1294
1295async fn poll_supports_safe_deposit_version(db: Database, module_api: DynModuleApi) {
1298 loop {
1299 let mut dbtx = db.begin_transaction().await;
1300
1301 if dbtx.get_value(&SupportsSafeDepositKey).await.is_some() {
1302 break;
1303 }
1304
1305 if let Ok(module_consensus_version) = module_api.module_consensus_version().await {
1306 if SAFE_DEPOSIT_MODULE_CONSENSUS_VERSION <= module_consensus_version {
1307 dbtx.insert_new_entry(&SupportsSafeDepositKey, &()).await;
1308 dbtx.commit_tx().await;
1309 break;
1310 }
1311 }
1312
1313 drop(dbtx);
1314
1315 if is_running_in_test_env() {
1316 sleep(Duration::from_secs(10)).await;
1318 } else {
1319 sleep(Duration::from_secs(3600)).await;
1320 }
1321 }
1322}
1323
1324async fn get_next_peg_in_tweak_child_id(dbtx: &mut DatabaseTransaction<'_>) -> TweakIdx {
1326 let index = dbtx
1327 .get_value(&NextPegInTweakIndexKey)
1328 .await
1329 .unwrap_or_default();
1330 dbtx.insert_entry(&NextPegInTweakIndexKey, &(index.next()))
1331 .await;
1332 index
1333}
1334
1335#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
1336pub enum WalletClientStates {
1337 Deposit(DepositStateMachine),
1338 Withdraw(WithdrawStateMachine),
1339}
1340
1341impl IntoDynInstance for WalletClientStates {
1342 type DynType = DynState;
1343
1344 fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
1345 DynState::from_typed(instance_id, self)
1346 }
1347}
1348
1349impl State for WalletClientStates {
1350 type ModuleContext = WalletClientContext;
1351
1352 fn transitions(
1353 &self,
1354 context: &Self::ModuleContext,
1355 global_context: &DynGlobalClientContext,
1356 ) -> Vec<StateTransition<Self>> {
1357 match self {
1358 WalletClientStates::Deposit(sm) => {
1359 sm_enum_variant_translation!(
1360 sm.transitions(context, global_context),
1361 WalletClientStates::Deposit
1362 )
1363 }
1364 WalletClientStates::Withdraw(sm) => {
1365 sm_enum_variant_translation!(
1366 sm.transitions(context, global_context),
1367 WalletClientStates::Withdraw
1368 )
1369 }
1370 }
1371 }
1372
1373 fn operation_id(&self) -> OperationId {
1374 match self {
1375 WalletClientStates::Deposit(sm) => sm.operation_id(),
1376 WalletClientStates::Withdraw(sm) => sm.operation_id(),
1377 }
1378 }
1379}
1380
1381#[cfg(all(test, not(target_family = "wasm")))]
1382mod tests {
1383 use std::collections::BTreeSet;
1384 use std::sync::atomic::{AtomicBool, Ordering};
1385
1386 use super::*;
1387 use crate::backup::{
1388 RECOVER_NUM_IDX_ADD_TO_LAST_USED, RecoverScanOutcome, recover_scan_idxes_for_activity,
1389 };
1390
1391 #[allow(clippy::too_many_lines)] #[tokio::test(flavor = "multi_thread")]
1393 async fn sanity_test_recover_inner() {
1394 {
1395 let last_checked = AtomicBool::new(false);
1396 let last_checked = &last_checked;
1397 assert_eq!(
1398 recover_scan_idxes_for_activity(
1399 TweakIdx(0),
1400 &BTreeSet::new(),
1401 |cur_idx| async move {
1402 Ok(match cur_idx {
1403 TweakIdx(9) => {
1404 last_checked.store(true, Ordering::SeqCst);
1405 vec![]
1406 }
1407 TweakIdx(10) => panic!("Shouldn't happen"),
1408 TweakIdx(11) => {
1409 vec![0usize] }
1411 _ => vec![],
1412 })
1413 }
1414 )
1415 .await
1416 .unwrap(),
1417 RecoverScanOutcome {
1418 last_used_idx: None,
1419 new_start_idx: TweakIdx(RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1420 tweak_idxes_with_pegins: BTreeSet::from([])
1421 }
1422 );
1423 assert!(last_checked.load(Ordering::SeqCst));
1424 }
1425
1426 {
1427 let last_checked = AtomicBool::new(false);
1428 let last_checked = &last_checked;
1429 assert_eq!(
1430 recover_scan_idxes_for_activity(
1431 TweakIdx(0),
1432 &BTreeSet::from([TweakIdx(1), TweakIdx(2)]),
1433 |cur_idx| async move {
1434 Ok(match cur_idx {
1435 TweakIdx(1) => panic!("Shouldn't happen: already used (1)"),
1436 TweakIdx(2) => panic!("Shouldn't happen: already used (2)"),
1437 TweakIdx(11) => {
1438 last_checked.store(true, Ordering::SeqCst);
1439 vec![]
1440 }
1441 TweakIdx(12) => panic!("Shouldn't happen"),
1442 TweakIdx(13) => {
1443 vec![0usize] }
1445 _ => vec![],
1446 })
1447 }
1448 )
1449 .await
1450 .unwrap(),
1451 RecoverScanOutcome {
1452 last_used_idx: Some(TweakIdx(2)),
1453 new_start_idx: TweakIdx(2 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1454 tweak_idxes_with_pegins: BTreeSet::from([])
1455 }
1456 );
1457 assert!(last_checked.load(Ordering::SeqCst));
1458 }
1459
1460 {
1461 let last_checked = AtomicBool::new(false);
1462 let last_checked = &last_checked;
1463 assert_eq!(
1464 recover_scan_idxes_for_activity(
1465 TweakIdx(10),
1466 &BTreeSet::new(),
1467 |cur_idx| async move {
1468 Ok(match cur_idx {
1469 TweakIdx(10) => vec![()],
1470 TweakIdx(19) => {
1471 last_checked.store(true, Ordering::SeqCst);
1472 vec![]
1473 }
1474 TweakIdx(20) => panic!("Shouldn't happen"),
1475 _ => vec![],
1476 })
1477 }
1478 )
1479 .await
1480 .unwrap(),
1481 RecoverScanOutcome {
1482 last_used_idx: Some(TweakIdx(10)),
1483 new_start_idx: TweakIdx(10 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1484 tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(10)])
1485 }
1486 );
1487 assert!(last_checked.load(Ordering::SeqCst));
1488 }
1489
1490 assert_eq!(
1491 recover_scan_idxes_for_activity(TweakIdx(0), &BTreeSet::new(), |cur_idx| async move {
1492 Ok(match cur_idx {
1493 TweakIdx(6 | 15) => vec![()],
1494 _ => vec![],
1495 })
1496 })
1497 .await
1498 .unwrap(),
1499 RecoverScanOutcome {
1500 last_used_idx: Some(TweakIdx(15)),
1501 new_start_idx: TweakIdx(15 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1502 tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(6), TweakIdx(15)])
1503 }
1504 );
1505 assert_eq!(
1506 recover_scan_idxes_for_activity(TweakIdx(10), &BTreeSet::new(), |cur_idx| async move {
1507 Ok(match cur_idx {
1508 TweakIdx(8) => {
1509 vec![()] }
1511 TweakIdx(9) => {
1512 panic!("Shouldn't happen")
1513 }
1514 _ => vec![],
1515 })
1516 })
1517 .await
1518 .unwrap(),
1519 RecoverScanOutcome {
1520 last_used_idx: None,
1521 new_start_idx: TweakIdx(9 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1522 tweak_idxes_with_pegins: BTreeSet::from([])
1523 }
1524 );
1525 assert_eq!(
1526 recover_scan_idxes_for_activity(TweakIdx(10), &BTreeSet::new(), |cur_idx| async move {
1527 Ok(match cur_idx {
1528 TweakIdx(9) => panic!("Shouldn't happen"),
1529 TweakIdx(15) => vec![()],
1530 _ => vec![],
1531 })
1532 })
1533 .await
1534 .unwrap(),
1535 RecoverScanOutcome {
1536 last_used_idx: Some(TweakIdx(15)),
1537 new_start_idx: TweakIdx(15 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1538 tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(15)])
1539 }
1540 );
1541 }
1542}