1use std::cmp;
2use std::time::{Duration, SystemTime};
3
4use bitcoin::ScriptBuf;
5use fedimint_api_client::api::DynModuleApi;
6use fedimint_bitcoind::DynBitcoindRpc;
7use fedimint_client_module::module::{ClientContext, OutPointRange};
8use fedimint_client_module::transaction::{ClientInput, ClientInputBundle};
9use fedimint_core::core::OperationId;
10use fedimint_core::db::{
11 AutocommitError, Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped as _,
12};
13use fedimint_core::envs::is_running_in_test_env;
14use fedimint_core::task::sleep;
15use fedimint_core::txoproof::TxOutProof;
16use fedimint_core::util::FmtCompactAnyhow as _;
17use fedimint_core::{BitcoinHash, TransactionId, secp256k1, time};
18use fedimint_logging::LOG_CLIENT_MODULE_WALLET;
19use fedimint_wallet_common::WalletInput;
20use fedimint_wallet_common::txoproof::PegInProof;
21use futures::StreamExt as _;
22use secp256k1::Keypair;
23use tokio::sync::watch;
24use tracing::{debug, instrument, trace, warn};
25
26use crate::api::WalletFederationApi as _;
27use crate::client_db::{
28 ClaimedPegInData, ClaimedPegInKey, PegInTweakIndexData, PegInTweakIndexKey,
29 PegInTweakIndexPrefix, TweakIdx,
30};
31use crate::events::DepositConfirmed;
32use crate::{WalletClientModule, WalletClientModuleData};
33
34#[derive(Debug, Clone)]
37struct NextActions {
38 now: SystemTime,
40 due: Vec<(PegInTweakIndexKey, PegInTweakIndexData)>,
42 next: Option<SystemTime>,
44}
45
46impl NextActions {
47 pub fn new() -> Self {
48 Self {
49 now: time::now(),
50 due: vec![],
51 next: None,
52 }
53 }
54}
55
56impl NextActions {
57 async fn from_db_state(db: &Database) -> Self {
59 db.begin_transaction_nc()
60 .await
61 .find_by_prefix(&PegInTweakIndexPrefix)
62 .await
63 .fold(NextActions::new(), |state, (key, val)| async {
64 state.fold(key, val)
65 })
66 .await
67 }
68
69 pub fn fold(mut self, key: PegInTweakIndexKey, val: PegInTweakIndexData) -> Self {
71 if let Some(next_check_time) = val.next_check_time {
72 if next_check_time < self.now {
73 self.due.push((key, val));
74 }
75
76 self.next = match self.next {
77 Some(existing) => Some(existing.min(next_check_time)),
78 None => Some(next_check_time),
79 };
80 }
81 self
82 }
83}
84
85#[allow(clippy::too_many_lines)]
90pub(crate) async fn run_peg_in_monitor(
91 client_ctx: ClientContext<WalletClientModule>,
92 db: Database,
93 btc_rpc: DynBitcoindRpc,
94 module_api: DynModuleApi,
95 data: WalletClientModuleData,
96 pegin_claimed_sender: watch::Sender<()>,
97 mut wakeup_receiver: watch::Receiver<()>,
98) {
99 let min_sleep: Duration = if is_running_in_test_env() {
100 Duration::from_millis(100)
101 } else {
102 Duration::from_secs(30)
103 };
104
105 loop {
106 if let Err(err) = check_for_deposits(
107 &db,
108 &data,
109 &btc_rpc,
110 &module_api,
111 &client_ctx,
112 &pegin_claimed_sender,
113 )
114 .await
115 {
116 warn!(target: LOG_CLIENT_MODULE_WALLET, error = %err.fmt_compact_anyhow(), "Error checking for deposits");
117 continue;
118 }
119
120 let now = time::now();
121 let next_wakeup = NextActions::from_db_state(&db).await.next.unwrap_or_else(||
122 now + Duration::from_secs(60 * 60));
124 let next_wakeup_duration = next_wakeup
125 .duration_since(now)
126 .unwrap_or_default()
127 .max(min_sleep);
128 debug!(target: LOG_CLIENT_MODULE_WALLET, sleep_msecs=%next_wakeup_duration.as_millis(), "Sleep after completing due checks");
129 tokio::select! {
130 () = sleep(next_wakeup_duration) => {
131 debug!(target: LOG_CLIENT_MODULE_WALLET, "Woken up by a scheduled wakeup");
132 },
133 res = wakeup_receiver.changed() => {
134 debug!(target: LOG_CLIENT_MODULE_WALLET, "Woken up by a signal");
135 if res.is_err() {
136 debug!(target: LOG_CLIENT_MODULE_WALLET, "Terminating peg-in monitor");
137 return;
138 }
139 }
140 }
141 }
142}
143
144async fn check_for_deposits(
145 db: &Database,
146 data: &WalletClientModuleData,
147 btc_rpc: &DynBitcoindRpc,
148 module_api: &DynModuleApi,
149 client_ctx: &ClientContext<WalletClientModule>,
150 pengin_claimed_sender: &watch::Sender<()>,
151) -> Result<(), anyhow::Error> {
152 let due = NextActions::from_db_state(db).await.due;
153 trace!(target: LOG_CLIENT_MODULE_WALLET, ?due, "Checking for deposists");
154 for (due_key, due_val) in due {
155 check_and_claim_idx_pegins(
156 data,
157 due_key,
158 btc_rpc,
159 module_api,
160 db,
161 client_ctx,
162 due_val,
163 pengin_claimed_sender,
164 )
165 .await?;
166 }
167
168 Ok(())
169}
170
171#[allow(clippy::too_many_arguments)]
172async fn check_and_claim_idx_pegins(
173 data: &WalletClientModuleData,
174 due_key: PegInTweakIndexKey,
175 btc_rpc: &DynBitcoindRpc,
176 module_api: &DynModuleApi,
177 db: &Database,
178 client_ctx: &ClientContext<WalletClientModule>,
179 due_val: PegInTweakIndexData,
180 pengin_claimed_sender: &watch::Sender<()>,
181) -> Result<(), anyhow::Error> {
182 let now = time::now();
183 match check_idx_pegins(data, due_key.0, btc_rpc, module_api, db, client_ctx).await {
184 Ok(outcomes) => {
185 let next_check_time = CheckOutcome::retry_delay_vec(&outcomes, due_val.creation_time)
186 .map(|duration| now + duration);
187 db
188 .autocommit(
189 |dbtx, _| {
190 Box::pin(async {
191 let claimed_now = CheckOutcome::get_claimed_now_outpoints(&outcomes);
192
193 let claimed_sender = pengin_claimed_sender.clone();
194 dbtx.on_commit(move || {
195 claimed_sender.send_replace(());
196 });
197
198 let peg_in_tweak_index_data = PegInTweakIndexData {
199 next_check_time,
200 last_check_time: Some(now),
201 claimed: [due_val.claimed.clone(), claimed_now].concat(),
202 ..due_val
203 };
204 trace!(
205 target: LOG_CLIENT_MODULE_WALLET,
206 tweak_idx=%due_key.0,
207 due_in_secs=?next_check_time.map(|next_check_time| next_check_time.duration_since(now).unwrap_or_default().as_secs()),
208 data=?peg_in_tweak_index_data,
209 "Updating"
210 );
211 dbtx
212 .insert_entry(&due_key, &peg_in_tweak_index_data)
213 .await;
214
215 Ok::<_, anyhow::Error>(())
216 })
217 },
218 None,
219 )
220 .await?;
221 }
222 Err(err) => {
223 debug!(target: LOG_CLIENT_MODULE_WALLET, err = %err.fmt_compact_anyhow(), tweak_idx=%due_key.0, "Error checking tweak_idx");
224 }
225 }
226 Ok(())
227}
228
229#[derive(Copy, Clone, Debug)]
233enum CheckOutcome {
234 Pending { num_blocks_needed: u64 },
236 Claimed { outpoint: bitcoin::OutPoint },
238
239 AlreadyClaimed,
242}
243
244impl CheckOutcome {
245 fn retry_delay(self) -> Option<Duration> {
249 match self {
250 CheckOutcome::Pending { num_blocks_needed } => {
252 if is_running_in_test_env() {
253 Some(Duration::from_millis(1))
255 } else {
256 Some(Duration::from_secs(60 * num_blocks_needed))
257 }
258 }
259 CheckOutcome::Claimed { .. } | CheckOutcome::AlreadyClaimed => None,
264 }
265 }
266
267 fn retry_delay_vec(outcomes: &[CheckOutcome], creation_time: SystemTime) -> Option<Duration> {
273 if outcomes.is_empty() {
277 if is_running_in_test_env() {
278 return Some(Duration::from_millis(100));
280 }
281 let now = time::now();
282 let age = now.duration_since(creation_time).unwrap_or_default();
283 return Some(age / 10);
284 }
285
286 let mut min = None;
288
289 for outcome in outcomes {
290 min = match (min, outcome.retry_delay()) {
291 (None, time) => time,
292 (Some(min), None) => Some(min),
293 (Some(min), Some(time)) => Some(cmp::min(min, time)),
294 };
295 }
296
297 min
298 }
299
300 fn get_claimed_now_outpoints(outcomes: &[CheckOutcome]) -> Vec<bitcoin::OutPoint> {
301 let mut res = vec![];
302 for outcome in outcomes {
303 if let CheckOutcome::Claimed { outpoint } = outcome {
304 res.push(*outpoint);
305 }
306 }
307
308 res
309 }
310}
311
312#[instrument(target = LOG_CLIENT_MODULE_WALLET, skip_all, fields(tweak_idx))]
317async fn check_idx_pegins(
318 data: &WalletClientModuleData,
319 tweak_idx: TweakIdx,
320 btc_rpc: &DynBitcoindRpc,
321 module_rpc: &DynModuleApi,
322 db: &Database,
323 client_ctx: &ClientContext<WalletClientModule>,
324) -> Result<Vec<CheckOutcome>, anyhow::Error> {
325 let current_consensus_block_count = module_rpc.fetch_consensus_block_count().await?;
326 let (script, address, tweak_key, operation_id) = data.derive_peg_in_script(tweak_idx);
327 btc_rpc.watch_script_history(&script).await?;
328
329 let history = btc_rpc.get_script_history(&script).await?;
330
331 debug!(target: LOG_CLIENT_MODULE_WALLET, %address, num_txes=history.len(), "Got history of a peg-in address");
332
333 let mut outcomes = vec![];
334
335 for (transaction, out_idx) in filter_onchain_deposit_outputs(history.into_iter(), &script) {
336 let txid = transaction.compute_txid();
337 let outpoint = bitcoin::OutPoint {
338 txid,
339 vout: out_idx,
340 };
341
342 let claimed_peg_in_key = ClaimedPegInKey {
343 peg_in_index: tweak_idx,
344 btc_out_point: outpoint,
345 };
346
347 if db
348 .begin_transaction_nc()
349 .await
350 .get_value(&claimed_peg_in_key)
351 .await
352 .is_some()
353 {
354 debug!(target: LOG_CLIENT_MODULE_WALLET, %txid, %out_idx, "Already claimed");
355 outcomes.push(CheckOutcome::AlreadyClaimed);
356 continue;
357 }
358 let finality_delay = u64::from(data.cfg.finality_delay);
359
360 let tx_block_count =
361 if let Some(tx_block_height) = btc_rpc.get_tx_block_height(&txid).await? {
362 tx_block_height.saturating_add(1)
363 } else {
364 outcomes.push(CheckOutcome::Pending {
365 num_blocks_needed: finality_delay,
366 });
367 debug!(target:LOG_CLIENT_MODULE_WALLET, %txid, %out_idx,"In the mempool");
368 continue;
369 };
370
371 let num_blocks_needed = tx_block_count.saturating_sub(current_consensus_block_count);
372
373 if 0 < num_blocks_needed {
374 outcomes.push(CheckOutcome::Pending { num_blocks_needed });
375 debug!(target: LOG_CLIENT_MODULE_WALLET, %txid, %out_idx, %num_blocks_needed, %finality_delay, %tx_block_count, %current_consensus_block_count, "Needs more confirmations");
376 continue;
377 }
378
379 debug!(target: LOG_CLIENT_MODULE_WALLET, %txid, %out_idx, %finality_delay, %tx_block_count, %current_consensus_block_count, "Ready to claim");
380
381 let tx_out_proof = btc_rpc.get_txout_proof(txid).await?;
382 let federation_knows_utxo = module_rpc.is_utxo_confirmed(outpoint).await?;
383
384 claim_peg_in(
385 client_ctx,
386 tweak_idx,
387 tweak_key,
388 &transaction,
389 operation_id,
390 outpoint,
391 tx_out_proof,
392 federation_knows_utxo,
393 )
394 .await?;
395 outcomes.push(CheckOutcome::Claimed { outpoint });
396 }
397 Ok(outcomes)
398}
399
400#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
401async fn claim_peg_in(
402 client_ctx: &ClientContext<WalletClientModule>,
403 tweak_idx: TweakIdx,
404 tweak_key: Keypair,
405 transaction: &bitcoin::Transaction,
406 operation_id: OperationId,
407 out_point: bitcoin::OutPoint,
408 tx_out_proof: TxOutProof,
409 federation_knows_utxo: bool,
410) -> anyhow::Result<()> {
411 async fn claim_peg_in_inner(
414 client_ctx: &ClientContext<WalletClientModule>,
415 dbtx: &mut DatabaseTransaction<'_>,
416 btc_transaction: &bitcoin::Transaction,
417 out_idx: u32,
418 tweak_key: Keypair,
419 txout_proof: TxOutProof,
420 operation_id: OperationId,
421 federation_knows_utxo: bool,
422 ) -> Option<OutPointRange> {
423 let pegin_proof = PegInProof::new(
424 txout_proof,
425 btc_transaction.clone(),
426 out_idx,
427 tweak_key.public_key(),
428 )
429 .expect("TODO: handle API returning faulty proofs");
430
431 let amount = pegin_proof.tx_output().value.into();
432 let wallet_input = if federation_knows_utxo {
433 WalletInput::new_v1(&pegin_proof)
434 } else {
435 WalletInput::new_v0(pegin_proof)
436 };
437
438 let client_input = ClientInput::<WalletInput> {
439 input: wallet_input,
440 keys: vec![tweak_key],
441 amount,
442 };
443
444 if amount <= client_ctx.self_ref().cfg().fee_consensus.peg_in_abs {
445 warn!(target: LOG_CLIENT_MODULE_WALLET, "We won't claim a deposit lower than the deposit fee");
446 return None;
447 }
448
449 client_ctx
450 .log_event(
451 dbtx,
452 DepositConfirmed {
453 txid: btc_transaction.compute_txid(),
454 out_idx,
455 amount,
456 },
457 )
458 .await;
459
460 Some(
461 client_ctx
462 .claim_inputs(
463 dbtx,
464 ClientInputBundle::new_no_sm(vec![client_input]),
465 operation_id,
466 )
467 .await
468 .expect("Cannot claim input, additional funding needed"),
469 )
470 }
471
472 let tx_out_proof = &tx_out_proof;
473
474 debug!(target: LOG_CLIENT_MODULE_WALLET, %out_point, "Claiming a peg-in");
475
476 client_ctx
477 .module_db()
478 .autocommit(
479 |dbtx, _| {
480 Box::pin(async {
481 let maybe_change_range = claim_peg_in_inner(
482 client_ctx,
483 dbtx,
484 transaction,
485 out_point.vout,
486 tweak_key,
487 tx_out_proof.clone(),
488 operation_id,
489 federation_knows_utxo,
490 )
491 .await;
492
493 let claimed_pegin_data = if let Some(change_range) = maybe_change_range {
494 ClaimedPegInData {
495 claim_txid: change_range.txid(),
496 change: change_range.into_iter().collect(),
497 }
498 } else {
499 ClaimedPegInData {
500 claim_txid: TransactionId::from_byte_array([0; 32]),
501 change: vec![],
502 }
503 };
504
505 dbtx.insert_entry(
506 &ClaimedPegInKey {
507 peg_in_index: tweak_idx,
508 btc_out_point: out_point,
509 },
510 &claimed_pegin_data,
511 )
512 .await;
513
514 Ok(())
515 })
516 },
517 Some(100),
518 )
519 .await
520 .map_err(|e| match e {
521 AutocommitError::CommitFailed {
522 last_error,
523 attempts,
524 } => last_error.context(format!("Failed to commit after {attempts} attempts")),
525 AutocommitError::ClosureError { error, .. } => error,
526 })?;
527
528 Ok(())
529}
530
531pub(crate) fn filter_onchain_deposit_outputs<'a>(
532 tx_iter: impl Iterator<Item = bitcoin::Transaction> + 'a,
533 out_script: &'a ScriptBuf,
534) -> impl Iterator<Item = (bitcoin::Transaction, u32)> + 'a {
535 tx_iter.flat_map(move |tx| {
536 tx.output
537 .clone()
538 .into_iter()
539 .enumerate()
540 .filter_map(move |(out_idx, tx_out)| {
541 if &tx_out.script_pubkey == out_script {
542 Some((tx.clone(), out_idx as u32))
543 } else {
544 None
545 }
546 })
547 })
548}