1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::missing_errors_doc)]
4#![allow(clippy::missing_panics_doc)]
5#![allow(clippy::module_name_repetitions)]
6#![allow(clippy::must_use_candidate)]
7
8pub mod api;
9#[cfg(feature = "cli")]
10mod cli;
11
12mod backup;
13
14pub mod client_db;
15mod deposit;
18pub mod events;
19use events::SendPaymentEvent;
20mod pegin_monitor;
22mod withdraw;
23
24use std::collections::{BTreeMap, BTreeSet};
25use std::future;
26use std::sync::Arc;
27use std::time::{Duration, SystemTime};
28
29use anyhow::{Context as AnyhowContext, anyhow, bail, ensure};
30use async_stream::{stream, try_stream};
31use backup::WalletModuleBackup;
32use bitcoin::address::NetworkUnchecked;
33use bitcoin::secp256k1::{All, SECP256K1, Secp256k1};
34use bitcoin::{Address, Network, ScriptBuf};
35use client_db::{DbKeyPrefix, PegInTweakIndexKey, SupportsSafeDepositKey, TweakIdx};
36use fedimint_api_client::api::{DynModuleApi, FederationResult};
37use fedimint_bitcoind::{BitcoindTracked, DynBitcoindRpc, IBitcoindRpc, create_esplora_rpc};
38use fedimint_client_module::module::init::{
39 ClientModuleInit, ClientModuleInitArgs, ClientModuleRecoverArgs,
40};
41use fedimint_client_module::module::recovery::RecoveryProgress;
42use fedimint_client_module::module::{ClientContext, ClientModule, IClientModule, OutPointRange};
43use fedimint_client_module::oplog::UpdateStreamOrOutcome;
44use fedimint_client_module::sm::{Context, DynState, ModuleNotifier, State, StateTransition};
45use fedimint_client_module::transaction::{
46 ClientOutput, ClientOutputBundle, ClientOutputSM, TransactionBuilder,
47};
48use fedimint_client_module::{DynGlobalClientContext, sm_enum_variant_translation};
49use fedimint_core::core::{Decoder, IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId};
50use fedimint_core::db::{
51 AutocommitError, Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped,
52};
53use fedimint_core::encoding::{Decodable, Encodable};
54use fedimint_core::envs::{BitcoinRpcConfig, is_running_in_test_env};
55use fedimint_core::module::{
56 Amounts, ApiAuth, ApiVersion, CommonModuleInit, ModuleCommon, ModuleConsensusVersion,
57 ModuleInit, MultiApiVersion,
58};
59use fedimint_core::task::{MaybeSend, MaybeSync, TaskGroup, sleep};
60use fedimint_core::util::backoff_util::background_backoff;
61use fedimint_core::util::{BoxStream, backoff_util, retry};
62use fedimint_core::{
63 BitcoinHash, OutPoint, TransactionId, apply, async_trait_maybe_send, push_db_pair_items,
64 runtime, secp256k1,
65};
66use fedimint_derive_secret::{ChildId, DerivableSecret};
67use fedimint_logging::LOG_CLIENT_MODULE_WALLET;
68pub use fedimint_wallet_common as common;
69use fedimint_wallet_common::config::{FeeConsensus, WalletClientConfig};
70use fedimint_wallet_common::tweakable::Tweakable;
71pub use fedimint_wallet_common::*;
72use futures::{Stream, StreamExt};
73use rand::{Rng, thread_rng};
74use secp256k1::Keypair;
75use serde::{Deserialize, Serialize};
76use strum::IntoEnumIterator;
77use tokio::sync::watch;
78use tracing::{debug, instrument};
79
80use crate::api::WalletFederationApi;
81use crate::backup::{FEDERATION_RECOVER_MAX_GAP, RecoveryStateV2, WalletRecovery};
82use crate::client_db::{
83 ClaimedPegInData, ClaimedPegInKey, ClaimedPegInPrefix, NextPegInTweakIndexKey,
84 PegInPoolCursorKey, PegInTweakIndexData, PegInTweakIndexPrefix, RecoveryFinalizedKey,
85 RecoveryStateKey, SupportsSafeDepositPrefix,
86};
87use crate::deposit::DepositStateMachine;
88use crate::withdraw::{CreatedWithdrawState, WithdrawStateMachine, WithdrawStates};
89
90const WALLET_TWEAK_CHILD_ID: ChildId = ChildId(0);
91
92#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
93pub struct BitcoinTransactionData {
94 pub btc_transaction: bitcoin::Transaction,
97 pub out_idx: u32,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
102pub enum DepositStateV1 {
103 WaitingForTransaction,
104 WaitingForConfirmation(BitcoinTransactionData),
105 Confirmed(BitcoinTransactionData),
106 Claimed(BitcoinTransactionData),
107 Failed(String),
108}
109
110#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
111pub enum DepositStateV2 {
112 WaitingForTransaction,
113 WaitingForConfirmation {
114 #[serde(with = "bitcoin::amount::serde::as_sat")]
115 btc_deposited: bitcoin::Amount,
116 btc_out_point: bitcoin::OutPoint,
117 },
118 Confirmed {
119 #[serde(with = "bitcoin::amount::serde::as_sat")]
120 btc_deposited: bitcoin::Amount,
121 btc_out_point: bitcoin::OutPoint,
122 },
123 Claimed {
124 #[serde(with = "bitcoin::amount::serde::as_sat")]
125 btc_deposited: bitcoin::Amount,
126 btc_out_point: bitcoin::OutPoint,
127 },
128 Failed(String),
129}
130
131#[derive(Debug, Clone, PartialEq, Eq)]
133pub struct DepositAddressInfo {
134 pub operation_id: OperationId,
135 pub address: Address,
136 pub tweak_idx: TweakIdx,
137}
138
139#[allow(clippy::enum_variant_names)]
145#[derive(Debug, Clone, PartialEq, Eq)]
146pub enum MaybeNewAddress {
147 NewAddress(DepositAddressInfo),
149 TooManyUnusedAddresses(Vec<DepositAddressInfo>),
153}
154
155#[derive(Debug, Clone, Copy, PartialEq, Eq)]
161pub enum AllocateDepositOutcome {
162 Fresh,
164 Reused { original_tweak_idx: TweakIdx },
168}
169
170#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
171pub enum WithdrawState {
172 Created,
173 Succeeded(bitcoin::Txid),
174 Failed(String),
175 }
179
180async fn next_withdraw_state<S>(stream: &mut S) -> Option<WithdrawStates>
181where
182 S: Stream<Item = WalletClientStates> + Unpin,
183{
184 loop {
185 if let WalletClientStates::Withdraw(ds) = stream.next().await? {
186 return Some(ds.state);
187 }
188 tokio::task::yield_now().await;
189 }
190}
191
192#[derive(Debug, Clone, Default)]
193pub struct WalletClientInit(pub Option<DynBitcoindRpc>);
195
196const SLICE_SIZE: u64 = 1000;
197
198impl WalletClientInit {
199 pub fn new(rpc: DynBitcoindRpc) -> Self {
200 Self(Some(rpc))
201 }
202
203 async fn recover_from_slices(
204 &self,
205 args: &ClientModuleRecoverArgs<Self>,
206 ) -> anyhow::Result<()> {
207 let data = WalletClientModuleData {
208 cfg: args.cfg().clone(),
209 module_root_secret: args.module_root_secret().clone(),
210 };
211
212 let total_items = args.module_api().fetch_recovery_count().await?;
213
214 let mut state = RecoveryStateV2::new();
215
216 state.refill_pending_pool_up_to(&data, TweakIdx(FEDERATION_RECOVER_MAX_GAP));
217
218 for start in (0..total_items).step_by(SLICE_SIZE as usize) {
219 let end = std::cmp::min(start + SLICE_SIZE, total_items);
220
221 let items = args.module_api().fetch_recovery_slice(start, end).await?;
222
223 for item in &items {
224 match item {
225 RecoveryItem::Input { outpoint, script } => {
226 state.handle_item(*outpoint, script, &data);
227 }
228 }
229 }
230
231 args.update_recovery_progress(RecoveryProgress {
232 complete: end.try_into().unwrap_or(u32::MAX),
233 total: total_items.try_into().unwrap_or(u32::MAX),
234 });
235 }
236
237 let mut dbtx = args.db().begin_transaction().await;
238
239 for tweak_idx in 0..state.new_start_idx().0 {
240 let operation_id = data.derive_peg_in_script(TweakIdx(tweak_idx)).3;
241
242 let claimed = state
243 .claimed_outpoints
244 .get(&TweakIdx(tweak_idx))
245 .cloned()
246 .unwrap_or_default();
247
248 dbtx.insert_new_entry(
249 &PegInTweakIndexKey(TweakIdx(tweak_idx)),
250 &PegInTweakIndexData {
251 operation_id,
252 creation_time: fedimint_core::time::now(),
253 last_check_time: None,
254 next_check_time: Some(fedimint_core::time::now()),
255 claimed,
256 },
257 )
258 .await;
259 }
260
261 dbtx.insert_new_entry(&NextPegInTweakIndexKey, &state.new_start_idx())
262 .await;
263
264 dbtx.commit_tx().await;
265
266 Ok(())
267 }
268}
269
270impl ModuleInit for WalletClientInit {
271 type Common = WalletCommonInit;
272
273 async fn dump_database(
274 &self,
275 dbtx: &mut DatabaseTransaction<'_>,
276 prefix_names: Vec<String>,
277 ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
278 let mut wallet_client_items: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> =
279 BTreeMap::new();
280 let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
281 prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
282 });
283
284 for table in filtered_prefixes {
285 match table {
286 DbKeyPrefix::NextPegInTweakIndex => {
287 if let Some(index) = dbtx.get_value(&NextPegInTweakIndexKey).await {
288 wallet_client_items
289 .insert("NextPegInTweakIndex".to_string(), Box::new(index));
290 }
291 }
292 DbKeyPrefix::PegInTweakIndex => {
293 push_db_pair_items!(
294 dbtx,
295 PegInTweakIndexPrefix,
296 PegInTweakIndexKey,
297 PegInTweakIndexData,
298 wallet_client_items,
299 "Peg-In Tweak Index"
300 );
301 }
302 DbKeyPrefix::ClaimedPegIn => {
303 push_db_pair_items!(
304 dbtx,
305 ClaimedPegInPrefix,
306 ClaimedPegInKey,
307 ClaimedPegInData,
308 wallet_client_items,
309 "Claimed Peg-In"
310 );
311 }
312 DbKeyPrefix::RecoveryFinalized => {
313 if let Some(val) = dbtx.get_value(&RecoveryFinalizedKey).await {
314 wallet_client_items.insert("RecoveryFinalized".to_string(), Box::new(val));
315 }
316 }
317 DbKeyPrefix::SupportsSafeDeposit => {
318 push_db_pair_items!(
319 dbtx,
320 SupportsSafeDepositPrefix,
321 SupportsSafeDepositKey,
322 (),
323 wallet_client_items,
324 "Supports Safe Deposit"
325 );
326 }
327 DbKeyPrefix::PegInPoolCursor => {
328 if let Some(cursor) = dbtx.get_value(&PegInPoolCursorKey).await {
329 wallet_client_items.insert("PegInPoolCursor".to_string(), Box::new(cursor));
330 }
331 }
332 DbKeyPrefix::RecoveryState
333 | DbKeyPrefix::ExternalReservedStart
334 | DbKeyPrefix::CoreInternalReservedStart
335 | DbKeyPrefix::CoreInternalReservedEnd => {}
336 }
337 }
338
339 Box::new(wallet_client_items.into_iter())
340 }
341}
342
343#[apply(async_trait_maybe_send!)]
344impl ClientModuleInit for WalletClientInit {
345 type Module = WalletClientModule;
346
347 fn supported_api_versions(&self) -> MultiApiVersion {
348 MultiApiVersion::try_from_iter([ApiVersion { major: 0, minor: 0 }])
349 .expect("no version conflicts")
350 }
351
352 async fn init(&self, args: &ClientModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
353 let data = WalletClientModuleData {
354 cfg: args.cfg().clone(),
355 module_root_secret: args.module_root_secret().clone(),
356 };
357
358 let db = args.db().clone();
359
360 let rpc_config = WalletClientModule::get_rpc_config(args.cfg());
361
362 let btc_rpc = if let Some(user_rpc) = args.user_bitcoind_rpc() {
369 user_rpc.clone()
370 } else if let Some(factory) = args.user_bitcoind_rpc_no_chain_id() {
371 if let Some(rpc) = factory(rpc_config.url.clone()).await {
372 rpc
373 } else {
374 self.0
375 .clone()
376 .unwrap_or(create_esplora_rpc(&rpc_config.url)?)
377 }
378 } else {
379 self.0
380 .clone()
381 .unwrap_or(create_esplora_rpc(&rpc_config.url)?)
382 };
383 let btc_rpc = BitcoindTracked::new(btc_rpc, "wallet-client").into_dyn();
384
385 let module_api = args.module_api().clone();
386
387 let (pegin_claimed_sender, pegin_claimed_receiver) = watch::channel(());
388 let (pegin_monitor_wakeup_sender, pegin_monitor_wakeup_receiver) = watch::channel(());
389
390 Ok(WalletClientModule {
391 db,
392 data,
393 module_api,
394 notifier: args.notifier().clone(),
395 rpc: btc_rpc,
396 client_ctx: args.context(),
397 pegin_monitor_wakeup_sender,
398 pegin_monitor_wakeup_receiver,
399 pegin_claimed_receiver,
400 pegin_claimed_sender,
401 task_group: args.task_group().clone(),
402 client_span: args.client_span().clone(),
403 admin_auth: args.admin_auth().cloned(),
404 })
405 }
406
407 async fn recover(
412 &self,
413 args: &ClientModuleRecoverArgs<Self>,
414 snapshot: Option<&<Self::Module as ClientModule>::Backup>,
415 ) -> anyhow::Result<()> {
416 if args
419 .db()
420 .begin_transaction_nc()
421 .await
422 .get_value(&RecoveryStateKey)
423 .await
424 .is_some()
425 {
426 return args
427 .recover_from_history::<WalletRecovery>(self, snapshot)
428 .await;
429 }
430
431 if args.module_api().fetch_recovery_count().await.is_ok() {
433 self.recover_from_slices(args).await
434 } else {
435 args.recover_from_history::<WalletRecovery>(self, snapshot)
436 .await
437 }
438 }
439
440 fn used_db_prefixes(&self) -> Option<BTreeSet<u8>> {
441 Some(
442 DbKeyPrefix::iter()
443 .map(|p| p as u8)
444 .chain(
445 DbKeyPrefix::ExternalReservedStart as u8
446 ..=DbKeyPrefix::CoreInternalReservedEnd as u8,
447 )
448 .collect(),
449 )
450 }
451}
452
453#[derive(Debug, Clone, Serialize, Deserialize)]
454pub struct WalletOperationMeta {
455 pub variant: WalletOperationMetaVariant,
456 pub extra_meta: serde_json::Value,
457}
458
459#[derive(Debug, Clone, Serialize, Deserialize)]
460#[serde(rename_all = "snake_case")]
461pub enum WalletOperationMetaVariant {
462 Deposit {
463 address: Address<NetworkUnchecked>,
464 #[serde(default)]
469 tweak_idx: Option<TweakIdx>,
470 #[serde(default, skip_serializing_if = "Option::is_none")]
471 expires_at: Option<SystemTime>,
472 },
473 Withdraw {
474 address: Address<NetworkUnchecked>,
475 #[serde(with = "bitcoin::amount::serde::as_sat")]
476 amount: bitcoin::Amount,
477 fee: PegOutFees,
478 change: Vec<OutPoint>,
479 },
480
481 RbfWithdraw {
482 rbf: Rbf,
483 change: Vec<OutPoint>,
484 },
485}
486
487#[derive(Debug, Clone)]
489pub struct WalletClientModuleData {
490 cfg: WalletClientConfig,
491 module_root_secret: DerivableSecret,
492}
493
494impl WalletClientModuleData {
495 fn derive_deposit_address(
496 &self,
497 idx: TweakIdx,
498 ) -> (Keypair, secp256k1::PublicKey, Address, OperationId) {
499 let idx = ChildId(idx.0);
500
501 let secret_tweak_key = self
502 .module_root_secret
503 .child_key(WALLET_TWEAK_CHILD_ID)
504 .child_key(idx)
505 .to_secp_key(fedimint_core::secp256k1::SECP256K1);
506
507 let public_tweak_key = secret_tweak_key.public_key();
508
509 let address = self
510 .cfg
511 .peg_in_descriptor
512 .tweak(&public_tweak_key, bitcoin::secp256k1::SECP256K1)
513 .address(self.cfg.network.0)
514 .unwrap();
515
516 let operation_id = OperationId(public_tweak_key.x_only_public_key().0.serialize());
518
519 (secret_tweak_key, public_tweak_key, address, operation_id)
520 }
521
522 fn derive_peg_in_script(
523 &self,
524 idx: TweakIdx,
525 ) -> (ScriptBuf, bitcoin::Address, Keypair, OperationId) {
526 let (secret_tweak_key, _, address, operation_id) = self.derive_deposit_address(idx);
527
528 (
529 self.cfg
530 .peg_in_descriptor
531 .tweak(&secret_tweak_key.public_key(), SECP256K1)
532 .script_pubkey(),
533 address,
534 secret_tweak_key,
535 operation_id,
536 )
537 }
538}
539
540#[derive(Debug)]
541pub struct WalletClientModule {
542 data: WalletClientModuleData,
543 db: Database,
544 module_api: DynModuleApi,
545 notifier: ModuleNotifier<WalletClientStates>,
546 rpc: DynBitcoindRpc,
547 client_ctx: ClientContext<Self>,
548 pegin_monitor_wakeup_sender: watch::Sender<()>,
550 pegin_monitor_wakeup_receiver: watch::Receiver<()>,
551 pegin_claimed_sender: watch::Sender<()>,
553 pegin_claimed_receiver: watch::Receiver<()>,
554 task_group: TaskGroup,
555 client_span: tracing::Span,
556 admin_auth: Option<ApiAuth>,
557}
558
559#[apply(async_trait_maybe_send!)]
560impl ClientModule for WalletClientModule {
561 type Init = WalletClientInit;
562 type Common = WalletModuleTypes;
563 type Backup = WalletModuleBackup;
564 type ModuleStateMachineContext = WalletClientContext;
565 type States = WalletClientStates;
566
567 fn context(&self) -> Self::ModuleStateMachineContext {
568 WalletClientContext {
569 rpc: self.rpc.clone(),
570 wallet_descriptor: self.cfg().peg_in_descriptor.clone(),
571 wallet_decoder: self.decoder(),
572 secp: Secp256k1::default(),
573 client_ctx: self.client_ctx.clone(),
574 }
575 }
576
577 async fn start(&self) {
578 self.task_group
579 .spawn_cancellable_with_span(self.client_span.clone(), "peg-in monitor", {
580 let client_ctx = self.client_ctx.clone();
581 let db = self.db.clone();
582 let btc_rpc = self.rpc.clone();
583 let module_api = self.module_api.clone();
584 let data = self.data.clone();
585 let pegin_claimed_sender = self.pegin_claimed_sender.clone();
586 let pegin_monitor_wakeup_receiver = self.pegin_monitor_wakeup_receiver.clone();
587 pegin_monitor::run_peg_in_monitor(
588 client_ctx,
589 db,
590 btc_rpc,
591 module_api,
592 data,
593 pegin_claimed_sender,
594 pegin_monitor_wakeup_receiver,
595 )
596 });
597
598 self.task_group.spawn_cancellable_with_span(
599 self.client_span.clone(),
600 "supports-safe-deposit-version",
601 {
602 let db = self.db.clone();
603 let module_api = self.module_api.clone();
604
605 poll_supports_safe_deposit_version(db, module_api)
606 },
607 );
608 }
609
610 fn supports_backup(&self) -> bool {
611 true
612 }
613
614 async fn backup(&self) -> anyhow::Result<backup::WalletModuleBackup> {
615 let session_count = self.client_ctx.global_api().session_count().await?;
617
618 let mut dbtx = self.db.begin_transaction_nc().await;
619 let next_pegin_tweak_idx = dbtx
620 .get_value(&NextPegInTweakIndexKey)
621 .await
622 .unwrap_or_default();
623 let claimed = dbtx
624 .find_by_prefix(&PegInTweakIndexPrefix)
625 .await
626 .filter_map(|(k, v)| async move {
627 if v.claimed.is_empty() {
628 None
629 } else {
630 Some(k.0)
631 }
632 })
633 .collect()
634 .await;
635 Ok(backup::WalletModuleBackup::new_v1(
636 session_count,
637 next_pegin_tweak_idx,
638 claimed,
639 ))
640 }
641
642 fn input_fee(
643 &self,
644 _amount: &Amounts,
645 _input: &<Self::Common as ModuleCommon>::Input,
646 ) -> Option<Amounts> {
647 Some(Amounts::new_bitcoin(self.cfg().fee_consensus.peg_in_abs))
648 }
649
650 fn output_fee(
651 &self,
652 _amount: &Amounts,
653 _output: &<Self::Common as ModuleCommon>::Output,
654 ) -> Option<Amounts> {
655 Some(Amounts::new_bitcoin(self.cfg().fee_consensus.peg_out_abs))
656 }
657
658 async fn handle_rpc(
659 &self,
660 method: String,
661 request: serde_json::Value,
662 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
663 Box::pin(try_stream! {
664 match method.as_str() {
665 "get_wallet_summary" => {
666 let _req: WalletSummaryRequest = serde_json::from_value(request)?;
667 let wallet_summary = self.get_wallet_summary()
668 .await
669 .expect("Failed to fetch wallet summary");
670 let result = serde_json::to_value(&wallet_summary)
671 .expect("Serialization error");
672 yield result;
673 }
674 "get_block_count_local" => {
675 let block_count = self.get_block_count_local().await
676 .expect("Failed to fetch block count");
677 yield serde_json::to_value(block_count)?;
678 }
679 "peg_in" => {
680 let req: PegInRequest = serde_json::from_value(request)?;
681 let response = self.peg_in(req)
682 .await
683 .map_err(|e| anyhow::anyhow!("peg_in failed: {e}"))?;
684 let result = serde_json::to_value(&response)?;
685 yield result;
686 },
687 "peg_out" => {
688 let req: PegOutRequest = serde_json::from_value(request)?;
689 let response = self.peg_out(req)
690 .await
691 .map_err(|e| anyhow::anyhow!("peg_out failed: {e}"))?;
692 let result = serde_json::to_value(&response)?;
693 yield result;
694 },
695 "subscribe_deposit" => {
696 let req: SubscribeDepositRequest = serde_json::from_value(request)?;
697 for await state in self.subscribe_deposit(req.operation_id).await?.into_stream() {
698 yield serde_json::to_value(state)?;
699 }
700 },
701 "subscribe_withdraw" => {
702 let req: SubscribeWithdrawRequest = serde_json::from_value(request)?;
703 for await state in self.subscribe_withdraw_updates(req.operation_id).await?.into_stream(){
704 yield serde_json::to_value(state)?;
705 }
706 }
707 _ => {
708 Err(anyhow::format_err!("Unknown method: {method}"))?;
709 }
710 }
711 })
712 }
713
714 #[cfg(feature = "cli")]
715 async fn handle_cli_command(
716 &self,
717 args: &[std::ffi::OsString],
718 ) -> anyhow::Result<serde_json::Value> {
719 cli::handle_cli_command(self, args).await
720 }
721}
722
723#[derive(Deserialize)]
724struct WalletSummaryRequest {}
725
726#[derive(Debug, Clone)]
727pub struct WalletClientContext {
728 rpc: DynBitcoindRpc,
729 wallet_descriptor: PegInDescriptor,
730 wallet_decoder: Decoder,
731 secp: Secp256k1<All>,
732 pub client_ctx: ClientContext<WalletClientModule>,
733}
734
735#[derive(Debug, Clone, Serialize, Deserialize)]
736pub struct PegInRequest {
737 pub extra_meta: serde_json::Value,
738}
739
740#[derive(Deserialize)]
741struct SubscribeDepositRequest {
742 operation_id: OperationId,
743}
744
745#[derive(Deserialize)]
746struct SubscribeWithdrawRequest {
747 operation_id: OperationId,
748}
749
750#[derive(Debug, Clone, Serialize, Deserialize)]
751pub struct PegInResponse {
752 pub deposit_address: Address<NetworkUnchecked>,
753 pub operation_id: OperationId,
754}
755
756#[derive(Debug, Clone, Serialize, Deserialize)]
757pub struct PegOutRequest {
758 pub amount_sat: u64,
759 pub destination_address: Address<NetworkUnchecked>,
760 pub extra_meta: serde_json::Value,
761}
762
763#[derive(Debug, Clone, Serialize, Deserialize)]
764pub struct PegOutResponse {
765 pub operation_id: OperationId,
766}
767
768impl Context for WalletClientContext {
769 const KIND: Option<ModuleKind> = Some(KIND);
770}
771
772impl WalletClientModule {
773 fn cfg(&self) -> &WalletClientConfig {
774 &self.data.cfg
775 }
776
777 fn get_rpc_config(cfg: &WalletClientConfig) -> BitcoinRpcConfig {
778 match BitcoinRpcConfig::get_defaults_from_env_vars() {
779 Ok(rpc_config) => {
780 if rpc_config.kind == "bitcoind" {
783 cfg.default_bitcoin_rpc.clone()
784 } else {
785 rpc_config
786 }
787 }
788 _ => cfg.default_bitcoin_rpc.clone(),
789 }
790 }
791
792 pub fn get_network(&self) -> Network {
793 self.cfg().network.0
794 }
795
796 pub fn get_finality_delay(&self) -> u32 {
797 self.cfg().finality_delay
798 }
799
800 pub fn get_fee_consensus(&self) -> FeeConsensus {
801 self.cfg().fee_consensus
802 }
803
804 async fn allocate_deposit_address_inner(
805 &self,
806 dbtx: &mut DatabaseTransaction<'_>,
807 ) -> DepositAddressInfo {
808 dbtx.ensure_isolated().expect("Must be isolated db");
809
810 let tweak_idx = get_next_peg_in_tweak_child_id(dbtx).await;
811 let (_secret_tweak_key, _, address, operation_id) =
812 self.data.derive_deposit_address(tweak_idx);
813
814 let now = fedimint_core::time::now();
815
816 dbtx.insert_new_entry(
817 &PegInTweakIndexKey(tweak_idx),
818 &PegInTweakIndexData {
819 creation_time: now,
820 next_check_time: Some(now),
821 last_check_time: None,
822 operation_id,
823 claimed: vec![],
824 },
825 )
826 .await;
827
828 DepositAddressInfo {
829 operation_id,
830 address,
831 tweak_idx,
832 }
833 }
834
835 pub async fn get_withdraw_fees(
842 &self,
843 address: &bitcoin::Address,
844 amount: bitcoin::Amount,
845 ) -> anyhow::Result<PegOutFees> {
846 self.module_api
847 .fetch_peg_out_fees(address, amount)
848 .await?
849 .context("Federation didn't return peg-out fees")
850 }
851
852 pub async fn get_wallet_summary(&self) -> anyhow::Result<WalletSummary> {
854 Ok(self.module_api.fetch_wallet_summary().await?)
855 }
856
857 pub async fn get_block_count_local(&self) -> anyhow::Result<u32> {
858 Ok(self.module_api.fetch_block_count_local().await?)
859 }
860
861 pub fn create_withdraw_output(
862 &self,
863 operation_id: OperationId,
864 address: bitcoin::Address,
865 amount: bitcoin::Amount,
866 fees: PegOutFees,
867 ) -> anyhow::Result<ClientOutputBundle<WalletOutput, WalletClientStates>> {
868 let output = WalletOutput::new_v0_peg_out(address, amount, fees);
869
870 let amount = output.maybe_v0_ref().expect("v0 output").amount().into();
871
872 let sm_gen = move |out_point_range: OutPointRange| {
873 assert_eq!(out_point_range.count(), 1);
874 let out_idx = out_point_range.start_idx();
875 vec![WalletClientStates::Withdraw(WithdrawStateMachine {
876 operation_id,
877 state: WithdrawStates::Created(CreatedWithdrawState {
878 fm_outpoint: OutPoint {
879 txid: out_point_range.txid(),
880 out_idx,
881 },
882 }),
883 })]
884 };
885
886 Ok(ClientOutputBundle::new(
887 vec![ClientOutput::<WalletOutput> {
888 output,
889 amounts: Amounts::new_bitcoin(amount),
890 }],
891 vec![ClientOutputSM::<WalletClientStates> {
892 state_machines: Arc::new(sm_gen),
893 }],
894 ))
895 }
896
897 pub async fn peg_in(&self, req: PegInRequest) -> anyhow::Result<PegInResponse> {
898 let deposit_address = self.safe_allocate_deposit_address(req.extra_meta).await?;
899
900 Ok(PegInResponse {
901 deposit_address: Address::from_script(
902 &deposit_address.address.script_pubkey(),
903 self.get_network(),
904 )?
905 .as_unchecked()
906 .clone(),
907 operation_id: deposit_address.operation_id,
908 })
909 }
910
911 pub async fn peg_out(&self, req: PegOutRequest) -> anyhow::Result<PegOutResponse> {
912 let amount = bitcoin::Amount::from_sat(req.amount_sat);
913 let destination = req
914 .destination_address
915 .require_network(self.get_network())?;
916
917 let fees = self.get_withdraw_fees(&destination, amount).await?;
918 let operation_id = self
919 .withdraw(&destination, amount, fees, req.extra_meta)
920 .await
921 .context("Failed to initiate withdraw")?;
922
923 Ok(PegOutResponse { operation_id })
924 }
925
926 pub fn create_rbf_withdraw_output(
927 &self,
928 operation_id: OperationId,
929 rbf: &Rbf,
930 ) -> anyhow::Result<ClientOutputBundle<WalletOutput, WalletClientStates>> {
931 let output = WalletOutput::new_v0_rbf(rbf.fees, rbf.txid);
932
933 let amount = output.maybe_v0_ref().expect("v0 output").amount().into();
934
935 let sm_gen = move |out_point_range: OutPointRange| {
936 assert_eq!(out_point_range.count(), 1);
937 let out_idx = out_point_range.start_idx();
938 vec![WalletClientStates::Withdraw(WithdrawStateMachine {
939 operation_id,
940 state: WithdrawStates::Created(CreatedWithdrawState {
941 fm_outpoint: OutPoint {
942 txid: out_point_range.txid(),
943 out_idx,
944 },
945 }),
946 })]
947 };
948
949 Ok(ClientOutputBundle::new(
950 vec![ClientOutput::<WalletOutput> {
951 output,
952 amounts: Amounts::new_bitcoin(amount),
953 }],
954 vec![ClientOutputSM::<WalletClientStates> {
955 state_machines: Arc::new(sm_gen),
956 }],
957 ))
958 }
959
960 pub async fn btc_tx_has_no_size_limit(&self) -> FederationResult<bool> {
961 Ok(self.module_api.module_consensus_version().await? >= ModuleConsensusVersion::new(2, 2))
962 }
963
964 pub async fn supports_safe_deposit(&self) -> bool {
973 let mut dbtx = self.db.begin_transaction().await;
974
975 let already_verified_supports_safe_deposit =
976 dbtx.get_value(&SupportsSafeDepositKey).await.is_some();
977
978 already_verified_supports_safe_deposit || {
979 match self.module_api.module_consensus_version().await {
980 Ok(module_consensus_version) => {
981 let supported_version =
982 SAFE_DEPOSIT_MODULE_CONSENSUS_VERSION <= module_consensus_version;
983
984 if supported_version {
985 dbtx.insert_new_entry(&SupportsSafeDepositKey, &()).await;
986 dbtx.commit_tx().await;
987 }
988
989 supported_version
990 }
991 Err(_) => false,
992 }
993 }
994 }
995
996 pub async fn safe_allocate_deposit_address<M>(
1004 &self,
1005 extra_meta: M,
1006 ) -> anyhow::Result<DepositAddressInfo>
1007 where
1008 M: Serialize + MaybeSend + MaybeSync,
1009 {
1010 ensure!(
1011 self.supports_safe_deposit().await,
1012 "Wallet module consensus version doesn't support safe deposits",
1013 );
1014
1015 self.allocate_deposit_address_expert_only(extra_meta).await
1016 }
1017
1018 pub async fn allocate_deposit_address_expert_only<M>(
1036 &self,
1037 extra_meta: M,
1038 ) -> anyhow::Result<DepositAddressInfo>
1039 where
1040 M: Serialize + MaybeSend + MaybeSync,
1041 {
1042 let extra_meta_value =
1043 serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
1044 let deposit_address = self
1045 .db
1046 .autocommit(
1047 move |dbtx, _| {
1048 let extra_meta_value_inner = extra_meta_value.clone();
1049 Box::pin(async move {
1050 let deposit_address = self.allocate_deposit_address_inner(dbtx).await;
1051
1052 self.client_ctx
1053 .manual_operation_start_dbtx(
1054 dbtx,
1055 deposit_address.operation_id,
1056 WalletCommonInit::KIND.as_str(),
1057 WalletOperationMeta {
1058 variant: WalletOperationMetaVariant::Deposit {
1059 address: deposit_address.address.clone().into_unchecked(),
1060 tweak_idx: Some(deposit_address.tweak_idx),
1061 expires_at: None,
1062 },
1063 extra_meta: extra_meta_value_inner,
1064 },
1065 vec![],
1066 )
1067 .await?;
1068
1069 debug!(
1070 target: LOG_CLIENT_MODULE_WALLET,
1071 tweak_idx = %deposit_address.tweak_idx,
1072 address = %deposit_address.address,
1073 "Derived a new deposit address"
1074 );
1075
1076 self.rpc
1078 .watch_script_history(&deposit_address.address.script_pubkey())
1079 .await?;
1080
1081 let sender = self.pegin_monitor_wakeup_sender.clone();
1082 dbtx.on_commit(move || {
1083 sender.send_replace(());
1084 });
1085
1086 Ok(deposit_address)
1087 })
1088 },
1089 Some(100),
1090 )
1091 .await
1092 .map_err(|e| match e {
1093 AutocommitError::CommitFailed {
1094 last_error,
1095 attempts,
1096 } => anyhow!("Failed to commit after {attempts} attempts: {last_error}"),
1097 AutocommitError::ClosureError { error, .. } => error,
1098 })?;
1099
1100 Ok(deposit_address)
1101 }
1102
1103 pub async fn allocate_deposit_address_pooled_stateless(
1141 &self,
1142 max_gap_size: usize,
1143 ) -> anyhow::Result<MaybeNewAddress> {
1144 let max_gap_size_u64 = u64::try_from(max_gap_size).unwrap_or(u64::MAX);
1145 let extra_meta_value = serde_json::Value::Null;
1146 let result = self
1147 .db
1148 .autocommit(
1149 move |dbtx, _| {
1150 let extra_meta_value_inner = extra_meta_value.clone();
1151 Box::pin(async move {
1152 let unused = self.unused_pooled_deposit_addresses(dbtx).await;
1153
1154 if max_gap_size_u64 <= unused.len() as u64 && !unused.is_empty() {
1155 let addresses = unused
1156 .into_iter()
1157 .map(|(tweak_idx, data)| {
1158 let (_script, address, _key, operation_id) =
1159 self.data.derive_peg_in_script(tweak_idx);
1160
1161 debug_assert_eq!(operation_id, data.operation_id);
1162
1163 DepositAddressInfo {
1164 operation_id,
1165 address,
1166 tweak_idx,
1167 }
1168 })
1169 .collect();
1170
1171 return Ok::<_, anyhow::Error>(
1172 MaybeNewAddress::TooManyUnusedAddresses(addresses),
1173 );
1174 }
1175
1176 let deposit_address = self.allocate_deposit_address_inner(dbtx).await;
1177
1178 self.client_ctx
1179 .manual_operation_start_dbtx(
1180 dbtx,
1181 deposit_address.operation_id,
1182 WalletCommonInit::KIND.as_str(),
1183 WalletOperationMeta {
1184 variant: WalletOperationMetaVariant::Deposit {
1185 address: deposit_address.address.clone().into_unchecked(),
1186 tweak_idx: Some(deposit_address.tweak_idx),
1187 expires_at: None,
1188 },
1189 extra_meta: extra_meta_value_inner,
1190 },
1191 vec![],
1192 )
1193 .await?;
1194
1195 debug!(
1196 target: LOG_CLIENT_MODULE_WALLET,
1197 tweak_idx = %deposit_address.tweak_idx,
1198 address = %deposit_address.address,
1199 "Derived a new pooled deposit address"
1200 );
1201
1202 self.rpc
1203 .watch_script_history(&deposit_address.address.script_pubkey())
1204 .await?;
1205
1206 let sender = self.pegin_monitor_wakeup_sender.clone();
1207 dbtx.on_commit(move || {
1208 sender.send_replace(());
1209 });
1210
1211 Ok(MaybeNewAddress::NewAddress(deposit_address))
1212 })
1213 },
1214 Some(100),
1215 )
1216 .await
1217 .map_err(|e| match e {
1218 AutocommitError::CommitFailed {
1219 last_error,
1220 attempts,
1221 } => anyhow!("Failed to commit after {attempts} attempts: {last_error}"),
1222 AutocommitError::ClosureError { error, .. } => error,
1223 })?;
1224
1225 Ok(result)
1226 }
1227
1228 async fn unused_pooled_deposit_addresses(
1229 &self,
1230 dbtx: &mut DatabaseTransaction<'_>,
1231 ) -> Vec<(TweakIdx, PegInTweakIndexData)> {
1232 let mut unused: Vec<(TweakIdx, PegInTweakIndexData)> = dbtx
1237 .find_by_prefix_sorted_descending(&PegInTweakIndexPrefix)
1238 .await
1239 .take_while(|(_, d)| std::future::ready(d.claimed.is_empty()))
1240 .map(|(k, v)| (k.0, v))
1241 .collect()
1242 .await;
1243
1244 unused.sort_by_key(|(t, d)| (d.creation_time, *t));
1247 unused
1248 }
1249
1250 #[allow(clippy::too_many_lines)]
1273 pub async fn allocate_deposit_address_pooled(
1274 &self,
1275 max_gap_size: usize,
1276 ) -> anyhow::Result<(DepositAddressInfo, AllocateDepositOutcome)> {
1277 let stateless = self
1278 .allocate_deposit_address_pooled_stateless(max_gap_size)
1279 .await?;
1280
1281 let reused_addresses = match stateless {
1282 MaybeNewAddress::NewAddress(deposit_address) => {
1283 return Ok((deposit_address, AllocateDepositOutcome::Fresh));
1284 }
1285 MaybeNewAddress::TooManyUnusedAddresses(addresses) => addresses,
1286 };
1287
1288 let result = self
1289 .db
1290 .autocommit(
1291 move |dbtx, _| {
1292 let reused_addresses = reused_addresses.clone();
1293 Box::pin(async move {
1294 let cursor = dbtx
1295 .get_value(&PegInPoolCursorKey)
1296 .await
1297 .unwrap_or(TweakIdx(0));
1298
1299 let pick_pos = reused_addresses
1300 .iter()
1301 .position(|a| cursor <= a.tweak_idx)
1302 .unwrap_or(0);
1303 let reused_address = reused_addresses[pick_pos].clone();
1304
1305 let existing_tweak_idx = reused_address.tweak_idx;
1306 let existing = dbtx
1307 .get_value(&PegInTweakIndexKey(reused_address.tweak_idx))
1308 .await
1309 .with_context(|| {
1310 format!(
1311 "Pooled address disappeared while reusing {}",
1312 reused_address.tweak_idx
1313 )
1314 })?;
1315
1316 ensure!(
1317 existing.claimed.is_empty(),
1318 "Pooled address was used while reusing {}",
1319 reused_address.tweak_idx
1320 );
1321
1322 dbtx.insert_entry(&PegInPoolCursorKey, &reused_address.tweak_idx.next())
1323 .await;
1324
1325 let now = fedimint_core::time::now();
1333 dbtx.insert_entry(
1334 &PegInTweakIndexKey(reused_address.tweak_idx),
1335 &PegInTweakIndexData {
1336 creation_time: now,
1337 last_check_time: None,
1338 next_check_time: Some(now),
1339 operation_id: existing.operation_id,
1340 claimed: existing.claimed,
1341 },
1342 )
1343 .await;
1344
1345 let sender = self.pegin_monitor_wakeup_sender.clone();
1346 dbtx.on_commit(move || {
1347 sender.send_replace(());
1348 });
1349
1350 Ok::<_, anyhow::Error>((
1351 reused_address,
1352 AllocateDepositOutcome::Reused {
1353 original_tweak_idx: existing_tweak_idx,
1354 },
1355 ))
1356 })
1357 },
1358 Some(100),
1359 )
1360 .await
1361 .map_err(|e| match e {
1362 AutocommitError::CommitFailed {
1363 last_error,
1364 attempts,
1365 } => anyhow!("Failed to commit after {attempts} attempts: {last_error}"),
1366 AutocommitError::ClosureError { error, .. } => error,
1367 })?;
1368
1369 Ok(result)
1370 }
1371
1372 pub async fn subscribe_deposit(
1378 &self,
1379 operation_id: OperationId,
1380 ) -> anyhow::Result<UpdateStreamOrOutcome<DepositStateV2>> {
1381 let operation = self
1382 .client_ctx
1383 .get_operation(operation_id)
1384 .await
1385 .with_context(|| anyhow!("Operation not found: {}", operation_id.fmt_short()))?;
1386
1387 if operation.operation_module_kind() != WalletCommonInit::KIND.as_str() {
1388 bail!("Operation is not a wallet operation");
1389 }
1390
1391 let operation_meta = operation.meta::<WalletOperationMeta>();
1392
1393 let WalletOperationMetaVariant::Deposit {
1394 address, tweak_idx, ..
1395 } = operation_meta.variant
1396 else {
1397 bail!("Operation is not a deposit operation");
1398 };
1399
1400 let address = address.require_network(self.cfg().network.0)?;
1401
1402 let Some(tweak_idx) = tweak_idx else {
1404 let outcome_v1 = operation
1408 .outcome::<DepositStateV1>()
1409 .context("Old pending deposit, can't subscribe to updates")?;
1410
1411 let outcome_v2 = match outcome_v1 {
1412 DepositStateV1::Claimed(tx_info) => DepositStateV2::Claimed {
1413 btc_deposited: tx_info.btc_transaction.output[tx_info.out_idx as usize].value,
1414 btc_out_point: bitcoin::OutPoint {
1415 txid: tx_info.btc_transaction.compute_txid(),
1416 vout: tx_info.out_idx,
1417 },
1418 },
1419 DepositStateV1::Failed(error) => DepositStateV2::Failed(error),
1420 _ => bail!("Non-final outcome in operation log"),
1421 };
1422
1423 return Ok(UpdateStreamOrOutcome::Outcome(outcome_v2));
1424 };
1425
1426 Ok(self.client_ctx.outcome_or_updates(operation, operation_id, {
1427 let stream_rpc = self.rpc.clone();
1428 let stream_client_ctx = self.client_ctx.clone();
1429 let stream_script_pub_key = address.script_pubkey();
1430 move || {
1431
1432 stream! {
1433 yield DepositStateV2::WaitingForTransaction;
1434
1435 retry(
1436 "subscribe script history",
1437 background_backoff(),
1438 || stream_rpc.watch_script_history(&stream_script_pub_key)
1439 ).await.expect("Will never give up");
1440 let (btc_out_point, btc_deposited) = retry(
1441 "fetch history",
1442 background_backoff(),
1443 || async {
1444 let history = stream_rpc.get_script_history(&stream_script_pub_key).await?;
1445 history.first().and_then(|tx| {
1446 let (out_idx, amount) = tx.output
1447 .iter()
1448 .enumerate()
1449 .find_map(|(idx, output)| (output.script_pubkey == stream_script_pub_key).then_some((idx, output.value)))?;
1450 let txid = tx.compute_txid();
1451
1452 Some((
1453 bitcoin::OutPoint {
1454 txid,
1455 vout: out_idx as u32,
1456 },
1457 amount
1458 ))
1459 }).context("No deposit transaction found")
1460 }
1461 ).await.expect("Will never give up");
1462
1463 yield DepositStateV2::WaitingForConfirmation {
1464 btc_deposited,
1465 btc_out_point
1466 };
1467
1468 let claim_data = stream_client_ctx.module_db().wait_key_exists(&ClaimedPegInKey {
1469 peg_in_index: tweak_idx,
1470 btc_out_point,
1471 }).await;
1472
1473 yield DepositStateV2::Confirmed {
1474 btc_deposited,
1475 btc_out_point
1476 };
1477
1478 match stream_client_ctx.await_primary_module_outputs(operation_id, claim_data.change).await {
1479 Ok(()) => yield DepositStateV2::Claimed {
1480 btc_deposited,
1481 btc_out_point
1482 },
1483 Err(e) => yield DepositStateV2::Failed(e.to_string())
1484 }
1485 }
1486 }}))
1487 }
1488
1489 pub async fn list_peg_in_tweak_idxes(&self) -> BTreeMap<TweakIdx, PegInTweakIndexData> {
1490 self.client_ctx
1491 .module_db()
1492 .clone()
1493 .begin_transaction_nc()
1494 .await
1495 .find_by_prefix(&PegInTweakIndexPrefix)
1496 .await
1497 .map(|(key, data)| (key.0, data))
1498 .collect()
1499 .await
1500 }
1501
1502 pub async fn find_tweak_idx_by_address(
1503 &self,
1504 address: bitcoin::Address<NetworkUnchecked>,
1505 ) -> anyhow::Result<TweakIdx> {
1506 let data = self.data.clone();
1507 let Some((tweak_idx, _)) = self
1508 .db
1509 .begin_transaction_nc()
1510 .await
1511 .find_by_prefix(&PegInTweakIndexPrefix)
1512 .await
1513 .filter(|(k, _)| {
1514 let (_, derived_address, _tweak_key, _) = data.derive_peg_in_script(k.0);
1515 future::ready(derived_address.into_unchecked() == address)
1516 })
1517 .next()
1518 .await
1519 else {
1520 bail!("Address not found in the list of derived keys");
1521 };
1522
1523 Ok(tweak_idx.0)
1524 }
1525 pub async fn find_tweak_idx_by_operation_id(
1526 &self,
1527 operation_id: OperationId,
1528 ) -> anyhow::Result<TweakIdx> {
1529 Ok(self
1530 .client_ctx
1531 .module_db()
1532 .clone()
1533 .begin_transaction_nc()
1534 .await
1535 .find_by_prefix(&PegInTweakIndexPrefix)
1536 .await
1537 .filter(|(_k, v)| future::ready(v.operation_id == operation_id))
1538 .next()
1539 .await
1540 .ok_or_else(|| anyhow::format_err!("OperationId not found"))?
1541 .0
1542 .0)
1543 }
1544
1545 pub async fn get_pegin_tweak_idx(
1546 &self,
1547 tweak_idx: TweakIdx,
1548 ) -> anyhow::Result<PegInTweakIndexData> {
1549 self.client_ctx
1550 .module_db()
1551 .clone()
1552 .begin_transaction_nc()
1553 .await
1554 .get_value(&PegInTweakIndexKey(tweak_idx))
1555 .await
1556 .ok_or_else(|| anyhow::format_err!("TweakIdx not found"))
1557 }
1558
1559 pub async fn get_claimed_pegins(
1560 &self,
1561 dbtx: &mut DatabaseTransaction<'_>,
1562 tweak_idx: TweakIdx,
1563 ) -> Vec<(
1564 bitcoin::OutPoint,
1565 TransactionId,
1566 Vec<fedimint_core::OutPoint>,
1567 )> {
1568 let outpoints = dbtx
1569 .get_value(&PegInTweakIndexKey(tweak_idx))
1570 .await
1571 .map(|v| v.claimed)
1572 .unwrap_or_default();
1573
1574 let mut res = vec![];
1575
1576 for outpoint in outpoints {
1577 let claimed_peg_in_data = dbtx
1578 .get_value(&ClaimedPegInKey {
1579 peg_in_index: tweak_idx,
1580 btc_out_point: outpoint,
1581 })
1582 .await
1583 .expect("Must have a corresponding claim record");
1584 res.push((
1585 outpoint,
1586 claimed_peg_in_data.claim_txid,
1587 claimed_peg_in_data.change,
1588 ));
1589 }
1590
1591 res
1592 }
1593
1594 pub async fn recheck_pegin_address_by_op_id(
1596 &self,
1597 operation_id: OperationId,
1598 ) -> anyhow::Result<()> {
1599 let tweak_idx = self.find_tweak_idx_by_operation_id(operation_id).await?;
1600
1601 self.recheck_pegin_address(tweak_idx).await
1602 }
1603
1604 pub async fn recheck_pegin_address_by_address(
1606 &self,
1607 address: bitcoin::Address<NetworkUnchecked>,
1608 ) -> anyhow::Result<()> {
1609 self.recheck_pegin_address(self.find_tweak_idx_by_address(address).await?)
1610 .await
1611 }
1612
1613 pub async fn recheck_pegin_address(&self, tweak_idx: TweakIdx) -> anyhow::Result<()> {
1615 self.db
1616 .autocommit(
1617 |dbtx, _| {
1618 Box::pin(async {
1619 let db_key = PegInTweakIndexKey(tweak_idx);
1620 let db_val = dbtx
1621 .get_value(&db_key)
1622 .await
1623 .ok_or_else(|| anyhow::format_err!("DBKey not found"))?;
1624
1625 dbtx.insert_entry(
1626 &db_key,
1627 &PegInTweakIndexData {
1628 next_check_time: Some(fedimint_core::time::now()),
1629 ..db_val
1630 },
1631 )
1632 .await;
1633
1634 let sender = self.pegin_monitor_wakeup_sender.clone();
1635 dbtx.on_commit(move || {
1636 sender.send_replace(());
1637 });
1638
1639 Ok::<_, anyhow::Error>(())
1640 })
1641 },
1642 Some(100),
1643 )
1644 .await?;
1645
1646 Ok(())
1647 }
1648
1649 pub async fn await_num_deposits_by_operation_id(
1651 &self,
1652 operation_id: OperationId,
1653 num_deposits: usize,
1654 ) -> anyhow::Result<()> {
1655 let tweak_idx = self.find_tweak_idx_by_operation_id(operation_id).await?;
1656 self.await_num_deposits(tweak_idx, num_deposits).await
1657 }
1658
1659 pub async fn await_num_deposits_by_address(
1660 &self,
1661 address: bitcoin::Address<NetworkUnchecked>,
1662 num_deposits: usize,
1663 ) -> anyhow::Result<()> {
1664 self.await_num_deposits(self.find_tweak_idx_by_address(address).await?, num_deposits)
1665 .await
1666 }
1667
1668 #[instrument(target = LOG_CLIENT_MODULE_WALLET, skip_all, fields(tweak_idx=?tweak_idx, num_deposists=num_deposits))]
1669 pub async fn await_num_deposits(
1670 &self,
1671 tweak_idx: TweakIdx,
1672 num_deposits: usize,
1673 ) -> anyhow::Result<()> {
1674 let operation_id = self.get_pegin_tweak_idx(tweak_idx).await?.operation_id;
1675
1676 let mut receiver = self.pegin_claimed_receiver.clone();
1677 let mut backoff = backoff_util::aggressive_backoff();
1678
1679 loop {
1680 let pegins = self
1681 .get_claimed_pegins(
1682 &mut self.client_ctx.module_db().begin_transaction_nc().await,
1683 tweak_idx,
1684 )
1685 .await;
1686
1687 if pegins.len() < num_deposits {
1688 debug!(target: LOG_CLIENT_MODULE_WALLET, has=pegins.len(), "Not enough deposits");
1689 self.recheck_pegin_address(tweak_idx).await?;
1690 runtime::sleep(backoff.next().unwrap_or_default()).await;
1691 receiver.changed().await?;
1692 continue;
1693 }
1694
1695 debug!(target: LOG_CLIENT_MODULE_WALLET, has=pegins.len(), "Enough deposits detected");
1696
1697 for (_outpoint, transaction_id, change) in pegins {
1698 if transaction_id == TransactionId::from_byte_array([0; 32]) && change.is_empty() {
1699 debug!(target: LOG_CLIENT_MODULE_WALLET, "Deposited amount was too low, skipping");
1700 continue;
1701 }
1702
1703 debug!(target: LOG_CLIENT_MODULE_WALLET, out_points=?change, "Ensuring deposists claimed");
1704 let tx_subscriber = self.client_ctx.transaction_updates(operation_id).await;
1705
1706 if let Err(e) = tx_subscriber.await_tx_accepted(transaction_id).await {
1707 bail!("{e}");
1708 }
1709
1710 debug!(target: LOG_CLIENT_MODULE_WALLET, out_points=?change, "Ensuring outputs claimed");
1711 self.client_ctx
1712 .await_primary_module_outputs(operation_id, change)
1713 .await
1714 .expect("Cannot fail if tx was accepted and federation is honest");
1715 }
1716
1717 return Ok(());
1718 }
1719 }
1720
1721 pub async fn withdraw<M: Serialize + MaybeSend + MaybeSync>(
1726 &self,
1727 address: &bitcoin::Address,
1728 amount: bitcoin::Amount,
1729 fee: PegOutFees,
1730 extra_meta: M,
1731 ) -> anyhow::Result<OperationId> {
1732 {
1733 let operation_id = OperationId(thread_rng().r#gen());
1734
1735 let withdraw_output =
1736 self.create_withdraw_output(operation_id, address.clone(), amount, fee)?;
1737 let tx_builder = TransactionBuilder::new()
1738 .with_outputs(self.client_ctx.make_client_outputs(withdraw_output));
1739
1740 let extra_meta =
1741 serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
1742 self.client_ctx
1743 .finalize_and_submit_transaction(
1744 operation_id,
1745 WalletCommonInit::KIND.as_str(),
1746 {
1747 let address = address.clone();
1748 move |change_range: OutPointRange| WalletOperationMeta {
1749 variant: WalletOperationMetaVariant::Withdraw {
1750 address: address.clone().into_unchecked(),
1751 amount,
1752 fee,
1753 change: change_range.into_iter().collect(),
1754 },
1755 extra_meta: extra_meta.clone(),
1756 }
1757 },
1758 tx_builder,
1759 )
1760 .await?;
1761
1762 let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
1763
1764 self.client_ctx
1765 .log_event(
1766 &mut dbtx,
1767 SendPaymentEvent {
1768 operation_id,
1769 amount: amount + fee.amount(),
1770 fee: fee.amount(),
1771 },
1772 )
1773 .await;
1774
1775 dbtx.commit_tx().await;
1776
1777 Ok(operation_id)
1778 }
1779 }
1780
1781 #[deprecated(
1786 since = "0.4.0",
1787 note = "RBF withdrawals are rejected by the federation"
1788 )]
1789 pub async fn rbf_withdraw<M: Serialize + MaybeSync + MaybeSend>(
1790 &self,
1791 rbf: Rbf,
1792 extra_meta: M,
1793 ) -> anyhow::Result<OperationId> {
1794 let operation_id = OperationId(thread_rng().r#gen());
1795
1796 let withdraw_output = self.create_rbf_withdraw_output(operation_id, &rbf)?;
1797 let tx_builder = TransactionBuilder::new()
1798 .with_outputs(self.client_ctx.make_client_outputs(withdraw_output));
1799
1800 let extra_meta = serde_json::to_value(extra_meta).expect("Failed to serialize extra meta");
1801 self.client_ctx
1802 .finalize_and_submit_transaction(
1803 operation_id,
1804 WalletCommonInit::KIND.as_str(),
1805 move |change_range: OutPointRange| WalletOperationMeta {
1806 variant: WalletOperationMetaVariant::RbfWithdraw {
1807 rbf: rbf.clone(),
1808 change: change_range.into_iter().collect(),
1809 },
1810 extra_meta: extra_meta.clone(),
1811 },
1812 tx_builder,
1813 )
1814 .await?;
1815
1816 Ok(operation_id)
1817 }
1818
1819 pub async fn subscribe_withdraw_updates(
1820 &self,
1821 operation_id: OperationId,
1822 ) -> anyhow::Result<UpdateStreamOrOutcome<WithdrawState>> {
1823 let operation = self
1824 .client_ctx
1825 .get_operation(operation_id)
1826 .await
1827 .with_context(|| anyhow!("Operation not found: {}", operation_id.fmt_short()))?;
1828
1829 if operation.operation_module_kind() != WalletCommonInit::KIND.as_str() {
1830 bail!("Operation is not a wallet operation");
1831 }
1832
1833 let operation_meta = operation.meta::<WalletOperationMeta>();
1834
1835 let (WalletOperationMetaVariant::Withdraw { change, .. }
1836 | WalletOperationMetaVariant::RbfWithdraw { change, .. }) = operation_meta.variant
1837 else {
1838 bail!("Operation is not a withdraw operation");
1839 };
1840
1841 let mut operation_stream = self.notifier.subscribe(operation_id).await;
1842 let client_ctx = self.client_ctx.clone();
1843
1844 Ok(self
1845 .client_ctx
1846 .outcome_or_updates(operation, operation_id, move || {
1847 stream! {
1848 match next_withdraw_state(&mut operation_stream).await {
1849 Some(WithdrawStates::Created(_)) => {
1850 yield WithdrawState::Created;
1851 },
1852 Some(s) => {
1853 panic!("Unexpected state {s:?}")
1854 },
1855 None => return,
1856 }
1857
1858 let _ = client_ctx
1863 .await_primary_module_outputs(operation_id, change)
1864 .await;
1865
1866
1867 match next_withdraw_state(&mut operation_stream).await {
1868 Some(WithdrawStates::Aborted(inner)) => {
1869 yield WithdrawState::Failed(inner.error);
1870 },
1871 Some(WithdrawStates::Success(inner)) => {
1872 yield WithdrawState::Succeeded(inner.txid);
1873 },
1874 Some(s) => {
1875 panic!("Unexpected state {s:?}")
1876 },
1877 None => {},
1878 }
1879 }
1880 }))
1881 }
1882
1883 fn admin_auth(&self) -> anyhow::Result<ApiAuth> {
1884 self.admin_auth
1885 .clone()
1886 .ok_or_else(|| anyhow::format_err!("Admin auth not set"))
1887 }
1888
1889 pub async fn activate_consensus_version_voting(&self) -> anyhow::Result<()> {
1890 self.module_api
1891 .activate_consensus_version_voting(self.admin_auth()?)
1892 .await?;
1893
1894 Ok(())
1895 }
1896}
1897
1898async fn poll_supports_safe_deposit_version(db: Database, module_api: DynModuleApi) {
1901 loop {
1902 let mut dbtx = db.begin_transaction().await;
1903
1904 if dbtx.get_value(&SupportsSafeDepositKey).await.is_some() {
1905 break;
1906 }
1907
1908 module_api.wait_for_initialized_connections().await;
1909
1910 if let Ok(module_consensus_version) = module_api.module_consensus_version().await
1911 && SAFE_DEPOSIT_MODULE_CONSENSUS_VERSION <= module_consensus_version
1912 {
1913 dbtx.insert_new_entry(&SupportsSafeDepositKey, &()).await;
1914 dbtx.commit_tx().await;
1915 break;
1916 }
1917
1918 drop(dbtx);
1919
1920 if is_running_in_test_env() {
1921 sleep(Duration::from_secs(10)).await;
1923 } else {
1924 sleep(Duration::from_hours(1)).await;
1925 }
1926 }
1927}
1928
1929async fn get_next_peg_in_tweak_child_id(dbtx: &mut DatabaseTransaction<'_>) -> TweakIdx {
1931 let index = dbtx
1932 .get_value(&NextPegInTweakIndexKey)
1933 .await
1934 .unwrap_or_default();
1935 dbtx.insert_entry(&NextPegInTweakIndexKey, &(index.next()))
1936 .await;
1937 index
1938}
1939
1940#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
1941pub enum WalletClientStates {
1942 Deposit(DepositStateMachine),
1943 Withdraw(WithdrawStateMachine),
1944}
1945
1946impl IntoDynInstance for WalletClientStates {
1947 type DynType = DynState;
1948
1949 fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
1950 DynState::from_typed(instance_id, self)
1951 }
1952}
1953
1954impl State for WalletClientStates {
1955 type ModuleContext = WalletClientContext;
1956
1957 fn transitions(
1958 &self,
1959 context: &Self::ModuleContext,
1960 global_context: &DynGlobalClientContext,
1961 ) -> Vec<StateTransition<Self>> {
1962 match self {
1963 WalletClientStates::Deposit(sm) => {
1964 sm_enum_variant_translation!(
1965 sm.transitions(context, global_context),
1966 WalletClientStates::Deposit
1967 )
1968 }
1969 WalletClientStates::Withdraw(sm) => {
1970 sm_enum_variant_translation!(
1971 sm.transitions(context, global_context),
1972 WalletClientStates::Withdraw
1973 )
1974 }
1975 }
1976 }
1977
1978 fn operation_id(&self) -> OperationId {
1979 match self {
1980 WalletClientStates::Deposit(sm) => sm.operation_id(),
1981 WalletClientStates::Withdraw(sm) => sm.operation_id(),
1982 }
1983 }
1984}
1985
1986#[cfg(all(test, not(target_family = "wasm")))]
1987mod tests {
1988 use std::collections::BTreeSet;
1989 use std::sync::atomic::{AtomicBool, Ordering};
1990
1991 use super::*;
1992 use crate::backup::{
1993 RECOVER_NUM_IDX_ADD_TO_LAST_USED, RecoverScanOutcome, recover_scan_idxes_for_activity,
1994 };
1995
1996 #[allow(clippy::too_many_lines)] #[tokio::test(flavor = "multi_thread")]
1998 async fn sanity_test_recover_inner() {
1999 {
2000 let last_checked = AtomicBool::new(false);
2001 let last_checked = &last_checked;
2002 assert_eq!(
2003 recover_scan_idxes_for_activity(
2004 TweakIdx(0),
2005 &BTreeSet::new(),
2006 |cur_idx| async move {
2007 Ok(match cur_idx {
2008 TweakIdx(9) => {
2009 last_checked.store(true, Ordering::SeqCst);
2010 vec![]
2011 }
2012 TweakIdx(10) => panic!("Shouldn't happen"),
2013 TweakIdx(11) => {
2014 vec![0usize] }
2016 _ => vec![],
2017 })
2018 }
2019 )
2020 .await
2021 .unwrap(),
2022 RecoverScanOutcome {
2023 last_used_idx: None,
2024 new_start_idx: TweakIdx(RECOVER_NUM_IDX_ADD_TO_LAST_USED),
2025 tweak_idxes_with_pegins: BTreeSet::from([])
2026 }
2027 );
2028 assert!(last_checked.load(Ordering::SeqCst));
2029 }
2030
2031 {
2032 let last_checked = AtomicBool::new(false);
2033 let last_checked = &last_checked;
2034 assert_eq!(
2035 recover_scan_idxes_for_activity(
2036 TweakIdx(0),
2037 &BTreeSet::from([TweakIdx(1), TweakIdx(2)]),
2038 |cur_idx| async move {
2039 Ok(match cur_idx {
2040 TweakIdx(1) => panic!("Shouldn't happen: already used (1)"),
2041 TweakIdx(2) => panic!("Shouldn't happen: already used (2)"),
2042 TweakIdx(11) => {
2043 last_checked.store(true, Ordering::SeqCst);
2044 vec![]
2045 }
2046 TweakIdx(12) => panic!("Shouldn't happen"),
2047 TweakIdx(13) => {
2048 vec![0usize] }
2050 _ => vec![],
2051 })
2052 }
2053 )
2054 .await
2055 .unwrap(),
2056 RecoverScanOutcome {
2057 last_used_idx: Some(TweakIdx(2)),
2058 new_start_idx: TweakIdx(2 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
2059 tweak_idxes_with_pegins: BTreeSet::from([])
2060 }
2061 );
2062 assert!(last_checked.load(Ordering::SeqCst));
2063 }
2064
2065 {
2066 let last_checked = AtomicBool::new(false);
2067 let last_checked = &last_checked;
2068 assert_eq!(
2069 recover_scan_idxes_for_activity(
2070 TweakIdx(10),
2071 &BTreeSet::new(),
2072 |cur_idx| async move {
2073 Ok(match cur_idx {
2074 TweakIdx(10) => vec![()],
2075 TweakIdx(19) => {
2076 last_checked.store(true, Ordering::SeqCst);
2077 vec![]
2078 }
2079 TweakIdx(20) => panic!("Shouldn't happen"),
2080 _ => vec![],
2081 })
2082 }
2083 )
2084 .await
2085 .unwrap(),
2086 RecoverScanOutcome {
2087 last_used_idx: Some(TweakIdx(10)),
2088 new_start_idx: TweakIdx(10 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
2089 tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(10)])
2090 }
2091 );
2092 assert!(last_checked.load(Ordering::SeqCst));
2093 }
2094
2095 assert_eq!(
2096 recover_scan_idxes_for_activity(TweakIdx(0), &BTreeSet::new(), |cur_idx| async move {
2097 Ok(match cur_idx {
2098 TweakIdx(6 | 15) => vec![()],
2099 _ => vec![],
2100 })
2101 })
2102 .await
2103 .unwrap(),
2104 RecoverScanOutcome {
2105 last_used_idx: Some(TweakIdx(15)),
2106 new_start_idx: TweakIdx(15 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
2107 tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(6), TweakIdx(15)])
2108 }
2109 );
2110 assert_eq!(
2111 recover_scan_idxes_for_activity(TweakIdx(10), &BTreeSet::new(), |cur_idx| async move {
2112 Ok(match cur_idx {
2113 TweakIdx(8) => {
2114 vec![()] }
2116 TweakIdx(9) => {
2117 panic!("Shouldn't happen")
2118 }
2119 _ => vec![],
2120 })
2121 })
2122 .await
2123 .unwrap(),
2124 RecoverScanOutcome {
2125 last_used_idx: None,
2126 new_start_idx: TweakIdx(9 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
2127 tweak_idxes_with_pegins: BTreeSet::from([])
2128 }
2129 );
2130 assert_eq!(
2131 recover_scan_idxes_for_activity(TweakIdx(10), &BTreeSet::new(), |cur_idx| async move {
2132 Ok(match cur_idx {
2133 TweakIdx(9) => panic!("Shouldn't happen"),
2134 TweakIdx(15) => vec![()],
2135 _ => vec![],
2136 })
2137 })
2138 .await
2139 .unwrap(),
2140 RecoverScanOutcome {
2141 last_used_idx: Some(TweakIdx(15)),
2142 new_start_idx: TweakIdx(15 + RECOVER_NUM_IDX_ADD_TO_LAST_USED),
2143 tweak_idxes_with_pegins: BTreeSet::from([TweakIdx(15)])
2144 }
2145 );
2146 }
2147}