1use std::cmp;
2use std::time::{Duration, SystemTime};
3
4use anyhow::anyhow;
5use bitcoin::ScriptBuf;
6use fedimint_api_client::api::DynModuleApi;
7use fedimint_bitcoind::DynBitcoindRpc;
8use fedimint_client_module::module::{ClientContext, OutPointRange};
9use fedimint_client_module::transaction::{ClientInput, ClientInputBundle};
10use fedimint_core::core::OperationId;
11use fedimint_core::db::{
12 AutocommitError, Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped as _,
13};
14use fedimint_core::envs::is_running_in_test_env;
15use fedimint_core::module::Amounts;
16use fedimint_core::task::sleep;
17use fedimint_core::txoproof::TxOutProof;
18use fedimint_core::util::FmtCompactAnyhow as _;
19use fedimint_core::{BitcoinHash, TransactionId, secp256k1, time};
20use fedimint_logging::LOG_CLIENT_MODULE_WALLET;
21use fedimint_wallet_common::WalletInput;
22use fedimint_wallet_common::txoproof::PegInProof;
23use futures::StreamExt as _;
24use secp256k1::Keypair;
25use tokio::sync::watch;
26use tracing::{debug, instrument, trace, warn};
27
28use crate::api::WalletFederationApi as _;
29use crate::client_db::{
30 ClaimedPegInData, ClaimedPegInKey, PegInTweakIndexData, PegInTweakIndexKey,
31 PegInTweakIndexPrefix, TweakIdx,
32};
33use crate::events::{DepositConfirmed, ReceivePaymentEvent};
34use crate::{WalletClientModule, WalletClientModuleData};
35
36#[derive(Debug, Clone)]
39struct NextActions {
40 now: SystemTime,
42 due: Vec<(PegInTweakIndexKey, PegInTweakIndexData)>,
44 next: Option<SystemTime>,
46}
47
48impl NextActions {
49 pub fn new() -> Self {
50 Self {
51 now: time::now(),
52 due: vec![],
53 next: None,
54 }
55 }
56}
57
58impl NextActions {
59 async fn from_db_state(db: &Database) -> Self {
61 db.begin_transaction_nc()
62 .await
63 .find_by_prefix(&PegInTweakIndexPrefix)
64 .await
65 .fold(NextActions::new(), |state, (key, val)| async {
66 state.fold(key, val)
67 })
68 .await
69 }
70
71 pub fn fold(mut self, key: PegInTweakIndexKey, val: PegInTweakIndexData) -> Self {
73 if let Some(next_check_time) = val.next_check_time {
74 if next_check_time < self.now {
75 self.due.push((key, val));
76 }
77
78 self.next = match self.next {
79 Some(existing) => Some(existing.min(next_check_time)),
80 None => Some(next_check_time),
81 };
82 }
83 self
84 }
85}
86
87#[allow(clippy::too_many_lines)]
92pub(crate) async fn run_peg_in_monitor(
93 client_ctx: ClientContext<WalletClientModule>,
94 db: Database,
95 btc_rpc: DynBitcoindRpc,
96 module_api: DynModuleApi,
97 data: WalletClientModuleData,
98 pegin_claimed_sender: watch::Sender<()>,
99 mut wakeup_receiver: watch::Receiver<()>,
100) {
101 let min_sleep: Duration = if is_running_in_test_env() {
102 Duration::from_millis(100)
103 } else {
104 Duration::from_secs(30)
105 };
106
107 let mut next_wakeup_duration = Duration::ZERO;
112
113 loop {
114 debug!(target: LOG_CLIENT_MODULE_WALLET, sleep_msecs=%next_wakeup_duration.as_millis(), "Sleeping before next check");
115 let woke_via_signal = tokio::select! {
116 () = sleep(next_wakeup_duration) => {
117 debug!(target: LOG_CLIENT_MODULE_WALLET, "Woken up by a scheduled wakeup");
118 false
119 },
120 res = wakeup_receiver.changed() => {
121 debug!(target: LOG_CLIENT_MODULE_WALLET, "Woken up by a signal");
122 if res.is_err() {
123 debug!(target: LOG_CLIENT_MODULE_WALLET, "Terminating peg-in monitor");
124 return;
125 }
126 true
127 }
128 };
129
130 if let Err(err) = check_for_deposits(
131 &db,
132 &data,
133 &btc_rpc,
134 &module_api,
135 &client_ctx,
136 &pegin_claimed_sender,
137 )
138 .await
139 {
140 warn!(target: LOG_CLIENT_MODULE_WALLET, error = %err.fmt_compact_anyhow(), "Error checking for deposits");
141 next_wakeup_duration = min_sleep;
144 continue;
145 }
146
147 let now = time::now();
148 let next_wakeup = NextActions::from_db_state(&db).await.next.unwrap_or_else(||
149 now + Duration::from_hours(1));
151 let effective_min_sleep = if woke_via_signal {
157 Duration::ZERO
158 } else {
159 min_sleep
160 };
161 next_wakeup_duration = next_wakeup
162 .duration_since(now)
163 .unwrap_or_default()
164 .max(effective_min_sleep);
165 }
166}
167
168async fn check_for_deposits(
169 db: &Database,
170 data: &WalletClientModuleData,
171 btc_rpc: &DynBitcoindRpc,
172 module_api: &DynModuleApi,
173 client_ctx: &ClientContext<WalletClientModule>,
174 pengin_claimed_sender: &watch::Sender<()>,
175) -> Result<(), anyhow::Error> {
176 let due = NextActions::from_db_state(db).await.due;
177 trace!(target: LOG_CLIENT_MODULE_WALLET, ?due, "Checking for deposists");
178 for (due_key, due_val) in due {
179 check_and_claim_idx_pegins(
180 data,
181 due_key,
182 btc_rpc,
183 module_api,
184 db,
185 client_ctx,
186 due_val,
187 pengin_claimed_sender,
188 )
189 .await?;
190 }
191
192 Ok(())
193}
194
195#[allow(clippy::too_many_arguments)]
196async fn check_and_claim_idx_pegins(
197 data: &WalletClientModuleData,
198 due_key: PegInTweakIndexKey,
199 btc_rpc: &DynBitcoindRpc,
200 module_api: &DynModuleApi,
201 db: &Database,
202 client_ctx: &ClientContext<WalletClientModule>,
203 due_val: PegInTweakIndexData,
204 pengin_claimed_sender: &watch::Sender<()>,
205) -> Result<(), anyhow::Error> {
206 let now = time::now();
207 match check_idx_pegins(data, due_key.0, btc_rpc, module_api, db, client_ctx).await {
208 Ok(outcomes) => {
209 let next_check_time = CheckOutcome::retry_delay_vec(&outcomes, due_val.creation_time)
210 .map(|duration| now + duration);
211 db
212 .autocommit(
213 |dbtx, _| {
214 Box::pin(async {
215 let current = dbtx
222 .get_value(&due_key)
223 .await
224 .expect("Peg-in entries are never deleted");
225
226 let merged_next_check_time = if current.next_check_time
230 == due_val.next_check_time
231 {
232 next_check_time
233 } else {
234 match (current.next_check_time, next_check_time) {
235 (Some(a), Some(b)) => Some(a.min(b)),
236 (a, None) => a,
237 (None, b) => b,
238 }
239 };
240
241 let claimed_now = CheckOutcome::get_claimed_now_outpoints(&outcomes);
242
243 let claimed_sender = pengin_claimed_sender.clone();
244 dbtx.on_commit(move || {
245 claimed_sender.send_replace(());
246 });
247
248 let peg_in_tweak_index_data = PegInTweakIndexData {
249 next_check_time: merged_next_check_time,
250 last_check_time: Some(now),
251 claimed: [current.claimed.clone(), claimed_now].concat(),
252 ..current
253 };
254 trace!(
255 target: LOG_CLIENT_MODULE_WALLET,
256 tweak_idx=%due_key.0,
257 due_in_secs=?merged_next_check_time.map(|next_check_time| next_check_time.duration_since(now).unwrap_or_default().as_secs()),
258 data=?peg_in_tweak_index_data,
259 "Updating"
260 );
261 dbtx
262 .insert_entry(&due_key, &peg_in_tweak_index_data)
263 .await;
264
265 Ok::<_, anyhow::Error>(())
266 })
267 },
268 None,
269 )
270 .await?;
271 }
272 Err(err) => {
273 debug!(target: LOG_CLIENT_MODULE_WALLET, err = %err.fmt_compact_anyhow(), tweak_idx=%due_key.0, "Error checking tweak_idx");
274 }
275 }
276 Ok(())
277}
278
279#[derive(Copy, Clone, Debug)]
283enum CheckOutcome {
284 Pending { num_blocks_needed: u64 },
286 Claimed { outpoint: bitcoin::OutPoint },
288
289 AlreadyClaimed,
292}
293
294impl CheckOutcome {
295 fn retry_delay(self) -> Option<Duration> {
299 match self {
300 CheckOutcome::Pending { num_blocks_needed } => {
302 if is_running_in_test_env() {
303 Some(Duration::from_millis(1))
305 } else {
306 Some(Duration::from_secs(60 * num_blocks_needed))
307 }
308 }
309 CheckOutcome::Claimed { .. } | CheckOutcome::AlreadyClaimed => None,
314 }
315 }
316
317 fn retry_delay_vec(outcomes: &[CheckOutcome], creation_time: SystemTime) -> Option<Duration> {
323 if outcomes.is_empty() {
327 if is_running_in_test_env() {
328 return Some(Duration::from_millis(100));
330 }
331 let now = time::now();
332 let age = now.duration_since(creation_time).unwrap_or_default();
333 return Some(age / 10);
334 }
335
336 let mut min = None;
338
339 for outcome in outcomes {
340 min = match (min, outcome.retry_delay()) {
341 (None, time) => time,
342 (Some(min), None) => Some(min),
343 (Some(min), Some(time)) => Some(cmp::min(min, time)),
344 };
345 }
346
347 min
348 }
349
350 fn get_claimed_now_outpoints(outcomes: &[CheckOutcome]) -> Vec<bitcoin::OutPoint> {
351 let mut res = vec![];
352 for outcome in outcomes {
353 if let CheckOutcome::Claimed { outpoint } = outcome {
354 res.push(*outpoint);
355 }
356 }
357
358 res
359 }
360}
361
362#[instrument(target = LOG_CLIENT_MODULE_WALLET, skip_all, fields(tweak_idx))]
367async fn check_idx_pegins(
368 data: &WalletClientModuleData,
369 tweak_idx: TweakIdx,
370 btc_rpc: &DynBitcoindRpc,
371 module_rpc: &DynModuleApi,
372 db: &Database,
373 client_ctx: &ClientContext<WalletClientModule>,
374) -> Result<Vec<CheckOutcome>, anyhow::Error> {
375 let current_consensus_block_count = module_rpc.fetch_consensus_block_count().await?;
376 let (script, address, tweak_key, operation_id) = data.derive_peg_in_script(tweak_idx);
377 btc_rpc.watch_script_history(&script).await?;
378
379 let history = btc_rpc.get_script_history(&script).await?;
380
381 debug!(target: LOG_CLIENT_MODULE_WALLET, %address, num_txes=history.len(), "Got history of a peg-in address");
382
383 let mut outcomes = vec![];
384
385 for (transaction, out_idx) in filter_onchain_deposit_outputs(history.into_iter(), &script) {
386 let txid = transaction.compute_txid();
387 let outpoint = bitcoin::OutPoint {
388 txid,
389 vout: out_idx,
390 };
391
392 let claimed_peg_in_key = ClaimedPegInKey {
393 peg_in_index: tweak_idx,
394 btc_out_point: outpoint,
395 };
396
397 if db
398 .begin_transaction_nc()
399 .await
400 .get_value(&claimed_peg_in_key)
401 .await
402 .is_some()
403 {
404 debug!(target: LOG_CLIENT_MODULE_WALLET, %txid, %out_idx, "Already claimed");
405 outcomes.push(CheckOutcome::AlreadyClaimed);
406 continue;
407 }
408 let finality_delay = u64::from(data.cfg.finality_delay);
409
410 let tx_block_count =
411 if let Some(tx_block_height) = btc_rpc.get_tx_block_height(&txid).await? {
412 tx_block_height.saturating_add(1)
413 } else {
414 outcomes.push(CheckOutcome::Pending {
415 num_blocks_needed: finality_delay,
416 });
417 debug!(target:LOG_CLIENT_MODULE_WALLET, %txid, %out_idx,"In the mempool");
418 continue;
419 };
420
421 let num_blocks_needed = tx_block_count.saturating_sub(current_consensus_block_count);
422
423 if 0 < num_blocks_needed {
424 outcomes.push(CheckOutcome::Pending { num_blocks_needed });
425 debug!(target: LOG_CLIENT_MODULE_WALLET, %txid, %out_idx, %num_blocks_needed, %finality_delay, %tx_block_count, %current_consensus_block_count, "Needs more confirmations");
426 continue;
427 }
428
429 debug!(target: LOG_CLIENT_MODULE_WALLET, %txid, %out_idx, %finality_delay, %tx_block_count, %current_consensus_block_count, "Ready to claim");
430
431 let tx_out_proof = btc_rpc.get_txout_proof(txid).await?;
432 let federation_knows_utxo = module_rpc.is_utxo_confirmed(outpoint).await?;
433
434 claim_peg_in(
435 client_ctx,
436 tweak_idx,
437 tweak_key,
438 &transaction,
439 operation_id,
440 outpoint,
441 tx_out_proof,
442 federation_knows_utxo,
443 )
444 .await?;
445 outcomes.push(CheckOutcome::Claimed { outpoint });
446 }
447 Ok(outcomes)
448}
449
450#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
451async fn claim_peg_in(
452 client_ctx: &ClientContext<WalletClientModule>,
453 tweak_idx: TweakIdx,
454 tweak_key: Keypair,
455 transaction: &bitcoin::Transaction,
456 operation_id: OperationId,
457 out_point: bitcoin::OutPoint,
458 tx_out_proof: TxOutProof,
459 federation_knows_utxo: bool,
460) -> anyhow::Result<()> {
461 async fn claim_peg_in_inner(
464 client_ctx: &ClientContext<WalletClientModule>,
465 dbtx: &mut DatabaseTransaction<'_>,
466 btc_transaction: &bitcoin::Transaction,
467 out_idx: u32,
468 tweak_key: Keypair,
469 txout_proof: TxOutProof,
470 operation_id: OperationId,
471 federation_knows_utxo: bool,
472 ) -> Option<OutPointRange> {
473 let pegin_proof = PegInProof::new(
474 txout_proof,
475 btc_transaction.clone(),
476 out_idx,
477 tweak_key.public_key(),
478 )
479 .expect("TODO: handle API returning faulty proofs");
480
481 let amount = pegin_proof.tx_output().value.into();
482 let wallet_input = if federation_knows_utxo {
483 WalletInput::new_v1(&pegin_proof)
484 } else {
485 WalletInput::new_v0(pegin_proof)
486 };
487
488 let client_input = ClientInput::<WalletInput> {
489 input: wallet_input,
490 keys: vec![tweak_key],
491 amounts: Amounts::new_bitcoin(amount),
492 };
493
494 if amount <= client_ctx.self_ref().cfg().fee_consensus.peg_in_abs {
495 warn!(target: LOG_CLIENT_MODULE_WALLET, "We won't claim a deposit lower than the deposit fee");
496 return None;
497 }
498
499 let txid = btc_transaction.compute_txid();
500
501 client_ctx
502 .log_event(
503 dbtx,
504 DepositConfirmed {
505 txid,
506 out_idx,
507 amount,
508 },
509 )
510 .await;
511
512 client_ctx
513 .log_event(
514 dbtx,
515 ReceivePaymentEvent {
516 operation_id,
517 amount,
518 txid,
519 },
520 )
521 .await;
522
523 Some(
524 client_ctx
525 .claim_inputs(
526 dbtx,
527 ClientInputBundle::new_no_sm(vec![client_input]),
528 operation_id,
529 )
530 .await
531 .expect("Cannot claim input, additional funding needed"),
532 )
533 }
534
535 let tx_out_proof = &tx_out_proof;
536
537 debug!(target: LOG_CLIENT_MODULE_WALLET, %out_point, "Claiming a peg-in");
538
539 client_ctx
540 .module_db()
541 .autocommit(
542 |dbtx, _| {
543 Box::pin(async {
544 let maybe_change_range = claim_peg_in_inner(
545 client_ctx,
546 dbtx,
547 transaction,
548 out_point.vout,
549 tweak_key,
550 tx_out_proof.clone(),
551 operation_id,
552 federation_knows_utxo,
553 )
554 .await;
555
556 let claimed_pegin_data = if let Some(change_range) = maybe_change_range {
557 ClaimedPegInData {
558 claim_txid: change_range.txid(),
559 change: change_range.into_iter().collect(),
560 }
561 } else {
562 ClaimedPegInData {
563 claim_txid: TransactionId::from_byte_array([0; 32]),
564 change: vec![],
565 }
566 };
567
568 dbtx.insert_entry(
569 &ClaimedPegInKey {
570 peg_in_index: tweak_idx,
571 btc_out_point: out_point,
572 },
573 &claimed_pegin_data,
574 )
575 .await;
576
577 Ok(())
578 })
579 },
580 Some(100),
581 )
582 .await
583 .map_err(|e| match e {
584 AutocommitError::CommitFailed {
585 last_error,
586 attempts,
587 } => anyhow!("Failed to commit after {attempts} attempts: {last_error}"),
588 AutocommitError::ClosureError { error, .. } => error,
589 })?;
590
591 Ok(())
592}
593
594pub(crate) fn filter_onchain_deposit_outputs<'a>(
595 tx_iter: impl Iterator<Item = bitcoin::Transaction> + 'a,
596 out_script: &'a ScriptBuf,
597) -> impl Iterator<Item = (bitcoin::Transaction, u32)> + 'a {
598 tx_iter.flat_map(move |tx| {
599 tx.output
600 .clone()
601 .into_iter()
602 .enumerate()
603 .filter_map(move |(out_idx, tx_out)| {
604 if &tx_out.script_pubkey == out_script {
605 Some((tx.clone(), out_idx as u32))
606 } else {
607 None
608 }
609 })
610 })
611}