1use std::cmp;
2use std::time::{Duration, SystemTime};
3
4use anyhow::anyhow;
5use bitcoin::ScriptBuf;
6use fedimint_api_client::api::DynModuleApi;
7use fedimint_bitcoind::DynBitcoindRpc;
8use fedimint_client_module::module::{ClientContext, OutPointRange};
9use fedimint_client_module::transaction::{ClientInput, ClientInputBundle};
10use fedimint_core::core::OperationId;
11use fedimint_core::db::{
12 AutocommitError, Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped as _,
13};
14use fedimint_core::envs::is_running_in_test_env;
15use fedimint_core::module::Amounts;
16use fedimint_core::task::sleep;
17use fedimint_core::txoproof::TxOutProof;
18use fedimint_core::util::FmtCompactAnyhow as _;
19use fedimint_core::{BitcoinHash, TransactionId, secp256k1, time};
20use fedimint_logging::LOG_CLIENT_MODULE_WALLET;
21use fedimint_wallet_common::WalletInput;
22use fedimint_wallet_common::txoproof::PegInProof;
23use futures::StreamExt as _;
24use secp256k1::Keypair;
25use tokio::sync::watch;
26use tracing::{debug, instrument, trace, warn};
27
28use crate::api::WalletFederationApi as _;
29use crate::client_db::{
30 ClaimedPegInData, ClaimedPegInKey, PegInTweakIndexData, PegInTweakIndexKey,
31 PegInTweakIndexPrefix, TweakIdx,
32};
33use crate::events::{DepositConfirmed, ReceivePaymentEvent};
34use crate::{WalletClientModule, WalletClientModuleData};
35
36#[derive(Debug, Clone)]
39struct NextActions {
40 now: SystemTime,
42 due: Vec<(PegInTweakIndexKey, PegInTweakIndexData)>,
44 next: Option<SystemTime>,
46}
47
48impl NextActions {
49 pub fn new() -> Self {
50 Self {
51 now: time::now(),
52 due: vec![],
53 next: None,
54 }
55 }
56}
57
58impl NextActions {
59 async fn from_db_state(db: &Database) -> Self {
61 db.begin_transaction_nc()
62 .await
63 .find_by_prefix(&PegInTweakIndexPrefix)
64 .await
65 .fold(NextActions::new(), |state, (key, val)| async {
66 state.fold(key, val)
67 })
68 .await
69 }
70
71 pub fn fold(mut self, key: PegInTweakIndexKey, val: PegInTweakIndexData) -> Self {
73 if let Some(next_check_time) = val.next_check_time {
74 if next_check_time < self.now {
75 self.due.push((key, val));
76 }
77
78 self.next = match self.next {
79 Some(existing) => Some(existing.min(next_check_time)),
80 None => Some(next_check_time),
81 };
82 }
83 self
84 }
85}
86
87#[allow(clippy::too_many_lines)]
92pub(crate) async fn run_peg_in_monitor(
93 client_ctx: ClientContext<WalletClientModule>,
94 db: Database,
95 btc_rpc: DynBitcoindRpc,
96 module_api: DynModuleApi,
97 data: WalletClientModuleData,
98 pegin_claimed_sender: watch::Sender<()>,
99 mut wakeup_receiver: watch::Receiver<()>,
100) {
101 let min_sleep: Duration = if is_running_in_test_env() {
102 Duration::from_millis(100)
103 } else {
104 Duration::from_secs(30)
105 };
106
107 loop {
108 if let Err(err) = check_for_deposits(
109 &db,
110 &data,
111 &btc_rpc,
112 &module_api,
113 &client_ctx,
114 &pegin_claimed_sender,
115 )
116 .await
117 {
118 warn!(target: LOG_CLIENT_MODULE_WALLET, error = %err.fmt_compact_anyhow(), "Error checking for deposits");
119 continue;
120 }
121
122 let now = time::now();
123 let next_wakeup = NextActions::from_db_state(&db).await.next.unwrap_or_else(||
124 now + Duration::from_secs(60 * 60));
126 let next_wakeup_duration = next_wakeup
127 .duration_since(now)
128 .unwrap_or_default()
129 .max(min_sleep);
130 debug!(target: LOG_CLIENT_MODULE_WALLET, sleep_msecs=%next_wakeup_duration.as_millis(), "Sleep after completing due checks");
131 tokio::select! {
132 () = sleep(next_wakeup_duration) => {
133 debug!(target: LOG_CLIENT_MODULE_WALLET, "Woken up by a scheduled wakeup");
134 },
135 res = wakeup_receiver.changed() => {
136 debug!(target: LOG_CLIENT_MODULE_WALLET, "Woken up by a signal");
137 if res.is_err() {
138 debug!(target: LOG_CLIENT_MODULE_WALLET, "Terminating peg-in monitor");
139 return;
140 }
141 }
142 }
143 }
144}
145
146async fn check_for_deposits(
147 db: &Database,
148 data: &WalletClientModuleData,
149 btc_rpc: &DynBitcoindRpc,
150 module_api: &DynModuleApi,
151 client_ctx: &ClientContext<WalletClientModule>,
152 pengin_claimed_sender: &watch::Sender<()>,
153) -> Result<(), anyhow::Error> {
154 let due = NextActions::from_db_state(db).await.due;
155 trace!(target: LOG_CLIENT_MODULE_WALLET, ?due, "Checking for deposists");
156 for (due_key, due_val) in due {
157 check_and_claim_idx_pegins(
158 data,
159 due_key,
160 btc_rpc,
161 module_api,
162 db,
163 client_ctx,
164 due_val,
165 pengin_claimed_sender,
166 )
167 .await?;
168 }
169
170 Ok(())
171}
172
173#[allow(clippy::too_many_arguments)]
174async fn check_and_claim_idx_pegins(
175 data: &WalletClientModuleData,
176 due_key: PegInTweakIndexKey,
177 btc_rpc: &DynBitcoindRpc,
178 module_api: &DynModuleApi,
179 db: &Database,
180 client_ctx: &ClientContext<WalletClientModule>,
181 due_val: PegInTweakIndexData,
182 pengin_claimed_sender: &watch::Sender<()>,
183) -> Result<(), anyhow::Error> {
184 let now = time::now();
185 match check_idx_pegins(data, due_key.0, btc_rpc, module_api, db, client_ctx).await {
186 Ok(outcomes) => {
187 let next_check_time = CheckOutcome::retry_delay_vec(&outcomes, due_val.creation_time)
188 .map(|duration| now + duration);
189 db
190 .autocommit(
191 |dbtx, _| {
192 Box::pin(async {
193 let claimed_now = CheckOutcome::get_claimed_now_outpoints(&outcomes);
194
195 let claimed_sender = pengin_claimed_sender.clone();
196 dbtx.on_commit(move || {
197 claimed_sender.send_replace(());
198 });
199
200 let peg_in_tweak_index_data = PegInTweakIndexData {
201 next_check_time,
202 last_check_time: Some(now),
203 claimed: [due_val.claimed.clone(), claimed_now].concat(),
204 ..due_val
205 };
206 trace!(
207 target: LOG_CLIENT_MODULE_WALLET,
208 tweak_idx=%due_key.0,
209 due_in_secs=?next_check_time.map(|next_check_time| next_check_time.duration_since(now).unwrap_or_default().as_secs()),
210 data=?peg_in_tweak_index_data,
211 "Updating"
212 );
213 dbtx
214 .insert_entry(&due_key, &peg_in_tweak_index_data)
215 .await;
216
217 Ok::<_, anyhow::Error>(())
218 })
219 },
220 None,
221 )
222 .await?;
223 }
224 Err(err) => {
225 debug!(target: LOG_CLIENT_MODULE_WALLET, err = %err.fmt_compact_anyhow(), tweak_idx=%due_key.0, "Error checking tweak_idx");
226 }
227 }
228 Ok(())
229}
230
231#[derive(Copy, Clone, Debug)]
235enum CheckOutcome {
236 Pending { num_blocks_needed: u64 },
238 Claimed { outpoint: bitcoin::OutPoint },
240
241 AlreadyClaimed,
244}
245
246impl CheckOutcome {
247 fn retry_delay(self) -> Option<Duration> {
251 match self {
252 CheckOutcome::Pending { num_blocks_needed } => {
254 if is_running_in_test_env() {
255 Some(Duration::from_millis(1))
257 } else {
258 Some(Duration::from_secs(60 * num_blocks_needed))
259 }
260 }
261 CheckOutcome::Claimed { .. } | CheckOutcome::AlreadyClaimed => None,
266 }
267 }
268
269 fn retry_delay_vec(outcomes: &[CheckOutcome], creation_time: SystemTime) -> Option<Duration> {
275 if outcomes.is_empty() {
279 if is_running_in_test_env() {
280 return Some(Duration::from_millis(100));
282 }
283 let now = time::now();
284 let age = now.duration_since(creation_time).unwrap_or_default();
285 return Some(age / 10);
286 }
287
288 let mut min = None;
290
291 for outcome in outcomes {
292 min = match (min, outcome.retry_delay()) {
293 (None, time) => time,
294 (Some(min), None) => Some(min),
295 (Some(min), Some(time)) => Some(cmp::min(min, time)),
296 };
297 }
298
299 min
300 }
301
302 fn get_claimed_now_outpoints(outcomes: &[CheckOutcome]) -> Vec<bitcoin::OutPoint> {
303 let mut res = vec![];
304 for outcome in outcomes {
305 if let CheckOutcome::Claimed { outpoint } = outcome {
306 res.push(*outpoint);
307 }
308 }
309
310 res
311 }
312}
313
314#[instrument(target = LOG_CLIENT_MODULE_WALLET, skip_all, fields(tweak_idx))]
319async fn check_idx_pegins(
320 data: &WalletClientModuleData,
321 tweak_idx: TweakIdx,
322 btc_rpc: &DynBitcoindRpc,
323 module_rpc: &DynModuleApi,
324 db: &Database,
325 client_ctx: &ClientContext<WalletClientModule>,
326) -> Result<Vec<CheckOutcome>, anyhow::Error> {
327 let current_consensus_block_count = module_rpc.fetch_consensus_block_count().await?;
328 let (script, address, tweak_key, operation_id) = data.derive_peg_in_script(tweak_idx);
329 btc_rpc.watch_script_history(&script).await?;
330
331 let history = btc_rpc.get_script_history(&script).await?;
332
333 debug!(target: LOG_CLIENT_MODULE_WALLET, %address, num_txes=history.len(), "Got history of a peg-in address");
334
335 let mut outcomes = vec![];
336
337 for (transaction, out_idx) in filter_onchain_deposit_outputs(history.into_iter(), &script) {
338 let txid = transaction.compute_txid();
339 let outpoint = bitcoin::OutPoint {
340 txid,
341 vout: out_idx,
342 };
343
344 let claimed_peg_in_key = ClaimedPegInKey {
345 peg_in_index: tweak_idx,
346 btc_out_point: outpoint,
347 };
348
349 if db
350 .begin_transaction_nc()
351 .await
352 .get_value(&claimed_peg_in_key)
353 .await
354 .is_some()
355 {
356 debug!(target: LOG_CLIENT_MODULE_WALLET, %txid, %out_idx, "Already claimed");
357 outcomes.push(CheckOutcome::AlreadyClaimed);
358 continue;
359 }
360 let finality_delay = u64::from(data.cfg.finality_delay);
361
362 let tx_block_count =
363 if let Some(tx_block_height) = btc_rpc.get_tx_block_height(&txid).await? {
364 tx_block_height.saturating_add(1)
365 } else {
366 outcomes.push(CheckOutcome::Pending {
367 num_blocks_needed: finality_delay,
368 });
369 debug!(target:LOG_CLIENT_MODULE_WALLET, %txid, %out_idx,"In the mempool");
370 continue;
371 };
372
373 let num_blocks_needed = tx_block_count.saturating_sub(current_consensus_block_count);
374
375 if 0 < num_blocks_needed {
376 outcomes.push(CheckOutcome::Pending { num_blocks_needed });
377 debug!(target: LOG_CLIENT_MODULE_WALLET, %txid, %out_idx, %num_blocks_needed, %finality_delay, %tx_block_count, %current_consensus_block_count, "Needs more confirmations");
378 continue;
379 }
380
381 debug!(target: LOG_CLIENT_MODULE_WALLET, %txid, %out_idx, %finality_delay, %tx_block_count, %current_consensus_block_count, "Ready to claim");
382
383 let tx_out_proof = btc_rpc.get_txout_proof(txid).await?;
384 let federation_knows_utxo = module_rpc.is_utxo_confirmed(outpoint).await?;
385
386 claim_peg_in(
387 client_ctx,
388 tweak_idx,
389 tweak_key,
390 &transaction,
391 operation_id,
392 outpoint,
393 tx_out_proof,
394 federation_knows_utxo,
395 )
396 .await?;
397 outcomes.push(CheckOutcome::Claimed { outpoint });
398 }
399 Ok(outcomes)
400}
401
402#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
403async fn claim_peg_in(
404 client_ctx: &ClientContext<WalletClientModule>,
405 tweak_idx: TweakIdx,
406 tweak_key: Keypair,
407 transaction: &bitcoin::Transaction,
408 operation_id: OperationId,
409 out_point: bitcoin::OutPoint,
410 tx_out_proof: TxOutProof,
411 federation_knows_utxo: bool,
412) -> anyhow::Result<()> {
413 async fn claim_peg_in_inner(
416 client_ctx: &ClientContext<WalletClientModule>,
417 dbtx: &mut DatabaseTransaction<'_>,
418 btc_transaction: &bitcoin::Transaction,
419 out_idx: u32,
420 tweak_key: Keypair,
421 txout_proof: TxOutProof,
422 operation_id: OperationId,
423 federation_knows_utxo: bool,
424 ) -> Option<OutPointRange> {
425 let pegin_proof = PegInProof::new(
426 txout_proof,
427 btc_transaction.clone(),
428 out_idx,
429 tweak_key.public_key(),
430 )
431 .expect("TODO: handle API returning faulty proofs");
432
433 let amount = pegin_proof.tx_output().value.into();
434 let wallet_input = if federation_knows_utxo {
435 WalletInput::new_v1(&pegin_proof)
436 } else {
437 WalletInput::new_v0(pegin_proof)
438 };
439
440 let client_input = ClientInput::<WalletInput> {
441 input: wallet_input,
442 keys: vec![tweak_key],
443 amounts: Amounts::new_bitcoin(amount),
444 };
445
446 if amount <= client_ctx.self_ref().cfg().fee_consensus.peg_in_abs {
447 warn!(target: LOG_CLIENT_MODULE_WALLET, "We won't claim a deposit lower than the deposit fee");
448 return None;
449 }
450
451 let txid = btc_transaction.compute_txid();
452
453 client_ctx
454 .log_event(
455 dbtx,
456 DepositConfirmed {
457 txid,
458 out_idx,
459 amount,
460 },
461 )
462 .await;
463
464 client_ctx
465 .log_event(
466 dbtx,
467 ReceivePaymentEvent {
468 operation_id,
469 amount,
470 txid,
471 },
472 )
473 .await;
474
475 Some(
476 client_ctx
477 .claim_inputs(
478 dbtx,
479 ClientInputBundle::new_no_sm(vec![client_input]),
480 operation_id,
481 )
482 .await
483 .expect("Cannot claim input, additional funding needed"),
484 )
485 }
486
487 let tx_out_proof = &tx_out_proof;
488
489 debug!(target: LOG_CLIENT_MODULE_WALLET, %out_point, "Claiming a peg-in");
490
491 client_ctx
492 .module_db()
493 .autocommit(
494 |dbtx, _| {
495 Box::pin(async {
496 let maybe_change_range = claim_peg_in_inner(
497 client_ctx,
498 dbtx,
499 transaction,
500 out_point.vout,
501 tweak_key,
502 tx_out_proof.clone(),
503 operation_id,
504 federation_knows_utxo,
505 )
506 .await;
507
508 let claimed_pegin_data = if let Some(change_range) = maybe_change_range {
509 ClaimedPegInData {
510 claim_txid: change_range.txid(),
511 change: change_range.into_iter().collect(),
512 }
513 } else {
514 ClaimedPegInData {
515 claim_txid: TransactionId::from_byte_array([0; 32]),
516 change: vec![],
517 }
518 };
519
520 dbtx.insert_entry(
521 &ClaimedPegInKey {
522 peg_in_index: tweak_idx,
523 btc_out_point: out_point,
524 },
525 &claimed_pegin_data,
526 )
527 .await;
528
529 Ok(())
530 })
531 },
532 Some(100),
533 )
534 .await
535 .map_err(|e| match e {
536 AutocommitError::CommitFailed {
537 last_error,
538 attempts,
539 } => anyhow!("Failed to commit after {attempts} attempts: {last_error}"),
540 AutocommitError::ClosureError { error, .. } => error,
541 })?;
542
543 Ok(())
544}
545
546pub(crate) fn filter_onchain_deposit_outputs<'a>(
547 tx_iter: impl Iterator<Item = bitcoin::Transaction> + 'a,
548 out_script: &'a ScriptBuf,
549) -> impl Iterator<Item = (bitcoin::Transaction, u32)> + 'a {
550 tx_iter.flat_map(move |tx| {
551 tx.output
552 .clone()
553 .into_iter()
554 .enumerate()
555 .filter_map(move |(out_idx, tx_out)| {
556 if &tx_out.script_pubkey == out_script {
557 Some((tx.clone(), out_idx as u32))
558 } else {
559 None
560 }
561 })
562 })
563}