1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::missing_errors_doc)]
4#![allow(clippy::missing_panics_doc)]
5#![allow(clippy::module_name_repetitions)]
6#![allow(clippy::must_use_candidate)]
7
8pub mod api;
9#[cfg(feature = "cli")]
10mod cli;
11
12mod backup;
13
14pub mod client_db;
15mod deposit;
18pub mod events;
19use events::SendPaymentEvent;
20mod pegin_monitor;
22mod withdraw;
23
24use std::collections::{BTreeMap, BTreeSet};
25use std::future;
26use std::sync::Arc;
27use std::time::{Duration, SystemTime};
28
29use anyhow::{Context as AnyhowContext, anyhow, bail, ensure};
30use async_stream::{stream, try_stream};
31use backup::WalletModuleBackup;
32use bitcoin::address::NetworkUnchecked;
33use bitcoin::secp256k1::{All, SECP256K1, Secp256k1};
34use bitcoin::{Address, Network, ScriptBuf};
35use client_db::{DbKeyPrefix, PegInTweakIndexKey, SupportsSafeDepositKey, TweakIdx};
36use fedimint_api_client::api::{DynModuleApi, FederationResult};
37use fedimint_bitcoind::{DynBitcoindRpc, create_esplora_rpc};
38use fedimint_client_module::module::init::{
39 ClientModuleInit, ClientModuleInitArgs, ClientModuleRecoverArgs,
40};
41use fedimint_client_module::module::{ClientContext, ClientModule, IClientModule, OutPointRange};
42use fedimint_client_module::oplog::UpdateStreamOrOutcome;
43use fedimint_client_module::sm::{Context, DynState, ModuleNotifier, State, StateTransition};
44use fedimint_client_module::transaction::{
45 ClientOutput, ClientOutputBundle, ClientOutputSM, TransactionBuilder,
46};
47use fedimint_client_module::{DynGlobalClientContext, sm_enum_variant_translation};
48use fedimint_core::core::{Decoder, IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId};
49use fedimint_core::db::{
50 AutocommitError, Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped,
51};
52use fedimint_core::encoding::{Decodable, Encodable};
53use fedimint_core::envs::{BitcoinRpcConfig, is_running_in_test_env};
54use fedimint_core::module::{
55 Amounts, ApiAuth, ApiVersion, CommonModuleInit, ModuleCommon, ModuleConsensusVersion,
56 ModuleInit, MultiApiVersion,
57};
58use fedimint_core::task::{MaybeSend, MaybeSync, TaskGroup, sleep};
59use fedimint_core::util::backoff_util::background_backoff;
60use fedimint_core::util::{BoxStream, backoff_util, retry};
61use fedimint_core::{
62 BitcoinHash, OutPoint, TransactionId, apply, async_trait_maybe_send, push_db_pair_items,
63 runtime, secp256k1,
64};
65use fedimint_derive_secret::{ChildId, DerivableSecret};
66use fedimint_logging::LOG_CLIENT_MODULE_WALLET;
67pub use fedimint_wallet_common as common;
68use fedimint_wallet_common::config::{FeeConsensus, WalletClientConfig};
69use fedimint_wallet_common::tweakable::Tweakable;
70pub use fedimint_wallet_common::*;
71use futures::{Stream, StreamExt};
72use rand::{Rng, thread_rng};
73use secp256k1::Keypair;
74use serde::{Deserialize, Serialize};
75use strum::IntoEnumIterator;
76use tokio::sync::watch;
77use tracing::{debug, instrument};
78
79use crate::api::WalletFederationApi;
80use crate::backup::WalletRecovery;
81use crate::client_db::{
82 ClaimedPegInData, ClaimedPegInKey, ClaimedPegInPrefix, NextPegInTweakIndexKey,
83 PegInTweakIndexData, PegInTweakIndexPrefix, RecoveryFinalizedKey, SupportsSafeDepositPrefix,
84};
85use crate::deposit::DepositStateMachine;
86use crate::withdraw::{CreatedWithdrawState, WithdrawStateMachine, WithdrawStates};
87
88const WALLET_TWEAK_CHILD_ID: ChildId = ChildId(0);
89
90#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
91pub struct BitcoinTransactionData {
92 pub btc_transaction: bitcoin::Transaction,
95 pub out_idx: u32,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
100pub enum DepositStateV1 {
101 WaitingForTransaction,
102 WaitingForConfirmation(BitcoinTransactionData),
103 Confirmed(BitcoinTransactionData),
104 Claimed(BitcoinTransactionData),
105 Failed(String),
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
109pub enum DepositStateV2 {
110 WaitingForTransaction,
111 WaitingForConfirmation {
112 #[serde(with = "bitcoin::amount::serde::as_sat")]
113 btc_deposited: bitcoin::Amount,
114 btc_out_point: bitcoin::OutPoint,
115 },
116 Confirmed {
117 #[serde(with = "bitcoin::amount::serde::as_sat")]
118 btc_deposited: bitcoin::Amount,
119 btc_out_point: bitcoin::OutPoint,
120 },
121 Claimed {
122 #[serde(with = "bitcoin::amount::serde::as_sat")]
123 btc_deposited: bitcoin::Amount,
124 btc_out_point: bitcoin::OutPoint,
125 },
126 Failed(String),
127}
128
129#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
130pub enum WithdrawState {
131 Created,
132 Succeeded(bitcoin::Txid),
133 Failed(String),
134 }
138
139async fn next_withdraw_state<S>(stream: &mut S) -> Option<WithdrawStates>
140where
141 S: Stream<Item = WalletClientStates> + Unpin,
142{
143 loop {
144 if let WalletClientStates::Withdraw(ds) = stream.next().await? {
145 return Some(ds.state);
146 }
147 tokio::task::yield_now().await;
148 }
149}
150
151#[derive(Debug, Clone, Default)]
152pub struct WalletClientInit(pub Option<DynBitcoindRpc>);
154
155impl WalletClientInit {
156 pub fn new(rpc: DynBitcoindRpc) -> Self {
157 Self(Some(rpc))
158 }
159}
160
161impl ModuleInit for WalletClientInit {
162 type Common = WalletCommonInit;
163
164 async fn dump_database(
165 &self,
166 dbtx: &mut DatabaseTransaction<'_>,
167 prefix_names: Vec<String>,
168 ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
169 let mut wallet_client_items: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> =
170 BTreeMap::new();
171 let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
172 prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
173 });
174
175 for table in filtered_prefixes {
176 match table {
177 DbKeyPrefix::NextPegInTweakIndex => {
178 if let Some(index) = dbtx.get_value(&NextPegInTweakIndexKey).await {
179 wallet_client_items
180 .insert("NextPegInTweakIndex".to_string(), Box::new(index));
181 }
182 }
183 DbKeyPrefix::PegInTweakIndex => {
184 push_db_pair_items!(
185 dbtx,
186 PegInTweakIndexPrefix,
187 PegInTweakIndexKey,
188 PegInTweakIndexData,
189 wallet_client_items,
190 "Peg-In Tweak Index"
191 );
192 }
193 DbKeyPrefix::ClaimedPegIn => {
194 push_db_pair_items!(
195 dbtx,
196 ClaimedPegInPrefix,
197 ClaimedPegInKey,
198 ClaimedPegInData,
199 wallet_client_items,
200 "Claimed Peg-In"
201 );
202 }
203 DbKeyPrefix::RecoveryFinalized => {
204 if let Some(val) = dbtx.get_value(&RecoveryFinalizedKey).await {
205 wallet_client_items.insert("RecoveryFinalized".to_string(), Box::new(val));
206 }
207 }
208 DbKeyPrefix::SupportsSafeDeposit => {
209 push_db_pair_items!(
210 dbtx,
211 SupportsSafeDepositPrefix,
212 SupportsSafeDepositKey,
213 (),
214 wallet_client_items,
215 "Supports Safe Deposit"
216 );
217 }
218 DbKeyPrefix::RecoveryState
219 | DbKeyPrefix::ExternalReservedStart
220 | DbKeyPrefix::CoreInternalReservedStart
221 | DbKeyPrefix::CoreInternalReservedEnd => {}
222 }
223 }
224
225 Box::new(wallet_client_items.into_iter())
226 }
227}
228
229#[apply(async_trait_maybe_send!)]
230impl ClientModuleInit for WalletClientInit {
231 type Module = WalletClientModule;
232
233 fn supported_api_versions(&self) -> MultiApiVersion {
234 MultiApiVersion::try_from_iter([ApiVersion { major: 0, minor: 0 }])
235 .expect("no version conflicts")
236 }
237
238 async fn init(&self, args: &ClientModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
239 let data = WalletClientModuleData {
240 cfg: args.cfg().clone(),
241 module_root_secret: args.module_root_secret().clone(),
242 };
243
244 let db = args.db().clone();
245
246 let btc_rpc = self.0.clone().unwrap_or(create_esplora_rpc(
247 &WalletClientModule::get_rpc_config(args.cfg()).url,
248 )?);
249
250 let module_api = args.module_api().clone();
251
252 let (pegin_claimed_sender, pegin_claimed_receiver) = watch::channel(());
253 let (pegin_monitor_wakeup_sender, pegin_monitor_wakeup_receiver) = watch::channel(());
254
255 Ok(WalletClientModule {
256 db,
257 data,
258 module_api,
259 notifier: args.notifier().clone(),
260 rpc: btc_rpc,
261 client_ctx: args.context(),
262 pegin_monitor_wakeup_sender,
263 pegin_monitor_wakeup_receiver,
264 pegin_claimed_receiver,
265 pegin_claimed_sender,
266 task_group: args.task_group().clone(),
267 admin_auth: args.admin_auth().cloned(),
268 })
269 }
270
271 async fn recover(
279 &self,
280 args: &ClientModuleRecoverArgs<Self>,
281 snapshot: Option<&<Self::Module as ClientModule>::Backup>,
282 ) -> anyhow::Result<()> {
283 args.recover_from_history::<WalletRecovery>(self, snapshot)
284 .await
285 }
286
287 fn used_db_prefixes(&self) -> Option<BTreeSet<u8>> {
288 Some(
289 DbKeyPrefix::iter()
290 .map(|p| p as u8)
291 .chain(
292 DbKeyPrefix::ExternalReservedStart as u8
293 ..=DbKeyPrefix::CoreInternalReservedEnd as u8,
294 )
295 .collect(),
296 )
297 }
298}
299
300#[derive(Debug, Clone, Serialize, Deserialize)]
301pub struct WalletOperationMeta {
302 pub variant: WalletOperationMetaVariant,
303 pub extra_meta: serde_json::Value,
304}
305
306#[derive(Debug, Clone, Serialize, Deserialize)]
307#[serde(rename_all = "snake_case")]
308pub enum WalletOperationMetaVariant {
309 Deposit {
310 address: Address<NetworkUnchecked>,
311 #[serde(default)]
316 tweak_idx: Option<TweakIdx>,
317 #[serde(default, skip_serializing_if = "Option::is_none")]
318 expires_at: Option<SystemTime>,
319 },
320 Withdraw {
321 address: Address<NetworkUnchecked>,
322 #[serde(with = "bitcoin::amount::serde::as_sat")]
323 amount: bitcoin::Amount,
324 fee: PegOutFees,
325 change: Vec<OutPoint>,
326 },
327
328 RbfWithdraw {
329 rbf: Rbf,
330 change: Vec<OutPoint>,
331 },
332}
333
334#[derive(Debug, Clone)]
336pub struct WalletClientModuleData {
337 cfg: WalletClientConfig,
338 module_root_secret: DerivableSecret,
339}
340
341impl WalletClientModuleData {
342 fn derive_deposit_address(
343 &self,
344 idx: TweakIdx,
345 ) -> (Keypair, secp256k1::PublicKey, Address, OperationId) {
346 let idx = ChildId(idx.0);
347
348 let secret_tweak_key = self
349 .module_root_secret
350 .child_key(WALLET_TWEAK_CHILD_ID)
351 .child_key(idx)
352 .to_secp_key(fedimint_core::secp256k1::SECP256K1);
353
354 let public_tweak_key = secret_tweak_key.public_key();
355
356 let address = self
357 .cfg
358 .peg_in_descriptor
359 .tweak(&public_tweak_key, bitcoin::secp256k1::SECP256K1)
360 .address(self.cfg.network.0)
361 .unwrap();
362
363 let operation_id = OperationId(public_tweak_key.x_only_public_key().0.serialize());
365
366 (secret_tweak_key, public_tweak_key, address, operation_id)
367 }
368
369 fn derive_peg_in_script(
370 &self,
371 idx: TweakIdx,
372 ) -> (ScriptBuf, bitcoin::Address, Keypair, OperationId) {
373 let (secret_tweak_key, _, address, operation_id) = self.derive_deposit_address(idx);
374
375 (
376 self.cfg
377 .peg_in_descriptor
378 .tweak(&secret_tweak_key.public_key(), SECP256K1)
379 .script_pubkey(),
380 address,
381 secret_tweak_key,
382 operation_id,
383 )
384 }
385}
386
387#[derive(Debug)]
388pub struct WalletClientModule {
389 data: WalletClientModuleData,
390 db: Database,
391 module_api: DynModuleApi,
392 notifier: ModuleNotifier<WalletClientStates>,
393 rpc: DynBitcoindRpc,
394 client_ctx: ClientContext<Self>,
395 pegin_monitor_wakeup_sender: watch::Sender<()>,
397 pegin_monitor_wakeup_receiver: watch::Receiver<()>,
398 pegin_claimed_sender: watch::Sender<()>,
400 pegin_claimed_receiver: watch::Receiver<()>,
401 task_group: TaskGroup,
402 admin_auth: Option<ApiAuth>,
403}
404
405#[apply(async_trait_maybe_send!)]
406impl ClientModule for WalletClientModule {
407 type Init = WalletClientInit;
408 type Common = WalletModuleTypes;
409 type Backup = WalletModuleBackup;
410 type ModuleStateMachineContext = WalletClientContext;
411 type States = WalletClientStates;
412
413 fn context(&self) -> Self::ModuleStateMachineContext {
414 WalletClientContext {
415 rpc: self.rpc.clone(),
416 wallet_descriptor: self.cfg().peg_in_descriptor.clone(),
417 wallet_decoder: self.decoder(),
418 secp: Secp256k1::default(),
419 client_ctx: self.client_ctx.clone(),
420 }
421 }
422
423 async fn start(&self) {
424 self.task_group.spawn_cancellable("peg-in monitor", {
425 let client_ctx = self.client_ctx.clone();
426 let db = self.db.clone();
427 let btc_rpc = self.rpc.clone();
428 let module_api = self.module_api.clone();
429 let data = self.data.clone();
430 let pegin_claimed_sender = self.pegin_claimed_sender.clone();
431 let pegin_monitor_wakeup_receiver = self.pegin_monitor_wakeup_receiver.clone();
432 pegin_monitor::run_peg_in_monitor(
433 client_ctx,
434 db,
435 btc_rpc,
436 module_api,
437 data,
438 pegin_claimed_sender,
439 pegin_monitor_wakeup_receiver,
440 )
441 });
442
443 self.task_group
444 .spawn_cancellable("supports-safe-deposit-version", {
445 let db = self.db.clone();
446 let module_api = self.module_api.clone();
447
448 poll_supports_safe_deposit_version(db, module_api)
449 });
450 }
451
452 fn supports_backup(&self) -> bool {
453 true
454 }
455
456 async fn backup(&self) -> anyhow::Result<backup::WalletModuleBackup> {
457 let session_count = self.client_ctx.global_api().session_count().await?;
459
460 let mut dbtx = self.db.begin_transaction_nc().await;
461 let next_pegin_tweak_idx = dbtx
462 .get_value(&NextPegInTweakIndexKey)
463 .await
464 .unwrap_or_default();
465 let claimed = dbtx
466 .find_by_prefix(&PegInTweakIndexPrefix)
467 .await
468 .filter_map(|(k, v)| async move {
469 if v.claimed.is_empty() {
470 None
471 } else {
472 Some(k.0)
473 }
474 })
475 .collect()
476 .await;
477 Ok(backup::WalletModuleBackup::new_v1(
478 session_count,
479 next_pegin_tweak_idx,
480 claimed,
481 ))
482 }
483
484 fn input_fee(
485 &self,
486 _amount: &Amounts,
487 _input: &<Self::Common as ModuleCommon>::Input,
488 ) -> Option<Amounts> {
489 Some(Amounts::new_bitcoin(self.cfg().fee_consensus.peg_in_abs))
490 }
491
492 fn output_fee(
493 &self,
494 _amount: &Amounts,
495 _output: &<Self::Common as ModuleCommon>::Output,
496 ) -> Option<Amounts> {
497 Some(Amounts::new_bitcoin(self.cfg().fee_consensus.peg_out_abs))
498 }
499
500 async fn handle_rpc(
501 &self,
502 method: String,
503 request: serde_json::Value,
504 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
505 Box::pin(try_stream! {
506 match method.as_str() {
507 "get_wallet_summary" => {
508 let _req: WalletSummaryRequest = serde_json::from_value(request)?;
509 let wallet_summary = self.get_wallet_summary()
510 .await
511 .expect("Failed to fetch wallet summary");
512 let result = serde_json::to_value(&wallet_summary)
513 .expect("Serialization error");
514 yield result;
515 }
516 "get_block_count_local" => {
517 let block_count = self.get_block_count_local().await
518 .expect("Failed to fetch block count");
519 yield serde_json::to_value(block_count)?;
520 }
521 "peg_in" => {
522 let req: PegInRequest = serde_json::from_value(request)?;
523 let response = self.peg_in(req)
524 .await
525 .map_err(|e| anyhow::anyhow!("peg_in failed: {}", e))?;
526 let result = serde_json::to_value(&response)?;
527 yield result;
528 },
529 "peg_out" => {
530 let req: PegOutRequest = serde_json::from_value(request)?;
531 let response = self.peg_out(req)
532 .await
533 .map_err(|e| anyhow::anyhow!("peg_out failed: {}", e))?;
534 let result = serde_json::to_value(&response)?;
535 yield result;
536 },
537 "subscribe_deposit" => {
538 let req: SubscribeDepositRequest = serde_json::from_value(request)?;
539 for await state in self.subscribe_deposit(req.operation_id).await?.into_stream() {
540 yield serde_json::to_value(state)?;
541 }
542 },
543 "subscribe_withdraw" => {
544 let req: SubscribeWithdrawRequest = serde_json::from_value(request)?;
545 for await state in self.subscribe_withdraw_updates(req.operation_id).await?.into_stream(){
546 yield serde_json::to_value(state)?;
547 }
548 }
549 _ => {
550 Err(anyhow::format_err!("Unknown method: {}", method))?;
551 }
552 }
553 })
554 }
555
556 #[cfg(feature = "cli")]
557 async fn handle_cli_command(
558 &self,
559 args: &[std::ffi::OsString],
560 ) -> anyhow::Result<serde_json::Value> {
561 cli::handle_cli_command(self, args).await
562 }
563}
564
565#[derive(Deserialize)]
566struct WalletSummaryRequest {}
567
568#[derive(Debug, Clone)]
569pub struct WalletClientContext {
570 rpc: DynBitcoindRpc,
571 wallet_descriptor: PegInDescriptor,
572 wallet_decoder: Decoder,
573 secp: Secp256k1<All>,
574 pub client_ctx: ClientContext<WalletClientModule>,
575}
576
577#[derive(Debug, Clone, Serialize, Deserialize)]
578pub struct PegInRequest {
579 pub extra_meta: serde_json::Value,
580}
581
582#[derive(Deserialize)]
583struct SubscribeDepositRequest {
584 operation_id: OperationId,
585}
586
587#[derive(Deserialize)]
588struct SubscribeWithdrawRequest {
589 operation_id: OperationId,
590}
591
592#[derive(Debug, Clone, Serialize, Deserialize)]
593pub struct PegInResponse {
594 pub deposit_address: Address<NetworkUnchecked>,
595 pub operation_id: OperationId,
596}
597
598#[derive(Debug, Clone, Serialize, Deserialize)]
599pub struct PegOutRequest {
600 pub amount_sat: u64,
601 pub destination_address: Address<NetworkUnchecked>,
602 pub extra_meta: serde_json::Value,
603}
604
605#[derive(Debug, Clone, Serialize, Deserialize)]
606pub struct PegOutResponse {
607 pub operation_id: OperationId,
608}
609
610impl Context for WalletClientContext {
611 const KIND: Option<ModuleKind> = Some(KIND);
612}
613
614impl WalletClientModule {
615 fn cfg(&self) -> &WalletClientConfig {
616 &self.data.cfg
617 }
618
619 fn get_rpc_config(cfg: &WalletClientConfig) -> BitcoinRpcConfig {
620 match BitcoinRpcConfig::get_defaults_from_env_vars() {
621 Ok(rpc_config) => {
622 if rpc_config.kind == "bitcoind" {
625 cfg.default_bitcoin_rpc.clone()
626 } else {
627 rpc_config
628 }
629 }
630 _ => cfg.default_bitcoin_rpc.clone(),
631 }
632 }
633
634 pub fn get_network(&self) -> Network {
635 self.cfg().network.0
636 }
637
638 pub fn get_finality_delay(&self) -> u32 {
639 self.cfg().finality_delay
640 }
641
642 pub fn get_fee_consensus(&self) -> FeeConsensus {
643 self.cfg().fee_consensus
644 }
645
646 async fn allocate_deposit_address_inner(
647 &self,
648 dbtx: &mut DatabaseTransaction<'_>,
649 ) -> (OperationId, Address, TweakIdx) {
650 dbtx.ensure_isolated().expect("Must be isolated db");
651
652 let tweak_idx = get_next_peg_in_tweak_child_id(dbtx).await;
653 let (_secret_tweak_key, _, address, operation_id) =
654 self.data.derive_deposit_address(tweak_idx);
655
656 let now = fedimint_core::time::now();
657
658 dbtx.insert_new_entry(
659 &PegInTweakIndexKey(tweak_idx),
660 &PegInTweakIndexData {
661 creation_time: now,
662 next_check_time: Some(now),
663 last_check_time: None,
664 operation_id,
665 claimed: vec![],
666 },
667 )
668 .await;
669
670 (operation_id, address, tweak_idx)
671 }
672
673 pub async fn get_withdraw_fees(
680 &self,
681 address: &bitcoin::Address,
682 amount: bitcoin::Amount,
683 ) -> anyhow::Result<PegOutFees> {
684 self.module_api
685 .fetch_peg_out_fees(address, amount)
686 .await?
687 .context("Federation didn't return peg-out fees")
688 }
689
690 pub async fn get_wallet_summary(&self) -> anyhow::Result<WalletSummary> {
692 Ok(self.module_api.fetch_wallet_summary().await?)
693 }
694
695 pub async fn get_block_count_local(&self) -> anyhow::Result<u32> {
696 Ok(self.module_api.fetch_block_count_local().await?)
697 }
698
699 pub fn create_withdraw_output(
700 &self,
701 operation_id: OperationId,
702 address: bitcoin::Address,
703 amount: bitcoin::Amount,
704 fees: PegOutFees,
705 ) -> anyhow::Result<ClientOutputBundle<WalletOutput, WalletClientStates>> {
706 let output = WalletOutput::new_v0_peg_out(address, amount, fees);
707
708 let amount = output.maybe_v0_ref().expect("v0 output").amount().into();
709
710 let sm_gen = move |out_point_range: OutPointRange| {
711 assert_eq!(out_point_range.count(), 1);
712 let out_idx = out_point_range.start_idx();
713 vec![WalletClientStates::Withdraw(WithdrawStateMachine {
714 operation_id,
715 state: WithdrawStates::Created(CreatedWithdrawState {
716 fm_outpoint: OutPoint {
717 txid: out_point_range.txid(),
718 out_idx,
719 },
720 }),
721 })]
722 };
723
724 Ok(ClientOutputBundle::new(
725 vec![ClientOutput::<WalletOutput> {
726 output,
727 amounts: Amounts::new_bitcoin(amount),
728 }],
729 vec![ClientOutputSM::<WalletClientStates> {
730 state_machines: Arc::new(sm_gen),
731 }],
732 ))
733 }
734
735 pub async fn peg_in(&self, req: PegInRequest) -> anyhow::Result<PegInResponse> {
736 let (operation_id, address, _) = self.safe_allocate_deposit_address(req.extra_meta).await?;
737
738 Ok(PegInResponse {
739 deposit_address: Address::from_script(&address.script_pubkey(), self.get_network())?
740 .as_unchecked()
741 .clone(),
742 operation_id,
743 })
744 }
745
746 pub async fn peg_out(&self, req: PegOutRequest) -> anyhow::Result<PegOutResponse> {
747 let amount = bitcoin::Amount::from_sat(req.amount_sat);
748 let destination = req
749 .destination_address
750 .require_network(self.get_network())?;
751
752 let fees = self.get_withdraw_fees(&destination, amount).await?;
753 let operation_id = self
754 .withdraw(&destination, amount, fees, req.extra_meta)
755 .await
756 .context("Failed to initiate withdraw")?;
757
758 Ok(PegOutResponse { operation_id })
759 }
760
761 pub fn create_rbf_withdraw_output(
762 &self,
763 operation_id: OperationId,
764 rbf: &Rbf,
765 ) -> anyhow::Result<ClientOutputBundle<WalletOutput, WalletClientStates>> {
766 let output = WalletOutput::new_v0_rbf(rbf.fees, rbf.txid);
767
768 let amount = output.maybe_v0_ref().expect("v0 output").amount().into();
769
770 let sm_gen = move |out_point_range: OutPointRange| {
771 assert_eq!(out_point_range.count(), 1);
772 let out_idx = out_point_range.start_idx();
773 vec![WalletClientStates::Withdraw(WithdrawStateMachine {
774 operation_id,
775 state: WithdrawStates::Created(CreatedWithdrawState {
776 fm_outpoint: OutPoint {
777 txid: out_point_range.txid(),
778 out_idx,
779 },
780 }),
781 })]
782 };
783
784 Ok(ClientOutputBundle::new(
785 vec![ClientOutput::<WalletOutput> {
786 output,
787 amounts: Amounts::new_bitcoin(amount),
788 }],
789 vec![ClientOutputSM::<WalletClientStates> {
790 state_machines: Arc::new(sm_gen),
791 }],
792 ))
793 }
794
795 pub async fn btc_tx_has_no_size_limit(&self) -> FederationResult<bool> {
796 Ok(self.module_api.module_consensus_version().await? >= ModuleConsensusVersion::new(2, 2))
797 }
798
799 pub async fn supports_safe_deposit(&self) -> bool {
808 let mut dbtx = self.db.begin_transaction().await;
809
810 let already_verified_supports_safe_deposit =
811 dbtx.get_value(&SupportsSafeDepositKey).await.is_some();
812
813 already_verified_supports_safe_deposit || {
814 match self.module_api.module_consensus_version().await {
815 Ok(module_consensus_version) => {
816 let supported_version =
817 SAFE_DEPOSIT_MODULE_CONSENSUS_VERSION <= module_consensus_version;
818
819 if supported_version {
820 dbtx.insert_new_entry(&SupportsSafeDepositKey, &()).await;
821 dbtx.commit_tx().await;
822 }
823
824 supported_version
825 }
826 Err(_) => false,
827 }
828 }
829 }
830
831 pub async fn safe_allocate_deposit_address<M>(
839 &self,
840 extra_meta: M,
841 ) -> anyhow::Result<(OperationId, Address, TweakIdx)>
842 where
843 M: Serialize + MaybeSend + MaybeSync,
844 {
845 ensure!(
846 self.supports_safe_deposit().await,
847 "Wallet module consensus version doesn't support safe deposits",
848 );
849
850 self.allocate_deposit_address_expert_only(extra_meta).await
851 }
852
853 pub async fn allocate_deposit_address_expert_only<M>(
871 &self,
872 extra_meta: M,
873 ) -> anyhow::Result<(OperationId, Address, TweakIdx)>
874 where
875 M: Serialize + MaybeSend + MaybeSync,
876 {
877 let extra_meta_value =
878 serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
879 let (operation_id, address, tweak_idx) = self
880 .db
881 .autocommit(
882 move |dbtx, _| {
883 let extra_meta_value_inner = extra_meta_value.clone();
884 Box::pin(async move {
885 let (operation_id, address, tweak_idx) = self
886 .allocate_deposit_address_inner(dbtx)
887 .await;
888
889 self.client_ctx.manual_operation_start_dbtx(
890 dbtx,
891 operation_id,
892 WalletCommonInit::KIND.as_str(),
893 WalletOperationMeta {
894 variant: WalletOperationMetaVariant::Deposit {
895 address: address.clone().into_unchecked(),
896 tweak_idx: Some(tweak_idx),
897 expires_at: None,
898 },
899 extra_meta: extra_meta_value_inner,
900 },
901 vec![]
902 ).await?;
903
904 debug!(target: LOG_CLIENT_MODULE_WALLET, %tweak_idx, %address, "Derived a new deposit address");
905
906 self.rpc.watch_script_history(&address.script_pubkey()).await?;
908
909 let sender = self.pegin_monitor_wakeup_sender.clone();
910 dbtx.on_commit(move || {
911 sender.send_replace(());
912 });
913
914 Ok((operation_id, address, tweak_idx))
915 })
916 },
917 Some(100),
918 )
919 .await
920 .map_err(|e| match e {
921 AutocommitError::CommitFailed {
922 last_error,
923 attempts,
924 } => anyhow!("Failed to commit after {attempts} attempts: {last_error}"),
925 AutocommitError::ClosureError { error, .. } => error,
926 })?;
927
928 Ok((operation_id, address, tweak_idx))
929 }
930
931 pub async fn subscribe_deposit(
937 &self,
938 operation_id: OperationId,
939 ) -> anyhow::Result<UpdateStreamOrOutcome<DepositStateV2>> {
940 let operation = self
941 .client_ctx
942 .get_operation(operation_id)
943 .await
944 .with_context(|| anyhow!("Operation not found: {}", operation_id.fmt_short()))?;
945
946 if operation.operation_module_kind() != WalletCommonInit::KIND.as_str() {
947 bail!("Operation is not a wallet operation");
948 }
949
950 let operation_meta = operation.meta::<WalletOperationMeta>();
951
952 let WalletOperationMetaVariant::Deposit {
953 address, tweak_idx, ..
954 } = operation_meta.variant
955 else {
956 bail!("Operation is not a deposit operation");
957 };
958
959 let address = address.require_network(self.cfg().network.0)?;
960
961 let Some(tweak_idx) = tweak_idx else {
963 let outcome_v1 = operation
967 .outcome::<DepositStateV1>()
968 .context("Old pending deposit, can't subscribe to updates")?;
969
970 let outcome_v2 = match outcome_v1 {
971 DepositStateV1::Claimed(tx_info) => DepositStateV2::Claimed {
972 btc_deposited: tx_info.btc_transaction.output[tx_info.out_idx as usize].value,
973 btc_out_point: bitcoin::OutPoint {
974 txid: tx_info.btc_transaction.compute_txid(),
975 vout: tx_info.out_idx,
976 },
977 },
978 DepositStateV1::Failed(error) => DepositStateV2::Failed(error),
979 _ => bail!("Non-final outcome in operation log"),
980 };
981
982 return Ok(UpdateStreamOrOutcome::Outcome(outcome_v2));
983 };
984
985 Ok(self.client_ctx.outcome_or_updates(operation, operation_id, {
986 let stream_rpc = self.rpc.clone();
987 let stream_client_ctx = self.client_ctx.clone();
988 let stream_script_pub_key = address.script_pubkey();
989 move || {
990
991 stream! {
992 yield DepositStateV2::WaitingForTransaction;
993
994 retry(
995 "subscribe script history",
996 background_backoff(),
997 || stream_rpc.watch_script_history(&stream_script_pub_key)
998 ).await.expect("Will never give up");
999 let (btc_out_point, btc_deposited) = retry(
1000 "fetch history",
1001 background_backoff(),
1002 || async {
1003 let history = stream_rpc.get_script_history(&stream_script_pub_key).await?;
1004 history.first().and_then(|tx| {
1005 let (out_idx, amount) = tx.output
1006 .iter()
1007 .enumerate()
1008 .find_map(|(idx, output)| (output.script_pubkey == stream_script_pub_key).then_some((idx, output.value)))?;
1009 let txid = tx.compute_txid();
1010
1011 Some((
1012 bitcoin::OutPoint {
1013 txid,
1014 vout: out_idx as u32,
1015 },
1016 amount
1017 ))
1018 }).context("No deposit transaction found")
1019 }
1020 ).await.expect("Will never give up");
1021
1022 yield DepositStateV2::WaitingForConfirmation {
1023 btc_deposited,
1024 btc_out_point
1025 };
1026
1027 let claim_data = stream_client_ctx.module_db().wait_key_exists(&ClaimedPegInKey {
1028 peg_in_index: tweak_idx,
1029 btc_out_point,
1030 }).await;
1031
1032 yield DepositStateV2::Confirmed {
1033 btc_deposited,
1034 btc_out_point
1035 };
1036
1037 match stream_client_ctx.await_primary_module_outputs(operation_id, claim_data.change).await {
1038 Ok(()) => yield DepositStateV2::Claimed {
1039 btc_deposited,
1040 btc_out_point
1041 },
1042 Err(e) => yield DepositStateV2::Failed(e.to_string())
1043 }
1044 }
1045 }}))
1046 }
1047
1048 pub async fn list_peg_in_tweak_idxes(&self) -> BTreeMap<TweakIdx, PegInTweakIndexData> {
1049 self.client_ctx
1050 .module_db()
1051 .clone()
1052 .begin_transaction_nc()
1053 .await
1054 .find_by_prefix(&PegInTweakIndexPrefix)
1055 .await
1056 .map(|(key, data)| (key.0, data))
1057 .collect()
1058 .await
1059 }
1060
1061 pub async fn find_tweak_idx_by_address(
1062 &self,
1063 address: bitcoin::Address<NetworkUnchecked>,
1064 ) -> anyhow::Result<TweakIdx> {
1065 let data = self.data.clone();
1066 let Some((tweak_idx, _)) = self
1067 .db
1068 .begin_transaction_nc()
1069 .await
1070 .find_by_prefix(&PegInTweakIndexPrefix)
1071 .await
1072 .filter(|(k, _)| {
1073 let (_, derived_address, _tweak_key, _) = data.derive_peg_in_script(k.0);
1074 future::ready(derived_address.into_unchecked() == address)
1075 })
1076 .next()
1077 .await
1078 else {
1079 bail!("Address not found in the list of derived keys");
1080 };
1081
1082 Ok(tweak_idx.0)
1083 }
1084 pub async fn find_tweak_idx_by_operation_id(
1085 &self,
1086 operation_id: OperationId,
1087 ) -> anyhow::Result<TweakIdx> {
1088 Ok(self
1089 .client_ctx
1090 .module_db()
1091 .clone()
1092 .begin_transaction_nc()
1093 .await
1094 .find_by_prefix(&PegInTweakIndexPrefix)
1095 .await
1096 .filter(|(_k, v)| future::ready(v.operation_id == operation_id))
1097 .next()
1098 .await
1099 .ok_or_else(|| anyhow::format_err!("OperationId not found"))?
1100 .0
1101 .0)
1102 }
1103
1104 pub async fn get_pegin_tweak_idx(
1105 &self,
1106 tweak_idx: TweakIdx,
1107 ) -> anyhow::Result<PegInTweakIndexData> {
1108 self.client_ctx
1109 .module_db()
1110 .clone()
1111 .begin_transaction_nc()
1112 .await
1113 .get_value(&PegInTweakIndexKey(tweak_idx))
1114 .await
1115 .ok_or_else(|| anyhow::format_err!("TweakIdx not found"))
1116 }
1117
1118 pub async fn get_claimed_pegins(
1119 &self,
1120 dbtx: &mut DatabaseTransaction<'_>,
1121 tweak_idx: TweakIdx,
1122 ) -> Vec<(
1123 bitcoin::OutPoint,
1124 TransactionId,
1125 Vec<fedimint_core::OutPoint>,
1126 )> {
1127 let outpoints = dbtx
1128 .get_value(&PegInTweakIndexKey(tweak_idx))
1129 .await
1130 .map(|v| v.claimed)
1131 .unwrap_or_default();
1132
1133 let mut res = vec![];
1134
1135 for outpoint in outpoints {
1136 let claimed_peg_in_data = dbtx
1137 .get_value(&ClaimedPegInKey {
1138 peg_in_index: tweak_idx,
1139 btc_out_point: outpoint,
1140 })
1141 .await
1142 .expect("Must have a corresponding claim record");
1143 res.push((
1144 outpoint,
1145 claimed_peg_in_data.claim_txid,
1146 claimed_peg_in_data.change,
1147 ));
1148 }
1149
1150 res
1151 }
1152
1153 pub async fn recheck_pegin_address_by_op_id(
1155 &self,
1156 operation_id: OperationId,
1157 ) -> anyhow::Result<()> {
1158 let tweak_idx = self.find_tweak_idx_by_operation_id(operation_id).await?;
1159
1160 self.recheck_pegin_address(tweak_idx).await
1161 }
1162
1163 pub async fn recheck_pegin_address_by_address(
1165 &self,
1166 address: bitcoin::Address<NetworkUnchecked>,
1167 ) -> anyhow::Result<()> {
1168 self.recheck_pegin_address(self.find_tweak_idx_by_address(address).await?)
1169 .await
1170 }
1171
1172 pub async fn recheck_pegin_address(&self, tweak_idx: TweakIdx) -> anyhow::Result<()> {
1174 self.db
1175 .autocommit(
1176 |dbtx, _| {
1177 Box::pin(async {
1178 let db_key = PegInTweakIndexKey(tweak_idx);
1179 let db_val = dbtx
1180 .get_value(&db_key)
1181 .await
1182 .ok_or_else(|| anyhow::format_err!("DBKey not found"))?;
1183
1184 dbtx.insert_entry(
1185 &db_key,
1186 &PegInTweakIndexData {
1187 next_check_time: Some(fedimint_core::time::now()),
1188 ..db_val
1189 },
1190 )
1191 .await;
1192
1193 let sender = self.pegin_monitor_wakeup_sender.clone();
1194 dbtx.on_commit(move || {
1195 sender.send_replace(());
1196 });
1197
1198 Ok::<_, anyhow::Error>(())
1199 })
1200 },
1201 Some(100),
1202 )
1203 .await?;
1204
1205 Ok(())
1206 }
1207
1208 pub async fn await_num_deposits_by_operation_id(
1210 &self,
1211 operation_id: OperationId,
1212 num_deposits: usize,
1213 ) -> anyhow::Result<()> {
1214 let tweak_idx = self.find_tweak_idx_by_operation_id(operation_id).await?;
1215 self.await_num_deposits(tweak_idx, num_deposits).await
1216 }
1217
1218 pub async fn await_num_deposits_by_address(
1219 &self,
1220 address: bitcoin::Address<NetworkUnchecked>,
1221 num_deposits: usize,
1222 ) -> anyhow::Result<()> {
1223 self.await_num_deposits(self.find_tweak_idx_by_address(address).await?, num_deposits)
1224 .await
1225 }
1226
1227 #[instrument(target = LOG_CLIENT_MODULE_WALLET, skip_all, fields(tweak_idx=?tweak_idx, num_deposists=num_deposits))]
1228 pub async fn await_num_deposits(
1229 &self,
1230 tweak_idx: TweakIdx,
1231 num_deposits: usize,
1232 ) -> anyhow::Result<()> {
1233 let operation_id = self.get_pegin_tweak_idx(tweak_idx).await?.operation_id;
1234
1235 let mut receiver = self.pegin_claimed_receiver.clone();
1236 let mut backoff = backoff_util::aggressive_backoff();
1237
1238 loop {
1239 let pegins = self
1240 .get_claimed_pegins(
1241 &mut self.client_ctx.module_db().begin_transaction_nc().await,
1242 tweak_idx,
1243 )
1244 .await;
1245
1246 if pegins.len() < num_deposits {
1247 debug!(target: LOG_CLIENT_MODULE_WALLET, has=pegins.len(), "Not enough deposits");
1248 self.recheck_pegin_address(tweak_idx).await?;
1249 runtime::sleep(backoff.next().unwrap_or_default()).await;
1250 receiver.changed().await?;
1251 continue;
1252 }
1253
1254 debug!(target: LOG_CLIENT_MODULE_WALLET, has=pegins.len(), "Enough deposits detected");
1255
1256 for (_outpoint, transaction_id, change) in pegins {
1257 if transaction_id == TransactionId::from_byte_array([0; 32]) && change.is_empty() {
1258 debug!(target: LOG_CLIENT_MODULE_WALLET, "Deposited amount was too low, skipping");
1259 continue;
1260 }
1261
1262 debug!(target: LOG_CLIENT_MODULE_WALLET, out_points=?change, "Ensuring deposists claimed");
1263 let tx_subscriber = self.client_ctx.transaction_updates(operation_id).await;
1264
1265 if let Err(e) = tx_subscriber.await_tx_accepted(transaction_id).await {
1266 bail!("{}", e);
1267 }
1268
1269 debug!(target: LOG_CLIENT_MODULE_WALLET, out_points=?change, "Ensuring outputs claimed");
1270 self.client_ctx
1271 .await_primary_module_outputs(operation_id, change)
1272 .await
1273 .expect("Cannot fail if tx was accepted and federation is honest");
1274 }
1275
1276 return Ok(());
1277 }
1278 }
1279
1280 pub async fn withdraw<M: Serialize + MaybeSend + MaybeSync>(
1285 &self,
1286 address: &bitcoin::Address,
1287 amount: bitcoin::Amount,
1288 fee: PegOutFees,
1289 extra_meta: M,
1290 ) -> anyhow::Result<OperationId> {
1291 {
1292 let operation_id = OperationId(thread_rng().r#gen());
1293
1294 let withdraw_output =
1295 self.create_withdraw_output(operation_id, address.clone(), amount, fee)?;
1296 let tx_builder = TransactionBuilder::new()
1297 .with_outputs(self.client_ctx.make_client_outputs(withdraw_output));
1298
1299 let extra_meta =
1300 serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
1301 self.client_ctx
1302 .finalize_and_submit_transaction(
1303 operation_id,
1304 WalletCommonInit::KIND.as_str(),
1305 {
1306 let address = address.clone();
1307 move |change_range: OutPointRange| WalletOperationMeta {
1308 variant: WalletOperationMetaVariant::Withdraw {
1309 address: address.clone().into_unchecked(),
1310 amount,
1311 fee,
1312 change: change_range.into_iter().collect(),
1313 },
1314 extra_meta: extra_meta.clone(),
1315 }
1316 },
1317 tx_builder,
1318 )
1319 .await?;
1320
1321 let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
1322
1323 self.client_ctx
1324 .log_event(
1325 &mut dbtx,
1326 SendPaymentEvent {
1327 operation_id,
1328 amount: amount + fee.amount(),
1329 fee: fee.amount(),
1330 },
1331 )
1332 .await;
1333
1334 dbtx.commit_tx().await;
1335
1336 Ok(operation_id)
1337 }
1338 }
1339
1340 #[deprecated(
1345 since = "0.4.0",
1346 note = "RBF withdrawals are rejected by the federation"
1347 )]
1348 pub async fn rbf_withdraw<M: Serialize + MaybeSync + MaybeSend>(
1349 &self,
1350 rbf: Rbf,
1351 extra_meta: M,
1352 ) -> anyhow::Result<OperationId> {
1353 let operation_id = OperationId(thread_rng().r#gen());
1354
1355 let withdraw_output = self.create_rbf_withdraw_output(operation_id, &rbf)?;
1356 let tx_builder = TransactionBuilder::new()
1357 .with_outputs(self.client_ctx.make_client_outputs(withdraw_output));
1358
1359 let extra_meta = serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
1360 self.client_ctx
1361 .finalize_and_submit_transaction(
1362 operation_id,
1363 WalletCommonInit::KIND.as_str(),
1364 move |change_range: OutPointRange| WalletOperationMeta {
1365 variant: WalletOperationMetaVariant::RbfWithdraw {
1366 rbf: rbf.clone(),
1367 change: change_range.into_iter().collect(),
1368 },
1369 extra_meta: extra_meta.clone(),
1370 },
1371 tx_builder,
1372 )
1373 .await?;
1374
1375 Ok(operation_id)
1376 }
1377
1378 pub async fn subscribe_withdraw_updates(
1379 &self,
1380 operation_id: OperationId,
1381 ) -> anyhow::Result<UpdateStreamOrOutcome<WithdrawState>> {
1382 let operation = self
1383 .client_ctx
1384 .get_operation(operation_id)
1385 .await
1386 .with_context(|| anyhow!("Operation not found: {}", operation_id.fmt_short()))?;
1387
1388 if operation.operation_module_kind() != WalletCommonInit::KIND.as_str() {
1389 bail!("Operation is not a wallet operation");
1390 }
1391
1392 let operation_meta = operation.meta::<WalletOperationMeta>();
1393
1394 let (WalletOperationMetaVariant::Withdraw { change, .. }
1395 | WalletOperationMetaVariant::RbfWithdraw { change, .. }) = operation_meta.variant
1396 else {
1397 bail!("Operation is not a withdraw operation");
1398 };
1399
1400 let mut operation_stream = self.notifier.subscribe(operation_id).await;
1401 let client_ctx = self.client_ctx.clone();
1402
1403 Ok(self
1404 .client_ctx
1405 .outcome_or_updates(operation, operation_id, move || {
1406 stream! {
1407 match next_withdraw_state(&mut operation_stream).await {
1408 Some(WithdrawStates::Created(_)) => {
1409 yield WithdrawState::Created;
1410 },
1411 Some(s) => {
1412 panic!("Unexpected state {s:?}")
1413 },
1414 None => return,
1415 }
1416
1417 let _ = client_ctx
1422 .await_primary_module_outputs(operation_id, change)
1423 .await;
1424
1425
1426 match next_withdraw_state(&mut operation_stream).await {
1427 Some(WithdrawStates::Aborted(inner)) => {
1428 yield WithdrawState::Failed(inner.error);
1429 },
1430 Some(WithdrawStates::Success(inner)) => {
1431 yield WithdrawState::Succeeded(inner.txid);
1432 },
1433 Some(s) => {
1434 panic!("Unexpected state {s:?}")
1435 },
1436 None => {},
1437 }
1438 }
1439 }))
1440 }
1441
1442 fn admin_auth(&self) -> anyhow::Result<ApiAuth> {
1443 self.admin_auth
1444 .clone()
1445 .ok_or_else(|| anyhow::format_err!("Admin auth not set"))
1446 }
1447
1448 pub async fn activate_consensus_version_voting(&self) -> anyhow::Result<()> {
1449 self.module_api
1450 .activate_consensus_version_voting(self.admin_auth()?)
1451 .await?;
1452
1453 Ok(())
1454 }
1455}
1456
1457async fn poll_supports_safe_deposit_version(db: Database, module_api: DynModuleApi) {
1460 loop {
1461 let mut dbtx = db.begin_transaction().await;
1462
1463 if dbtx.get_value(&SupportsSafeDepositKey).await.is_some() {
1464 break;
1465 }
1466
1467 module_api.wait_for_initialized_connections().await;
1468
1469 if let Ok(module_consensus_version) = module_api.module_consensus_version().await
1470 && SAFE_DEPOSIT_MODULE_CONSENSUS_VERSION <= module_consensus_version
1471 {
1472 dbtx.insert_new_entry(&SupportsSafeDepositKey, &()).await;
1473 dbtx.commit_tx().await;
1474 break;
1475 }
1476
1477 drop(dbtx);
1478
1479 if is_running_in_test_env() {
1480 sleep(Duration::from_secs(10)).await;
1482 } else {
1483 sleep(Duration::from_secs(3600)).await;
1484 }
1485 }
1486}
1487
1488async fn get_next_peg_in_tweak_child_id(dbtx: &mut DatabaseTransaction<'_>) -> TweakIdx {
1490 let index = dbtx
1491 .get_value(&NextPegInTweakIndexKey)
1492 .await
1493 .unwrap_or_default();
1494 dbtx.insert_entry(&NextPegInTweakIndexKey, &(index.next()))
1495 .await;
1496 index
1497}
1498
1499#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
1500pub enum WalletClientStates {
1501 Deposit(DepositStateMachine),
1502 Withdraw(WithdrawStateMachine),
1503}
1504
1505impl IntoDynInstance for WalletClientStates {
1506 type DynType = DynState;
1507
1508 fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
1509 DynState::from_typed(instance_id, self)
1510 }
1511}
1512
1513impl State for WalletClientStates {
1514 type ModuleContext = WalletClientContext;
1515
1516 fn transitions(
1517 &self,
1518 context: &Self::ModuleContext,
1519 global_context: &DynGlobalClientContext,
1520 ) -> Vec<StateTransition<Self>> {
1521 match self {
1522 WalletClientStates::Deposit(sm) => {
1523 sm_enum_variant_translation!(
1524 sm.transitions(context, global_context),
1525 WalletClientStates::Deposit
1526 )
1527 }
1528 WalletClientStates::Withdraw(sm) => {
1529 sm_enum_variant_translation!(
1530 sm.transitions(context, global_context),
1531 WalletClientStates::Withdraw
1532 )
1533 }
1534 }
1535 }
1536
1537 fn operation_id(&self) -> OperationId {
1538 match self {
1539 WalletClientStates::Deposit(sm) => sm.operation_id(),
1540 WalletClientStates::Withdraw(sm) => sm.operation_id(),
1541 }
1542 }
1543}
1544
1545#[cfg(all(test, not(target_family = "wasm")))]
1546mod tests {
1547 use std::collections::BTreeSet;
1548 use std::sync::atomic::{AtomicBool, Ordering};
1549
1550 use super::*;
1551 use crate::backup::{
1552 RECOVER_NUM_IDX_ADD_TO_LAST_USED, RecoverScanOutcome, recover_scan_idxes_for_activity,
1553 };
1554
1555 #[allow(clippy::too_many_lines)] #[tokio::test(flavor = "multi_thread")]
1557 async fn sanity_test_recover_inner() {
1558 {
1559 let last_checked = AtomicBool::new(false);
1560 let last_checked = &last_checked;
1561 assert_eq!(
1562 recover_scan_idxes_for_activity(
1563 TweakIdx(0),
1564 &BTreeSet::new(),
1565 |cur_idx| async move {
1566 Ok(match cur_idx {
1567 TweakIdx(9) => {
1568 last_checked.store(true, Ordering::SeqCst);
1569 vec![]
1570 }
1571 TweakIdx(10) => panic!("Shouldn't happen"),
1572 TweakIdx(11) => {
1573 vec![0usize] }
1575 _ => vec![],
1576 })
1577 }
1578 )
1579 .await
1580 .unwrap(),
1581 RecoverScanOutcome {
1582 last_used_idx: None,
1583 new_start_idx: TweakIdx(RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1584 tweak_idxes_with_pegins: BTreeSet::from([])
1585 }
1586 );
1587 assert!(last_checked.load(Ordering::SeqCst));
1588 }
1589
1590 {
1591 let last_checked = AtomicBool::new(false);
1592 let last_checked = &last_checked;
1593 assert_eq!(
1594 recover_scan_idxes_for_activity(
1595 TweakIdx(0),
1596 &BTreeSet::from([TweakIdx(1), TweakIdx(2)]),
1597 |cur_idx| async move {
1598 Ok(match cur_idx {
1599 TweakIdx(1) => panic!("Shouldn't happen: already used (1)"),
1600 TweakIdx(2) => panic!("Shouldn't happen: already used (2)"),
1601 TweakIdx(11) => {
1602 last_checked.store(true, Ordering::SeqCst);
1603 vec![]
1604 }
1605 TweakIdx(12) => panic!("Shouldn't happen"),
1606 TweakIdx(13) => {
1607 vec![0usize] }
1609 _ => vec![],
1610 })
1611 }
1612 )
1613 .await
1614 .unwrap(),
1615 RecoverScanOutcome {
1616 last_used_idx: Some(TweakIdx(2)),
1617 new_start_idx: TweakIdx(2 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1618 tweak_idxes_with_pegins: BTreeSet::from([])
1619 }
1620 );
1621 assert!(last_checked.load(Ordering::SeqCst));
1622 }
1623
1624 {
1625 let last_checked = AtomicBool::new(false);
1626 let last_checked = &last_checked;
1627 assert_eq!(
1628 recover_scan_idxes_for_activity(
1629 TweakIdx(10),
1630 &BTreeSet::new(),
1631 |cur_idx| async move {
1632 Ok(match cur_idx {
1633 TweakIdx(10) => vec![()],
1634 TweakIdx(19) => {
1635 last_checked.store(true, Ordering::SeqCst);
1636 vec![]
1637 }
1638 TweakIdx(20) => panic!("Shouldn't happen"),
1639 _ => vec![],
1640 })
1641 }
1642 )
1643 .await
1644 .unwrap(),
1645 RecoverScanOutcome {
1646 last_used_idx: Some(TweakIdx(10)),
1647 new_start_idx: TweakIdx(10 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1648 tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(10)])
1649 }
1650 );
1651 assert!(last_checked.load(Ordering::SeqCst));
1652 }
1653
1654 assert_eq!(
1655 recover_scan_idxes_for_activity(TweakIdx(0), &BTreeSet::new(), |cur_idx| async move {
1656 Ok(match cur_idx {
1657 TweakIdx(6 | 15) => vec![()],
1658 _ => vec![],
1659 })
1660 })
1661 .await
1662 .unwrap(),
1663 RecoverScanOutcome {
1664 last_used_idx: Some(TweakIdx(15)),
1665 new_start_idx: TweakIdx(15 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1666 tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(6), TweakIdx(15)])
1667 }
1668 );
1669 assert_eq!(
1670 recover_scan_idxes_for_activity(TweakIdx(10), &BTreeSet::new(), |cur_idx| async move {
1671 Ok(match cur_idx {
1672 TweakIdx(8) => {
1673 vec![()] }
1675 TweakIdx(9) => {
1676 panic!("Shouldn't happen")
1677 }
1678 _ => vec![],
1679 })
1680 })
1681 .await
1682 .unwrap(),
1683 RecoverScanOutcome {
1684 last_used_idx: None,
1685 new_start_idx: TweakIdx(9 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1686 tweak_idxes_with_pegins: BTreeSet::from([])
1687 }
1688 );
1689 assert_eq!(
1690 recover_scan_idxes_for_activity(TweakIdx(10), &BTreeSet::new(), |cur_idx| async move {
1691 Ok(match cur_idx {
1692 TweakIdx(9) => panic!("Shouldn't happen"),
1693 TweakIdx(15) => vec![()],
1694 _ => vec![],
1695 })
1696 })
1697 .await
1698 .unwrap(),
1699 RecoverScanOutcome {
1700 last_used_idx: Some(TweakIdx(15)),
1701 new_start_idx: TweakIdx(15 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1702 tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(15)])
1703 }
1704 );
1705 }
1706}