1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::missing_errors_doc)]
4#![allow(clippy::missing_panics_doc)]
5#![allow(clippy::module_name_repetitions)]
6#![allow(clippy::must_use_candidate)]
7
8pub mod api;
9#[cfg(feature = "cli")]
10mod cli;
11
12mod backup;
13
14pub mod client_db;
15mod deposit;
18pub mod events;
19use events::SendPaymentEvent;
20mod pegin_monitor;
22mod withdraw;
23
24use std::collections::{BTreeMap, BTreeSet};
25use std::future;
26use std::sync::Arc;
27use std::time::{Duration, SystemTime};
28
29use anyhow::{Context as AnyhowContext, anyhow, bail, ensure};
30use async_stream::{stream, try_stream};
31use backup::WalletModuleBackup;
32use bitcoin::address::NetworkUnchecked;
33use bitcoin::secp256k1::{All, SECP256K1, Secp256k1};
34use bitcoin::{Address, Network, ScriptBuf};
35use client_db::{DbKeyPrefix, PegInTweakIndexKey, SupportsSafeDepositKey, TweakIdx};
36use fedimint_api_client::api::{DynModuleApi, FederationResult};
37use fedimint_bitcoind::{DynBitcoindRpc, create_esplora_rpc};
38use fedimint_client_module::module::init::{
39 ClientModuleInit, ClientModuleInitArgs, ClientModuleRecoverArgs,
40};
41use fedimint_client_module::module::{ClientContext, ClientModule, IClientModule, OutPointRange};
42use fedimint_client_module::oplog::UpdateStreamOrOutcome;
43use fedimint_client_module::sm::{Context, DynState, ModuleNotifier, State, StateTransition};
44use fedimint_client_module::transaction::{
45 ClientOutput, ClientOutputBundle, ClientOutputSM, TransactionBuilder,
46};
47use fedimint_client_module::{DynGlobalClientContext, sm_enum_variant_translation};
48use fedimint_core::core::{Decoder, IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId};
49use fedimint_core::db::{
50 AutocommitError, Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped,
51};
52use fedimint_core::encoding::{Decodable, Encodable};
53use fedimint_core::envs::{BitcoinRpcConfig, is_running_in_test_env};
54use fedimint_core::module::{
55 Amounts, ApiAuth, ApiVersion, CommonModuleInit, ModuleCommon, ModuleConsensusVersion,
56 ModuleInit, MultiApiVersion,
57};
58use fedimint_core::task::{MaybeSend, MaybeSync, TaskGroup, sleep};
59use fedimint_core::util::backoff_util::background_backoff;
60use fedimint_core::util::{BoxStream, backoff_util, retry};
61use fedimint_core::{
62 BitcoinHash, OutPoint, TransactionId, apply, async_trait_maybe_send, push_db_pair_items,
63 runtime, secp256k1,
64};
65use fedimint_derive_secret::{ChildId, DerivableSecret};
66use fedimint_logging::LOG_CLIENT_MODULE_WALLET;
67use fedimint_wallet_common::config::{FeeConsensus, WalletClientConfig};
68use fedimint_wallet_common::tweakable::Tweakable;
69pub use fedimint_wallet_common::*;
70use futures::{Stream, StreamExt};
71use rand::{Rng, thread_rng};
72use secp256k1::Keypair;
73use serde::{Deserialize, Serialize};
74use strum::IntoEnumIterator;
75use tokio::sync::watch;
76use tracing::{debug, instrument};
77
78use crate::api::WalletFederationApi;
79use crate::backup::WalletRecovery;
80use crate::client_db::{
81 ClaimedPegInData, ClaimedPegInKey, ClaimedPegInPrefix, NextPegInTweakIndexKey,
82 PegInTweakIndexData, PegInTweakIndexPrefix, RecoveryFinalizedKey, SupportsSafeDepositPrefix,
83};
84use crate::deposit::DepositStateMachine;
85use crate::withdraw::{CreatedWithdrawState, WithdrawStateMachine, WithdrawStates};
86
87const WALLET_TWEAK_CHILD_ID: ChildId = ChildId(0);
88
89#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
90pub struct BitcoinTransactionData {
91 pub btc_transaction: bitcoin::Transaction,
94 pub out_idx: u32,
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
99pub enum DepositStateV1 {
100 WaitingForTransaction,
101 WaitingForConfirmation(BitcoinTransactionData),
102 Confirmed(BitcoinTransactionData),
103 Claimed(BitcoinTransactionData),
104 Failed(String),
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
108pub enum DepositStateV2 {
109 WaitingForTransaction,
110 WaitingForConfirmation {
111 #[serde(with = "bitcoin::amount::serde::as_sat")]
112 btc_deposited: bitcoin::Amount,
113 btc_out_point: bitcoin::OutPoint,
114 },
115 Confirmed {
116 #[serde(with = "bitcoin::amount::serde::as_sat")]
117 btc_deposited: bitcoin::Amount,
118 btc_out_point: bitcoin::OutPoint,
119 },
120 Claimed {
121 #[serde(with = "bitcoin::amount::serde::as_sat")]
122 btc_deposited: bitcoin::Amount,
123 btc_out_point: bitcoin::OutPoint,
124 },
125 Failed(String),
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
129pub enum WithdrawState {
130 Created,
131 Succeeded(bitcoin::Txid),
132 Failed(String),
133 }
137
138async fn next_withdraw_state<S>(stream: &mut S) -> Option<WithdrawStates>
139where
140 S: Stream<Item = WalletClientStates> + Unpin,
141{
142 loop {
143 if let WalletClientStates::Withdraw(ds) = stream.next().await? {
144 return Some(ds.state);
145 }
146 tokio::task::yield_now().await;
147 }
148}
149
150#[derive(Debug, Clone, Default)]
151pub struct WalletClientInit(pub Option<DynBitcoindRpc>);
153
154impl WalletClientInit {
155 pub fn new(rpc: DynBitcoindRpc) -> Self {
156 Self(Some(rpc))
157 }
158}
159
160impl ModuleInit for WalletClientInit {
161 type Common = WalletCommonInit;
162
163 async fn dump_database(
164 &self,
165 dbtx: &mut DatabaseTransaction<'_>,
166 prefix_names: Vec<String>,
167 ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
168 let mut wallet_client_items: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> =
169 BTreeMap::new();
170 let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
171 prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
172 });
173
174 for table in filtered_prefixes {
175 match table {
176 DbKeyPrefix::NextPegInTweakIndex => {
177 if let Some(index) = dbtx.get_value(&NextPegInTweakIndexKey).await {
178 wallet_client_items
179 .insert("NextPegInTweakIndex".to_string(), Box::new(index));
180 }
181 }
182 DbKeyPrefix::PegInTweakIndex => {
183 push_db_pair_items!(
184 dbtx,
185 PegInTweakIndexPrefix,
186 PegInTweakIndexKey,
187 PegInTweakIndexData,
188 wallet_client_items,
189 "Peg-In Tweak Index"
190 );
191 }
192 DbKeyPrefix::ClaimedPegIn => {
193 push_db_pair_items!(
194 dbtx,
195 ClaimedPegInPrefix,
196 ClaimedPegInKey,
197 ClaimedPegInData,
198 wallet_client_items,
199 "Claimed Peg-In"
200 );
201 }
202 DbKeyPrefix::RecoveryFinalized => {
203 if let Some(val) = dbtx.get_value(&RecoveryFinalizedKey).await {
204 wallet_client_items.insert("RecoveryFinalized".to_string(), Box::new(val));
205 }
206 }
207 DbKeyPrefix::SupportsSafeDeposit => {
208 push_db_pair_items!(
209 dbtx,
210 SupportsSafeDepositPrefix,
211 SupportsSafeDepositKey,
212 (),
213 wallet_client_items,
214 "Supports Safe Deposit"
215 );
216 }
217 DbKeyPrefix::RecoveryState
218 | DbKeyPrefix::ExternalReservedStart
219 | DbKeyPrefix::CoreInternalReservedStart
220 | DbKeyPrefix::CoreInternalReservedEnd => {}
221 }
222 }
223
224 Box::new(wallet_client_items.into_iter())
225 }
226}
227
228#[apply(async_trait_maybe_send!)]
229impl ClientModuleInit for WalletClientInit {
230 type Module = WalletClientModule;
231
232 fn supported_api_versions(&self) -> MultiApiVersion {
233 MultiApiVersion::try_from_iter([ApiVersion { major: 0, minor: 0 }])
234 .expect("no version conflicts")
235 }
236
237 async fn init(&self, args: &ClientModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
238 let data = WalletClientModuleData {
239 cfg: args.cfg().clone(),
240 module_root_secret: args.module_root_secret().clone(),
241 };
242
243 let db = args.db().clone();
244
245 let btc_rpc = self.0.clone().unwrap_or(create_esplora_rpc(
246 &WalletClientModule::get_rpc_config(args.cfg()).url,
247 )?);
248
249 let module_api = args.module_api().clone();
250
251 let (pegin_claimed_sender, pegin_claimed_receiver) = watch::channel(());
252 let (pegin_monitor_wakeup_sender, pegin_monitor_wakeup_receiver) = watch::channel(());
253
254 Ok(WalletClientModule {
255 db,
256 data,
257 module_api,
258 notifier: args.notifier().clone(),
259 rpc: btc_rpc,
260 client_ctx: args.context(),
261 pegin_monitor_wakeup_sender,
262 pegin_monitor_wakeup_receiver,
263 pegin_claimed_receiver,
264 pegin_claimed_sender,
265 task_group: args.task_group().clone(),
266 admin_auth: args.admin_auth().cloned(),
267 })
268 }
269
270 async fn recover(
278 &self,
279 args: &ClientModuleRecoverArgs<Self>,
280 snapshot: Option<&<Self::Module as ClientModule>::Backup>,
281 ) -> anyhow::Result<()> {
282 args.recover_from_history::<WalletRecovery>(self, snapshot)
283 .await
284 }
285
286 fn used_db_prefixes(&self) -> Option<BTreeSet<u8>> {
287 Some(
288 DbKeyPrefix::iter()
289 .map(|p| p as u8)
290 .chain(
291 DbKeyPrefix::ExternalReservedStart as u8
292 ..=DbKeyPrefix::CoreInternalReservedEnd as u8,
293 )
294 .collect(),
295 )
296 }
297}
298
299#[derive(Debug, Clone, Serialize, Deserialize)]
300pub struct WalletOperationMeta {
301 pub variant: WalletOperationMetaVariant,
302 pub extra_meta: serde_json::Value,
303}
304
305#[derive(Debug, Clone, Serialize, Deserialize)]
306#[serde(rename_all = "snake_case")]
307pub enum WalletOperationMetaVariant {
308 Deposit {
309 address: Address<NetworkUnchecked>,
310 #[serde(default)]
315 tweak_idx: Option<TweakIdx>,
316 #[serde(default, skip_serializing_if = "Option::is_none")]
317 expires_at: Option<SystemTime>,
318 },
319 Withdraw {
320 address: Address<NetworkUnchecked>,
321 #[serde(with = "bitcoin::amount::serde::as_sat")]
322 amount: bitcoin::Amount,
323 fee: PegOutFees,
324 change: Vec<OutPoint>,
325 },
326
327 RbfWithdraw {
328 rbf: Rbf,
329 change: Vec<OutPoint>,
330 },
331}
332
333#[derive(Debug, Clone)]
335pub struct WalletClientModuleData {
336 cfg: WalletClientConfig,
337 module_root_secret: DerivableSecret,
338}
339
340impl WalletClientModuleData {
341 fn derive_deposit_address(
342 &self,
343 idx: TweakIdx,
344 ) -> (Keypair, secp256k1::PublicKey, Address, OperationId) {
345 let idx = ChildId(idx.0);
346
347 let secret_tweak_key = self
348 .module_root_secret
349 .child_key(WALLET_TWEAK_CHILD_ID)
350 .child_key(idx)
351 .to_secp_key(fedimint_core::secp256k1::SECP256K1);
352
353 let public_tweak_key = secret_tweak_key.public_key();
354
355 let address = self
356 .cfg
357 .peg_in_descriptor
358 .tweak(&public_tweak_key, bitcoin::secp256k1::SECP256K1)
359 .address(self.cfg.network.0)
360 .unwrap();
361
362 let operation_id = OperationId(public_tweak_key.x_only_public_key().0.serialize());
364
365 (secret_tweak_key, public_tweak_key, address, operation_id)
366 }
367
368 fn derive_peg_in_script(
369 &self,
370 idx: TweakIdx,
371 ) -> (ScriptBuf, bitcoin::Address, Keypair, OperationId) {
372 let (secret_tweak_key, _, address, operation_id) = self.derive_deposit_address(idx);
373
374 (
375 self.cfg
376 .peg_in_descriptor
377 .tweak(&secret_tweak_key.public_key(), SECP256K1)
378 .script_pubkey(),
379 address,
380 secret_tweak_key,
381 operation_id,
382 )
383 }
384}
385
386#[derive(Debug)]
387pub struct WalletClientModule {
388 data: WalletClientModuleData,
389 db: Database,
390 module_api: DynModuleApi,
391 notifier: ModuleNotifier<WalletClientStates>,
392 rpc: DynBitcoindRpc,
393 client_ctx: ClientContext<Self>,
394 pegin_monitor_wakeup_sender: watch::Sender<()>,
396 pegin_monitor_wakeup_receiver: watch::Receiver<()>,
397 pegin_claimed_sender: watch::Sender<()>,
399 pegin_claimed_receiver: watch::Receiver<()>,
400 task_group: TaskGroup,
401 admin_auth: Option<ApiAuth>,
402}
403
404#[apply(async_trait_maybe_send!)]
405impl ClientModule for WalletClientModule {
406 type Init = WalletClientInit;
407 type Common = WalletModuleTypes;
408 type Backup = WalletModuleBackup;
409 type ModuleStateMachineContext = WalletClientContext;
410 type States = WalletClientStates;
411
412 fn context(&self) -> Self::ModuleStateMachineContext {
413 WalletClientContext {
414 rpc: self.rpc.clone(),
415 wallet_descriptor: self.cfg().peg_in_descriptor.clone(),
416 wallet_decoder: self.decoder(),
417 secp: Secp256k1::default(),
418 client_ctx: self.client_ctx.clone(),
419 }
420 }
421
422 async fn start(&self) {
423 self.task_group.spawn_cancellable("peg-in monitor", {
424 let client_ctx = self.client_ctx.clone();
425 let db = self.db.clone();
426 let btc_rpc = self.rpc.clone();
427 let module_api = self.module_api.clone();
428 let data = self.data.clone();
429 let pegin_claimed_sender = self.pegin_claimed_sender.clone();
430 let pegin_monitor_wakeup_receiver = self.pegin_monitor_wakeup_receiver.clone();
431 pegin_monitor::run_peg_in_monitor(
432 client_ctx,
433 db,
434 btc_rpc,
435 module_api,
436 data,
437 pegin_claimed_sender,
438 pegin_monitor_wakeup_receiver,
439 )
440 });
441
442 self.task_group
443 .spawn_cancellable("supports-safe-deposit-version", {
444 let db = self.db.clone();
445 let module_api = self.module_api.clone();
446
447 poll_supports_safe_deposit_version(db, module_api)
448 });
449 }
450
451 fn supports_backup(&self) -> bool {
452 true
453 }
454
455 async fn backup(&self) -> anyhow::Result<backup::WalletModuleBackup> {
456 let session_count = self.client_ctx.global_api().session_count().await?;
458
459 let mut dbtx = self.db.begin_transaction_nc().await;
460 let next_pegin_tweak_idx = dbtx
461 .get_value(&NextPegInTweakIndexKey)
462 .await
463 .unwrap_or_default();
464 let claimed = dbtx
465 .find_by_prefix(&PegInTweakIndexPrefix)
466 .await
467 .filter_map(|(k, v)| async move {
468 if v.claimed.is_empty() {
469 None
470 } else {
471 Some(k.0)
472 }
473 })
474 .collect()
475 .await;
476 Ok(backup::WalletModuleBackup::new_v1(
477 session_count,
478 next_pegin_tweak_idx,
479 claimed,
480 ))
481 }
482
483 fn input_fee(
484 &self,
485 _amount: &Amounts,
486 _input: &<Self::Common as ModuleCommon>::Input,
487 ) -> Option<Amounts> {
488 Some(Amounts::new_bitcoin(self.cfg().fee_consensus.peg_in_abs))
489 }
490
491 fn output_fee(
492 &self,
493 _amount: &Amounts,
494 _output: &<Self::Common as ModuleCommon>::Output,
495 ) -> Option<Amounts> {
496 Some(Amounts::new_bitcoin(self.cfg().fee_consensus.peg_out_abs))
497 }
498
499 async fn handle_rpc(
500 &self,
501 method: String,
502 request: serde_json::Value,
503 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
504 Box::pin(try_stream! {
505 match method.as_str() {
506 "get_wallet_summary" => {
507 let _req: WalletSummaryRequest = serde_json::from_value(request)?;
508 let wallet_summary = self.get_wallet_summary()
509 .await
510 .expect("Failed to fetch wallet summary");
511 let result = serde_json::to_value(&wallet_summary)
512 .expect("Serialization error");
513 yield result;
514 }
515 "get_block_count_local" => {
516 let block_count = self.get_block_count_local().await
517 .expect("Failed to fetch block count");
518 yield serde_json::to_value(block_count)?;
519 }
520 "peg_in" => {
521 let req: PegInRequest = serde_json::from_value(request)?;
522 let response = self.peg_in(req)
523 .await
524 .map_err(|e| anyhow::anyhow!("peg_in failed: {}", e))?;
525 let result = serde_json::to_value(&response)?;
526 yield result;
527 },
528 "peg_out" => {
529 let req: PegOutRequest = serde_json::from_value(request)?;
530 let response = self.peg_out(req)
531 .await
532 .map_err(|e| anyhow::anyhow!("peg_out failed: {}", e))?;
533 let result = serde_json::to_value(&response)?;
534 yield result;
535 },
536 "subscribe_deposit" => {
537 let req: SubscribeDepositRequest = serde_json::from_value(request)?;
538 for await state in self.subscribe_deposit(req.operation_id).await?.into_stream() {
539 yield serde_json::to_value(state)?;
540 }
541 },
542 "subscribe_withdraw" => {
543 let req: SubscribeWithdrawRequest = serde_json::from_value(request)?;
544 for await state in self.subscribe_withdraw_updates(req.operation_id).await?.into_stream(){
545 yield serde_json::to_value(state)?;
546 }
547 }
548 _ => {
549 Err(anyhow::format_err!("Unknown method: {}", method))?;
550 }
551 }
552 })
553 }
554
555 #[cfg(feature = "cli")]
556 async fn handle_cli_command(
557 &self,
558 args: &[std::ffi::OsString],
559 ) -> anyhow::Result<serde_json::Value> {
560 cli::handle_cli_command(self, args).await
561 }
562}
563
564#[derive(Deserialize)]
565struct WalletSummaryRequest {}
566
567#[derive(Debug, Clone)]
568pub struct WalletClientContext {
569 rpc: DynBitcoindRpc,
570 wallet_descriptor: PegInDescriptor,
571 wallet_decoder: Decoder,
572 secp: Secp256k1<All>,
573 pub client_ctx: ClientContext<WalletClientModule>,
574}
575
576#[derive(Debug, Clone, Serialize, Deserialize)]
577pub struct PegInRequest {
578 pub extra_meta: serde_json::Value,
579}
580
581#[derive(Deserialize)]
582struct SubscribeDepositRequest {
583 operation_id: OperationId,
584}
585
586#[derive(Deserialize)]
587struct SubscribeWithdrawRequest {
588 operation_id: OperationId,
589}
590
591#[derive(Debug, Clone, Serialize, Deserialize)]
592pub struct PegInResponse {
593 pub deposit_address: Address<NetworkUnchecked>,
594 pub operation_id: OperationId,
595}
596
597#[derive(Debug, Clone, Serialize, Deserialize)]
598pub struct PegOutRequest {
599 pub amount_sat: u64,
600 pub destination_address: Address<NetworkUnchecked>,
601 pub extra_meta: serde_json::Value,
602}
603
604#[derive(Debug, Clone, Serialize, Deserialize)]
605pub struct PegOutResponse {
606 pub operation_id: OperationId,
607}
608
609impl Context for WalletClientContext {
610 const KIND: Option<ModuleKind> = Some(KIND);
611}
612
613impl WalletClientModule {
614 fn cfg(&self) -> &WalletClientConfig {
615 &self.data.cfg
616 }
617
618 fn get_rpc_config(cfg: &WalletClientConfig) -> BitcoinRpcConfig {
619 match BitcoinRpcConfig::get_defaults_from_env_vars() {
620 Ok(rpc_config) => {
621 if rpc_config.kind == "bitcoind" {
624 cfg.default_bitcoin_rpc.clone()
625 } else {
626 rpc_config
627 }
628 }
629 _ => cfg.default_bitcoin_rpc.clone(),
630 }
631 }
632
633 pub fn get_network(&self) -> Network {
634 self.cfg().network.0
635 }
636
637 pub fn get_finality_delay(&self) -> u32 {
638 self.cfg().finality_delay
639 }
640
641 pub fn get_fee_consensus(&self) -> FeeConsensus {
642 self.cfg().fee_consensus
643 }
644
645 async fn allocate_deposit_address_inner(
646 &self,
647 dbtx: &mut DatabaseTransaction<'_>,
648 ) -> (OperationId, Address, TweakIdx) {
649 dbtx.ensure_isolated().expect("Must be isolated db");
650
651 let tweak_idx = get_next_peg_in_tweak_child_id(dbtx).await;
652 let (_secret_tweak_key, _, address, operation_id) =
653 self.data.derive_deposit_address(tweak_idx);
654
655 let now = fedimint_core::time::now();
656
657 dbtx.insert_new_entry(
658 &PegInTweakIndexKey(tweak_idx),
659 &PegInTweakIndexData {
660 creation_time: now,
661 next_check_time: Some(now),
662 last_check_time: None,
663 operation_id,
664 claimed: vec![],
665 },
666 )
667 .await;
668
669 (operation_id, address, tweak_idx)
670 }
671
672 pub async fn get_withdraw_fees(
679 &self,
680 address: &bitcoin::Address,
681 amount: bitcoin::Amount,
682 ) -> anyhow::Result<PegOutFees> {
683 self.module_api
684 .fetch_peg_out_fees(address, amount)
685 .await?
686 .context("Federation didn't return peg-out fees")
687 }
688
689 pub async fn get_wallet_summary(&self) -> anyhow::Result<WalletSummary> {
691 Ok(self.module_api.fetch_wallet_summary().await?)
692 }
693
694 pub async fn get_block_count_local(&self) -> anyhow::Result<u32> {
695 Ok(self.module_api.fetch_block_count_local().await?)
696 }
697
698 pub fn create_withdraw_output(
699 &self,
700 operation_id: OperationId,
701 address: bitcoin::Address,
702 amount: bitcoin::Amount,
703 fees: PegOutFees,
704 ) -> anyhow::Result<ClientOutputBundle<WalletOutput, WalletClientStates>> {
705 let output = WalletOutput::new_v0_peg_out(address, amount, fees);
706
707 let amount = output.maybe_v0_ref().expect("v0 output").amount().into();
708
709 let sm_gen = move |out_point_range: OutPointRange| {
710 assert_eq!(out_point_range.count(), 1);
711 let out_idx = out_point_range.start_idx();
712 vec![WalletClientStates::Withdraw(WithdrawStateMachine {
713 operation_id,
714 state: WithdrawStates::Created(CreatedWithdrawState {
715 fm_outpoint: OutPoint {
716 txid: out_point_range.txid(),
717 out_idx,
718 },
719 }),
720 })]
721 };
722
723 Ok(ClientOutputBundle::new(
724 vec![ClientOutput::<WalletOutput> {
725 output,
726 amounts: Amounts::new_bitcoin(amount),
727 }],
728 vec![ClientOutputSM::<WalletClientStates> {
729 state_machines: Arc::new(sm_gen),
730 }],
731 ))
732 }
733
734 pub async fn peg_in(&self, req: PegInRequest) -> anyhow::Result<PegInResponse> {
735 let (operation_id, address, _) = self.safe_allocate_deposit_address(req.extra_meta).await?;
736
737 Ok(PegInResponse {
738 deposit_address: Address::from_script(&address.script_pubkey(), self.get_network())?
739 .as_unchecked()
740 .clone(),
741 operation_id,
742 })
743 }
744
745 pub async fn peg_out(&self, req: PegOutRequest) -> anyhow::Result<PegOutResponse> {
746 let amount = bitcoin::Amount::from_sat(req.amount_sat);
747 let destination = req
748 .destination_address
749 .require_network(self.get_network())?;
750
751 let fees = self.get_withdraw_fees(&destination, amount).await?;
752 let operation_id = self
753 .withdraw(&destination, amount, fees, req.extra_meta)
754 .await
755 .context("Failed to initiate withdraw")?;
756
757 Ok(PegOutResponse { operation_id })
758 }
759
760 pub fn create_rbf_withdraw_output(
761 &self,
762 operation_id: OperationId,
763 rbf: &Rbf,
764 ) -> anyhow::Result<ClientOutputBundle<WalletOutput, WalletClientStates>> {
765 let output = WalletOutput::new_v0_rbf(rbf.fees, rbf.txid);
766
767 let amount = output.maybe_v0_ref().expect("v0 output").amount().into();
768
769 let sm_gen = move |out_point_range: OutPointRange| {
770 assert_eq!(out_point_range.count(), 1);
771 let out_idx = out_point_range.start_idx();
772 vec![WalletClientStates::Withdraw(WithdrawStateMachine {
773 operation_id,
774 state: WithdrawStates::Created(CreatedWithdrawState {
775 fm_outpoint: OutPoint {
776 txid: out_point_range.txid(),
777 out_idx,
778 },
779 }),
780 })]
781 };
782
783 Ok(ClientOutputBundle::new(
784 vec![ClientOutput::<WalletOutput> {
785 output,
786 amounts: Amounts::new_bitcoin(amount),
787 }],
788 vec![ClientOutputSM::<WalletClientStates> {
789 state_machines: Arc::new(sm_gen),
790 }],
791 ))
792 }
793
794 pub async fn btc_tx_has_no_size_limit(&self) -> FederationResult<bool> {
795 Ok(self.module_api.module_consensus_version().await? >= ModuleConsensusVersion::new(2, 2))
796 }
797
798 pub async fn supports_safe_deposit(&self) -> bool {
807 let mut dbtx = self.db.begin_transaction().await;
808
809 let already_verified_supports_safe_deposit =
810 dbtx.get_value(&SupportsSafeDepositKey).await.is_some();
811
812 already_verified_supports_safe_deposit || {
813 match self.module_api.module_consensus_version().await {
814 Ok(module_consensus_version) => {
815 let supported_version =
816 SAFE_DEPOSIT_MODULE_CONSENSUS_VERSION <= module_consensus_version;
817
818 if supported_version {
819 dbtx.insert_new_entry(&SupportsSafeDepositKey, &()).await;
820 dbtx.commit_tx().await;
821 }
822
823 supported_version
824 }
825 Err(_) => false,
826 }
827 }
828 }
829
830 pub async fn safe_allocate_deposit_address<M>(
838 &self,
839 extra_meta: M,
840 ) -> anyhow::Result<(OperationId, Address, TweakIdx)>
841 where
842 M: Serialize + MaybeSend + MaybeSync,
843 {
844 ensure!(
845 self.supports_safe_deposit().await,
846 "Wallet module consensus version doesn't support safe deposits",
847 );
848
849 self.allocate_deposit_address_expert_only(extra_meta).await
850 }
851
852 pub async fn allocate_deposit_address_expert_only<M>(
870 &self,
871 extra_meta: M,
872 ) -> anyhow::Result<(OperationId, Address, TweakIdx)>
873 where
874 M: Serialize + MaybeSend + MaybeSync,
875 {
876 let extra_meta_value =
877 serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
878 let (operation_id, address, tweak_idx) = self
879 .db
880 .autocommit(
881 move |dbtx, _| {
882 let extra_meta_value_inner = extra_meta_value.clone();
883 Box::pin(async move {
884 let (operation_id, address, tweak_idx) = self
885 .allocate_deposit_address_inner(dbtx)
886 .await;
887
888 self.client_ctx.manual_operation_start_dbtx(
889 dbtx,
890 operation_id,
891 WalletCommonInit::KIND.as_str(),
892 WalletOperationMeta {
893 variant: WalletOperationMetaVariant::Deposit {
894 address: address.clone().into_unchecked(),
895 tweak_idx: Some(tweak_idx),
896 expires_at: None,
897 },
898 extra_meta: extra_meta_value_inner,
899 },
900 vec![]
901 ).await?;
902
903 debug!(target: LOG_CLIENT_MODULE_WALLET, %tweak_idx, %address, "Derived a new deposit address");
904
905 self.rpc.watch_script_history(&address.script_pubkey()).await?;
907
908 let sender = self.pegin_monitor_wakeup_sender.clone();
909 dbtx.on_commit(move || {
910 sender.send_replace(());
911 });
912
913 Ok((operation_id, address, tweak_idx))
914 })
915 },
916 Some(100),
917 )
918 .await
919 .map_err(|e| match e {
920 AutocommitError::CommitFailed {
921 last_error,
922 attempts,
923 } => last_error.context(format!("Failed to commit after {attempts} attempts")),
924 AutocommitError::ClosureError { error, .. } => error,
925 })?;
926
927 Ok((operation_id, address, tweak_idx))
928 }
929
930 pub async fn subscribe_deposit(
936 &self,
937 operation_id: OperationId,
938 ) -> anyhow::Result<UpdateStreamOrOutcome<DepositStateV2>> {
939 let operation = self
940 .client_ctx
941 .get_operation(operation_id)
942 .await
943 .with_context(|| anyhow!("Operation not found: {}", operation_id.fmt_short()))?;
944
945 if operation.operation_module_kind() != WalletCommonInit::KIND.as_str() {
946 bail!("Operation is not a wallet operation");
947 }
948
949 let operation_meta = operation.meta::<WalletOperationMeta>();
950
951 let WalletOperationMetaVariant::Deposit {
952 address, tweak_idx, ..
953 } = operation_meta.variant
954 else {
955 bail!("Operation is not a deposit operation");
956 };
957
958 let address = address.require_network(self.cfg().network.0)?;
959
960 let Some(tweak_idx) = tweak_idx else {
962 let outcome_v1 = operation
966 .outcome::<DepositStateV1>()
967 .context("Old pending deposit, can't subscribe to updates")?;
968
969 let outcome_v2 = match outcome_v1 {
970 DepositStateV1::Claimed(tx_info) => DepositStateV2::Claimed {
971 btc_deposited: tx_info.btc_transaction.output[tx_info.out_idx as usize].value,
972 btc_out_point: bitcoin::OutPoint {
973 txid: tx_info.btc_transaction.compute_txid(),
974 vout: tx_info.out_idx,
975 },
976 },
977 DepositStateV1::Failed(error) => DepositStateV2::Failed(error),
978 _ => bail!("Non-final outcome in operation log"),
979 };
980
981 return Ok(UpdateStreamOrOutcome::Outcome(outcome_v2));
982 };
983
984 Ok(self.client_ctx.outcome_or_updates(operation, operation_id, {
985 let stream_rpc = self.rpc.clone();
986 let stream_client_ctx = self.client_ctx.clone();
987 let stream_script_pub_key = address.script_pubkey();
988 move || {
989
990 stream! {
991 yield DepositStateV2::WaitingForTransaction;
992
993 retry(
994 "subscribe script history",
995 background_backoff(),
996 || stream_rpc.watch_script_history(&stream_script_pub_key)
997 ).await.expect("Will never give up");
998 let (btc_out_point, btc_deposited) = retry(
999 "fetch history",
1000 background_backoff(),
1001 || async {
1002 let history = stream_rpc.get_script_history(&stream_script_pub_key).await?;
1003 history.first().and_then(|tx| {
1004 let (out_idx, amount) = tx.output
1005 .iter()
1006 .enumerate()
1007 .find_map(|(idx, output)| (output.script_pubkey == stream_script_pub_key).then_some((idx, output.value)))?;
1008 let txid = tx.compute_txid();
1009
1010 Some((
1011 bitcoin::OutPoint {
1012 txid,
1013 vout: out_idx as u32,
1014 },
1015 amount
1016 ))
1017 }).context("No deposit transaction found")
1018 }
1019 ).await.expect("Will never give up");
1020
1021 yield DepositStateV2::WaitingForConfirmation {
1022 btc_deposited,
1023 btc_out_point
1024 };
1025
1026 let claim_data = stream_client_ctx.module_db().wait_key_exists(&ClaimedPegInKey {
1027 peg_in_index: tweak_idx,
1028 btc_out_point,
1029 }).await;
1030
1031 yield DepositStateV2::Confirmed {
1032 btc_deposited,
1033 btc_out_point
1034 };
1035
1036 match stream_client_ctx.await_primary_module_outputs(operation_id, claim_data.change).await {
1037 Ok(()) => yield DepositStateV2::Claimed {
1038 btc_deposited,
1039 btc_out_point
1040 },
1041 Err(e) => yield DepositStateV2::Failed(e.to_string())
1042 }
1043 }
1044 }}))
1045 }
1046
1047 pub async fn list_peg_in_tweak_idxes(&self) -> BTreeMap<TweakIdx, PegInTweakIndexData> {
1048 self.client_ctx
1049 .module_db()
1050 .clone()
1051 .begin_transaction_nc()
1052 .await
1053 .find_by_prefix(&PegInTweakIndexPrefix)
1054 .await
1055 .map(|(key, data)| (key.0, data))
1056 .collect()
1057 .await
1058 }
1059
1060 pub async fn find_tweak_idx_by_address(
1061 &self,
1062 address: bitcoin::Address<NetworkUnchecked>,
1063 ) -> anyhow::Result<TweakIdx> {
1064 let data = self.data.clone();
1065 let Some((tweak_idx, _)) = self
1066 .db
1067 .begin_transaction_nc()
1068 .await
1069 .find_by_prefix(&PegInTweakIndexPrefix)
1070 .await
1071 .filter(|(k, _)| {
1072 let (_, derived_address, _tweak_key, _) = data.derive_peg_in_script(k.0);
1073 future::ready(derived_address.into_unchecked() == address)
1074 })
1075 .next()
1076 .await
1077 else {
1078 bail!("Address not found in the list of derived keys");
1079 };
1080
1081 Ok(tweak_idx.0)
1082 }
1083 pub async fn find_tweak_idx_by_operation_id(
1084 &self,
1085 operation_id: OperationId,
1086 ) -> anyhow::Result<TweakIdx> {
1087 Ok(self
1088 .client_ctx
1089 .module_db()
1090 .clone()
1091 .begin_transaction_nc()
1092 .await
1093 .find_by_prefix(&PegInTweakIndexPrefix)
1094 .await
1095 .filter(|(_k, v)| future::ready(v.operation_id == operation_id))
1096 .next()
1097 .await
1098 .ok_or_else(|| anyhow::format_err!("OperationId not found"))?
1099 .0
1100 .0)
1101 }
1102
1103 pub async fn get_pegin_tweak_idx(
1104 &self,
1105 tweak_idx: TweakIdx,
1106 ) -> anyhow::Result<PegInTweakIndexData> {
1107 self.client_ctx
1108 .module_db()
1109 .clone()
1110 .begin_transaction_nc()
1111 .await
1112 .get_value(&PegInTweakIndexKey(tweak_idx))
1113 .await
1114 .ok_or_else(|| anyhow::format_err!("TweakIdx not found"))
1115 }
1116
1117 pub async fn get_claimed_pegins(
1118 &self,
1119 dbtx: &mut DatabaseTransaction<'_>,
1120 tweak_idx: TweakIdx,
1121 ) -> Vec<(
1122 bitcoin::OutPoint,
1123 TransactionId,
1124 Vec<fedimint_core::OutPoint>,
1125 )> {
1126 let outpoints = dbtx
1127 .get_value(&PegInTweakIndexKey(tweak_idx))
1128 .await
1129 .map(|v| v.claimed)
1130 .unwrap_or_default();
1131
1132 let mut res = vec![];
1133
1134 for outpoint in outpoints {
1135 let claimed_peg_in_data = dbtx
1136 .get_value(&ClaimedPegInKey {
1137 peg_in_index: tweak_idx,
1138 btc_out_point: outpoint,
1139 })
1140 .await
1141 .expect("Must have a corresponding claim record");
1142 res.push((
1143 outpoint,
1144 claimed_peg_in_data.claim_txid,
1145 claimed_peg_in_data.change,
1146 ));
1147 }
1148
1149 res
1150 }
1151
1152 pub async fn recheck_pegin_address_by_op_id(
1154 &self,
1155 operation_id: OperationId,
1156 ) -> anyhow::Result<()> {
1157 let tweak_idx = self.find_tweak_idx_by_operation_id(operation_id).await?;
1158
1159 self.recheck_pegin_address(tweak_idx).await
1160 }
1161
1162 pub async fn recheck_pegin_address_by_address(
1164 &self,
1165 address: bitcoin::Address<NetworkUnchecked>,
1166 ) -> anyhow::Result<()> {
1167 self.recheck_pegin_address(self.find_tweak_idx_by_address(address).await?)
1168 .await
1169 }
1170
1171 pub async fn recheck_pegin_address(&self, tweak_idx: TweakIdx) -> anyhow::Result<()> {
1173 self.db
1174 .autocommit(
1175 |dbtx, _| {
1176 Box::pin(async {
1177 let db_key = PegInTweakIndexKey(tweak_idx);
1178 let db_val = dbtx
1179 .get_value(&db_key)
1180 .await
1181 .ok_or_else(|| anyhow::format_err!("DBKey not found"))?;
1182
1183 dbtx.insert_entry(
1184 &db_key,
1185 &PegInTweakIndexData {
1186 next_check_time: Some(fedimint_core::time::now()),
1187 ..db_val
1188 },
1189 )
1190 .await;
1191
1192 let sender = self.pegin_monitor_wakeup_sender.clone();
1193 dbtx.on_commit(move || {
1194 sender.send_replace(());
1195 });
1196
1197 Ok::<_, anyhow::Error>(())
1198 })
1199 },
1200 Some(100),
1201 )
1202 .await?;
1203
1204 Ok(())
1205 }
1206
1207 pub async fn await_num_deposits_by_operation_id(
1209 &self,
1210 operation_id: OperationId,
1211 num_deposits: usize,
1212 ) -> anyhow::Result<()> {
1213 let tweak_idx = self.find_tweak_idx_by_operation_id(operation_id).await?;
1214 self.await_num_deposits(tweak_idx, num_deposits).await
1215 }
1216
1217 pub async fn await_num_deposits_by_address(
1218 &self,
1219 address: bitcoin::Address<NetworkUnchecked>,
1220 num_deposits: usize,
1221 ) -> anyhow::Result<()> {
1222 self.await_num_deposits(self.find_tweak_idx_by_address(address).await?, num_deposits)
1223 .await
1224 }
1225
1226 #[instrument(target = LOG_CLIENT_MODULE_WALLET, skip_all, fields(tweak_idx=?tweak_idx, num_deposists=num_deposits))]
1227 pub async fn await_num_deposits(
1228 &self,
1229 tweak_idx: TweakIdx,
1230 num_deposits: usize,
1231 ) -> anyhow::Result<()> {
1232 let operation_id = self.get_pegin_tweak_idx(tweak_idx).await?.operation_id;
1233
1234 let mut receiver = self.pegin_claimed_receiver.clone();
1235 let mut backoff = backoff_util::aggressive_backoff();
1236
1237 loop {
1238 let pegins = self
1239 .get_claimed_pegins(
1240 &mut self.client_ctx.module_db().begin_transaction_nc().await,
1241 tweak_idx,
1242 )
1243 .await;
1244
1245 if pegins.len() < num_deposits {
1246 debug!(target: LOG_CLIENT_MODULE_WALLET, has=pegins.len(), "Not enough deposits");
1247 self.recheck_pegin_address(tweak_idx).await?;
1248 runtime::sleep(backoff.next().unwrap_or_default()).await;
1249 receiver.changed().await?;
1250 continue;
1251 }
1252
1253 debug!(target: LOG_CLIENT_MODULE_WALLET, has=pegins.len(), "Enough deposits detected");
1254
1255 for (_outpoint, transaction_id, change) in pegins {
1256 if transaction_id == TransactionId::from_byte_array([0; 32]) && change.is_empty() {
1257 debug!(target: LOG_CLIENT_MODULE_WALLET, "Deposited amount was too low, skipping");
1258 continue;
1259 }
1260
1261 debug!(target: LOG_CLIENT_MODULE_WALLET, out_points=?change, "Ensuring deposists claimed");
1262 let tx_subscriber = self.client_ctx.transaction_updates(operation_id).await;
1263
1264 if let Err(e) = tx_subscriber.await_tx_accepted(transaction_id).await {
1265 bail!("{}", e);
1266 }
1267
1268 debug!(target: LOG_CLIENT_MODULE_WALLET, out_points=?change, "Ensuring outputs claimed");
1269 self.client_ctx
1270 .await_primary_module_outputs(operation_id, change)
1271 .await
1272 .expect("Cannot fail if tx was accepted and federation is honest");
1273 }
1274
1275 return Ok(());
1276 }
1277 }
1278
1279 pub async fn withdraw<M: Serialize + MaybeSend + MaybeSync>(
1284 &self,
1285 address: &bitcoin::Address,
1286 amount: bitcoin::Amount,
1287 fee: PegOutFees,
1288 extra_meta: M,
1289 ) -> anyhow::Result<OperationId> {
1290 {
1291 let operation_id = OperationId(thread_rng().r#gen());
1292
1293 let withdraw_output =
1294 self.create_withdraw_output(operation_id, address.clone(), amount, fee)?;
1295 let tx_builder = TransactionBuilder::new()
1296 .with_outputs(self.client_ctx.make_client_outputs(withdraw_output));
1297
1298 let extra_meta =
1299 serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
1300 self.client_ctx
1301 .finalize_and_submit_transaction(
1302 operation_id,
1303 WalletCommonInit::KIND.as_str(),
1304 {
1305 let address = address.clone();
1306 move |change_range: OutPointRange| WalletOperationMeta {
1307 variant: WalletOperationMetaVariant::Withdraw {
1308 address: address.clone().into_unchecked(),
1309 amount,
1310 fee,
1311 change: change_range.into_iter().collect(),
1312 },
1313 extra_meta: extra_meta.clone(),
1314 }
1315 },
1316 tx_builder,
1317 )
1318 .await?;
1319
1320 let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
1321
1322 self.client_ctx
1323 .log_event(
1324 &mut dbtx,
1325 SendPaymentEvent {
1326 operation_id,
1327 amount: amount + fee.amount(),
1328 fee: fee.amount(),
1329 },
1330 )
1331 .await;
1332
1333 dbtx.commit_tx().await;
1334
1335 Ok(operation_id)
1336 }
1337 }
1338
1339 #[deprecated(
1344 since = "0.4.0",
1345 note = "RBF withdrawals are rejected by the federation"
1346 )]
1347 pub async fn rbf_withdraw<M: Serialize + MaybeSync + MaybeSend>(
1348 &self,
1349 rbf: Rbf,
1350 extra_meta: M,
1351 ) -> anyhow::Result<OperationId> {
1352 let operation_id = OperationId(thread_rng().r#gen());
1353
1354 let withdraw_output = self.create_rbf_withdraw_output(operation_id, &rbf)?;
1355 let tx_builder = TransactionBuilder::new()
1356 .with_outputs(self.client_ctx.make_client_outputs(withdraw_output));
1357
1358 let extra_meta = serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
1359 self.client_ctx
1360 .finalize_and_submit_transaction(
1361 operation_id,
1362 WalletCommonInit::KIND.as_str(),
1363 move |change_range: OutPointRange| WalletOperationMeta {
1364 variant: WalletOperationMetaVariant::RbfWithdraw {
1365 rbf: rbf.clone(),
1366 change: change_range.into_iter().collect(),
1367 },
1368 extra_meta: extra_meta.clone(),
1369 },
1370 tx_builder,
1371 )
1372 .await?;
1373
1374 Ok(operation_id)
1375 }
1376
1377 pub async fn subscribe_withdraw_updates(
1378 &self,
1379 operation_id: OperationId,
1380 ) -> anyhow::Result<UpdateStreamOrOutcome<WithdrawState>> {
1381 let operation = self
1382 .client_ctx
1383 .get_operation(operation_id)
1384 .await
1385 .with_context(|| anyhow!("Operation not found: {}", operation_id.fmt_short()))?;
1386
1387 if operation.operation_module_kind() != WalletCommonInit::KIND.as_str() {
1388 bail!("Operation is not a wallet operation");
1389 }
1390
1391 let operation_meta = operation.meta::<WalletOperationMeta>();
1392
1393 let (WalletOperationMetaVariant::Withdraw { change, .. }
1394 | WalletOperationMetaVariant::RbfWithdraw { change, .. }) = operation_meta.variant
1395 else {
1396 bail!("Operation is not a withdraw operation");
1397 };
1398
1399 let mut operation_stream = self.notifier.subscribe(operation_id).await;
1400 let client_ctx = self.client_ctx.clone();
1401
1402 Ok(self
1403 .client_ctx
1404 .outcome_or_updates(operation, operation_id, move || {
1405 stream! {
1406 match next_withdraw_state(&mut operation_stream).await {
1407 Some(WithdrawStates::Created(_)) => {
1408 yield WithdrawState::Created;
1409 },
1410 Some(s) => {
1411 panic!("Unexpected state {s:?}")
1412 },
1413 None => return,
1414 }
1415
1416 let _ = client_ctx
1421 .await_primary_module_outputs(operation_id, change)
1422 .await;
1423
1424
1425 match next_withdraw_state(&mut operation_stream).await {
1426 Some(WithdrawStates::Aborted(inner)) => {
1427 yield WithdrawState::Failed(inner.error);
1428 },
1429 Some(WithdrawStates::Success(inner)) => {
1430 yield WithdrawState::Succeeded(inner.txid);
1431 },
1432 Some(s) => {
1433 panic!("Unexpected state {s:?}")
1434 },
1435 None => {},
1436 }
1437 }
1438 }))
1439 }
1440
1441 fn admin_auth(&self) -> anyhow::Result<ApiAuth> {
1442 self.admin_auth
1443 .clone()
1444 .ok_or_else(|| anyhow::format_err!("Admin auth not set"))
1445 }
1446
1447 pub async fn activate_consensus_version_voting(&self) -> anyhow::Result<()> {
1448 self.module_api
1449 .activate_consensus_version_voting(self.admin_auth()?)
1450 .await?;
1451
1452 Ok(())
1453 }
1454}
1455
1456async fn poll_supports_safe_deposit_version(db: Database, module_api: DynModuleApi) {
1459 loop {
1460 let mut dbtx = db.begin_transaction().await;
1461
1462 if dbtx.get_value(&SupportsSafeDepositKey).await.is_some() {
1463 break;
1464 }
1465
1466 if let Ok(module_consensus_version) = module_api.module_consensus_version().await
1467 && SAFE_DEPOSIT_MODULE_CONSENSUS_VERSION <= module_consensus_version
1468 {
1469 dbtx.insert_new_entry(&SupportsSafeDepositKey, &()).await;
1470 dbtx.commit_tx().await;
1471 break;
1472 }
1473
1474 drop(dbtx);
1475
1476 if is_running_in_test_env() {
1477 sleep(Duration::from_secs(10)).await;
1479 } else {
1480 sleep(Duration::from_secs(3600)).await;
1481 }
1482 }
1483}
1484
1485async fn get_next_peg_in_tweak_child_id(dbtx: &mut DatabaseTransaction<'_>) -> TweakIdx {
1487 let index = dbtx
1488 .get_value(&NextPegInTweakIndexKey)
1489 .await
1490 .unwrap_or_default();
1491 dbtx.insert_entry(&NextPegInTweakIndexKey, &(index.next()))
1492 .await;
1493 index
1494}
1495
1496#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
1497pub enum WalletClientStates {
1498 Deposit(DepositStateMachine),
1499 Withdraw(WithdrawStateMachine),
1500}
1501
1502impl IntoDynInstance for WalletClientStates {
1503 type DynType = DynState;
1504
1505 fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
1506 DynState::from_typed(instance_id, self)
1507 }
1508}
1509
1510impl State for WalletClientStates {
1511 type ModuleContext = WalletClientContext;
1512
1513 fn transitions(
1514 &self,
1515 context: &Self::ModuleContext,
1516 global_context: &DynGlobalClientContext,
1517 ) -> Vec<StateTransition<Self>> {
1518 match self {
1519 WalletClientStates::Deposit(sm) => {
1520 sm_enum_variant_translation!(
1521 sm.transitions(context, global_context),
1522 WalletClientStates::Deposit
1523 )
1524 }
1525 WalletClientStates::Withdraw(sm) => {
1526 sm_enum_variant_translation!(
1527 sm.transitions(context, global_context),
1528 WalletClientStates::Withdraw
1529 )
1530 }
1531 }
1532 }
1533
1534 fn operation_id(&self) -> OperationId {
1535 match self {
1536 WalletClientStates::Deposit(sm) => sm.operation_id(),
1537 WalletClientStates::Withdraw(sm) => sm.operation_id(),
1538 }
1539 }
1540}
1541
1542#[cfg(all(test, not(target_family = "wasm")))]
1543mod tests {
1544 use std::collections::BTreeSet;
1545 use std::sync::atomic::{AtomicBool, Ordering};
1546
1547 use super::*;
1548 use crate::backup::{
1549 RECOVER_NUM_IDX_ADD_TO_LAST_USED, RecoverScanOutcome, recover_scan_idxes_for_activity,
1550 };
1551
1552 #[allow(clippy::too_many_lines)] #[tokio::test(flavor = "multi_thread")]
1554 async fn sanity_test_recover_inner() {
1555 {
1556 let last_checked = AtomicBool::new(false);
1557 let last_checked = &last_checked;
1558 assert_eq!(
1559 recover_scan_idxes_for_activity(
1560 TweakIdx(0),
1561 &BTreeSet::new(),
1562 |cur_idx| async move {
1563 Ok(match cur_idx {
1564 TweakIdx(9) => {
1565 last_checked.store(true, Ordering::SeqCst);
1566 vec![]
1567 }
1568 TweakIdx(10) => panic!("Shouldn't happen"),
1569 TweakIdx(11) => {
1570 vec![0usize] }
1572 _ => vec![],
1573 })
1574 }
1575 )
1576 .await
1577 .unwrap(),
1578 RecoverScanOutcome {
1579 last_used_idx: None,
1580 new_start_idx: TweakIdx(RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1581 tweak_idxes_with_pegins: BTreeSet::from([])
1582 }
1583 );
1584 assert!(last_checked.load(Ordering::SeqCst));
1585 }
1586
1587 {
1588 let last_checked = AtomicBool::new(false);
1589 let last_checked = &last_checked;
1590 assert_eq!(
1591 recover_scan_idxes_for_activity(
1592 TweakIdx(0),
1593 &BTreeSet::from([TweakIdx(1), TweakIdx(2)]),
1594 |cur_idx| async move {
1595 Ok(match cur_idx {
1596 TweakIdx(1) => panic!("Shouldn't happen: already used (1)"),
1597 TweakIdx(2) => panic!("Shouldn't happen: already used (2)"),
1598 TweakIdx(11) => {
1599 last_checked.store(true, Ordering::SeqCst);
1600 vec![]
1601 }
1602 TweakIdx(12) => panic!("Shouldn't happen"),
1603 TweakIdx(13) => {
1604 vec![0usize] }
1606 _ => vec![],
1607 })
1608 }
1609 )
1610 .await
1611 .unwrap(),
1612 RecoverScanOutcome {
1613 last_used_idx: Some(TweakIdx(2)),
1614 new_start_idx: TweakIdx(2 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1615 tweak_idxes_with_pegins: BTreeSet::from([])
1616 }
1617 );
1618 assert!(last_checked.load(Ordering::SeqCst));
1619 }
1620
1621 {
1622 let last_checked = AtomicBool::new(false);
1623 let last_checked = &last_checked;
1624 assert_eq!(
1625 recover_scan_idxes_for_activity(
1626 TweakIdx(10),
1627 &BTreeSet::new(),
1628 |cur_idx| async move {
1629 Ok(match cur_idx {
1630 TweakIdx(10) => vec![()],
1631 TweakIdx(19) => {
1632 last_checked.store(true, Ordering::SeqCst);
1633 vec![]
1634 }
1635 TweakIdx(20) => panic!("Shouldn't happen"),
1636 _ => vec![],
1637 })
1638 }
1639 )
1640 .await
1641 .unwrap(),
1642 RecoverScanOutcome {
1643 last_used_idx: Some(TweakIdx(10)),
1644 new_start_idx: TweakIdx(10 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1645 tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(10)])
1646 }
1647 );
1648 assert!(last_checked.load(Ordering::SeqCst));
1649 }
1650
1651 assert_eq!(
1652 recover_scan_idxes_for_activity(TweakIdx(0), &BTreeSet::new(), |cur_idx| async move {
1653 Ok(match cur_idx {
1654 TweakIdx(6 | 15) => vec![()],
1655 _ => vec![],
1656 })
1657 })
1658 .await
1659 .unwrap(),
1660 RecoverScanOutcome {
1661 last_used_idx: Some(TweakIdx(15)),
1662 new_start_idx: TweakIdx(15 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1663 tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(6), TweakIdx(15)])
1664 }
1665 );
1666 assert_eq!(
1667 recover_scan_idxes_for_activity(TweakIdx(10), &BTreeSet::new(), |cur_idx| async move {
1668 Ok(match cur_idx {
1669 TweakIdx(8) => {
1670 vec![()] }
1672 TweakIdx(9) => {
1673 panic!("Shouldn't happen")
1674 }
1675 _ => vec![],
1676 })
1677 })
1678 .await
1679 .unwrap(),
1680 RecoverScanOutcome {
1681 last_used_idx: None,
1682 new_start_idx: TweakIdx(9 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1683 tweak_idxes_with_pegins: BTreeSet::from([])
1684 }
1685 );
1686 assert_eq!(
1687 recover_scan_idxes_for_activity(TweakIdx(10), &BTreeSet::new(), |cur_idx| async move {
1688 Ok(match cur_idx {
1689 TweakIdx(9) => panic!("Shouldn't happen"),
1690 TweakIdx(15) => vec![()],
1691 _ => vec![],
1692 })
1693 })
1694 .await
1695 .unwrap(),
1696 RecoverScanOutcome {
1697 last_used_idx: Some(TweakIdx(15)),
1698 new_start_idx: TweakIdx(15 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1699 tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(15)])
1700 }
1701 );
1702 }
1703}