1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::missing_errors_doc)]
4#![allow(clippy::missing_panics_doc)]
5#![allow(clippy::module_name_repetitions)]
6#![allow(clippy::must_use_candidate)]
7
8pub mod api;
9#[cfg(feature = "cli")]
10mod cli;
11
12mod backup;
13
14pub mod client_db;
15mod deposit;
18pub mod events;
19use events::SendPaymentEvent;
20mod pegin_monitor;
22mod withdraw;
23
24use std::collections::{BTreeMap, BTreeSet};
25use std::future;
26use std::sync::Arc;
27use std::time::{Duration, SystemTime};
28
29use anyhow::{Context as AnyhowContext, anyhow, bail, ensure};
30use async_stream::{stream, try_stream};
31use backup::WalletModuleBackup;
32use bitcoin::address::NetworkUnchecked;
33use bitcoin::secp256k1::{All, SECP256K1, Secp256k1};
34use bitcoin::{Address, Network, ScriptBuf};
35use client_db::{DbKeyPrefix, PegInTweakIndexKey, SupportsSafeDepositKey, TweakIdx};
36use fedimint_api_client::api::{DynModuleApi, FederationResult};
37use fedimint_bitcoind::{BitcoindTracked, DynBitcoindRpc, IBitcoindRpc, create_esplora_rpc};
38use fedimint_client_module::module::init::{
39 ClientModuleInit, ClientModuleInitArgs, ClientModuleRecoverArgs,
40};
41use fedimint_client_module::module::recovery::RecoveryProgress;
42use fedimint_client_module::module::{ClientContext, ClientModule, IClientModule, OutPointRange};
43use fedimint_client_module::oplog::UpdateStreamOrOutcome;
44use fedimint_client_module::sm::{Context, DynState, ModuleNotifier, State, StateTransition};
45use fedimint_client_module::transaction::{
46 ClientOutput, ClientOutputBundle, ClientOutputSM, TransactionBuilder,
47};
48use fedimint_client_module::{DynGlobalClientContext, sm_enum_variant_translation};
49use fedimint_core::core::{Decoder, IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId};
50use fedimint_core::db::{
51 AutocommitError, Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped,
52};
53use fedimint_core::encoding::{Decodable, Encodable};
54use fedimint_core::envs::{BitcoinRpcConfig, is_running_in_test_env};
55use fedimint_core::module::{
56 Amounts, ApiAuth, ApiVersion, CommonModuleInit, ModuleCommon, ModuleConsensusVersion,
57 ModuleInit, MultiApiVersion,
58};
59use fedimint_core::task::{MaybeSend, MaybeSync, TaskGroup, sleep};
60use fedimint_core::util::backoff_util::background_backoff;
61use fedimint_core::util::{BoxStream, backoff_util, retry};
62use fedimint_core::{
63 BitcoinHash, OutPoint, TransactionId, apply, async_trait_maybe_send, push_db_pair_items,
64 runtime, secp256k1,
65};
66use fedimint_derive_secret::{ChildId, DerivableSecret};
67use fedimint_logging::LOG_CLIENT_MODULE_WALLET;
68pub use fedimint_wallet_common as common;
69use fedimint_wallet_common::config::{FeeConsensus, WalletClientConfig};
70use fedimint_wallet_common::tweakable::Tweakable;
71pub use fedimint_wallet_common::*;
72use futures::{Stream, StreamExt};
73use rand::{Rng, thread_rng};
74use secp256k1::Keypair;
75use serde::{Deserialize, Serialize};
76use strum::IntoEnumIterator;
77use tokio::sync::watch;
78use tracing::{debug, instrument};
79
80use crate::api::WalletFederationApi;
81use crate::backup::{FEDERATION_RECOVER_MAX_GAP, RecoveryStateV2, WalletRecovery};
82use crate::client_db::{
83 ClaimedPegInData, ClaimedPegInKey, ClaimedPegInPrefix, NextPegInTweakIndexKey,
84 PegInTweakIndexData, PegInTweakIndexPrefix, RecoveryFinalizedKey, RecoveryStateKey,
85 SupportsSafeDepositPrefix,
86};
87use crate::deposit::DepositStateMachine;
88use crate::withdraw::{CreatedWithdrawState, WithdrawStateMachine, WithdrawStates};
89
90const WALLET_TWEAK_CHILD_ID: ChildId = ChildId(0);
91
92#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
93pub struct BitcoinTransactionData {
94 pub btc_transaction: bitcoin::Transaction,
97 pub out_idx: u32,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
102pub enum DepositStateV1 {
103 WaitingForTransaction,
104 WaitingForConfirmation(BitcoinTransactionData),
105 Confirmed(BitcoinTransactionData),
106 Claimed(BitcoinTransactionData),
107 Failed(String),
108}
109
110#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
111pub enum DepositStateV2 {
112 WaitingForTransaction,
113 WaitingForConfirmation {
114 #[serde(with = "bitcoin::amount::serde::as_sat")]
115 btc_deposited: bitcoin::Amount,
116 btc_out_point: bitcoin::OutPoint,
117 },
118 Confirmed {
119 #[serde(with = "bitcoin::amount::serde::as_sat")]
120 btc_deposited: bitcoin::Amount,
121 btc_out_point: bitcoin::OutPoint,
122 },
123 Claimed {
124 #[serde(with = "bitcoin::amount::serde::as_sat")]
125 btc_deposited: bitcoin::Amount,
126 btc_out_point: bitcoin::OutPoint,
127 },
128 Failed(String),
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
132pub enum WithdrawState {
133 Created,
134 Succeeded(bitcoin::Txid),
135 Failed(String),
136 }
140
141async fn next_withdraw_state<S>(stream: &mut S) -> Option<WithdrawStates>
142where
143 S: Stream<Item = WalletClientStates> + Unpin,
144{
145 loop {
146 if let WalletClientStates::Withdraw(ds) = stream.next().await? {
147 return Some(ds.state);
148 }
149 tokio::task::yield_now().await;
150 }
151}
152
153#[derive(Debug, Clone, Default)]
154pub struct WalletClientInit(pub Option<DynBitcoindRpc>);
156
157const SLICE_SIZE: u64 = 1000;
158
159impl WalletClientInit {
160 pub fn new(rpc: DynBitcoindRpc) -> Self {
161 Self(Some(rpc))
162 }
163
164 async fn recover_from_slices(
165 &self,
166 args: &ClientModuleRecoverArgs<Self>,
167 ) -> anyhow::Result<()> {
168 let data = WalletClientModuleData {
169 cfg: args.cfg().clone(),
170 module_root_secret: args.module_root_secret().clone(),
171 };
172
173 let total_items = args.module_api().fetch_recovery_count().await?;
174
175 let mut state = RecoveryStateV2::new();
176
177 state.refill_pending_pool_up_to(&data, TweakIdx(FEDERATION_RECOVER_MAX_GAP));
178
179 for start in (0..total_items).step_by(SLICE_SIZE as usize) {
180 let end = std::cmp::min(start + SLICE_SIZE, total_items);
181
182 let items = args.module_api().fetch_recovery_slice(start, end).await?;
183
184 for item in &items {
185 match item {
186 RecoveryItem::Input { outpoint, script } => {
187 state.handle_item(*outpoint, script, &data);
188 }
189 }
190 }
191
192 args.update_recovery_progress(RecoveryProgress {
193 complete: end.try_into().unwrap_or(u32::MAX),
194 total: total_items.try_into().unwrap_or(u32::MAX),
195 });
196 }
197
198 let mut dbtx = args.db().begin_transaction().await;
199
200 for tweak_idx in 0..state.new_start_idx().0 {
201 let operation_id = data.derive_peg_in_script(TweakIdx(tweak_idx)).3;
202
203 let claimed = state
204 .claimed_outpoints
205 .get(&TweakIdx(tweak_idx))
206 .cloned()
207 .unwrap_or_default();
208
209 dbtx.insert_new_entry(
210 &PegInTweakIndexKey(TweakIdx(tweak_idx)),
211 &PegInTweakIndexData {
212 operation_id,
213 creation_time: fedimint_core::time::now(),
214 last_check_time: None,
215 next_check_time: Some(fedimint_core::time::now()),
216 claimed,
217 },
218 )
219 .await;
220 }
221
222 dbtx.insert_new_entry(&NextPegInTweakIndexKey, &state.new_start_idx())
223 .await;
224
225 dbtx.commit_tx().await;
226
227 Ok(())
228 }
229}
230
231impl ModuleInit for WalletClientInit {
232 type Common = WalletCommonInit;
233
234 async fn dump_database(
235 &self,
236 dbtx: &mut DatabaseTransaction<'_>,
237 prefix_names: Vec<String>,
238 ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
239 let mut wallet_client_items: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> =
240 BTreeMap::new();
241 let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
242 prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
243 });
244
245 for table in filtered_prefixes {
246 match table {
247 DbKeyPrefix::NextPegInTweakIndex => {
248 if let Some(index) = dbtx.get_value(&NextPegInTweakIndexKey).await {
249 wallet_client_items
250 .insert("NextPegInTweakIndex".to_string(), Box::new(index));
251 }
252 }
253 DbKeyPrefix::PegInTweakIndex => {
254 push_db_pair_items!(
255 dbtx,
256 PegInTweakIndexPrefix,
257 PegInTweakIndexKey,
258 PegInTweakIndexData,
259 wallet_client_items,
260 "Peg-In Tweak Index"
261 );
262 }
263 DbKeyPrefix::ClaimedPegIn => {
264 push_db_pair_items!(
265 dbtx,
266 ClaimedPegInPrefix,
267 ClaimedPegInKey,
268 ClaimedPegInData,
269 wallet_client_items,
270 "Claimed Peg-In"
271 );
272 }
273 DbKeyPrefix::RecoveryFinalized => {
274 if let Some(val) = dbtx.get_value(&RecoveryFinalizedKey).await {
275 wallet_client_items.insert("RecoveryFinalized".to_string(), Box::new(val));
276 }
277 }
278 DbKeyPrefix::SupportsSafeDeposit => {
279 push_db_pair_items!(
280 dbtx,
281 SupportsSafeDepositPrefix,
282 SupportsSafeDepositKey,
283 (),
284 wallet_client_items,
285 "Supports Safe Deposit"
286 );
287 }
288 DbKeyPrefix::RecoveryState
289 | DbKeyPrefix::ExternalReservedStart
290 | DbKeyPrefix::CoreInternalReservedStart
291 | DbKeyPrefix::CoreInternalReservedEnd => {}
292 }
293 }
294
295 Box::new(wallet_client_items.into_iter())
296 }
297}
298
299#[apply(async_trait_maybe_send!)]
300impl ClientModuleInit for WalletClientInit {
301 type Module = WalletClientModule;
302
303 fn supported_api_versions(&self) -> MultiApiVersion {
304 MultiApiVersion::try_from_iter([ApiVersion { major: 0, minor: 0 }])
305 .expect("no version conflicts")
306 }
307
308 async fn init(&self, args: &ClientModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
309 let data = WalletClientModuleData {
310 cfg: args.cfg().clone(),
311 module_root_secret: args.module_root_secret().clone(),
312 };
313
314 let db = args.db().clone();
315
316 let rpc_config = WalletClientModule::get_rpc_config(args.cfg());
317
318 let btc_rpc = if let Some(user_rpc) = args.user_bitcoind_rpc() {
325 user_rpc.clone()
326 } else if let Some(factory) = args.user_bitcoind_rpc_no_chain_id() {
327 if let Some(rpc) = factory(rpc_config.url.clone()).await {
328 rpc
329 } else {
330 self.0
331 .clone()
332 .unwrap_or(create_esplora_rpc(&rpc_config.url)?)
333 }
334 } else {
335 self.0
336 .clone()
337 .unwrap_or(create_esplora_rpc(&rpc_config.url)?)
338 };
339 let btc_rpc = BitcoindTracked::new(btc_rpc, "wallet-client").into_dyn();
340
341 let module_api = args.module_api().clone();
342
343 let (pegin_claimed_sender, pegin_claimed_receiver) = watch::channel(());
344 let (pegin_monitor_wakeup_sender, pegin_monitor_wakeup_receiver) = watch::channel(());
345
346 Ok(WalletClientModule {
347 db,
348 data,
349 module_api,
350 notifier: args.notifier().clone(),
351 rpc: btc_rpc,
352 client_ctx: args.context(),
353 pegin_monitor_wakeup_sender,
354 pegin_monitor_wakeup_receiver,
355 pegin_claimed_receiver,
356 pegin_claimed_sender,
357 task_group: args.task_group().clone(),
358 admin_auth: args.admin_auth().cloned(),
359 })
360 }
361
362 async fn recover(
367 &self,
368 args: &ClientModuleRecoverArgs<Self>,
369 snapshot: Option<&<Self::Module as ClientModule>::Backup>,
370 ) -> anyhow::Result<()> {
371 if args
374 .db()
375 .begin_transaction_nc()
376 .await
377 .get_value(&RecoveryStateKey)
378 .await
379 .is_some()
380 {
381 return args
382 .recover_from_history::<WalletRecovery>(self, snapshot)
383 .await;
384 }
385
386 if args.module_api().fetch_recovery_count().await.is_ok() {
388 self.recover_from_slices(args).await
389 } else {
390 args.recover_from_history::<WalletRecovery>(self, snapshot)
391 .await
392 }
393 }
394
395 fn used_db_prefixes(&self) -> Option<BTreeSet<u8>> {
396 Some(
397 DbKeyPrefix::iter()
398 .map(|p| p as u8)
399 .chain(
400 DbKeyPrefix::ExternalReservedStart as u8
401 ..=DbKeyPrefix::CoreInternalReservedEnd as u8,
402 )
403 .collect(),
404 )
405 }
406}
407
408#[derive(Debug, Clone, Serialize, Deserialize)]
409pub struct WalletOperationMeta {
410 pub variant: WalletOperationMetaVariant,
411 pub extra_meta: serde_json::Value,
412}
413
414#[derive(Debug, Clone, Serialize, Deserialize)]
415#[serde(rename_all = "snake_case")]
416pub enum WalletOperationMetaVariant {
417 Deposit {
418 address: Address<NetworkUnchecked>,
419 #[serde(default)]
424 tweak_idx: Option<TweakIdx>,
425 #[serde(default, skip_serializing_if = "Option::is_none")]
426 expires_at: Option<SystemTime>,
427 },
428 Withdraw {
429 address: Address<NetworkUnchecked>,
430 #[serde(with = "bitcoin::amount::serde::as_sat")]
431 amount: bitcoin::Amount,
432 fee: PegOutFees,
433 change: Vec<OutPoint>,
434 },
435
436 RbfWithdraw {
437 rbf: Rbf,
438 change: Vec<OutPoint>,
439 },
440}
441
442#[derive(Debug, Clone)]
444pub struct WalletClientModuleData {
445 cfg: WalletClientConfig,
446 module_root_secret: DerivableSecret,
447}
448
449impl WalletClientModuleData {
450 fn derive_deposit_address(
451 &self,
452 idx: TweakIdx,
453 ) -> (Keypair, secp256k1::PublicKey, Address, OperationId) {
454 let idx = ChildId(idx.0);
455
456 let secret_tweak_key = self
457 .module_root_secret
458 .child_key(WALLET_TWEAK_CHILD_ID)
459 .child_key(idx)
460 .to_secp_key(fedimint_core::secp256k1::SECP256K1);
461
462 let public_tweak_key = secret_tweak_key.public_key();
463
464 let address = self
465 .cfg
466 .peg_in_descriptor
467 .tweak(&public_tweak_key, bitcoin::secp256k1::SECP256K1)
468 .address(self.cfg.network.0)
469 .unwrap();
470
471 let operation_id = OperationId(public_tweak_key.x_only_public_key().0.serialize());
473
474 (secret_tweak_key, public_tweak_key, address, operation_id)
475 }
476
477 fn derive_peg_in_script(
478 &self,
479 idx: TweakIdx,
480 ) -> (ScriptBuf, bitcoin::Address, Keypair, OperationId) {
481 let (secret_tweak_key, _, address, operation_id) = self.derive_deposit_address(idx);
482
483 (
484 self.cfg
485 .peg_in_descriptor
486 .tweak(&secret_tweak_key.public_key(), SECP256K1)
487 .script_pubkey(),
488 address,
489 secret_tweak_key,
490 operation_id,
491 )
492 }
493}
494
495#[derive(Debug)]
496pub struct WalletClientModule {
497 data: WalletClientModuleData,
498 db: Database,
499 module_api: DynModuleApi,
500 notifier: ModuleNotifier<WalletClientStates>,
501 rpc: DynBitcoindRpc,
502 client_ctx: ClientContext<Self>,
503 pegin_monitor_wakeup_sender: watch::Sender<()>,
505 pegin_monitor_wakeup_receiver: watch::Receiver<()>,
506 pegin_claimed_sender: watch::Sender<()>,
508 pegin_claimed_receiver: watch::Receiver<()>,
509 task_group: TaskGroup,
510 admin_auth: Option<ApiAuth>,
511}
512
513#[apply(async_trait_maybe_send!)]
514impl ClientModule for WalletClientModule {
515 type Init = WalletClientInit;
516 type Common = WalletModuleTypes;
517 type Backup = WalletModuleBackup;
518 type ModuleStateMachineContext = WalletClientContext;
519 type States = WalletClientStates;
520
521 fn context(&self) -> Self::ModuleStateMachineContext {
522 WalletClientContext {
523 rpc: self.rpc.clone(),
524 wallet_descriptor: self.cfg().peg_in_descriptor.clone(),
525 wallet_decoder: self.decoder(),
526 secp: Secp256k1::default(),
527 client_ctx: self.client_ctx.clone(),
528 }
529 }
530
531 async fn start(&self) {
532 self.task_group.spawn_cancellable("peg-in monitor", {
533 let client_ctx = self.client_ctx.clone();
534 let db = self.db.clone();
535 let btc_rpc = self.rpc.clone();
536 let module_api = self.module_api.clone();
537 let data = self.data.clone();
538 let pegin_claimed_sender = self.pegin_claimed_sender.clone();
539 let pegin_monitor_wakeup_receiver = self.pegin_monitor_wakeup_receiver.clone();
540 pegin_monitor::run_peg_in_monitor(
541 client_ctx,
542 db,
543 btc_rpc,
544 module_api,
545 data,
546 pegin_claimed_sender,
547 pegin_monitor_wakeup_receiver,
548 )
549 });
550
551 self.task_group
552 .spawn_cancellable("supports-safe-deposit-version", {
553 let db = self.db.clone();
554 let module_api = self.module_api.clone();
555
556 poll_supports_safe_deposit_version(db, module_api)
557 });
558 }
559
560 fn supports_backup(&self) -> bool {
561 true
562 }
563
564 async fn backup(&self) -> anyhow::Result<backup::WalletModuleBackup> {
565 let session_count = self.client_ctx.global_api().session_count().await?;
567
568 let mut dbtx = self.db.begin_transaction_nc().await;
569 let next_pegin_tweak_idx = dbtx
570 .get_value(&NextPegInTweakIndexKey)
571 .await
572 .unwrap_or_default();
573 let claimed = dbtx
574 .find_by_prefix(&PegInTweakIndexPrefix)
575 .await
576 .filter_map(|(k, v)| async move {
577 if v.claimed.is_empty() {
578 None
579 } else {
580 Some(k.0)
581 }
582 })
583 .collect()
584 .await;
585 Ok(backup::WalletModuleBackup::new_v1(
586 session_count,
587 next_pegin_tweak_idx,
588 claimed,
589 ))
590 }
591
592 fn input_fee(
593 &self,
594 _amount: &Amounts,
595 _input: &<Self::Common as ModuleCommon>::Input,
596 ) -> Option<Amounts> {
597 Some(Amounts::new_bitcoin(self.cfg().fee_consensus.peg_in_abs))
598 }
599
600 fn output_fee(
601 &self,
602 _amount: &Amounts,
603 _output: &<Self::Common as ModuleCommon>::Output,
604 ) -> Option<Amounts> {
605 Some(Amounts::new_bitcoin(self.cfg().fee_consensus.peg_out_abs))
606 }
607
608 async fn handle_rpc(
609 &self,
610 method: String,
611 request: serde_json::Value,
612 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
613 Box::pin(try_stream! {
614 match method.as_str() {
615 "get_wallet_summary" => {
616 let _req: WalletSummaryRequest = serde_json::from_value(request)?;
617 let wallet_summary = self.get_wallet_summary()
618 .await
619 .expect("Failed to fetch wallet summary");
620 let result = serde_json::to_value(&wallet_summary)
621 .expect("Serialization error");
622 yield result;
623 }
624 "get_block_count_local" => {
625 let block_count = self.get_block_count_local().await
626 .expect("Failed to fetch block count");
627 yield serde_json::to_value(block_count)?;
628 }
629 "peg_in" => {
630 let req: PegInRequest = serde_json::from_value(request)?;
631 let response = self.peg_in(req)
632 .await
633 .map_err(|e| anyhow::anyhow!("peg_in failed: {}", e))?;
634 let result = serde_json::to_value(&response)?;
635 yield result;
636 },
637 "peg_out" => {
638 let req: PegOutRequest = serde_json::from_value(request)?;
639 let response = self.peg_out(req)
640 .await
641 .map_err(|e| anyhow::anyhow!("peg_out failed: {}", e))?;
642 let result = serde_json::to_value(&response)?;
643 yield result;
644 },
645 "subscribe_deposit" => {
646 let req: SubscribeDepositRequest = serde_json::from_value(request)?;
647 for await state in self.subscribe_deposit(req.operation_id).await?.into_stream() {
648 yield serde_json::to_value(state)?;
649 }
650 },
651 "subscribe_withdraw" => {
652 let req: SubscribeWithdrawRequest = serde_json::from_value(request)?;
653 for await state in self.subscribe_withdraw_updates(req.operation_id).await?.into_stream(){
654 yield serde_json::to_value(state)?;
655 }
656 }
657 _ => {
658 Err(anyhow::format_err!("Unknown method: {}", method))?;
659 }
660 }
661 })
662 }
663
664 #[cfg(feature = "cli")]
665 async fn handle_cli_command(
666 &self,
667 args: &[std::ffi::OsString],
668 ) -> anyhow::Result<serde_json::Value> {
669 cli::handle_cli_command(self, args).await
670 }
671}
672
673#[derive(Deserialize)]
674struct WalletSummaryRequest {}
675
676#[derive(Debug, Clone)]
677pub struct WalletClientContext {
678 rpc: DynBitcoindRpc,
679 wallet_descriptor: PegInDescriptor,
680 wallet_decoder: Decoder,
681 secp: Secp256k1<All>,
682 pub client_ctx: ClientContext<WalletClientModule>,
683}
684
685#[derive(Debug, Clone, Serialize, Deserialize)]
686pub struct PegInRequest {
687 pub extra_meta: serde_json::Value,
688}
689
690#[derive(Deserialize)]
691struct SubscribeDepositRequest {
692 operation_id: OperationId,
693}
694
695#[derive(Deserialize)]
696struct SubscribeWithdrawRequest {
697 operation_id: OperationId,
698}
699
700#[derive(Debug, Clone, Serialize, Deserialize)]
701pub struct PegInResponse {
702 pub deposit_address: Address<NetworkUnchecked>,
703 pub operation_id: OperationId,
704}
705
706#[derive(Debug, Clone, Serialize, Deserialize)]
707pub struct PegOutRequest {
708 pub amount_sat: u64,
709 pub destination_address: Address<NetworkUnchecked>,
710 pub extra_meta: serde_json::Value,
711}
712
713#[derive(Debug, Clone, Serialize, Deserialize)]
714pub struct PegOutResponse {
715 pub operation_id: OperationId,
716}
717
718impl Context for WalletClientContext {
719 const KIND: Option<ModuleKind> = Some(KIND);
720}
721
722impl WalletClientModule {
723 fn cfg(&self) -> &WalletClientConfig {
724 &self.data.cfg
725 }
726
727 fn get_rpc_config(cfg: &WalletClientConfig) -> BitcoinRpcConfig {
728 match BitcoinRpcConfig::get_defaults_from_env_vars() {
729 Ok(rpc_config) => {
730 if rpc_config.kind == "bitcoind" {
733 cfg.default_bitcoin_rpc.clone()
734 } else {
735 rpc_config
736 }
737 }
738 _ => cfg.default_bitcoin_rpc.clone(),
739 }
740 }
741
742 pub fn get_network(&self) -> Network {
743 self.cfg().network.0
744 }
745
746 pub fn get_finality_delay(&self) -> u32 {
747 self.cfg().finality_delay
748 }
749
750 pub fn get_fee_consensus(&self) -> FeeConsensus {
751 self.cfg().fee_consensus
752 }
753
754 async fn allocate_deposit_address_inner(
755 &self,
756 dbtx: &mut DatabaseTransaction<'_>,
757 ) -> (OperationId, Address, TweakIdx) {
758 dbtx.ensure_isolated().expect("Must be isolated db");
759
760 let tweak_idx = get_next_peg_in_tweak_child_id(dbtx).await;
761 let (_secret_tweak_key, _, address, operation_id) =
762 self.data.derive_deposit_address(tweak_idx);
763
764 let now = fedimint_core::time::now();
765
766 dbtx.insert_new_entry(
767 &PegInTweakIndexKey(tweak_idx),
768 &PegInTweakIndexData {
769 creation_time: now,
770 next_check_time: Some(now),
771 last_check_time: None,
772 operation_id,
773 claimed: vec![],
774 },
775 )
776 .await;
777
778 (operation_id, address, tweak_idx)
779 }
780
781 pub async fn get_withdraw_fees(
788 &self,
789 address: &bitcoin::Address,
790 amount: bitcoin::Amount,
791 ) -> anyhow::Result<PegOutFees> {
792 self.module_api
793 .fetch_peg_out_fees(address, amount)
794 .await?
795 .context("Federation didn't return peg-out fees")
796 }
797
798 pub async fn get_wallet_summary(&self) -> anyhow::Result<WalletSummary> {
800 Ok(self.module_api.fetch_wallet_summary().await?)
801 }
802
803 pub async fn get_block_count_local(&self) -> anyhow::Result<u32> {
804 Ok(self.module_api.fetch_block_count_local().await?)
805 }
806
807 pub fn create_withdraw_output(
808 &self,
809 operation_id: OperationId,
810 address: bitcoin::Address,
811 amount: bitcoin::Amount,
812 fees: PegOutFees,
813 ) -> anyhow::Result<ClientOutputBundle<WalletOutput, WalletClientStates>> {
814 let output = WalletOutput::new_v0_peg_out(address, amount, fees);
815
816 let amount = output.maybe_v0_ref().expect("v0 output").amount().into();
817
818 let sm_gen = move |out_point_range: OutPointRange| {
819 assert_eq!(out_point_range.count(), 1);
820 let out_idx = out_point_range.start_idx();
821 vec![WalletClientStates::Withdraw(WithdrawStateMachine {
822 operation_id,
823 state: WithdrawStates::Created(CreatedWithdrawState {
824 fm_outpoint: OutPoint {
825 txid: out_point_range.txid(),
826 out_idx,
827 },
828 }),
829 })]
830 };
831
832 Ok(ClientOutputBundle::new(
833 vec![ClientOutput::<WalletOutput> {
834 output,
835 amounts: Amounts::new_bitcoin(amount),
836 }],
837 vec![ClientOutputSM::<WalletClientStates> {
838 state_machines: Arc::new(sm_gen),
839 }],
840 ))
841 }
842
843 pub async fn peg_in(&self, req: PegInRequest) -> anyhow::Result<PegInResponse> {
844 let (operation_id, address, _) = self.safe_allocate_deposit_address(req.extra_meta).await?;
845
846 Ok(PegInResponse {
847 deposit_address: Address::from_script(&address.script_pubkey(), self.get_network())?
848 .as_unchecked()
849 .clone(),
850 operation_id,
851 })
852 }
853
854 pub async fn peg_out(&self, req: PegOutRequest) -> anyhow::Result<PegOutResponse> {
855 let amount = bitcoin::Amount::from_sat(req.amount_sat);
856 let destination = req
857 .destination_address
858 .require_network(self.get_network())?;
859
860 let fees = self.get_withdraw_fees(&destination, amount).await?;
861 let operation_id = self
862 .withdraw(&destination, amount, fees, req.extra_meta)
863 .await
864 .context("Failed to initiate withdraw")?;
865
866 Ok(PegOutResponse { operation_id })
867 }
868
869 pub fn create_rbf_withdraw_output(
870 &self,
871 operation_id: OperationId,
872 rbf: &Rbf,
873 ) -> anyhow::Result<ClientOutputBundle<WalletOutput, WalletClientStates>> {
874 let output = WalletOutput::new_v0_rbf(rbf.fees, rbf.txid);
875
876 let amount = output.maybe_v0_ref().expect("v0 output").amount().into();
877
878 let sm_gen = move |out_point_range: OutPointRange| {
879 assert_eq!(out_point_range.count(), 1);
880 let out_idx = out_point_range.start_idx();
881 vec![WalletClientStates::Withdraw(WithdrawStateMachine {
882 operation_id,
883 state: WithdrawStates::Created(CreatedWithdrawState {
884 fm_outpoint: OutPoint {
885 txid: out_point_range.txid(),
886 out_idx,
887 },
888 }),
889 })]
890 };
891
892 Ok(ClientOutputBundle::new(
893 vec![ClientOutput::<WalletOutput> {
894 output,
895 amounts: Amounts::new_bitcoin(amount),
896 }],
897 vec![ClientOutputSM::<WalletClientStates> {
898 state_machines: Arc::new(sm_gen),
899 }],
900 ))
901 }
902
903 pub async fn btc_tx_has_no_size_limit(&self) -> FederationResult<bool> {
904 Ok(self.module_api.module_consensus_version().await? >= ModuleConsensusVersion::new(2, 2))
905 }
906
907 pub async fn supports_safe_deposit(&self) -> bool {
916 let mut dbtx = self.db.begin_transaction().await;
917
918 let already_verified_supports_safe_deposit =
919 dbtx.get_value(&SupportsSafeDepositKey).await.is_some();
920
921 already_verified_supports_safe_deposit || {
922 match self.module_api.module_consensus_version().await {
923 Ok(module_consensus_version) => {
924 let supported_version =
925 SAFE_DEPOSIT_MODULE_CONSENSUS_VERSION <= module_consensus_version;
926
927 if supported_version {
928 dbtx.insert_new_entry(&SupportsSafeDepositKey, &()).await;
929 dbtx.commit_tx().await;
930 }
931
932 supported_version
933 }
934 Err(_) => false,
935 }
936 }
937 }
938
939 pub async fn safe_allocate_deposit_address<M>(
947 &self,
948 extra_meta: M,
949 ) -> anyhow::Result<(OperationId, Address, TweakIdx)>
950 where
951 M: Serialize + MaybeSend + MaybeSync,
952 {
953 ensure!(
954 self.supports_safe_deposit().await,
955 "Wallet module consensus version doesn't support safe deposits",
956 );
957
958 self.allocate_deposit_address_expert_only(extra_meta).await
959 }
960
961 pub async fn allocate_deposit_address_expert_only<M>(
979 &self,
980 extra_meta: M,
981 ) -> anyhow::Result<(OperationId, Address, TweakIdx)>
982 where
983 M: Serialize + MaybeSend + MaybeSync,
984 {
985 let extra_meta_value =
986 serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
987 let (operation_id, address, tweak_idx) = self
988 .db
989 .autocommit(
990 move |dbtx, _| {
991 let extra_meta_value_inner = extra_meta_value.clone();
992 Box::pin(async move {
993 let (operation_id, address, tweak_idx) = self
994 .allocate_deposit_address_inner(dbtx)
995 .await;
996
997 self.client_ctx.manual_operation_start_dbtx(
998 dbtx,
999 operation_id,
1000 WalletCommonInit::KIND.as_str(),
1001 WalletOperationMeta {
1002 variant: WalletOperationMetaVariant::Deposit {
1003 address: address.clone().into_unchecked(),
1004 tweak_idx: Some(tweak_idx),
1005 expires_at: None,
1006 },
1007 extra_meta: extra_meta_value_inner,
1008 },
1009 vec![]
1010 ).await?;
1011
1012 debug!(target: LOG_CLIENT_MODULE_WALLET, %tweak_idx, %address, "Derived a new deposit address");
1013
1014 self.rpc.watch_script_history(&address.script_pubkey()).await?;
1016
1017 let sender = self.pegin_monitor_wakeup_sender.clone();
1018 dbtx.on_commit(move || {
1019 sender.send_replace(());
1020 });
1021
1022 Ok((operation_id, address, tweak_idx))
1023 })
1024 },
1025 Some(100),
1026 )
1027 .await
1028 .map_err(|e| match e {
1029 AutocommitError::CommitFailed {
1030 last_error,
1031 attempts,
1032 } => anyhow!("Failed to commit after {attempts} attempts: {last_error}"),
1033 AutocommitError::ClosureError { error, .. } => error,
1034 })?;
1035
1036 Ok((operation_id, address, tweak_idx))
1037 }
1038
1039 pub async fn subscribe_deposit(
1045 &self,
1046 operation_id: OperationId,
1047 ) -> anyhow::Result<UpdateStreamOrOutcome<DepositStateV2>> {
1048 let operation = self
1049 .client_ctx
1050 .get_operation(operation_id)
1051 .await
1052 .with_context(|| anyhow!("Operation not found: {}", operation_id.fmt_short()))?;
1053
1054 if operation.operation_module_kind() != WalletCommonInit::KIND.as_str() {
1055 bail!("Operation is not a wallet operation");
1056 }
1057
1058 let operation_meta = operation.meta::<WalletOperationMeta>();
1059
1060 let WalletOperationMetaVariant::Deposit {
1061 address, tweak_idx, ..
1062 } = operation_meta.variant
1063 else {
1064 bail!("Operation is not a deposit operation");
1065 };
1066
1067 let address = address.require_network(self.cfg().network.0)?;
1068
1069 let Some(tweak_idx) = tweak_idx else {
1071 let outcome_v1 = operation
1075 .outcome::<DepositStateV1>()
1076 .context("Old pending deposit, can't subscribe to updates")?;
1077
1078 let outcome_v2 = match outcome_v1 {
1079 DepositStateV1::Claimed(tx_info) => DepositStateV2::Claimed {
1080 btc_deposited: tx_info.btc_transaction.output[tx_info.out_idx as usize].value,
1081 btc_out_point: bitcoin::OutPoint {
1082 txid: tx_info.btc_transaction.compute_txid(),
1083 vout: tx_info.out_idx,
1084 },
1085 },
1086 DepositStateV1::Failed(error) => DepositStateV2::Failed(error),
1087 _ => bail!("Non-final outcome in operation log"),
1088 };
1089
1090 return Ok(UpdateStreamOrOutcome::Outcome(outcome_v2));
1091 };
1092
1093 Ok(self.client_ctx.outcome_or_updates(operation, operation_id, {
1094 let stream_rpc = self.rpc.clone();
1095 let stream_client_ctx = self.client_ctx.clone();
1096 let stream_script_pub_key = address.script_pubkey();
1097 move || {
1098
1099 stream! {
1100 yield DepositStateV2::WaitingForTransaction;
1101
1102 retry(
1103 "subscribe script history",
1104 background_backoff(),
1105 || stream_rpc.watch_script_history(&stream_script_pub_key)
1106 ).await.expect("Will never give up");
1107 let (btc_out_point, btc_deposited) = retry(
1108 "fetch history",
1109 background_backoff(),
1110 || async {
1111 let history = stream_rpc.get_script_history(&stream_script_pub_key).await?;
1112 history.first().and_then(|tx| {
1113 let (out_idx, amount) = tx.output
1114 .iter()
1115 .enumerate()
1116 .find_map(|(idx, output)| (output.script_pubkey == stream_script_pub_key).then_some((idx, output.value)))?;
1117 let txid = tx.compute_txid();
1118
1119 Some((
1120 bitcoin::OutPoint {
1121 txid,
1122 vout: out_idx as u32,
1123 },
1124 amount
1125 ))
1126 }).context("No deposit transaction found")
1127 }
1128 ).await.expect("Will never give up");
1129
1130 yield DepositStateV2::WaitingForConfirmation {
1131 btc_deposited,
1132 btc_out_point
1133 };
1134
1135 let claim_data = stream_client_ctx.module_db().wait_key_exists(&ClaimedPegInKey {
1136 peg_in_index: tweak_idx,
1137 btc_out_point,
1138 }).await;
1139
1140 yield DepositStateV2::Confirmed {
1141 btc_deposited,
1142 btc_out_point
1143 };
1144
1145 match stream_client_ctx.await_primary_module_outputs(operation_id, claim_data.change).await {
1146 Ok(()) => yield DepositStateV2::Claimed {
1147 btc_deposited,
1148 btc_out_point
1149 },
1150 Err(e) => yield DepositStateV2::Failed(e.to_string())
1151 }
1152 }
1153 }}))
1154 }
1155
1156 pub async fn list_peg_in_tweak_idxes(&self) -> BTreeMap<TweakIdx, PegInTweakIndexData> {
1157 self.client_ctx
1158 .module_db()
1159 .clone()
1160 .begin_transaction_nc()
1161 .await
1162 .find_by_prefix(&PegInTweakIndexPrefix)
1163 .await
1164 .map(|(key, data)| (key.0, data))
1165 .collect()
1166 .await
1167 }
1168
1169 pub async fn find_tweak_idx_by_address(
1170 &self,
1171 address: bitcoin::Address<NetworkUnchecked>,
1172 ) -> anyhow::Result<TweakIdx> {
1173 let data = self.data.clone();
1174 let Some((tweak_idx, _)) = self
1175 .db
1176 .begin_transaction_nc()
1177 .await
1178 .find_by_prefix(&PegInTweakIndexPrefix)
1179 .await
1180 .filter(|(k, _)| {
1181 let (_, derived_address, _tweak_key, _) = data.derive_peg_in_script(k.0);
1182 future::ready(derived_address.into_unchecked() == address)
1183 })
1184 .next()
1185 .await
1186 else {
1187 bail!("Address not found in the list of derived keys");
1188 };
1189
1190 Ok(tweak_idx.0)
1191 }
1192 pub async fn find_tweak_idx_by_operation_id(
1193 &self,
1194 operation_id: OperationId,
1195 ) -> anyhow::Result<TweakIdx> {
1196 Ok(self
1197 .client_ctx
1198 .module_db()
1199 .clone()
1200 .begin_transaction_nc()
1201 .await
1202 .find_by_prefix(&PegInTweakIndexPrefix)
1203 .await
1204 .filter(|(_k, v)| future::ready(v.operation_id == operation_id))
1205 .next()
1206 .await
1207 .ok_or_else(|| anyhow::format_err!("OperationId not found"))?
1208 .0
1209 .0)
1210 }
1211
1212 pub async fn get_pegin_tweak_idx(
1213 &self,
1214 tweak_idx: TweakIdx,
1215 ) -> anyhow::Result<PegInTweakIndexData> {
1216 self.client_ctx
1217 .module_db()
1218 .clone()
1219 .begin_transaction_nc()
1220 .await
1221 .get_value(&PegInTweakIndexKey(tweak_idx))
1222 .await
1223 .ok_or_else(|| anyhow::format_err!("TweakIdx not found"))
1224 }
1225
1226 pub async fn get_claimed_pegins(
1227 &self,
1228 dbtx: &mut DatabaseTransaction<'_>,
1229 tweak_idx: TweakIdx,
1230 ) -> Vec<(
1231 bitcoin::OutPoint,
1232 TransactionId,
1233 Vec<fedimint_core::OutPoint>,
1234 )> {
1235 let outpoints = dbtx
1236 .get_value(&PegInTweakIndexKey(tweak_idx))
1237 .await
1238 .map(|v| v.claimed)
1239 .unwrap_or_default();
1240
1241 let mut res = vec![];
1242
1243 for outpoint in outpoints {
1244 let claimed_peg_in_data = dbtx
1245 .get_value(&ClaimedPegInKey {
1246 peg_in_index: tweak_idx,
1247 btc_out_point: outpoint,
1248 })
1249 .await
1250 .expect("Must have a corresponding claim record");
1251 res.push((
1252 outpoint,
1253 claimed_peg_in_data.claim_txid,
1254 claimed_peg_in_data.change,
1255 ));
1256 }
1257
1258 res
1259 }
1260
1261 pub async fn recheck_pegin_address_by_op_id(
1263 &self,
1264 operation_id: OperationId,
1265 ) -> anyhow::Result<()> {
1266 let tweak_idx = self.find_tweak_idx_by_operation_id(operation_id).await?;
1267
1268 self.recheck_pegin_address(tweak_idx).await
1269 }
1270
1271 pub async fn recheck_pegin_address_by_address(
1273 &self,
1274 address: bitcoin::Address<NetworkUnchecked>,
1275 ) -> anyhow::Result<()> {
1276 self.recheck_pegin_address(self.find_tweak_idx_by_address(address).await?)
1277 .await
1278 }
1279
1280 pub async fn recheck_pegin_address(&self, tweak_idx: TweakIdx) -> anyhow::Result<()> {
1282 self.db
1283 .autocommit(
1284 |dbtx, _| {
1285 Box::pin(async {
1286 let db_key = PegInTweakIndexKey(tweak_idx);
1287 let db_val = dbtx
1288 .get_value(&db_key)
1289 .await
1290 .ok_or_else(|| anyhow::format_err!("DBKey not found"))?;
1291
1292 dbtx.insert_entry(
1293 &db_key,
1294 &PegInTweakIndexData {
1295 next_check_time: Some(fedimint_core::time::now()),
1296 ..db_val
1297 },
1298 )
1299 .await;
1300
1301 let sender = self.pegin_monitor_wakeup_sender.clone();
1302 dbtx.on_commit(move || {
1303 sender.send_replace(());
1304 });
1305
1306 Ok::<_, anyhow::Error>(())
1307 })
1308 },
1309 Some(100),
1310 )
1311 .await?;
1312
1313 Ok(())
1314 }
1315
1316 pub async fn await_num_deposits_by_operation_id(
1318 &self,
1319 operation_id: OperationId,
1320 num_deposits: usize,
1321 ) -> anyhow::Result<()> {
1322 let tweak_idx = self.find_tweak_idx_by_operation_id(operation_id).await?;
1323 self.await_num_deposits(tweak_idx, num_deposits).await
1324 }
1325
1326 pub async fn await_num_deposits_by_address(
1327 &self,
1328 address: bitcoin::Address<NetworkUnchecked>,
1329 num_deposits: usize,
1330 ) -> anyhow::Result<()> {
1331 self.await_num_deposits(self.find_tweak_idx_by_address(address).await?, num_deposits)
1332 .await
1333 }
1334
1335 #[instrument(target = LOG_CLIENT_MODULE_WALLET, skip_all, fields(tweak_idx=?tweak_idx, num_deposists=num_deposits))]
1336 pub async fn await_num_deposits(
1337 &self,
1338 tweak_idx: TweakIdx,
1339 num_deposits: usize,
1340 ) -> anyhow::Result<()> {
1341 let operation_id = self.get_pegin_tweak_idx(tweak_idx).await?.operation_id;
1342
1343 let mut receiver = self.pegin_claimed_receiver.clone();
1344 let mut backoff = backoff_util::aggressive_backoff();
1345
1346 loop {
1347 let pegins = self
1348 .get_claimed_pegins(
1349 &mut self.client_ctx.module_db().begin_transaction_nc().await,
1350 tweak_idx,
1351 )
1352 .await;
1353
1354 if pegins.len() < num_deposits {
1355 debug!(target: LOG_CLIENT_MODULE_WALLET, has=pegins.len(), "Not enough deposits");
1356 self.recheck_pegin_address(tweak_idx).await?;
1357 runtime::sleep(backoff.next().unwrap_or_default()).await;
1358 receiver.changed().await?;
1359 continue;
1360 }
1361
1362 debug!(target: LOG_CLIENT_MODULE_WALLET, has=pegins.len(), "Enough deposits detected");
1363
1364 for (_outpoint, transaction_id, change) in pegins {
1365 if transaction_id == TransactionId::from_byte_array([0; 32]) && change.is_empty() {
1366 debug!(target: LOG_CLIENT_MODULE_WALLET, "Deposited amount was too low, skipping");
1367 continue;
1368 }
1369
1370 debug!(target: LOG_CLIENT_MODULE_WALLET, out_points=?change, "Ensuring deposists claimed");
1371 let tx_subscriber = self.client_ctx.transaction_updates(operation_id).await;
1372
1373 if let Err(e) = tx_subscriber.await_tx_accepted(transaction_id).await {
1374 bail!("{}", e);
1375 }
1376
1377 debug!(target: LOG_CLIENT_MODULE_WALLET, out_points=?change, "Ensuring outputs claimed");
1378 self.client_ctx
1379 .await_primary_module_outputs(operation_id, change)
1380 .await
1381 .expect("Cannot fail if tx was accepted and federation is honest");
1382 }
1383
1384 return Ok(());
1385 }
1386 }
1387
1388 pub async fn withdraw<M: Serialize + MaybeSend + MaybeSync>(
1393 &self,
1394 address: &bitcoin::Address,
1395 amount: bitcoin::Amount,
1396 fee: PegOutFees,
1397 extra_meta: M,
1398 ) -> anyhow::Result<OperationId> {
1399 {
1400 let operation_id = OperationId(thread_rng().r#gen());
1401
1402 let withdraw_output =
1403 self.create_withdraw_output(operation_id, address.clone(), amount, fee)?;
1404 let tx_builder = TransactionBuilder::new()
1405 .with_outputs(self.client_ctx.make_client_outputs(withdraw_output));
1406
1407 let extra_meta =
1408 serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
1409 self.client_ctx
1410 .finalize_and_submit_transaction(
1411 operation_id,
1412 WalletCommonInit::KIND.as_str(),
1413 {
1414 let address = address.clone();
1415 move |change_range: OutPointRange| WalletOperationMeta {
1416 variant: WalletOperationMetaVariant::Withdraw {
1417 address: address.clone().into_unchecked(),
1418 amount,
1419 fee,
1420 change: change_range.into_iter().collect(),
1421 },
1422 extra_meta: extra_meta.clone(),
1423 }
1424 },
1425 tx_builder,
1426 )
1427 .await?;
1428
1429 let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
1430
1431 self.client_ctx
1432 .log_event(
1433 &mut dbtx,
1434 SendPaymentEvent {
1435 operation_id,
1436 amount: amount + fee.amount(),
1437 fee: fee.amount(),
1438 },
1439 )
1440 .await;
1441
1442 dbtx.commit_tx().await;
1443
1444 Ok(operation_id)
1445 }
1446 }
1447
1448 #[deprecated(
1453 since = "0.4.0",
1454 note = "RBF withdrawals are rejected by the federation"
1455 )]
1456 pub async fn rbf_withdraw<M: Serialize + MaybeSync + MaybeSend>(
1457 &self,
1458 rbf: Rbf,
1459 extra_meta: M,
1460 ) -> anyhow::Result<OperationId> {
1461 let operation_id = OperationId(thread_rng().r#gen());
1462
1463 let withdraw_output = self.create_rbf_withdraw_output(operation_id, &rbf)?;
1464 let tx_builder = TransactionBuilder::new()
1465 .with_outputs(self.client_ctx.make_client_outputs(withdraw_output));
1466
1467 let extra_meta = serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
1468 self.client_ctx
1469 .finalize_and_submit_transaction(
1470 operation_id,
1471 WalletCommonInit::KIND.as_str(),
1472 move |change_range: OutPointRange| WalletOperationMeta {
1473 variant: WalletOperationMetaVariant::RbfWithdraw {
1474 rbf: rbf.clone(),
1475 change: change_range.into_iter().collect(),
1476 },
1477 extra_meta: extra_meta.clone(),
1478 },
1479 tx_builder,
1480 )
1481 .await?;
1482
1483 Ok(operation_id)
1484 }
1485
1486 pub async fn subscribe_withdraw_updates(
1487 &self,
1488 operation_id: OperationId,
1489 ) -> anyhow::Result<UpdateStreamOrOutcome<WithdrawState>> {
1490 let operation = self
1491 .client_ctx
1492 .get_operation(operation_id)
1493 .await
1494 .with_context(|| anyhow!("Operation not found: {}", operation_id.fmt_short()))?;
1495
1496 if operation.operation_module_kind() != WalletCommonInit::KIND.as_str() {
1497 bail!("Operation is not a wallet operation");
1498 }
1499
1500 let operation_meta = operation.meta::<WalletOperationMeta>();
1501
1502 let (WalletOperationMetaVariant::Withdraw { change, .. }
1503 | WalletOperationMetaVariant::RbfWithdraw { change, .. }) = operation_meta.variant
1504 else {
1505 bail!("Operation is not a withdraw operation");
1506 };
1507
1508 let mut operation_stream = self.notifier.subscribe(operation_id).await;
1509 let client_ctx = self.client_ctx.clone();
1510
1511 Ok(self
1512 .client_ctx
1513 .outcome_or_updates(operation, operation_id, move || {
1514 stream! {
1515 match next_withdraw_state(&mut operation_stream).await {
1516 Some(WithdrawStates::Created(_)) => {
1517 yield WithdrawState::Created;
1518 },
1519 Some(s) => {
1520 panic!("Unexpected state {s:?}")
1521 },
1522 None => return,
1523 }
1524
1525 let _ = client_ctx
1530 .await_primary_module_outputs(operation_id, change)
1531 .await;
1532
1533
1534 match next_withdraw_state(&mut operation_stream).await {
1535 Some(WithdrawStates::Aborted(inner)) => {
1536 yield WithdrawState::Failed(inner.error);
1537 },
1538 Some(WithdrawStates::Success(inner)) => {
1539 yield WithdrawState::Succeeded(inner.txid);
1540 },
1541 Some(s) => {
1542 panic!("Unexpected state {s:?}")
1543 },
1544 None => {},
1545 }
1546 }
1547 }))
1548 }
1549
1550 fn admin_auth(&self) -> anyhow::Result<ApiAuth> {
1551 self.admin_auth
1552 .clone()
1553 .ok_or_else(|| anyhow::format_err!("Admin auth not set"))
1554 }
1555
1556 pub async fn activate_consensus_version_voting(&self) -> anyhow::Result<()> {
1557 self.module_api
1558 .activate_consensus_version_voting(self.admin_auth()?)
1559 .await?;
1560
1561 Ok(())
1562 }
1563}
1564
1565async fn poll_supports_safe_deposit_version(db: Database, module_api: DynModuleApi) {
1568 loop {
1569 let mut dbtx = db.begin_transaction().await;
1570
1571 if dbtx.get_value(&SupportsSafeDepositKey).await.is_some() {
1572 break;
1573 }
1574
1575 module_api.wait_for_initialized_connections().await;
1576
1577 if let Ok(module_consensus_version) = module_api.module_consensus_version().await
1578 && SAFE_DEPOSIT_MODULE_CONSENSUS_VERSION <= module_consensus_version
1579 {
1580 dbtx.insert_new_entry(&SupportsSafeDepositKey, &()).await;
1581 dbtx.commit_tx().await;
1582 break;
1583 }
1584
1585 drop(dbtx);
1586
1587 if is_running_in_test_env() {
1588 sleep(Duration::from_secs(10)).await;
1590 } else {
1591 sleep(Duration::from_secs(3600)).await;
1592 }
1593 }
1594}
1595
1596async fn get_next_peg_in_tweak_child_id(dbtx: &mut DatabaseTransaction<'_>) -> TweakIdx {
1598 let index = dbtx
1599 .get_value(&NextPegInTweakIndexKey)
1600 .await
1601 .unwrap_or_default();
1602 dbtx.insert_entry(&NextPegInTweakIndexKey, &(index.next()))
1603 .await;
1604 index
1605}
1606
1607#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
1608pub enum WalletClientStates {
1609 Deposit(DepositStateMachine),
1610 Withdraw(WithdrawStateMachine),
1611}
1612
1613impl IntoDynInstance for WalletClientStates {
1614 type DynType = DynState;
1615
1616 fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
1617 DynState::from_typed(instance_id, self)
1618 }
1619}
1620
1621impl State for WalletClientStates {
1622 type ModuleContext = WalletClientContext;
1623
1624 fn transitions(
1625 &self,
1626 context: &Self::ModuleContext,
1627 global_context: &DynGlobalClientContext,
1628 ) -> Vec<StateTransition<Self>> {
1629 match self {
1630 WalletClientStates::Deposit(sm) => {
1631 sm_enum_variant_translation!(
1632 sm.transitions(context, global_context),
1633 WalletClientStates::Deposit
1634 )
1635 }
1636 WalletClientStates::Withdraw(sm) => {
1637 sm_enum_variant_translation!(
1638 sm.transitions(context, global_context),
1639 WalletClientStates::Withdraw
1640 )
1641 }
1642 }
1643 }
1644
1645 fn operation_id(&self) -> OperationId {
1646 match self {
1647 WalletClientStates::Deposit(sm) => sm.operation_id(),
1648 WalletClientStates::Withdraw(sm) => sm.operation_id(),
1649 }
1650 }
1651}
1652
1653#[cfg(all(test, not(target_family = "wasm")))]
1654mod tests {
1655 use std::collections::BTreeSet;
1656 use std::sync::atomic::{AtomicBool, Ordering};
1657
1658 use super::*;
1659 use crate::backup::{
1660 RECOVER_NUM_IDX_ADD_TO_LAST_USED, RecoverScanOutcome, recover_scan_idxes_for_activity,
1661 };
1662
1663 #[allow(clippy::too_many_lines)] #[tokio::test(flavor = "multi_thread")]
1665 async fn sanity_test_recover_inner() {
1666 {
1667 let last_checked = AtomicBool::new(false);
1668 let last_checked = &last_checked;
1669 assert_eq!(
1670 recover_scan_idxes_for_activity(
1671 TweakIdx(0),
1672 &BTreeSet::new(),
1673 |cur_idx| async move {
1674 Ok(match cur_idx {
1675 TweakIdx(9) => {
1676 last_checked.store(true, Ordering::SeqCst);
1677 vec![]
1678 }
1679 TweakIdx(10) => panic!("Shouldn't happen"),
1680 TweakIdx(11) => {
1681 vec![0usize] }
1683 _ => vec![],
1684 })
1685 }
1686 )
1687 .await
1688 .unwrap(),
1689 RecoverScanOutcome {
1690 last_used_idx: None,
1691 new_start_idx: TweakIdx(RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1692 tweak_idxes_with_pegins: BTreeSet::from([])
1693 }
1694 );
1695 assert!(last_checked.load(Ordering::SeqCst));
1696 }
1697
1698 {
1699 let last_checked = AtomicBool::new(false);
1700 let last_checked = &last_checked;
1701 assert_eq!(
1702 recover_scan_idxes_for_activity(
1703 TweakIdx(0),
1704 &BTreeSet::from([TweakIdx(1), TweakIdx(2)]),
1705 |cur_idx| async move {
1706 Ok(match cur_idx {
1707 TweakIdx(1) => panic!("Shouldn't happen: already used (1)"),
1708 TweakIdx(2) => panic!("Shouldn't happen: already used (2)"),
1709 TweakIdx(11) => {
1710 last_checked.store(true, Ordering::SeqCst);
1711 vec![]
1712 }
1713 TweakIdx(12) => panic!("Shouldn't happen"),
1714 TweakIdx(13) => {
1715 vec![0usize] }
1717 _ => vec![],
1718 })
1719 }
1720 )
1721 .await
1722 .unwrap(),
1723 RecoverScanOutcome {
1724 last_used_idx: Some(TweakIdx(2)),
1725 new_start_idx: TweakIdx(2 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1726 tweak_idxes_with_pegins: BTreeSet::from([])
1727 }
1728 );
1729 assert!(last_checked.load(Ordering::SeqCst));
1730 }
1731
1732 {
1733 let last_checked = AtomicBool::new(false);
1734 let last_checked = &last_checked;
1735 assert_eq!(
1736 recover_scan_idxes_for_activity(
1737 TweakIdx(10),
1738 &BTreeSet::new(),
1739 |cur_idx| async move {
1740 Ok(match cur_idx {
1741 TweakIdx(10) => vec![()],
1742 TweakIdx(19) => {
1743 last_checked.store(true, Ordering::SeqCst);
1744 vec![]
1745 }
1746 TweakIdx(20) => panic!("Shouldn't happen"),
1747 _ => vec![],
1748 })
1749 }
1750 )
1751 .await
1752 .unwrap(),
1753 RecoverScanOutcome {
1754 last_used_idx: Some(TweakIdx(10)),
1755 new_start_idx: TweakIdx(10 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1756 tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(10)])
1757 }
1758 );
1759 assert!(last_checked.load(Ordering::SeqCst));
1760 }
1761
1762 assert_eq!(
1763 recover_scan_idxes_for_activity(TweakIdx(0), &BTreeSet::new(), |cur_idx| async move {
1764 Ok(match cur_idx {
1765 TweakIdx(6 | 15) => vec![()],
1766 _ => vec![],
1767 })
1768 })
1769 .await
1770 .unwrap(),
1771 RecoverScanOutcome {
1772 last_used_idx: Some(TweakIdx(15)),
1773 new_start_idx: TweakIdx(15 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1774 tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(6), TweakIdx(15)])
1775 }
1776 );
1777 assert_eq!(
1778 recover_scan_idxes_for_activity(TweakIdx(10), &BTreeSet::new(), |cur_idx| async move {
1779 Ok(match cur_idx {
1780 TweakIdx(8) => {
1781 vec![()] }
1783 TweakIdx(9) => {
1784 panic!("Shouldn't happen")
1785 }
1786 _ => vec![],
1787 })
1788 })
1789 .await
1790 .unwrap(),
1791 RecoverScanOutcome {
1792 last_used_idx: None,
1793 new_start_idx: TweakIdx(9 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1794 tweak_idxes_with_pegins: BTreeSet::from([])
1795 }
1796 );
1797 assert_eq!(
1798 recover_scan_idxes_for_activity(TweakIdx(10), &BTreeSet::new(), |cur_idx| async move {
1799 Ok(match cur_idx {
1800 TweakIdx(9) => panic!("Shouldn't happen"),
1801 TweakIdx(15) => vec![()],
1802 _ => vec![],
1803 })
1804 })
1805 .await
1806 .unwrap(),
1807 RecoverScanOutcome {
1808 last_used_idx: Some(TweakIdx(15)),
1809 new_start_idx: TweakIdx(15 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
1810 tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(15)])
1811 }
1812 );
1813 }
1814}