fedimint_wallet_client/
pegin_monitor.rs

1use std::cmp;
2use std::time::{Duration, SystemTime};
3
4use anyhow::anyhow;
5use bitcoin::ScriptBuf;
6use fedimint_api_client::api::DynModuleApi;
7use fedimint_bitcoind::DynBitcoindRpc;
8use fedimint_client_module::module::{ClientContext, OutPointRange};
9use fedimint_client_module::transaction::{ClientInput, ClientInputBundle};
10use fedimint_core::core::OperationId;
11use fedimint_core::db::{
12    AutocommitError, Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped as _,
13};
14use fedimint_core::envs::is_running_in_test_env;
15use fedimint_core::module::Amounts;
16use fedimint_core::task::sleep;
17use fedimint_core::txoproof::TxOutProof;
18use fedimint_core::util::FmtCompactAnyhow as _;
19use fedimint_core::{BitcoinHash, TransactionId, secp256k1, time};
20use fedimint_logging::LOG_CLIENT_MODULE_WALLET;
21use fedimint_wallet_common::WalletInput;
22use fedimint_wallet_common::txoproof::PegInProof;
23use futures::StreamExt as _;
24use secp256k1::Keypair;
25use tokio::sync::watch;
26use tracing::{debug, instrument, trace, warn};
27
28use crate::api::WalletFederationApi as _;
29use crate::client_db::{
30    ClaimedPegInData, ClaimedPegInKey, PegInTweakIndexData, PegInTweakIndexKey,
31    PegInTweakIndexPrefix, TweakIdx,
32};
33use crate::events::{DepositConfirmed, ReceivePaymentEvent};
34use crate::{WalletClientModule, WalletClientModuleData};
35
36/// A helper struct meant to combined data from all addresses/records
37/// into a single struct with all actionable data.
38#[derive(Debug, Clone)]
39struct NextActions {
40    /// Current time
41    now: SystemTime,
42    /// Index keys due for a check
43    due: Vec<(PegInTweakIndexKey, PegInTweakIndexData)>,
44    /// Nearest key that is not due yet
45    next: Option<SystemTime>,
46}
47
48impl NextActions {
49    pub fn new() -> Self {
50        Self {
51            now: time::now(),
52            due: vec![],
53            next: None,
54        }
55    }
56}
57
58impl NextActions {
59    /// Calculate next actions from the database
60    async fn from_db_state(db: &Database) -> Self {
61        db.begin_transaction_nc()
62            .await
63            .find_by_prefix(&PegInTweakIndexPrefix)
64            .await
65            .fold(NextActions::new(), |state, (key, val)| async {
66                state.fold(key, val)
67            })
68            .await
69    }
70
71    /// Combine current state with another record
72    pub fn fold(mut self, key: PegInTweakIndexKey, val: PegInTweakIndexData) -> Self {
73        if let Some(next_check_time) = val.next_check_time {
74            if next_check_time < self.now {
75                self.due.push((key, val));
76            }
77
78            self.next = match self.next {
79                Some(existing) => Some(existing.min(next_check_time)),
80                None => Some(next_check_time),
81            };
82        }
83        self
84    }
85}
86
87/// A deposit monitoring task
88///
89/// On the high level it maintains a list of derived addresses with some info
90/// like when is the next time to check for deposits on them.
91#[allow(clippy::too_many_lines)]
92pub(crate) async fn run_peg_in_monitor(
93    client_ctx: ClientContext<WalletClientModule>,
94    db: Database,
95    btc_rpc: DynBitcoindRpc,
96    module_api: DynModuleApi,
97    data: WalletClientModuleData,
98    pegin_claimed_sender: watch::Sender<()>,
99    mut wakeup_receiver: watch::Receiver<()>,
100) {
101    let min_sleep: Duration = if is_running_in_test_env() {
102        Duration::from_millis(100)
103    } else {
104        Duration::from_secs(30)
105    };
106
107    loop {
108        if let Err(err) = check_for_deposits(
109            &db,
110            &data,
111            &btc_rpc,
112            &module_api,
113            &client_ctx,
114            &pegin_claimed_sender,
115        )
116        .await
117        {
118            warn!(target: LOG_CLIENT_MODULE_WALLET, error = %err.fmt_compact_anyhow(), "Error checking for deposits");
119            continue;
120        }
121
122        let now = time::now();
123        let next_wakeup = NextActions::from_db_state(&db).await.next.unwrap_or_else(||
124            /* for simplicity just wake up every hour, even when there's no need */
125              now + Duration::from_secs(60 * 60));
126        let next_wakeup_duration = next_wakeup
127            .duration_since(now)
128            .unwrap_or_default()
129            .max(min_sleep);
130        debug!(target: LOG_CLIENT_MODULE_WALLET, sleep_msecs=%next_wakeup_duration.as_millis(), "Sleep after completing due checks");
131        tokio::select! {
132            () = sleep(next_wakeup_duration) => {
133                debug!(target: LOG_CLIENT_MODULE_WALLET, "Woken up by a scheduled wakeup");
134            },
135            res = wakeup_receiver.changed() => {
136                debug!(target: LOG_CLIENT_MODULE_WALLET, "Woken up by a signal");
137                if res.is_err() {
138                    debug!(target: LOG_CLIENT_MODULE_WALLET,  "Terminating peg-in monitor");
139                    return;
140                }
141            }
142        }
143    }
144}
145
146async fn check_for_deposits(
147    db: &Database,
148    data: &WalletClientModuleData,
149    btc_rpc: &DynBitcoindRpc,
150    module_api: &DynModuleApi,
151    client_ctx: &ClientContext<WalletClientModule>,
152    pengin_claimed_sender: &watch::Sender<()>,
153) -> Result<(), anyhow::Error> {
154    let due = NextActions::from_db_state(db).await.due;
155    trace!(target: LOG_CLIENT_MODULE_WALLET, ?due, "Checking for deposists");
156    for (due_key, due_val) in due {
157        check_and_claim_idx_pegins(
158            data,
159            due_key,
160            btc_rpc,
161            module_api,
162            db,
163            client_ctx,
164            due_val,
165            pengin_claimed_sender,
166        )
167        .await?;
168    }
169
170    Ok(())
171}
172
173#[allow(clippy::too_many_arguments)]
174async fn check_and_claim_idx_pegins(
175    data: &WalletClientModuleData,
176    due_key: PegInTweakIndexKey,
177    btc_rpc: &DynBitcoindRpc,
178    module_api: &DynModuleApi,
179    db: &Database,
180    client_ctx: &ClientContext<WalletClientModule>,
181    due_val: PegInTweakIndexData,
182    pengin_claimed_sender: &watch::Sender<()>,
183) -> Result<(), anyhow::Error> {
184    let now = time::now();
185    match check_idx_pegins(data, due_key.0, btc_rpc, module_api, db, client_ctx).await {
186        Ok(outcomes) => {
187            let next_check_time = CheckOutcome::retry_delay_vec(&outcomes, due_val.creation_time)
188                .map(|duration| now + duration);
189            db
190                .autocommit(
191                    |dbtx, _| {
192                        Box::pin(async {
193                            let claimed_now = CheckOutcome::get_claimed_now_outpoints(&outcomes);
194
195                            let claimed_sender = pengin_claimed_sender.clone();
196                            dbtx.on_commit(move || {
197                                claimed_sender.send_replace(());
198                            });
199
200                            let peg_in_tweak_index_data = PegInTweakIndexData {
201                                next_check_time,
202                                last_check_time: Some(now),
203                                claimed: [due_val.claimed.clone(), claimed_now].concat(),
204                                ..due_val
205                            };
206                            trace!(
207                                target: LOG_CLIENT_MODULE_WALLET,
208                                tweak_idx=%due_key.0,
209                                due_in_secs=?next_check_time.map(|next_check_time| next_check_time.duration_since(now).unwrap_or_default().as_secs()),
210                                data=?peg_in_tweak_index_data,
211                                "Updating"
212                            );
213                            dbtx
214                                .insert_entry(&due_key, &peg_in_tweak_index_data)
215                                .await;
216
217                            Ok::<_, anyhow::Error>(())
218                        })
219                    },
220                    None,
221                )
222                .await?;
223        }
224        Err(err) => {
225            debug!(target: LOG_CLIENT_MODULE_WALLET, err = %err.fmt_compact_anyhow(), tweak_idx=%due_key.0, "Error checking tweak_idx");
226        }
227    }
228    Ok(())
229}
230
231/// Outcome of checking a single deposit Bitcoin transaction output
232///
233/// For every address there can be multiple outcomes (`Vec<Self>`).
234#[derive(Copy, Clone, Debug)]
235enum CheckOutcome {
236    /// There's a tx pending (needs more confirmation)
237    Pending { num_blocks_needed: u64 },
238    /// A state machine was created to claim the peg-in
239    Claimed { outpoint: bitcoin::OutPoint },
240
241    /// A peg-in transaction was already claimed (state machine created) in the
242    /// past
243    AlreadyClaimed,
244}
245
246impl CheckOutcome {
247    /// Desired retry delay for a single outcome
248    ///
249    /// None means "no need to check anymore".
250    fn retry_delay(self) -> Option<Duration> {
251        match self {
252            // Check again in time proportional to the expected block confirmation time
253            CheckOutcome::Pending { num_blocks_needed } => {
254                if is_running_in_test_env() {
255                    // In tests, we basically mine all blocks right away
256                    Some(Duration::from_millis(1))
257                } else {
258                    Some(Duration::from_secs(60 * num_blocks_needed))
259                }
260            }
261            // Once anything has been claimed, there's no reason to claim again automatically,
262            // and it's undesirable due to privacy reasons.
263            // Users can possibly update the underlying record via other means to force a check on
264            // demand.
265            CheckOutcome::Claimed { .. } | CheckOutcome::AlreadyClaimed => None,
266        }
267    }
268
269    /// Desired retry delay for a bunch of outcomes.
270    ///
271    /// This time is intended to be persisted in the database.
272    ///
273    /// None means "no need to check anymore".
274    fn retry_delay_vec(outcomes: &[CheckOutcome], creation_time: SystemTime) -> Option<Duration> {
275        // If the address was allocated, but nothing was ever received or even detected
276        // on it yet, check again in time proportional to the age of the
277        // address.
278        if outcomes.is_empty() {
279            if is_running_in_test_env() {
280                // When testing we usually send deposits right away, so check more aggressively.
281                return Some(Duration::from_millis(100));
282            }
283            let now = time::now();
284            let age = now.duration_since(creation_time).unwrap_or_default();
285            return Some(age / 10);
286        }
287
288        // The delays is the minimum retry delay.
289        let mut min = None;
290
291        for outcome in outcomes {
292            min = match (min, outcome.retry_delay()) {
293                (None, time) => time,
294                (Some(min), None) => Some(min),
295                (Some(min), Some(time)) => Some(cmp::min(min, time)),
296            };
297        }
298
299        min
300    }
301
302    fn get_claimed_now_outpoints(outcomes: &[CheckOutcome]) -> Vec<bitcoin::OutPoint> {
303        let mut res = vec![];
304        for outcome in outcomes {
305            if let CheckOutcome::Claimed { outpoint } = outcome {
306                res.push(*outpoint);
307            }
308        }
309
310        res
311    }
312}
313
314/// Query via btc rpc for a history of an address derived with `tweak_idx` and
315/// claim any peg-ins that are ready.
316///
317/// Return a list of [`CheckOutcome`]s for each matching output.
318#[instrument(target = LOG_CLIENT_MODULE_WALLET, skip_all, fields(tweak_idx))]
319async fn check_idx_pegins(
320    data: &WalletClientModuleData,
321    tweak_idx: TweakIdx,
322    btc_rpc: &DynBitcoindRpc,
323    module_rpc: &DynModuleApi,
324    db: &Database,
325    client_ctx: &ClientContext<WalletClientModule>,
326) -> Result<Vec<CheckOutcome>, anyhow::Error> {
327    let current_consensus_block_count = module_rpc.fetch_consensus_block_count().await?;
328    let (script, address, tweak_key, operation_id) = data.derive_peg_in_script(tweak_idx);
329    btc_rpc.watch_script_history(&script).await?;
330
331    let history = btc_rpc.get_script_history(&script).await?;
332
333    debug!(target: LOG_CLIENT_MODULE_WALLET, %address, num_txes=history.len(), "Got history of a peg-in address");
334
335    let mut outcomes = vec![];
336
337    for (transaction, out_idx) in filter_onchain_deposit_outputs(history.into_iter(), &script) {
338        let txid = transaction.compute_txid();
339        let outpoint = bitcoin::OutPoint {
340            txid,
341            vout: out_idx,
342        };
343
344        let claimed_peg_in_key = ClaimedPegInKey {
345            peg_in_index: tweak_idx,
346            btc_out_point: outpoint,
347        };
348
349        if db
350            .begin_transaction_nc()
351            .await
352            .get_value(&claimed_peg_in_key)
353            .await
354            .is_some()
355        {
356            debug!(target: LOG_CLIENT_MODULE_WALLET, %txid, %out_idx, "Already claimed");
357            outcomes.push(CheckOutcome::AlreadyClaimed);
358            continue;
359        }
360        let finality_delay = u64::from(data.cfg.finality_delay);
361
362        let tx_block_count =
363            if let Some(tx_block_height) = btc_rpc.get_tx_block_height(&txid).await? {
364                tx_block_height.saturating_add(1)
365            } else {
366                outcomes.push(CheckOutcome::Pending {
367                    num_blocks_needed: finality_delay,
368                });
369                debug!(target:LOG_CLIENT_MODULE_WALLET, %txid, %out_idx,"In the mempool");
370                continue;
371            };
372
373        let num_blocks_needed = tx_block_count.saturating_sub(current_consensus_block_count);
374
375        if 0 < num_blocks_needed {
376            outcomes.push(CheckOutcome::Pending { num_blocks_needed });
377            debug!(target: LOG_CLIENT_MODULE_WALLET, %txid, %out_idx, %num_blocks_needed, %finality_delay, %tx_block_count, %current_consensus_block_count, "Needs more confirmations");
378            continue;
379        }
380
381        debug!(target: LOG_CLIENT_MODULE_WALLET, %txid, %out_idx, %finality_delay, %tx_block_count, %current_consensus_block_count, "Ready to claim");
382
383        let tx_out_proof = btc_rpc.get_txout_proof(txid).await?;
384        let federation_knows_utxo = module_rpc.is_utxo_confirmed(outpoint).await?;
385
386        claim_peg_in(
387            client_ctx,
388            tweak_idx,
389            tweak_key,
390            &transaction,
391            operation_id,
392            outpoint,
393            tx_out_proof,
394            federation_knows_utxo,
395        )
396        .await?;
397        outcomes.push(CheckOutcome::Claimed { outpoint });
398    }
399    Ok(outcomes)
400}
401
402#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
403async fn claim_peg_in(
404    client_ctx: &ClientContext<WalletClientModule>,
405    tweak_idx: TweakIdx,
406    tweak_key: Keypair,
407    transaction: &bitcoin::Transaction,
408    operation_id: OperationId,
409    out_point: bitcoin::OutPoint,
410    tx_out_proof: TxOutProof,
411    federation_knows_utxo: bool,
412) -> anyhow::Result<()> {
413    /// Returns the claim transactions output range if a claim happened or
414    /// `None` otherwise if the deposit was smaller than the deposit fee.
415    async fn claim_peg_in_inner(
416        client_ctx: &ClientContext<WalletClientModule>,
417        dbtx: &mut DatabaseTransaction<'_>,
418        btc_transaction: &bitcoin::Transaction,
419        out_idx: u32,
420        tweak_key: Keypair,
421        txout_proof: TxOutProof,
422        operation_id: OperationId,
423        federation_knows_utxo: bool,
424    ) -> Option<OutPointRange> {
425        let pegin_proof = PegInProof::new(
426            txout_proof,
427            btc_transaction.clone(),
428            out_idx,
429            tweak_key.public_key(),
430        )
431        .expect("TODO: handle API returning faulty proofs");
432
433        let amount = pegin_proof.tx_output().value.into();
434        let wallet_input = if federation_knows_utxo {
435            WalletInput::new_v1(&pegin_proof)
436        } else {
437            WalletInput::new_v0(pegin_proof)
438        };
439
440        let client_input = ClientInput::<WalletInput> {
441            input: wallet_input,
442            keys: vec![tweak_key],
443            amounts: Amounts::new_bitcoin(amount),
444        };
445
446        if amount <= client_ctx.self_ref().cfg().fee_consensus.peg_in_abs {
447            warn!(target: LOG_CLIENT_MODULE_WALLET, "We won't claim a deposit lower than the deposit fee");
448            return None;
449        }
450
451        let txid = btc_transaction.compute_txid();
452
453        client_ctx
454            .log_event(
455                dbtx,
456                DepositConfirmed {
457                    txid,
458                    out_idx,
459                    amount,
460                },
461            )
462            .await;
463
464        client_ctx
465            .log_event(
466                dbtx,
467                ReceivePaymentEvent {
468                    operation_id,
469                    amount,
470                    txid,
471                },
472            )
473            .await;
474
475        Some(
476            client_ctx
477                .claim_inputs(
478                    dbtx,
479                    ClientInputBundle::new_no_sm(vec![client_input]),
480                    operation_id,
481                )
482                .await
483                .expect("Cannot claim input, additional funding needed"),
484        )
485    }
486
487    let tx_out_proof = &tx_out_proof;
488
489    debug!(target: LOG_CLIENT_MODULE_WALLET, %out_point, "Claiming a peg-in");
490
491    client_ctx
492        .module_db()
493        .autocommit(
494            |dbtx, _| {
495                Box::pin(async {
496                    let maybe_change_range = claim_peg_in_inner(
497                        client_ctx,
498                        dbtx,
499                        transaction,
500                        out_point.vout,
501                        tweak_key,
502                        tx_out_proof.clone(),
503                        operation_id,
504                        federation_knows_utxo,
505                    )
506                    .await;
507
508                    let claimed_pegin_data = if let Some(change_range) = maybe_change_range {
509                        ClaimedPegInData {
510                            claim_txid: change_range.txid(),
511                            change: change_range.into_iter().collect(),
512                        }
513                    } else {
514                        ClaimedPegInData {
515                            claim_txid: TransactionId::from_byte_array([0; 32]),
516                            change: vec![],
517                        }
518                    };
519
520                    dbtx.insert_entry(
521                        &ClaimedPegInKey {
522                            peg_in_index: tweak_idx,
523                            btc_out_point: out_point,
524                        },
525                        &claimed_pegin_data,
526                    )
527                    .await;
528
529                    Ok(())
530                })
531            },
532            Some(100),
533        )
534        .await
535        .map_err(|e| match e {
536            AutocommitError::CommitFailed {
537                last_error,
538                attempts,
539            } => anyhow!("Failed to commit after {attempts} attempts: {last_error}"),
540            AutocommitError::ClosureError { error, .. } => error,
541        })?;
542
543    Ok(())
544}
545
546pub(crate) fn filter_onchain_deposit_outputs<'a>(
547    tx_iter: impl Iterator<Item = bitcoin::Transaction> + 'a,
548    out_script: &'a ScriptBuf,
549) -> impl Iterator<Item = (bitcoin::Transaction, u32)> + 'a {
550    tx_iter.flat_map(move |tx| {
551        tx.output
552            .clone()
553            .into_iter()
554            .enumerate()
555            .filter_map(move |(out_idx, tx_out)| {
556                if &tx_out.script_pubkey == out_script {
557                    Some((tx.clone(), out_idx as u32))
558                } else {
559                    None
560                }
561            })
562    })
563}