1use std::cmp;
2use std::time::{Duration, SystemTime};
3
4use bitcoin::ScriptBuf;
5use fedimint_api_client::api::DynModuleApi;
6use fedimint_bitcoind::DynBitcoindRpc;
7use fedimint_client_module::module::{ClientContext, OutPointRange};
8use fedimint_client_module::transaction::{ClientInput, ClientInputBundle};
9use fedimint_core::core::OperationId;
10use fedimint_core::db::{
11 AutocommitError, Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped as _,
12};
13use fedimint_core::envs::is_running_in_test_env;
14use fedimint_core::module::Amounts;
15use fedimint_core::task::sleep;
16use fedimint_core::txoproof::TxOutProof;
17use fedimint_core::util::FmtCompactAnyhow as _;
18use fedimint_core::{BitcoinHash, TransactionId, secp256k1, time};
19use fedimint_logging::LOG_CLIENT_MODULE_WALLET;
20use fedimint_wallet_common::WalletInput;
21use fedimint_wallet_common::txoproof::PegInProof;
22use futures::StreamExt as _;
23use secp256k1::Keypair;
24use tokio::sync::watch;
25use tracing::{debug, instrument, trace, warn};
26
27use crate::api::WalletFederationApi as _;
28use crate::client_db::{
29 ClaimedPegInData, ClaimedPegInKey, PegInTweakIndexData, PegInTweakIndexKey,
30 PegInTweakIndexPrefix, TweakIdx,
31};
32use crate::events::DepositConfirmed;
33use crate::{WalletClientModule, WalletClientModuleData};
34
35#[derive(Debug, Clone)]
38struct NextActions {
39 now: SystemTime,
41 due: Vec<(PegInTweakIndexKey, PegInTweakIndexData)>,
43 next: Option<SystemTime>,
45}
46
47impl NextActions {
48 pub fn new() -> Self {
49 Self {
50 now: time::now(),
51 due: vec![],
52 next: None,
53 }
54 }
55}
56
57impl NextActions {
58 async fn from_db_state(db: &Database) -> Self {
60 db.begin_transaction_nc()
61 .await
62 .find_by_prefix(&PegInTweakIndexPrefix)
63 .await
64 .fold(NextActions::new(), |state, (key, val)| async {
65 state.fold(key, val)
66 })
67 .await
68 }
69
70 pub fn fold(mut self, key: PegInTweakIndexKey, val: PegInTweakIndexData) -> Self {
72 if let Some(next_check_time) = val.next_check_time {
73 if next_check_time < self.now {
74 self.due.push((key, val));
75 }
76
77 self.next = match self.next {
78 Some(existing) => Some(existing.min(next_check_time)),
79 None => Some(next_check_time),
80 };
81 }
82 self
83 }
84}
85
86#[allow(clippy::too_many_lines)]
91pub(crate) async fn run_peg_in_monitor(
92 client_ctx: ClientContext<WalletClientModule>,
93 db: Database,
94 btc_rpc: DynBitcoindRpc,
95 module_api: DynModuleApi,
96 data: WalletClientModuleData,
97 pegin_claimed_sender: watch::Sender<()>,
98 mut wakeup_receiver: watch::Receiver<()>,
99) {
100 let min_sleep: Duration = if is_running_in_test_env() {
101 Duration::from_millis(100)
102 } else {
103 Duration::from_secs(30)
104 };
105
106 loop {
107 if let Err(err) = check_for_deposits(
108 &db,
109 &data,
110 &btc_rpc,
111 &module_api,
112 &client_ctx,
113 &pegin_claimed_sender,
114 )
115 .await
116 {
117 warn!(target: LOG_CLIENT_MODULE_WALLET, error = %err.fmt_compact_anyhow(), "Error checking for deposits");
118 continue;
119 }
120
121 let now = time::now();
122 let next_wakeup = NextActions::from_db_state(&db).await.next.unwrap_or_else(||
123 now + Duration::from_secs(60 * 60));
125 let next_wakeup_duration = next_wakeup
126 .duration_since(now)
127 .unwrap_or_default()
128 .max(min_sleep);
129 debug!(target: LOG_CLIENT_MODULE_WALLET, sleep_msecs=%next_wakeup_duration.as_millis(), "Sleep after completing due checks");
130 tokio::select! {
131 () = sleep(next_wakeup_duration) => {
132 debug!(target: LOG_CLIENT_MODULE_WALLET, "Woken up by a scheduled wakeup");
133 },
134 res = wakeup_receiver.changed() => {
135 debug!(target: LOG_CLIENT_MODULE_WALLET, "Woken up by a signal");
136 if res.is_err() {
137 debug!(target: LOG_CLIENT_MODULE_WALLET, "Terminating peg-in monitor");
138 return;
139 }
140 }
141 }
142 }
143}
144
145async fn check_for_deposits(
146 db: &Database,
147 data: &WalletClientModuleData,
148 btc_rpc: &DynBitcoindRpc,
149 module_api: &DynModuleApi,
150 client_ctx: &ClientContext<WalletClientModule>,
151 pengin_claimed_sender: &watch::Sender<()>,
152) -> Result<(), anyhow::Error> {
153 let due = NextActions::from_db_state(db).await.due;
154 trace!(target: LOG_CLIENT_MODULE_WALLET, ?due, "Checking for deposists");
155 for (due_key, due_val) in due {
156 check_and_claim_idx_pegins(
157 data,
158 due_key,
159 btc_rpc,
160 module_api,
161 db,
162 client_ctx,
163 due_val,
164 pengin_claimed_sender,
165 )
166 .await?;
167 }
168
169 Ok(())
170}
171
172#[allow(clippy::too_many_arguments)]
173async fn check_and_claim_idx_pegins(
174 data: &WalletClientModuleData,
175 due_key: PegInTweakIndexKey,
176 btc_rpc: &DynBitcoindRpc,
177 module_api: &DynModuleApi,
178 db: &Database,
179 client_ctx: &ClientContext<WalletClientModule>,
180 due_val: PegInTweakIndexData,
181 pengin_claimed_sender: &watch::Sender<()>,
182) -> Result<(), anyhow::Error> {
183 let now = time::now();
184 match check_idx_pegins(data, due_key.0, btc_rpc, module_api, db, client_ctx).await {
185 Ok(outcomes) => {
186 let next_check_time = CheckOutcome::retry_delay_vec(&outcomes, due_val.creation_time)
187 .map(|duration| now + duration);
188 db
189 .autocommit(
190 |dbtx, _| {
191 Box::pin(async {
192 let claimed_now = CheckOutcome::get_claimed_now_outpoints(&outcomes);
193
194 let claimed_sender = pengin_claimed_sender.clone();
195 dbtx.on_commit(move || {
196 claimed_sender.send_replace(());
197 });
198
199 let peg_in_tweak_index_data = PegInTweakIndexData {
200 next_check_time,
201 last_check_time: Some(now),
202 claimed: [due_val.claimed.clone(), claimed_now].concat(),
203 ..due_val
204 };
205 trace!(
206 target: LOG_CLIENT_MODULE_WALLET,
207 tweak_idx=%due_key.0,
208 due_in_secs=?next_check_time.map(|next_check_time| next_check_time.duration_since(now).unwrap_or_default().as_secs()),
209 data=?peg_in_tweak_index_data,
210 "Updating"
211 );
212 dbtx
213 .insert_entry(&due_key, &peg_in_tweak_index_data)
214 .await;
215
216 Ok::<_, anyhow::Error>(())
217 })
218 },
219 None,
220 )
221 .await?;
222 }
223 Err(err) => {
224 debug!(target: LOG_CLIENT_MODULE_WALLET, err = %err.fmt_compact_anyhow(), tweak_idx=%due_key.0, "Error checking tweak_idx");
225 }
226 }
227 Ok(())
228}
229
230#[derive(Copy, Clone, Debug)]
234enum CheckOutcome {
235 Pending { num_blocks_needed: u64 },
237 Claimed { outpoint: bitcoin::OutPoint },
239
240 AlreadyClaimed,
243}
244
245impl CheckOutcome {
246 fn retry_delay(self) -> Option<Duration> {
250 match self {
251 CheckOutcome::Pending { num_blocks_needed } => {
253 if is_running_in_test_env() {
254 Some(Duration::from_millis(1))
256 } else {
257 Some(Duration::from_secs(60 * num_blocks_needed))
258 }
259 }
260 CheckOutcome::Claimed { .. } | CheckOutcome::AlreadyClaimed => None,
265 }
266 }
267
268 fn retry_delay_vec(outcomes: &[CheckOutcome], creation_time: SystemTime) -> Option<Duration> {
274 if outcomes.is_empty() {
278 if is_running_in_test_env() {
279 return Some(Duration::from_millis(100));
281 }
282 let now = time::now();
283 let age = now.duration_since(creation_time).unwrap_or_default();
284 return Some(age / 10);
285 }
286
287 let mut min = None;
289
290 for outcome in outcomes {
291 min = match (min, outcome.retry_delay()) {
292 (None, time) => time,
293 (Some(min), None) => Some(min),
294 (Some(min), Some(time)) => Some(cmp::min(min, time)),
295 };
296 }
297
298 min
299 }
300
301 fn get_claimed_now_outpoints(outcomes: &[CheckOutcome]) -> Vec<bitcoin::OutPoint> {
302 let mut res = vec![];
303 for outcome in outcomes {
304 if let CheckOutcome::Claimed { outpoint } = outcome {
305 res.push(*outpoint);
306 }
307 }
308
309 res
310 }
311}
312
313#[instrument(target = LOG_CLIENT_MODULE_WALLET, skip_all, fields(tweak_idx))]
318async fn check_idx_pegins(
319 data: &WalletClientModuleData,
320 tweak_idx: TweakIdx,
321 btc_rpc: &DynBitcoindRpc,
322 module_rpc: &DynModuleApi,
323 db: &Database,
324 client_ctx: &ClientContext<WalletClientModule>,
325) -> Result<Vec<CheckOutcome>, anyhow::Error> {
326 let current_consensus_block_count = module_rpc.fetch_consensus_block_count().await?;
327 let (script, address, tweak_key, operation_id) = data.derive_peg_in_script(tweak_idx);
328 btc_rpc.watch_script_history(&script).await?;
329
330 let history = btc_rpc.get_script_history(&script).await?;
331
332 debug!(target: LOG_CLIENT_MODULE_WALLET, %address, num_txes=history.len(), "Got history of a peg-in address");
333
334 let mut outcomes = vec![];
335
336 for (transaction, out_idx) in filter_onchain_deposit_outputs(history.into_iter(), &script) {
337 let txid = transaction.compute_txid();
338 let outpoint = bitcoin::OutPoint {
339 txid,
340 vout: out_idx,
341 };
342
343 let claimed_peg_in_key = ClaimedPegInKey {
344 peg_in_index: tweak_idx,
345 btc_out_point: outpoint,
346 };
347
348 if db
349 .begin_transaction_nc()
350 .await
351 .get_value(&claimed_peg_in_key)
352 .await
353 .is_some()
354 {
355 debug!(target: LOG_CLIENT_MODULE_WALLET, %txid, %out_idx, "Already claimed");
356 outcomes.push(CheckOutcome::AlreadyClaimed);
357 continue;
358 }
359 let finality_delay = u64::from(data.cfg.finality_delay);
360
361 let tx_block_count =
362 if let Some(tx_block_height) = btc_rpc.get_tx_block_height(&txid).await? {
363 tx_block_height.saturating_add(1)
364 } else {
365 outcomes.push(CheckOutcome::Pending {
366 num_blocks_needed: finality_delay,
367 });
368 debug!(target:LOG_CLIENT_MODULE_WALLET, %txid, %out_idx,"In the mempool");
369 continue;
370 };
371
372 let num_blocks_needed = tx_block_count.saturating_sub(current_consensus_block_count);
373
374 if 0 < num_blocks_needed {
375 outcomes.push(CheckOutcome::Pending { num_blocks_needed });
376 debug!(target: LOG_CLIENT_MODULE_WALLET, %txid, %out_idx, %num_blocks_needed, %finality_delay, %tx_block_count, %current_consensus_block_count, "Needs more confirmations");
377 continue;
378 }
379
380 debug!(target: LOG_CLIENT_MODULE_WALLET, %txid, %out_idx, %finality_delay, %tx_block_count, %current_consensus_block_count, "Ready to claim");
381
382 let tx_out_proof = btc_rpc.get_txout_proof(txid).await?;
383 let federation_knows_utxo = module_rpc.is_utxo_confirmed(outpoint).await?;
384
385 claim_peg_in(
386 client_ctx,
387 tweak_idx,
388 tweak_key,
389 &transaction,
390 operation_id,
391 outpoint,
392 tx_out_proof,
393 federation_knows_utxo,
394 )
395 .await?;
396 outcomes.push(CheckOutcome::Claimed { outpoint });
397 }
398 Ok(outcomes)
399}
400
401#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
402async fn claim_peg_in(
403 client_ctx: &ClientContext<WalletClientModule>,
404 tweak_idx: TweakIdx,
405 tweak_key: Keypair,
406 transaction: &bitcoin::Transaction,
407 operation_id: OperationId,
408 out_point: bitcoin::OutPoint,
409 tx_out_proof: TxOutProof,
410 federation_knows_utxo: bool,
411) -> anyhow::Result<()> {
412 async fn claim_peg_in_inner(
415 client_ctx: &ClientContext<WalletClientModule>,
416 dbtx: &mut DatabaseTransaction<'_>,
417 btc_transaction: &bitcoin::Transaction,
418 out_idx: u32,
419 tweak_key: Keypair,
420 txout_proof: TxOutProof,
421 operation_id: OperationId,
422 federation_knows_utxo: bool,
423 ) -> Option<OutPointRange> {
424 let pegin_proof = PegInProof::new(
425 txout_proof,
426 btc_transaction.clone(),
427 out_idx,
428 tweak_key.public_key(),
429 )
430 .expect("TODO: handle API returning faulty proofs");
431
432 let amount = pegin_proof.tx_output().value.into();
433 let wallet_input = if federation_knows_utxo {
434 WalletInput::new_v1(&pegin_proof)
435 } else {
436 WalletInput::new_v0(pegin_proof)
437 };
438
439 let client_input = ClientInput::<WalletInput> {
440 input: wallet_input,
441 keys: vec![tweak_key],
442 amounts: Amounts::new_bitcoin(amount),
443 };
444
445 if amount <= client_ctx.self_ref().cfg().fee_consensus.peg_in_abs {
446 warn!(target: LOG_CLIENT_MODULE_WALLET, "We won't claim a deposit lower than the deposit fee");
447 return None;
448 }
449
450 client_ctx
451 .log_event(
452 dbtx,
453 DepositConfirmed {
454 txid: btc_transaction.compute_txid(),
455 out_idx,
456 amount,
457 },
458 )
459 .await;
460
461 Some(
462 client_ctx
463 .claim_inputs(
464 dbtx,
465 ClientInputBundle::new_no_sm(vec![client_input]),
466 operation_id,
467 )
468 .await
469 .expect("Cannot claim input, additional funding needed"),
470 )
471 }
472
473 let tx_out_proof = &tx_out_proof;
474
475 debug!(target: LOG_CLIENT_MODULE_WALLET, %out_point, "Claiming a peg-in");
476
477 client_ctx
478 .module_db()
479 .autocommit(
480 |dbtx, _| {
481 Box::pin(async {
482 let maybe_change_range = claim_peg_in_inner(
483 client_ctx,
484 dbtx,
485 transaction,
486 out_point.vout,
487 tweak_key,
488 tx_out_proof.clone(),
489 operation_id,
490 federation_knows_utxo,
491 )
492 .await;
493
494 let claimed_pegin_data = if let Some(change_range) = maybe_change_range {
495 ClaimedPegInData {
496 claim_txid: change_range.txid(),
497 change: change_range.into_iter().collect(),
498 }
499 } else {
500 ClaimedPegInData {
501 claim_txid: TransactionId::from_byte_array([0; 32]),
502 change: vec![],
503 }
504 };
505
506 dbtx.insert_entry(
507 &ClaimedPegInKey {
508 peg_in_index: tweak_idx,
509 btc_out_point: out_point,
510 },
511 &claimed_pegin_data,
512 )
513 .await;
514
515 Ok(())
516 })
517 },
518 Some(100),
519 )
520 .await
521 .map_err(|e| match e {
522 AutocommitError::CommitFailed {
523 last_error,
524 attempts,
525 } => last_error.context(format!("Failed to commit after {attempts} attempts")),
526 AutocommitError::ClosureError { error, .. } => error,
527 })?;
528
529 Ok(())
530}
531
532pub(crate) fn filter_onchain_deposit_outputs<'a>(
533 tx_iter: impl Iterator<Item = bitcoin::Transaction> + 'a,
534 out_script: &'a ScriptBuf,
535) -> impl Iterator<Item = (bitcoin::Transaction, u32)> + 'a {
536 tx_iter.flat_map(move |tx| {
537 tx.output
538 .clone()
539 .into_iter()
540 .enumerate()
541 .filter_map(move |(out_idx, tx_out)| {
542 if &tx_out.script_pubkey == out_script {
543 Some((tx.clone(), out_idx as u32))
544 } else {
545 None
546 }
547 })
548 })
549}