fedimint_wallet_client/
pegin_monitor.rs

1use std::cmp;
2use std::time::{Duration, SystemTime};
3
4use bitcoin::ScriptBuf;
5use fedimint_api_client::api::DynModuleApi;
6use fedimint_bitcoind::DynBitcoindRpc;
7use fedimint_client_module::module::{ClientContext, OutPointRange};
8use fedimint_client_module::transaction::{ClientInput, ClientInputBundle};
9use fedimint_core::core::OperationId;
10use fedimint_core::db::{
11    AutocommitError, Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped as _,
12};
13use fedimint_core::envs::is_running_in_test_env;
14use fedimint_core::task::sleep;
15use fedimint_core::txoproof::TxOutProof;
16use fedimint_core::util::FmtCompactAnyhow as _;
17use fedimint_core::{BitcoinHash, TransactionId, secp256k1, time};
18use fedimint_logging::LOG_CLIENT_MODULE_WALLET;
19use fedimint_wallet_common::WalletInput;
20use fedimint_wallet_common::txoproof::PegInProof;
21use futures::StreamExt as _;
22use secp256k1::Keypair;
23use tokio::sync::watch;
24use tracing::{debug, instrument, trace, warn};
25
26use crate::api::WalletFederationApi as _;
27use crate::client_db::{
28    ClaimedPegInData, ClaimedPegInKey, PegInTweakIndexData, PegInTweakIndexKey,
29    PegInTweakIndexPrefix, TweakIdx,
30};
31use crate::events::DepositConfirmed;
32use crate::{WalletClientModule, WalletClientModuleData};
33
34/// A helper struct meant to combined data from all addresses/records
35/// into a single struct with all actionable data.
36#[derive(Debug, Clone)]
37struct NextActions {
38    /// Current time
39    now: SystemTime,
40    /// Index keys due for a check
41    due: Vec<(PegInTweakIndexKey, PegInTweakIndexData)>,
42    /// Nearest key that is not due yet
43    next: Option<SystemTime>,
44}
45
46impl NextActions {
47    pub fn new() -> Self {
48        Self {
49            now: time::now(),
50            due: vec![],
51            next: None,
52        }
53    }
54}
55
56impl NextActions {
57    /// Calculate next actions from the database
58    async fn from_db_state(db: &Database) -> Self {
59        db.begin_transaction_nc()
60            .await
61            .find_by_prefix(&PegInTweakIndexPrefix)
62            .await
63            .fold(NextActions::new(), |state, (key, val)| async {
64                state.fold(key, val)
65            })
66            .await
67    }
68
69    /// Combine current state with another record
70    pub fn fold(mut self, key: PegInTweakIndexKey, val: PegInTweakIndexData) -> Self {
71        if let Some(next_check_time) = val.next_check_time {
72            if next_check_time < self.now {
73                self.due.push((key, val));
74            }
75
76            self.next = match self.next {
77                Some(existing) => Some(existing.min(next_check_time)),
78                None => Some(next_check_time),
79            };
80        }
81        self
82    }
83}
84
85/// A deposit monitoring task
86///
87/// On the high level it maintains a list of derived addresses with some info
88/// like when is the next time to check for deposits on them.
89#[allow(clippy::too_many_lines)]
90pub(crate) async fn run_peg_in_monitor(
91    client_ctx: ClientContext<WalletClientModule>,
92    db: Database,
93    btc_rpc: DynBitcoindRpc,
94    module_api: DynModuleApi,
95    data: WalletClientModuleData,
96    pegin_claimed_sender: watch::Sender<()>,
97    mut wakeup_receiver: watch::Receiver<()>,
98) {
99    let min_sleep: Duration = if is_running_in_test_env() {
100        Duration::from_millis(100)
101    } else {
102        Duration::from_secs(30)
103    };
104
105    loop {
106        if let Err(err) = check_for_deposits(
107            &db,
108            &data,
109            &btc_rpc,
110            &module_api,
111            &client_ctx,
112            &pegin_claimed_sender,
113        )
114        .await
115        {
116            warn!(target: LOG_CLIENT_MODULE_WALLET, error = %err.fmt_compact_anyhow(), "Error checking for deposits");
117            continue;
118        }
119
120        let now = time::now();
121        let next_wakeup = NextActions::from_db_state(&db).await.next.unwrap_or_else(||
122            /* for simplicity just wake up every hour, even when there's no need */
123             ( now + Duration::from_secs(60 * 60)));
124        let next_wakeup_duration = next_wakeup
125            .duration_since(now)
126            .unwrap_or_default()
127            .max(min_sleep);
128        debug!(target: LOG_CLIENT_MODULE_WALLET, sleep_msecs=%next_wakeup_duration.as_millis(), "Sleep after completing due checks");
129        tokio::select! {
130            () = sleep(next_wakeup_duration) => {
131                debug!(target: LOG_CLIENT_MODULE_WALLET, "Woken up by a scheduled wakeup");
132            },
133            res = wakeup_receiver.changed() => {
134                debug!(target: LOG_CLIENT_MODULE_WALLET, "Woken up by a signal");
135                if res.is_err() {
136                    debug!(target: LOG_CLIENT_MODULE_WALLET,  "Terminating peg-in monitor");
137                    return;
138                }
139            }
140        }
141    }
142}
143
144async fn check_for_deposits(
145    db: &Database,
146    data: &WalletClientModuleData,
147    btc_rpc: &DynBitcoindRpc,
148    module_api: &DynModuleApi,
149    client_ctx: &ClientContext<WalletClientModule>,
150    pengin_claimed_sender: &watch::Sender<()>,
151) -> Result<(), anyhow::Error> {
152    let due = NextActions::from_db_state(db).await.due;
153    trace!(target: LOG_CLIENT_MODULE_WALLET, ?due, "Checking for deposists");
154    for (due_key, due_val) in due {
155        check_and_claim_idx_pegins(
156            data,
157            due_key,
158            btc_rpc,
159            module_api,
160            db,
161            client_ctx,
162            due_val,
163            pengin_claimed_sender,
164        )
165        .await?;
166    }
167
168    Ok(())
169}
170
171#[allow(clippy::too_many_arguments)]
172async fn check_and_claim_idx_pegins(
173    data: &WalletClientModuleData,
174    due_key: PegInTweakIndexKey,
175    btc_rpc: &DynBitcoindRpc,
176    module_api: &DynModuleApi,
177    db: &Database,
178    client_ctx: &ClientContext<WalletClientModule>,
179    due_val: PegInTweakIndexData,
180    pengin_claimed_sender: &watch::Sender<()>,
181) -> Result<(), anyhow::Error> {
182    let now = time::now();
183    match check_idx_pegins(data, due_key.0, btc_rpc, module_api, db, client_ctx).await {
184        Ok(outcomes) => {
185            let next_check_time = CheckOutcome::retry_delay_vec(&outcomes, due_val.creation_time)
186                .map(|duration| now + duration);
187            db
188                .autocommit(
189                    |dbtx, _| {
190                        Box::pin(async {
191                            let claimed_now = CheckOutcome::get_claimed_now_outpoints(&outcomes);
192
193                            let claimed_sender = pengin_claimed_sender.clone();
194                            dbtx.on_commit(move || {
195                                claimed_sender.send_replace(());
196                            });
197
198                            let peg_in_tweak_index_data = PegInTweakIndexData {
199                                next_check_time,
200                                last_check_time: Some(now),
201                                claimed: [due_val.claimed.clone(), claimed_now].concat(),
202                                ..due_val
203                            };
204                            trace!(
205                                target: LOG_CLIENT_MODULE_WALLET,
206                                tweak_idx=%due_key.0,
207                                due_in_secs=?next_check_time.map(|next_check_time| next_check_time.duration_since(now).unwrap_or_default().as_secs()),
208                                data=?peg_in_tweak_index_data,
209                                "Updating"
210                            );
211                            dbtx
212                                .insert_entry(&due_key, &peg_in_tweak_index_data)
213                                .await;
214
215                            Ok::<_, anyhow::Error>(())
216                        })
217                    },
218                    None,
219                )
220                .await?;
221        }
222        Err(err) => {
223            debug!(target: LOG_CLIENT_MODULE_WALLET, err = %err.fmt_compact_anyhow(), tweak_idx=%due_key.0, "Error checking tweak_idx");
224        }
225    }
226    Ok(())
227}
228
229/// Outcome of checking a single deposit Bitcoin transaction output
230///
231/// For every address there can be multiple outcomes (`Vec<Self>`).
232#[derive(Copy, Clone, Debug)]
233enum CheckOutcome {
234    /// There's a tx pending (needs more confirmation)
235    Pending { num_blocks_needed: u64 },
236    /// A state machine was created to claim the peg-in
237    Claimed { outpoint: bitcoin::OutPoint },
238
239    /// A peg-in transaction was already claimed (state machine created) in the
240    /// past
241    AlreadyClaimed,
242}
243
244impl CheckOutcome {
245    /// Desired retry delay for a single outcome
246    ///
247    /// None means "no need to check anymore".
248    fn retry_delay(self) -> Option<Duration> {
249        match self {
250            // Check again in time proportional to the expected block confirmation time
251            CheckOutcome::Pending { num_blocks_needed } => {
252                if is_running_in_test_env() {
253                    // In tests, we basically mine all blocks right away
254                    Some(Duration::from_millis(1))
255                } else {
256                    Some(Duration::from_secs(60 * num_blocks_needed))
257                }
258            }
259            // Once anything has been claimed, there's no reason to claim again automatically,
260            // and it's undesirable due to privacy reasons.
261            // Users can possibly update the underlying record via other means to force a check on
262            // demand.
263            CheckOutcome::Claimed { .. } | CheckOutcome::AlreadyClaimed => None,
264        }
265    }
266
267    /// Desired retry delay for a bunch of outcomes.
268    ///
269    /// This time is intended to be persisted in the database.
270    ///
271    /// None means "no need to check anymore".
272    fn retry_delay_vec(outcomes: &[CheckOutcome], creation_time: SystemTime) -> Option<Duration> {
273        // If the address was allocated, but nothing was ever received or even detected
274        // on it yet, check again in time proportional to the age of the
275        // address.
276        if outcomes.is_empty() {
277            if is_running_in_test_env() {
278                // When testing we usually send deposits right away, so check more aggressively.
279                return Some(Duration::from_millis(100));
280            }
281            let now = time::now();
282            let age = now.duration_since(creation_time).unwrap_or_default();
283            return Some(age / 10);
284        }
285
286        // The delays is the minimum retry delay.
287        let mut min = None;
288
289        for outcome in outcomes {
290            min = match (min, outcome.retry_delay()) {
291                (None, time) => time,
292                (Some(min), None) => Some(min),
293                (Some(min), Some(time)) => Some(cmp::min(min, time)),
294            };
295        }
296
297        min
298    }
299
300    fn get_claimed_now_outpoints(outcomes: &[CheckOutcome]) -> Vec<bitcoin::OutPoint> {
301        let mut res = vec![];
302        for outcome in outcomes {
303            if let CheckOutcome::Claimed { outpoint } = outcome {
304                res.push(*outpoint);
305            }
306        }
307
308        res
309    }
310}
311
312/// Query via btc rpc for a history of an address derived with `tweak_idx` and
313/// claim any peg-ins that are ready.
314///
315/// Return a list of [`CheckOutcome`]s for each matching output.
316#[instrument(target = LOG_CLIENT_MODULE_WALLET, skip_all, fields(tweak_idx))]
317async fn check_idx_pegins(
318    data: &WalletClientModuleData,
319    tweak_idx: TweakIdx,
320    btc_rpc: &DynBitcoindRpc,
321    module_rpc: &DynModuleApi,
322    db: &Database,
323    client_ctx: &ClientContext<WalletClientModule>,
324) -> Result<Vec<CheckOutcome>, anyhow::Error> {
325    let current_consensus_block_count = module_rpc.fetch_consensus_block_count().await?;
326    let (script, address, tweak_key, operation_id) = data.derive_peg_in_script(tweak_idx);
327
328    let history = btc_rpc.get_script_history(&script).await?;
329
330    debug!(target: LOG_CLIENT_MODULE_WALLET, %address, num_txes=history.len(), "Got history of a peg-in address");
331
332    let mut outcomes = vec![];
333
334    for (transaction, out_idx) in filter_onchain_deposit_outputs(history.into_iter(), &script) {
335        let txid = transaction.compute_txid();
336        let outpoint = bitcoin::OutPoint {
337            txid,
338            vout: out_idx,
339        };
340
341        let claimed_peg_in_key = ClaimedPegInKey {
342            peg_in_index: tweak_idx,
343            btc_out_point: outpoint,
344        };
345
346        if db
347            .begin_transaction_nc()
348            .await
349            .get_value(&claimed_peg_in_key)
350            .await
351            .is_some()
352        {
353            debug!(target: LOG_CLIENT_MODULE_WALLET, %txid, %out_idx, "Already claimed");
354            outcomes.push(CheckOutcome::AlreadyClaimed);
355            continue;
356        }
357        let finality_delay = u64::from(data.cfg.finality_delay);
358
359        let tx_block_count =
360            if let Some(tx_block_height) = btc_rpc.get_tx_block_height(&txid).await? {
361                tx_block_height.saturating_add(1)
362            } else {
363                outcomes.push(CheckOutcome::Pending {
364                    num_blocks_needed: finality_delay,
365                });
366                debug!(target:LOG_CLIENT_MODULE_WALLET, %txid, %out_idx,"In the mempool");
367                continue;
368            };
369
370        let num_blocks_needed = tx_block_count.saturating_sub(current_consensus_block_count);
371
372        if 0 < num_blocks_needed {
373            outcomes.push(CheckOutcome::Pending { num_blocks_needed });
374            debug!(target: LOG_CLIENT_MODULE_WALLET, %txid, %out_idx, %num_blocks_needed, %finality_delay, %tx_block_count, %current_consensus_block_count, "Needs more confirmations");
375            continue;
376        }
377
378        debug!(target: LOG_CLIENT_MODULE_WALLET, %txid, %out_idx, %finality_delay, %tx_block_count, %current_consensus_block_count, "Ready to claim");
379
380        let tx_out_proof = btc_rpc.get_txout_proof(txid).await?;
381        let federation_knows_utxo = module_rpc.is_utxo_confirmed(outpoint).await?;
382
383        claim_peg_in(
384            client_ctx,
385            tweak_idx,
386            tweak_key,
387            &transaction,
388            operation_id,
389            outpoint,
390            tx_out_proof,
391            federation_knows_utxo,
392        )
393        .await?;
394        outcomes.push(CheckOutcome::Claimed { outpoint });
395    }
396    Ok(outcomes)
397}
398
399#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
400async fn claim_peg_in(
401    client_ctx: &ClientContext<WalletClientModule>,
402    tweak_idx: TweakIdx,
403    tweak_key: Keypair,
404    transaction: &bitcoin::Transaction,
405    operation_id: OperationId,
406    out_point: bitcoin::OutPoint,
407    tx_out_proof: TxOutProof,
408    federation_knows_utxo: bool,
409) -> anyhow::Result<()> {
410    /// Returns the claim transactions output range if a claim happened or
411    /// `None` otherwise if the deposit was smaller than the deposit fee.
412    async fn claim_peg_in_inner(
413        client_ctx: &ClientContext<WalletClientModule>,
414        dbtx: &mut DatabaseTransaction<'_>,
415        btc_transaction: &bitcoin::Transaction,
416        out_idx: u32,
417        tweak_key: Keypair,
418        txout_proof: TxOutProof,
419        operation_id: OperationId,
420        federation_knows_utxo: bool,
421    ) -> Option<OutPointRange> {
422        let pegin_proof = PegInProof::new(
423            txout_proof,
424            btc_transaction.clone(),
425            out_idx,
426            tweak_key.public_key(),
427        )
428        .expect("TODO: handle API returning faulty proofs");
429
430        let amount = pegin_proof.tx_output().value.into();
431        let wallet_input = if federation_knows_utxo {
432            WalletInput::new_v1(&pegin_proof)
433        } else {
434            WalletInput::new_v0(pegin_proof)
435        };
436
437        let client_input = ClientInput::<WalletInput> {
438            input: wallet_input,
439            keys: vec![tweak_key],
440            amount,
441        };
442
443        if amount <= client_ctx.self_ref().cfg().fee_consensus.peg_in_abs {
444            warn!(target: LOG_CLIENT_MODULE_WALLET, "We won't claim a deposit lower than the deposit fee");
445            return None;
446        }
447
448        client_ctx
449            .log_event(
450                dbtx,
451                DepositConfirmed {
452                    txid: btc_transaction.compute_txid(),
453                    out_idx,
454                    amount,
455                },
456            )
457            .await;
458
459        Some(
460            client_ctx
461                .claim_inputs(
462                    dbtx,
463                    ClientInputBundle::new_no_sm(vec![client_input]),
464                    operation_id,
465                )
466                .await
467                .expect("Cannot claim input, additional funding needed"),
468        )
469    }
470
471    let tx_out_proof = &tx_out_proof;
472
473    debug!(target: LOG_CLIENT_MODULE_WALLET, %out_point, "Claiming a peg-in");
474
475    client_ctx
476        .module_db()
477        .autocommit(
478            |dbtx, _| {
479                Box::pin(async {
480                    let maybe_change_range = claim_peg_in_inner(
481                        client_ctx,
482                        dbtx,
483                        transaction,
484                        out_point.vout,
485                        tweak_key,
486                        tx_out_proof.clone(),
487                        operation_id,
488                        federation_knows_utxo,
489                    )
490                    .await;
491
492                    let claimed_pegin_data = if let Some(change_range) = maybe_change_range {
493                        ClaimedPegInData {
494                            claim_txid: change_range.txid(),
495                            change: change_range.into_iter().collect(),
496                        }
497                    } else {
498                        ClaimedPegInData {
499                            claim_txid: TransactionId::from_byte_array([0; 32]),
500                            change: vec![],
501                        }
502                    };
503
504                    dbtx.insert_entry(
505                        &ClaimedPegInKey {
506                            peg_in_index: tweak_idx,
507                            btc_out_point: out_point,
508                        },
509                        &claimed_pegin_data,
510                    )
511                    .await;
512
513                    Ok(())
514                })
515            },
516            Some(100),
517        )
518        .await
519        .map_err(|e| match e {
520            AutocommitError::CommitFailed {
521                last_error,
522                attempts,
523            } => last_error.context(format!("Failed to commit after {attempts} attempts")),
524            AutocommitError::ClosureError { error, .. } => error,
525        })?;
526
527    Ok(())
528}
529
530pub(crate) fn filter_onchain_deposit_outputs<'a>(
531    tx_iter: impl Iterator<Item = bitcoin::Transaction> + 'a,
532    out_script: &'a ScriptBuf,
533) -> impl Iterator<Item = (bitcoin::Transaction, u32)> + 'a {
534    tx_iter.flat_map(move |tx| {
535        tx.output
536            .clone()
537            .into_iter()
538            .enumerate()
539            .filter_map(move |(out_idx, tx_out)| {
540                if &tx_out.script_pubkey == out_script {
541                    Some((tx.clone(), out_idx as u32))
542                } else {
543                    None
544                }
545            })
546    })
547}