Skip to main content

fedimint_wallet_client/
pegin_monitor.rs

1use std::cmp;
2use std::time::{Duration, SystemTime};
3
4use anyhow::anyhow;
5use bitcoin::ScriptBuf;
6use fedimint_api_client::api::DynModuleApi;
7use fedimint_bitcoind::DynBitcoindRpc;
8use fedimint_client_module::module::{ClientContext, OutPointRange};
9use fedimint_client_module::transaction::{ClientInput, ClientInputBundle};
10use fedimint_core::core::OperationId;
11use fedimint_core::db::{
12    AutocommitError, Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped as _,
13};
14use fedimint_core::envs::is_running_in_test_env;
15use fedimint_core::module::Amounts;
16use fedimint_core::task::sleep;
17use fedimint_core::txoproof::TxOutProof;
18use fedimint_core::util::FmtCompactAnyhow as _;
19use fedimint_core::{BitcoinHash, TransactionId, secp256k1, time};
20use fedimint_logging::LOG_CLIENT_MODULE_WALLET;
21use fedimint_wallet_common::WalletInput;
22use fedimint_wallet_common::txoproof::PegInProof;
23use futures::StreamExt as _;
24use secp256k1::Keypair;
25use tokio::sync::watch;
26use tracing::{debug, instrument, trace, warn};
27
28use crate::api::WalletFederationApi as _;
29use crate::client_db::{
30    ClaimedPegInData, ClaimedPegInKey, PegInTweakIndexData, PegInTweakIndexKey,
31    PegInTweakIndexPrefix, TweakIdx,
32};
33use crate::events::{DepositConfirmed, ReceivePaymentEvent};
34use crate::{WalletClientModule, WalletClientModuleData};
35
36/// A helper struct meant to combined data from all addresses/records
37/// into a single struct with all actionable data.
38#[derive(Debug, Clone)]
39struct NextActions {
40    /// Current time
41    now: SystemTime,
42    /// Index keys due for a check
43    due: Vec<(PegInTweakIndexKey, PegInTweakIndexData)>,
44    /// Nearest key that is not due yet
45    next: Option<SystemTime>,
46}
47
48impl NextActions {
49    pub fn new() -> Self {
50        Self {
51            now: time::now(),
52            due: vec![],
53            next: None,
54        }
55    }
56}
57
58impl NextActions {
59    /// Calculate next actions from the database
60    async fn from_db_state(db: &Database) -> Self {
61        db.begin_transaction_nc()
62            .await
63            .find_by_prefix(&PegInTweakIndexPrefix)
64            .await
65            .fold(NextActions::new(), |state, (key, val)| async {
66                state.fold(key, val)
67            })
68            .await
69    }
70
71    /// Combine current state with another record
72    pub fn fold(mut self, key: PegInTweakIndexKey, val: PegInTweakIndexData) -> Self {
73        if let Some(next_check_time) = val.next_check_time {
74            if next_check_time < self.now {
75                self.due.push((key, val));
76            }
77
78            self.next = match self.next {
79                Some(existing) => Some(existing.min(next_check_time)),
80                None => Some(next_check_time),
81            };
82        }
83        self
84    }
85}
86
87/// A deposit monitoring task
88///
89/// On the high level it maintains a list of derived addresses with some info
90/// like when is the next time to check for deposits on them.
91#[allow(clippy::too_many_lines)]
92pub(crate) async fn run_peg_in_monitor(
93    client_ctx: ClientContext<WalletClientModule>,
94    db: Database,
95    btc_rpc: DynBitcoindRpc,
96    module_api: DynModuleApi,
97    data: WalletClientModuleData,
98    pegin_claimed_sender: watch::Sender<()>,
99    mut wakeup_receiver: watch::Receiver<()>,
100) {
101    let min_sleep: Duration = if is_running_in_test_env() {
102        Duration::from_millis(100)
103    } else {
104        Duration::from_secs(30)
105    };
106
107    // How long to wait before the next check. Seeded to ZERO so the
108    // first iteration runs a check immediately. Recomputed at the end
109    // of each successful iteration based on DB scheduling and how the
110    // previous wakeup arrived (see `woke_via_signal` below).
111    let mut next_wakeup_duration = Duration::ZERO;
112
113    loop {
114        debug!(target: LOG_CLIENT_MODULE_WALLET, sleep_msecs=%next_wakeup_duration.as_millis(), "Sleeping before next check");
115        let woke_via_signal = tokio::select! {
116            () = sleep(next_wakeup_duration) => {
117                debug!(target: LOG_CLIENT_MODULE_WALLET, "Woken up by a scheduled wakeup");
118                false
119            },
120            res = wakeup_receiver.changed() => {
121                debug!(target: LOG_CLIENT_MODULE_WALLET, "Woken up by a signal");
122                if res.is_err() {
123                    debug!(target: LOG_CLIENT_MODULE_WALLET,  "Terminating peg-in monitor");
124                    return;
125                }
126                true
127            }
128        };
129
130        if let Err(err) = check_for_deposits(
131            &db,
132            &data,
133            &btc_rpc,
134            &module_api,
135            &client_ctx,
136            &pegin_claimed_sender,
137        )
138        .await
139        {
140            warn!(target: LOG_CLIENT_MODULE_WALLET, error = %err.fmt_compact_anyhow(), "Error checking for deposits");
141            // Retry after `min_sleep`. The next iteration's select still
142            // honours signals, so a manual recheck wakes us up sooner.
143            next_wakeup_duration = min_sleep;
144            continue;
145        }
146
147        let now = time::now();
148        let next_wakeup = NextActions::from_db_state(&db).await.next.unwrap_or_else(||
149            /* for simplicity just wake up every hour, even when there's no need */
150              now + Duration::from_hours(1));
151        // When this iteration was signal-driven (manual recheck, new
152        // address allocation) drop the `min_sleep` floor so the natural
153        // retry_delay_vec drives timing — relevant for fresh addresses
154        // where the natural delay is `age / 10` and would otherwise be
155        // clamped to 30s in production.
156        let effective_min_sleep = if woke_via_signal {
157            Duration::ZERO
158        } else {
159            min_sleep
160        };
161        next_wakeup_duration = next_wakeup
162            .duration_since(now)
163            .unwrap_or_default()
164            .max(effective_min_sleep);
165    }
166}
167
168async fn check_for_deposits(
169    db: &Database,
170    data: &WalletClientModuleData,
171    btc_rpc: &DynBitcoindRpc,
172    module_api: &DynModuleApi,
173    client_ctx: &ClientContext<WalletClientModule>,
174    pengin_claimed_sender: &watch::Sender<()>,
175) -> Result<(), anyhow::Error> {
176    let due = NextActions::from_db_state(db).await.due;
177    trace!(target: LOG_CLIENT_MODULE_WALLET, ?due, "Checking for deposists");
178    for (due_key, due_val) in due {
179        check_and_claim_idx_pegins(
180            data,
181            due_key,
182            btc_rpc,
183            module_api,
184            db,
185            client_ctx,
186            due_val,
187            pengin_claimed_sender,
188        )
189        .await?;
190    }
191
192    Ok(())
193}
194
195#[allow(clippy::too_many_arguments)]
196async fn check_and_claim_idx_pegins(
197    data: &WalletClientModuleData,
198    due_key: PegInTweakIndexKey,
199    btc_rpc: &DynBitcoindRpc,
200    module_api: &DynModuleApi,
201    db: &Database,
202    client_ctx: &ClientContext<WalletClientModule>,
203    due_val: PegInTweakIndexData,
204    pengin_claimed_sender: &watch::Sender<()>,
205) -> Result<(), anyhow::Error> {
206    let now = time::now();
207    match check_idx_pegins(data, due_key.0, btc_rpc, module_api, db, client_ctx).await {
208        Ok(outcomes) => {
209            let next_check_time = CheckOutcome::retry_delay_vec(&outcomes, due_val.creation_time)
210                .map(|duration| now + duration);
211            db
212                .autocommit(
213                    |dbtx, _| {
214                        Box::pin(async {
215                            // Re-read inside the transaction so a concurrent
216                            // `recheck_pegin_address` that landed while we were
217                            // doing Bitcoin RPC calls isn't silently overwritten.
218                            // Peg-in entries are never deleted, only updated,
219                            // so the entry is guaranteed to still exist even
220                            // across `autocommit` retries.
221                            let current = dbtx
222                                .get_value(&due_key)
223                                .await
224                                .expect("Peg-in entries are never deleted");
225
226                            // If `next_check_time` changed under us (a recheck set
227                            // it to "now"), keep the earliest of the two so the
228                            // recheck still triggers the next monitor iteration.
229                            let merged_next_check_time = if current.next_check_time
230                                == due_val.next_check_time
231                            {
232                                next_check_time
233                            } else {
234                                match (current.next_check_time, next_check_time) {
235                                    (Some(a), Some(b)) => Some(a.min(b)),
236                                    (a, None) => a,
237                                    (None, b) => b,
238                                }
239                            };
240
241                            let claimed_now = CheckOutcome::get_claimed_now_outpoints(&outcomes);
242
243                            let claimed_sender = pengin_claimed_sender.clone();
244                            dbtx.on_commit(move || {
245                                claimed_sender.send_replace(());
246                            });
247
248                            let peg_in_tweak_index_data = PegInTweakIndexData {
249                                next_check_time: merged_next_check_time,
250                                last_check_time: Some(now),
251                                claimed: [current.claimed.clone(), claimed_now].concat(),
252                                ..current
253                            };
254                            trace!(
255                                target: LOG_CLIENT_MODULE_WALLET,
256                                tweak_idx=%due_key.0,
257                                due_in_secs=?merged_next_check_time.map(|next_check_time| next_check_time.duration_since(now).unwrap_or_default().as_secs()),
258                                data=?peg_in_tweak_index_data,
259                                "Updating"
260                            );
261                            dbtx
262                                .insert_entry(&due_key, &peg_in_tweak_index_data)
263                                .await;
264
265                            Ok::<_, anyhow::Error>(())
266                        })
267                    },
268                    None,
269                )
270                .await?;
271        }
272        Err(err) => {
273            debug!(target: LOG_CLIENT_MODULE_WALLET, err = %err.fmt_compact_anyhow(), tweak_idx=%due_key.0, "Error checking tweak_idx");
274        }
275    }
276    Ok(())
277}
278
279/// Outcome of checking a single deposit Bitcoin transaction output
280///
281/// For every address there can be multiple outcomes (`Vec<Self>`).
282#[derive(Copy, Clone, Debug)]
283enum CheckOutcome {
284    /// There's a tx pending (needs more confirmation)
285    Pending { num_blocks_needed: u64 },
286    /// A state machine was created to claim the peg-in
287    Claimed { outpoint: bitcoin::OutPoint },
288
289    /// A peg-in transaction was already claimed (state machine created) in the
290    /// past
291    AlreadyClaimed,
292}
293
294impl CheckOutcome {
295    /// Desired retry delay for a single outcome
296    ///
297    /// None means "no need to check anymore".
298    fn retry_delay(self) -> Option<Duration> {
299        match self {
300            // Check again in time proportional to the expected block confirmation time
301            CheckOutcome::Pending { num_blocks_needed } => {
302                if is_running_in_test_env() {
303                    // In tests, we basically mine all blocks right away
304                    Some(Duration::from_millis(1))
305                } else {
306                    Some(Duration::from_secs(60 * num_blocks_needed))
307                }
308            }
309            // Once anything has been claimed, there's no reason to claim again automatically,
310            // and it's undesirable due to privacy reasons.
311            // Users can possibly update the underlying record via other means to force a check on
312            // demand.
313            CheckOutcome::Claimed { .. } | CheckOutcome::AlreadyClaimed => None,
314        }
315    }
316
317    /// Desired retry delay for a bunch of outcomes.
318    ///
319    /// This time is intended to be persisted in the database.
320    ///
321    /// None means "no need to check anymore".
322    fn retry_delay_vec(outcomes: &[CheckOutcome], creation_time: SystemTime) -> Option<Duration> {
323        // If the address was allocated, but nothing was ever received or even detected
324        // on it yet, check again in time proportional to the age of the
325        // address.
326        if outcomes.is_empty() {
327            if is_running_in_test_env() {
328                // When testing we usually send deposits right away, so check more aggressively.
329                return Some(Duration::from_millis(100));
330            }
331            let now = time::now();
332            let age = now.duration_since(creation_time).unwrap_or_default();
333            return Some(age / 10);
334        }
335
336        // The delays is the minimum retry delay.
337        let mut min = None;
338
339        for outcome in outcomes {
340            min = match (min, outcome.retry_delay()) {
341                (None, time) => time,
342                (Some(min), None) => Some(min),
343                (Some(min), Some(time)) => Some(cmp::min(min, time)),
344            };
345        }
346
347        min
348    }
349
350    fn get_claimed_now_outpoints(outcomes: &[CheckOutcome]) -> Vec<bitcoin::OutPoint> {
351        let mut res = vec![];
352        for outcome in outcomes {
353            if let CheckOutcome::Claimed { outpoint } = outcome {
354                res.push(*outpoint);
355            }
356        }
357
358        res
359    }
360}
361
362/// Query via btc rpc for a history of an address derived with `tweak_idx` and
363/// claim any peg-ins that are ready.
364///
365/// Return a list of [`CheckOutcome`]s for each matching output.
366#[instrument(target = LOG_CLIENT_MODULE_WALLET, skip_all, fields(tweak_idx))]
367async fn check_idx_pegins(
368    data: &WalletClientModuleData,
369    tweak_idx: TweakIdx,
370    btc_rpc: &DynBitcoindRpc,
371    module_rpc: &DynModuleApi,
372    db: &Database,
373    client_ctx: &ClientContext<WalletClientModule>,
374) -> Result<Vec<CheckOutcome>, anyhow::Error> {
375    let current_consensus_block_count = module_rpc.fetch_consensus_block_count().await?;
376    let (script, address, tweak_key, operation_id) = data.derive_peg_in_script(tweak_idx);
377    btc_rpc.watch_script_history(&script).await?;
378
379    let history = btc_rpc.get_script_history(&script).await?;
380
381    debug!(target: LOG_CLIENT_MODULE_WALLET, %address, num_txes=history.len(), "Got history of a peg-in address");
382
383    let mut outcomes = vec![];
384
385    for (transaction, out_idx) in filter_onchain_deposit_outputs(history.into_iter(), &script) {
386        let txid = transaction.compute_txid();
387        let outpoint = bitcoin::OutPoint {
388            txid,
389            vout: out_idx,
390        };
391
392        let claimed_peg_in_key = ClaimedPegInKey {
393            peg_in_index: tweak_idx,
394            btc_out_point: outpoint,
395        };
396
397        if db
398            .begin_transaction_nc()
399            .await
400            .get_value(&claimed_peg_in_key)
401            .await
402            .is_some()
403        {
404            debug!(target: LOG_CLIENT_MODULE_WALLET, %txid, %out_idx, "Already claimed");
405            outcomes.push(CheckOutcome::AlreadyClaimed);
406            continue;
407        }
408        let finality_delay = u64::from(data.cfg.finality_delay);
409
410        let tx_block_count =
411            if let Some(tx_block_height) = btc_rpc.get_tx_block_height(&txid).await? {
412                tx_block_height.saturating_add(1)
413            } else {
414                outcomes.push(CheckOutcome::Pending {
415                    num_blocks_needed: finality_delay,
416                });
417                debug!(target:LOG_CLIENT_MODULE_WALLET, %txid, %out_idx,"In the mempool");
418                continue;
419            };
420
421        let num_blocks_needed = tx_block_count.saturating_sub(current_consensus_block_count);
422
423        if 0 < num_blocks_needed {
424            outcomes.push(CheckOutcome::Pending { num_blocks_needed });
425            debug!(target: LOG_CLIENT_MODULE_WALLET, %txid, %out_idx, %num_blocks_needed, %finality_delay, %tx_block_count, %current_consensus_block_count, "Needs more confirmations");
426            continue;
427        }
428
429        debug!(target: LOG_CLIENT_MODULE_WALLET, %txid, %out_idx, %finality_delay, %tx_block_count, %current_consensus_block_count, "Ready to claim");
430
431        let tx_out_proof = btc_rpc.get_txout_proof(txid).await?;
432        let federation_knows_utxo = module_rpc.is_utxo_confirmed(outpoint).await?;
433
434        claim_peg_in(
435            client_ctx,
436            tweak_idx,
437            tweak_key,
438            &transaction,
439            operation_id,
440            outpoint,
441            tx_out_proof,
442            federation_knows_utxo,
443        )
444        .await?;
445        outcomes.push(CheckOutcome::Claimed { outpoint });
446    }
447    Ok(outcomes)
448}
449
450#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
451async fn claim_peg_in(
452    client_ctx: &ClientContext<WalletClientModule>,
453    tweak_idx: TweakIdx,
454    tweak_key: Keypair,
455    transaction: &bitcoin::Transaction,
456    operation_id: OperationId,
457    out_point: bitcoin::OutPoint,
458    tx_out_proof: TxOutProof,
459    federation_knows_utxo: bool,
460) -> anyhow::Result<()> {
461    /// Returns the claim transactions output range if a claim happened or
462    /// `None` otherwise if the deposit was smaller than the deposit fee.
463    async fn claim_peg_in_inner(
464        client_ctx: &ClientContext<WalletClientModule>,
465        dbtx: &mut DatabaseTransaction<'_>,
466        btc_transaction: &bitcoin::Transaction,
467        out_idx: u32,
468        tweak_key: Keypair,
469        txout_proof: TxOutProof,
470        operation_id: OperationId,
471        federation_knows_utxo: bool,
472    ) -> Option<OutPointRange> {
473        let pegin_proof = PegInProof::new(
474            txout_proof,
475            btc_transaction.clone(),
476            out_idx,
477            tweak_key.public_key(),
478        )
479        .expect("TODO: handle API returning faulty proofs");
480
481        let amount = pegin_proof.tx_output().value.into();
482        let wallet_input = if federation_knows_utxo {
483            WalletInput::new_v1(&pegin_proof)
484        } else {
485            WalletInput::new_v0(pegin_proof)
486        };
487
488        let client_input = ClientInput::<WalletInput> {
489            input: wallet_input,
490            keys: vec![tweak_key],
491            amounts: Amounts::new_bitcoin(amount),
492        };
493
494        if amount <= client_ctx.self_ref().cfg().fee_consensus.peg_in_abs {
495            warn!(target: LOG_CLIENT_MODULE_WALLET, "We won't claim a deposit lower than the deposit fee");
496            return None;
497        }
498
499        let txid = btc_transaction.compute_txid();
500
501        client_ctx
502            .log_event(
503                dbtx,
504                DepositConfirmed {
505                    txid,
506                    out_idx,
507                    amount,
508                },
509            )
510            .await;
511
512        client_ctx
513            .log_event(
514                dbtx,
515                ReceivePaymentEvent {
516                    operation_id,
517                    amount,
518                    txid,
519                },
520            )
521            .await;
522
523        Some(
524            client_ctx
525                .claim_inputs(
526                    dbtx,
527                    ClientInputBundle::new_no_sm(vec![client_input]),
528                    operation_id,
529                )
530                .await
531                .expect("Cannot claim input, additional funding needed"),
532        )
533    }
534
535    let tx_out_proof = &tx_out_proof;
536
537    debug!(target: LOG_CLIENT_MODULE_WALLET, %out_point, "Claiming a peg-in");
538
539    client_ctx
540        .module_db()
541        .autocommit(
542            |dbtx, _| {
543                Box::pin(async {
544                    let maybe_change_range = claim_peg_in_inner(
545                        client_ctx,
546                        dbtx,
547                        transaction,
548                        out_point.vout,
549                        tweak_key,
550                        tx_out_proof.clone(),
551                        operation_id,
552                        federation_knows_utxo,
553                    )
554                    .await;
555
556                    let claimed_pegin_data = if let Some(change_range) = maybe_change_range {
557                        ClaimedPegInData {
558                            claim_txid: change_range.txid(),
559                            change: change_range.into_iter().collect(),
560                        }
561                    } else {
562                        ClaimedPegInData {
563                            claim_txid: TransactionId::from_byte_array([0; 32]),
564                            change: vec![],
565                        }
566                    };
567
568                    dbtx.insert_entry(
569                        &ClaimedPegInKey {
570                            peg_in_index: tweak_idx,
571                            btc_out_point: out_point,
572                        },
573                        &claimed_pegin_data,
574                    )
575                    .await;
576
577                    Ok(())
578                })
579            },
580            Some(100),
581        )
582        .await
583        .map_err(|e| match e {
584            AutocommitError::CommitFailed {
585                last_error,
586                attempts,
587            } => anyhow!("Failed to commit after {attempts} attempts: {last_error}"),
588            AutocommitError::ClosureError { error, .. } => error,
589        })?;
590
591    Ok(())
592}
593
594pub(crate) fn filter_onchain_deposit_outputs<'a>(
595    tx_iter: impl Iterator<Item = bitcoin::Transaction> + 'a,
596    out_script: &'a ScriptBuf,
597) -> impl Iterator<Item = (bitcoin::Transaction, u32)> + 'a {
598    tx_iter.flat_map(move |tx| {
599        tx.output
600            .clone()
601            .into_iter()
602            .enumerate()
603            .filter_map(move |(out_idx, tx_out)| {
604                if &tx_out.script_pubkey == out_script {
605                    Some((tx.clone(), out_idx as u32))
606                } else {
607                    None
608                }
609            })
610    })
611}