fedimint_wallet_client/
pegin_monitor.rs

1use std::cmp;
2use std::time::{Duration, SystemTime};
3
4use bitcoin::ScriptBuf;
5use fedimint_api_client::api::DynModuleApi;
6use fedimint_bitcoind::DynBitcoindRpc;
7use fedimint_client_module::module::{ClientContext, OutPointRange};
8use fedimint_client_module::transaction::{ClientInput, ClientInputBundle};
9use fedimint_core::core::OperationId;
10use fedimint_core::db::{
11    AutocommitError, Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped as _,
12};
13use fedimint_core::envs::is_running_in_test_env;
14use fedimint_core::module::Amounts;
15use fedimint_core::task::sleep;
16use fedimint_core::txoproof::TxOutProof;
17use fedimint_core::util::FmtCompactAnyhow as _;
18use fedimint_core::{BitcoinHash, TransactionId, secp256k1, time};
19use fedimint_logging::LOG_CLIENT_MODULE_WALLET;
20use fedimint_wallet_common::WalletInput;
21use fedimint_wallet_common::txoproof::PegInProof;
22use futures::StreamExt as _;
23use secp256k1::Keypair;
24use tokio::sync::watch;
25use tracing::{debug, instrument, trace, warn};
26
27use crate::api::WalletFederationApi as _;
28use crate::client_db::{
29    ClaimedPegInData, ClaimedPegInKey, PegInTweakIndexData, PegInTweakIndexKey,
30    PegInTweakIndexPrefix, TweakIdx,
31};
32use crate::events::DepositConfirmed;
33use crate::{WalletClientModule, WalletClientModuleData};
34
35/// A helper struct meant to combined data from all addresses/records
36/// into a single struct with all actionable data.
37#[derive(Debug, Clone)]
38struct NextActions {
39    /// Current time
40    now: SystemTime,
41    /// Index keys due for a check
42    due: Vec<(PegInTweakIndexKey, PegInTweakIndexData)>,
43    /// Nearest key that is not due yet
44    next: Option<SystemTime>,
45}
46
47impl NextActions {
48    pub fn new() -> Self {
49        Self {
50            now: time::now(),
51            due: vec![],
52            next: None,
53        }
54    }
55}
56
57impl NextActions {
58    /// Calculate next actions from the database
59    async fn from_db_state(db: &Database) -> Self {
60        db.begin_transaction_nc()
61            .await
62            .find_by_prefix(&PegInTweakIndexPrefix)
63            .await
64            .fold(NextActions::new(), |state, (key, val)| async {
65                state.fold(key, val)
66            })
67            .await
68    }
69
70    /// Combine current state with another record
71    pub fn fold(mut self, key: PegInTweakIndexKey, val: PegInTweakIndexData) -> Self {
72        if let Some(next_check_time) = val.next_check_time {
73            if next_check_time < self.now {
74                self.due.push((key, val));
75            }
76
77            self.next = match self.next {
78                Some(existing) => Some(existing.min(next_check_time)),
79                None => Some(next_check_time),
80            };
81        }
82        self
83    }
84}
85
86/// A deposit monitoring task
87///
88/// On the high level it maintains a list of derived addresses with some info
89/// like when is the next time to check for deposits on them.
90#[allow(clippy::too_many_lines)]
91pub(crate) async fn run_peg_in_monitor(
92    client_ctx: ClientContext<WalletClientModule>,
93    db: Database,
94    btc_rpc: DynBitcoindRpc,
95    module_api: DynModuleApi,
96    data: WalletClientModuleData,
97    pegin_claimed_sender: watch::Sender<()>,
98    mut wakeup_receiver: watch::Receiver<()>,
99) {
100    let min_sleep: Duration = if is_running_in_test_env() {
101        Duration::from_millis(100)
102    } else {
103        Duration::from_secs(30)
104    };
105
106    loop {
107        if let Err(err) = check_for_deposits(
108            &db,
109            &data,
110            &btc_rpc,
111            &module_api,
112            &client_ctx,
113            &pegin_claimed_sender,
114        )
115        .await
116        {
117            warn!(target: LOG_CLIENT_MODULE_WALLET, error = %err.fmt_compact_anyhow(), "Error checking for deposits");
118            continue;
119        }
120
121        let now = time::now();
122        let next_wakeup = NextActions::from_db_state(&db).await.next.unwrap_or_else(||
123            /* for simplicity just wake up every hour, even when there's no need */
124              now + Duration::from_secs(60 * 60));
125        let next_wakeup_duration = next_wakeup
126            .duration_since(now)
127            .unwrap_or_default()
128            .max(min_sleep);
129        debug!(target: LOG_CLIENT_MODULE_WALLET, sleep_msecs=%next_wakeup_duration.as_millis(), "Sleep after completing due checks");
130        tokio::select! {
131            () = sleep(next_wakeup_duration) => {
132                debug!(target: LOG_CLIENT_MODULE_WALLET, "Woken up by a scheduled wakeup");
133            },
134            res = wakeup_receiver.changed() => {
135                debug!(target: LOG_CLIENT_MODULE_WALLET, "Woken up by a signal");
136                if res.is_err() {
137                    debug!(target: LOG_CLIENT_MODULE_WALLET,  "Terminating peg-in monitor");
138                    return;
139                }
140            }
141        }
142    }
143}
144
145async fn check_for_deposits(
146    db: &Database,
147    data: &WalletClientModuleData,
148    btc_rpc: &DynBitcoindRpc,
149    module_api: &DynModuleApi,
150    client_ctx: &ClientContext<WalletClientModule>,
151    pengin_claimed_sender: &watch::Sender<()>,
152) -> Result<(), anyhow::Error> {
153    let due = NextActions::from_db_state(db).await.due;
154    trace!(target: LOG_CLIENT_MODULE_WALLET, ?due, "Checking for deposists");
155    for (due_key, due_val) in due {
156        check_and_claim_idx_pegins(
157            data,
158            due_key,
159            btc_rpc,
160            module_api,
161            db,
162            client_ctx,
163            due_val,
164            pengin_claimed_sender,
165        )
166        .await?;
167    }
168
169    Ok(())
170}
171
172#[allow(clippy::too_many_arguments)]
173async fn check_and_claim_idx_pegins(
174    data: &WalletClientModuleData,
175    due_key: PegInTweakIndexKey,
176    btc_rpc: &DynBitcoindRpc,
177    module_api: &DynModuleApi,
178    db: &Database,
179    client_ctx: &ClientContext<WalletClientModule>,
180    due_val: PegInTweakIndexData,
181    pengin_claimed_sender: &watch::Sender<()>,
182) -> Result<(), anyhow::Error> {
183    let now = time::now();
184    match check_idx_pegins(data, due_key.0, btc_rpc, module_api, db, client_ctx).await {
185        Ok(outcomes) => {
186            let next_check_time = CheckOutcome::retry_delay_vec(&outcomes, due_val.creation_time)
187                .map(|duration| now + duration);
188            db
189                .autocommit(
190                    |dbtx, _| {
191                        Box::pin(async {
192                            let claimed_now = CheckOutcome::get_claimed_now_outpoints(&outcomes);
193
194                            let claimed_sender = pengin_claimed_sender.clone();
195                            dbtx.on_commit(move || {
196                                claimed_sender.send_replace(());
197                            });
198
199                            let peg_in_tweak_index_data = PegInTweakIndexData {
200                                next_check_time,
201                                last_check_time: Some(now),
202                                claimed: [due_val.claimed.clone(), claimed_now].concat(),
203                                ..due_val
204                            };
205                            trace!(
206                                target: LOG_CLIENT_MODULE_WALLET,
207                                tweak_idx=%due_key.0,
208                                due_in_secs=?next_check_time.map(|next_check_time| next_check_time.duration_since(now).unwrap_or_default().as_secs()),
209                                data=?peg_in_tweak_index_data,
210                                "Updating"
211                            );
212                            dbtx
213                                .insert_entry(&due_key, &peg_in_tweak_index_data)
214                                .await;
215
216                            Ok::<_, anyhow::Error>(())
217                        })
218                    },
219                    None,
220                )
221                .await?;
222        }
223        Err(err) => {
224            debug!(target: LOG_CLIENT_MODULE_WALLET, err = %err.fmt_compact_anyhow(), tweak_idx=%due_key.0, "Error checking tweak_idx");
225        }
226    }
227    Ok(())
228}
229
230/// Outcome of checking a single deposit Bitcoin transaction output
231///
232/// For every address there can be multiple outcomes (`Vec<Self>`).
233#[derive(Copy, Clone, Debug)]
234enum CheckOutcome {
235    /// There's a tx pending (needs more confirmation)
236    Pending { num_blocks_needed: u64 },
237    /// A state machine was created to claim the peg-in
238    Claimed { outpoint: bitcoin::OutPoint },
239
240    /// A peg-in transaction was already claimed (state machine created) in the
241    /// past
242    AlreadyClaimed,
243}
244
245impl CheckOutcome {
246    /// Desired retry delay for a single outcome
247    ///
248    /// None means "no need to check anymore".
249    fn retry_delay(self) -> Option<Duration> {
250        match self {
251            // Check again in time proportional to the expected block confirmation time
252            CheckOutcome::Pending { num_blocks_needed } => {
253                if is_running_in_test_env() {
254                    // In tests, we basically mine all blocks right away
255                    Some(Duration::from_millis(1))
256                } else {
257                    Some(Duration::from_secs(60 * num_blocks_needed))
258                }
259            }
260            // Once anything has been claimed, there's no reason to claim again automatically,
261            // and it's undesirable due to privacy reasons.
262            // Users can possibly update the underlying record via other means to force a check on
263            // demand.
264            CheckOutcome::Claimed { .. } | CheckOutcome::AlreadyClaimed => None,
265        }
266    }
267
268    /// Desired retry delay for a bunch of outcomes.
269    ///
270    /// This time is intended to be persisted in the database.
271    ///
272    /// None means "no need to check anymore".
273    fn retry_delay_vec(outcomes: &[CheckOutcome], creation_time: SystemTime) -> Option<Duration> {
274        // If the address was allocated, but nothing was ever received or even detected
275        // on it yet, check again in time proportional to the age of the
276        // address.
277        if outcomes.is_empty() {
278            if is_running_in_test_env() {
279                // When testing we usually send deposits right away, so check more aggressively.
280                return Some(Duration::from_millis(100));
281            }
282            let now = time::now();
283            let age = now.duration_since(creation_time).unwrap_or_default();
284            return Some(age / 10);
285        }
286
287        // The delays is the minimum retry delay.
288        let mut min = None;
289
290        for outcome in outcomes {
291            min = match (min, outcome.retry_delay()) {
292                (None, time) => time,
293                (Some(min), None) => Some(min),
294                (Some(min), Some(time)) => Some(cmp::min(min, time)),
295            };
296        }
297
298        min
299    }
300
301    fn get_claimed_now_outpoints(outcomes: &[CheckOutcome]) -> Vec<bitcoin::OutPoint> {
302        let mut res = vec![];
303        for outcome in outcomes {
304            if let CheckOutcome::Claimed { outpoint } = outcome {
305                res.push(*outpoint);
306            }
307        }
308
309        res
310    }
311}
312
313/// Query via btc rpc for a history of an address derived with `tweak_idx` and
314/// claim any peg-ins that are ready.
315///
316/// Return a list of [`CheckOutcome`]s for each matching output.
317#[instrument(target = LOG_CLIENT_MODULE_WALLET, skip_all, fields(tweak_idx))]
318async fn check_idx_pegins(
319    data: &WalletClientModuleData,
320    tweak_idx: TweakIdx,
321    btc_rpc: &DynBitcoindRpc,
322    module_rpc: &DynModuleApi,
323    db: &Database,
324    client_ctx: &ClientContext<WalletClientModule>,
325) -> Result<Vec<CheckOutcome>, anyhow::Error> {
326    let current_consensus_block_count = module_rpc.fetch_consensus_block_count().await?;
327    let (script, address, tweak_key, operation_id) = data.derive_peg_in_script(tweak_idx);
328    btc_rpc.watch_script_history(&script).await?;
329
330    let history = btc_rpc.get_script_history(&script).await?;
331
332    debug!(target: LOG_CLIENT_MODULE_WALLET, %address, num_txes=history.len(), "Got history of a peg-in address");
333
334    let mut outcomes = vec![];
335
336    for (transaction, out_idx) in filter_onchain_deposit_outputs(history.into_iter(), &script) {
337        let txid = transaction.compute_txid();
338        let outpoint = bitcoin::OutPoint {
339            txid,
340            vout: out_idx,
341        };
342
343        let claimed_peg_in_key = ClaimedPegInKey {
344            peg_in_index: tweak_idx,
345            btc_out_point: outpoint,
346        };
347
348        if db
349            .begin_transaction_nc()
350            .await
351            .get_value(&claimed_peg_in_key)
352            .await
353            .is_some()
354        {
355            debug!(target: LOG_CLIENT_MODULE_WALLET, %txid, %out_idx, "Already claimed");
356            outcomes.push(CheckOutcome::AlreadyClaimed);
357            continue;
358        }
359        let finality_delay = u64::from(data.cfg.finality_delay);
360
361        let tx_block_count =
362            if let Some(tx_block_height) = btc_rpc.get_tx_block_height(&txid).await? {
363                tx_block_height.saturating_add(1)
364            } else {
365                outcomes.push(CheckOutcome::Pending {
366                    num_blocks_needed: finality_delay,
367                });
368                debug!(target:LOG_CLIENT_MODULE_WALLET, %txid, %out_idx,"In the mempool");
369                continue;
370            };
371
372        let num_blocks_needed = tx_block_count.saturating_sub(current_consensus_block_count);
373
374        if 0 < num_blocks_needed {
375            outcomes.push(CheckOutcome::Pending { num_blocks_needed });
376            debug!(target: LOG_CLIENT_MODULE_WALLET, %txid, %out_idx, %num_blocks_needed, %finality_delay, %tx_block_count, %current_consensus_block_count, "Needs more confirmations");
377            continue;
378        }
379
380        debug!(target: LOG_CLIENT_MODULE_WALLET, %txid, %out_idx, %finality_delay, %tx_block_count, %current_consensus_block_count, "Ready to claim");
381
382        let tx_out_proof = btc_rpc.get_txout_proof(txid).await?;
383        let federation_knows_utxo = module_rpc.is_utxo_confirmed(outpoint).await?;
384
385        claim_peg_in(
386            client_ctx,
387            tweak_idx,
388            tweak_key,
389            &transaction,
390            operation_id,
391            outpoint,
392            tx_out_proof,
393            federation_knows_utxo,
394        )
395        .await?;
396        outcomes.push(CheckOutcome::Claimed { outpoint });
397    }
398    Ok(outcomes)
399}
400
401#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
402async fn claim_peg_in(
403    client_ctx: &ClientContext<WalletClientModule>,
404    tweak_idx: TweakIdx,
405    tweak_key: Keypair,
406    transaction: &bitcoin::Transaction,
407    operation_id: OperationId,
408    out_point: bitcoin::OutPoint,
409    tx_out_proof: TxOutProof,
410    federation_knows_utxo: bool,
411) -> anyhow::Result<()> {
412    /// Returns the claim transactions output range if a claim happened or
413    /// `None` otherwise if the deposit was smaller than the deposit fee.
414    async fn claim_peg_in_inner(
415        client_ctx: &ClientContext<WalletClientModule>,
416        dbtx: &mut DatabaseTransaction<'_>,
417        btc_transaction: &bitcoin::Transaction,
418        out_idx: u32,
419        tweak_key: Keypair,
420        txout_proof: TxOutProof,
421        operation_id: OperationId,
422        federation_knows_utxo: bool,
423    ) -> Option<OutPointRange> {
424        let pegin_proof = PegInProof::new(
425            txout_proof,
426            btc_transaction.clone(),
427            out_idx,
428            tweak_key.public_key(),
429        )
430        .expect("TODO: handle API returning faulty proofs");
431
432        let amount = pegin_proof.tx_output().value.into();
433        let wallet_input = if federation_knows_utxo {
434            WalletInput::new_v1(&pegin_proof)
435        } else {
436            WalletInput::new_v0(pegin_proof)
437        };
438
439        let client_input = ClientInput::<WalletInput> {
440            input: wallet_input,
441            keys: vec![tweak_key],
442            amounts: Amounts::new_bitcoin(amount),
443        };
444
445        if amount <= client_ctx.self_ref().cfg().fee_consensus.peg_in_abs {
446            warn!(target: LOG_CLIENT_MODULE_WALLET, "We won't claim a deposit lower than the deposit fee");
447            return None;
448        }
449
450        client_ctx
451            .log_event(
452                dbtx,
453                DepositConfirmed {
454                    txid: btc_transaction.compute_txid(),
455                    out_idx,
456                    amount,
457                },
458            )
459            .await;
460
461        Some(
462            client_ctx
463                .claim_inputs(
464                    dbtx,
465                    ClientInputBundle::new_no_sm(vec![client_input]),
466                    operation_id,
467                )
468                .await
469                .expect("Cannot claim input, additional funding needed"),
470        )
471    }
472
473    let tx_out_proof = &tx_out_proof;
474
475    debug!(target: LOG_CLIENT_MODULE_WALLET, %out_point, "Claiming a peg-in");
476
477    client_ctx
478        .module_db()
479        .autocommit(
480            |dbtx, _| {
481                Box::pin(async {
482                    let maybe_change_range = claim_peg_in_inner(
483                        client_ctx,
484                        dbtx,
485                        transaction,
486                        out_point.vout,
487                        tweak_key,
488                        tx_out_proof.clone(),
489                        operation_id,
490                        federation_knows_utxo,
491                    )
492                    .await;
493
494                    let claimed_pegin_data = if let Some(change_range) = maybe_change_range {
495                        ClaimedPegInData {
496                            claim_txid: change_range.txid(),
497                            change: change_range.into_iter().collect(),
498                        }
499                    } else {
500                        ClaimedPegInData {
501                            claim_txid: TransactionId::from_byte_array([0; 32]),
502                            change: vec![],
503                        }
504                    };
505
506                    dbtx.insert_entry(
507                        &ClaimedPegInKey {
508                            peg_in_index: tweak_idx,
509                            btc_out_point: out_point,
510                        },
511                        &claimed_pegin_data,
512                    )
513                    .await;
514
515                    Ok(())
516                })
517            },
518            Some(100),
519        )
520        .await
521        .map_err(|e| match e {
522            AutocommitError::CommitFailed {
523                last_error,
524                attempts,
525            } => last_error.context(format!("Failed to commit after {attempts} attempts")),
526            AutocommitError::ClosureError { error, .. } => error,
527        })?;
528
529    Ok(())
530}
531
532pub(crate) fn filter_onchain_deposit_outputs<'a>(
533    tx_iter: impl Iterator<Item = bitcoin::Transaction> + 'a,
534    out_script: &'a ScriptBuf,
535) -> impl Iterator<Item = (bitcoin::Transaction, u32)> + 'a {
536    tx_iter.flat_map(move |tx| {
537        tx.output
538            .clone()
539            .into_iter()
540            .enumerate()
541            .filter_map(move |(out_idx, tx_out)| {
542                if &tx_out.script_pubkey == out_script {
543                    Some((tx.clone(), out_idx as u32))
544                } else {
545                    None
546                }
547            })
548    })
549}