fedimint_ln_client/recurring/
mod.rs

1pub mod api;
2
3use std::collections::BTreeMap;
4use std::fmt::{Display, Formatter};
5use std::future::pending;
6use std::str::FromStr;
7use std::sync::Arc;
8use std::time::{Duration, SystemTime};
9
10use anyhow::bail;
11use api::{RecurringdApiError, RecurringdClient};
12use async_stream::stream;
13use bitcoin::hashes::sha256;
14use bitcoin::secp256k1::SECP256K1;
15use fedimint_client_module::OperationId;
16use fedimint_client_module::module::ClientContext;
17use fedimint_client_module::oplog::UpdateStreamOrOutcome;
18use fedimint_core::BitcoinHash;
19use fedimint_core::config::FederationId;
20use fedimint_core::db::IDatabaseTransactionOpsCoreTyped;
21use fedimint_core::encoding::{Decodable, Encodable};
22use fedimint_core::secp256k1::{Keypair, PublicKey};
23use fedimint_core::task::sleep;
24use fedimint_core::util::{BoxFuture, FmtCompact, SafeUrl};
25use fedimint_derive_secret::ChildId;
26use futures::StreamExt;
27use futures::future::select_all;
28use lightning_invoice::Bolt11Invoice;
29use serde::{Deserialize, Serialize};
30use thiserror::Error;
31use tokio::select;
32use tokio::sync::Notify;
33use tracing::{debug, trace};
34
35use crate::db::{RecurringPaymentCodeKey, RecurringPaymentCodeKeyPrefix};
36use crate::receive::LightningReceiveError;
37use crate::{
38    LightningClientModule, LightningClientStateMachines, LightningOperationMeta,
39    LightningOperationMetaVariant, LnReceiveState, tweak_user_key, tweak_user_secret_key,
40};
41
42impl LightningClientModule {
43    pub async fn register_recurring_payment_code(
44        &self,
45        protocol: RecurringPaymentProtocol,
46        recurringd_api: SafeUrl,
47        meta: &str,
48    ) -> Result<RecurringPaymentCodeEntry, RecurringdApiError> {
49        self.client_ctx
50            .module_db()
51            .autocommit(
52                |dbtx, _| {
53                    let recurringd_api_inner = recurringd_api.clone();
54                    let new_recurring_payment_code = self.new_recurring_payment_code.clone();
55                    Box::pin(async move {
56                        let next_idx = dbtx
57                            .find_by_prefix_sorted_descending(&RecurringPaymentCodeKeyPrefix)
58                            .await
59                            .map(|(k, _)| k.derivation_idx)
60                            .next()
61                            .await
62                            .map_or(0, |last_idx| last_idx + 1);
63
64                        let payment_code_root_key = self.get_payment_code_root_key(next_idx);
65
66                        let recurringd_client =
67                            RecurringdClient::new(&recurringd_api_inner.clone());
68                        let register_response = recurringd_client
69                            .register_recurring_payment_code(
70                                self.client_ctx
71                                    .get_config()
72                                    .await
73                                    .global
74                                    .calculate_federation_id(),
75                                protocol,
76                                crate::recurring::PaymentCodeRootKey(
77                                    payment_code_root_key.public_key(),
78                                ),
79                                meta,
80                            )
81                            .await?;
82
83                        let payment_code_entry = RecurringPaymentCodeEntry {
84                            protocol,
85                            root_keypair: payment_code_root_key,
86                            code: register_response.recurring_payment_code,
87                            recurringd_api: recurringd_api_inner,
88                            last_derivation_index: 0,
89                            creation_time: fedimint_core::time::now(),
90                            meta: meta.to_owned(),
91                        };
92                        dbtx.insert_new_entry(
93                            &crate::db::RecurringPaymentCodeKey {
94                                derivation_idx: next_idx,
95                            },
96                            &payment_code_entry,
97                        )
98                        .await;
99                        dbtx.on_commit(move || new_recurring_payment_code.notify_waiters());
100
101                        Ok(payment_code_entry)
102                    })
103                },
104                None,
105            )
106            .await
107            .map_err(|e| match e {
108                fedimint_core::db::AutocommitError::ClosureError { error, .. } => error,
109                fedimint_core::db::AutocommitError::CommitFailed { last_error, .. } => {
110                    panic!("Commit failed: {last_error}")
111                }
112            })
113    }
114
115    pub async fn get_recurring_payment_codes(&self) -> Vec<(u64, RecurringPaymentCodeEntry)> {
116        Self::get_recurring_payment_codes_static(self.client_ctx.module_db()).await
117    }
118
119    pub async fn get_recurring_payment_codes_static(
120        db: &fedimint_core::db::Database,
121    ) -> Vec<(u64, RecurringPaymentCodeEntry)> {
122        assert!(!db.is_global(), "Needs to run in module context");
123        db.begin_transaction_nc()
124            .await
125            .find_by_prefix(&RecurringPaymentCodeKeyPrefix)
126            .await
127            .map(|(idx, entry)| (idx.derivation_idx, entry))
128            .collect()
129            .await
130    }
131
132    fn get_payment_code_root_key(&self, payment_code_registration_idx: u64) -> Keypair {
133        self.recurring_payment_code_secret
134            .child_key(ChildId(payment_code_registration_idx))
135            .to_secp_key(&self.secp)
136    }
137
138    pub async fn scan_recurring_payment_code_invoices(
139        client: ClientContext<Self>,
140        new_code_registered: Arc<Notify>,
141    ) {
142        const QUERY_RETRY_DELAY: Duration = Duration::from_secs(60);
143
144        loop {
145            // We have to register the waiter before querying the DB for recurring payment
146            // code registrations so we don't miss any notification between querying the DB
147            // and registering the notifier.
148            let new_code_registered_future = new_code_registered.notified();
149
150            // We wait for all recurring payment codes to have an invoice in parallel
151            let all_recurring_invoice_futures = Self::get_recurring_payment_codes_static(client.module_db())
152                .await
153                .into_iter()
154                .map(|(payment_code_idx, payment_code)| Box::pin(async move {
155                    let client = RecurringdClient::new(&payment_code.recurringd_api.clone());
156                    let invoice_index = payment_code.last_derivation_index + 1;
157
158                    trace!(
159                        root_key=?payment_code.root_keypair.public_key(),
160                        %invoice_index,
161                        server=%payment_code.recurringd_api,
162                        "Waiting for new invoice from recurringd"
163                    );
164
165                    match client.await_new_invoice(crate::recurring::PaymentCodeRootKey(payment_code.root_keypair.public_key()), invoice_index).await {
166                        Ok(invoice) => {Ok((payment_code_idx, payment_code, invoice_index, invoice))}
167                        Err(err) => {
168                            debug!(
169                                err=%err.fmt_compact(),
170                                root_key=?payment_code.root_keypair.public_key(),
171                                invoice_index=%invoice_index,
172                                server=%payment_code.recurringd_api,
173                                "Failed querying recurring payment code invoice, will retry in {:?}",
174                                QUERY_RETRY_DELAY,
175                            );
176                            sleep(QUERY_RETRY_DELAY).await;
177                            Err(err)
178                        }
179                    }
180                }))
181                .collect::<Vec<_>>();
182
183            // TODO: isn't there some shorthand for this
184            let await_any_invoice: BoxFuture<_> = if all_recurring_invoice_futures.is_empty() {
185                Box::pin(pending())
186            } else {
187                Box::pin(select_all(all_recurring_invoice_futures))
188            };
189
190            let (payment_code_idx, _payment_code, invoice_idx, invoice) = select! {
191                (ret, _, _) = await_any_invoice => match ret {
192                    Ok(ret) => ret,
193                    Err(_) => {
194                        continue;
195                    }
196                },
197                () = new_code_registered_future => {
198                    continue;
199                }
200            };
201
202            Self::process_recurring_payment_code_invoice(
203                &client,
204                payment_code_idx,
205                invoice_idx,
206                invoice,
207            )
208            .await;
209
210            // Just in case something goes wrong, we don't want to burn too much CPU
211            sleep(Duration::from_secs(1)).await;
212        }
213    }
214
215    async fn process_recurring_payment_code_invoice(
216        client: &ClientContext<Self>,
217        payment_code_idx: u64,
218        invoice_idx: u64,
219        invoice: lightning_invoice::Bolt11Invoice,
220    ) {
221        // TODO: validate invoice hash etc.
222        let mut dbtx = client.module_db().begin_transaction().await;
223        let old_payment_code_entry = dbtx
224            .get_value(&crate::db::RecurringPaymentCodeKey {
225                derivation_idx: payment_code_idx,
226            })
227            .await
228            .expect("We queried it, so it exists in our DB");
229
230        let new_payment_code_entry = RecurringPaymentCodeEntry {
231            last_derivation_index: invoice_idx,
232            ..old_payment_code_entry.clone()
233        };
234        dbtx.insert_entry(
235            &crate::db::RecurringPaymentCodeKey {
236                derivation_idx: payment_code_idx,
237            },
238            &new_payment_code_entry,
239        )
240        .await;
241
242        Self::create_recurring_receive_operation(
243            client,
244            &mut dbtx.to_ref_nc(),
245            &old_payment_code_entry,
246            invoice_idx,
247            invoice,
248        )
249        .await;
250
251        dbtx.commit_tx().await;
252    }
253
254    #[allow(clippy::pedantic)]
255    async fn create_recurring_receive_operation(
256        client: &ClientContext<Self>,
257        dbtx: &mut fedimint_core::db::DatabaseTransaction<'_>,
258        payment_code: &RecurringPaymentCodeEntry,
259        invoice_index: u64,
260        invoice: lightning_invoice::Bolt11Invoice,
261    ) {
262        // TODO: pipe secure secp context to here
263        let invoice_key =
264            tweak_user_secret_key(SECP256K1, payment_code.root_keypair, invoice_index);
265
266        // TODO: use proper operation id, what do we use for LN normally? Preimage I
267        // guess?
268        let operation_id = OperationId(*invoice.payment_hash().as_ref());
269        debug!(
270            ?operation_id,
271            payment_code_key=?payment_code.root_keypair.public_key(),
272            invoice_index=%invoice_index,
273            "Creating recurring receive operation"
274        );
275        let ln_state =
276            LightningClientStateMachines::Receive(crate::receive::LightningReceiveStateMachine {
277                operation_id,
278                // TODO: technically we want a state that doesn't assume the offer was accepted
279                // since we haven't checked, but for an MVP this is good enough
280                state: crate::receive::LightningReceiveStates::ConfirmedInvoice(
281                    crate::receive::LightningReceiveConfirmedInvoice {
282                        invoice: invoice.clone(),
283                        receiving_key: crate::ReceivingKey::Personal(invoice_key),
284                    },
285                ),
286            });
287
288        client
289            .manual_operation_start_dbtx(
290                dbtx,
291                operation_id,
292                "ln",
293                LightningOperationMeta {
294                    variant: LightningOperationMetaVariant::RecurringPaymentReceive(
295                        ReurringPaymentReceiveMeta {
296                            payment_code_id: PaymentCodeRootKey(
297                                payment_code.root_keypair.public_key(),
298                            )
299                            .to_payment_code_id(),
300                            invoice,
301                        },
302                    ),
303                    extra_meta: serde_json::Value::Null,
304                },
305                vec![client.make_dyn_state(ln_state)],
306            )
307            .await
308            .expect("OperationId is random");
309    }
310
311    pub async fn subscribe_ln_recurring_receive(
312        &self,
313        operation_id: OperationId,
314    ) -> anyhow::Result<UpdateStreamOrOutcome<LnReceiveState>> {
315        let operation = self.client_ctx.get_operation(operation_id).await?;
316        let LightningOperationMetaVariant::RecurringPaymentReceive(ReurringPaymentReceiveMeta {
317            invoice,
318            ..
319        }) = operation.meta::<LightningOperationMeta>().variant
320        else {
321            bail!("Operation is not a recurring lightning receive")
322        };
323
324        let client_ctx = self.client_ctx.clone();
325
326        Ok(self.client_ctx.outcome_or_updates(operation, operation_id, move || {
327            stream! {
328                let self_ref = client_ctx.self_ref();
329
330                yield LnReceiveState::Created;
331                yield LnReceiveState::WaitingForPayment { invoice: invoice.to_string(), timeout: invoice.expiry_time() };
332
333                match self_ref.await_receive_success(operation_id).await {
334                    Ok(_) => {
335                        yield LnReceiveState::Funded;
336
337                        if let Ok(out_points) = self_ref.await_claim_acceptance(operation_id).await {
338                            yield LnReceiveState::AwaitingFunds;
339
340                            if client_ctx.await_primary_module_outputs(operation_id, out_points).await.is_ok() {
341                                yield LnReceiveState::Claimed;
342                                return;
343                            }
344                        }
345
346                        yield LnReceiveState::Canceled { reason: LightningReceiveError::Rejected };
347                    }
348                    Err(e) => {
349                        yield LnReceiveState::Canceled { reason: e };
350                    }
351                }
352            }
353        }))
354    }
355
356    pub async fn list_recurring_payment_codes(&self) -> BTreeMap<u64, RecurringPaymentCodeEntry> {
357        self.client_ctx
358            .module_db()
359            .begin_transaction_nc()
360            .await
361            .find_by_prefix(&RecurringPaymentCodeKeyPrefix)
362            .await
363            .map(|(idx, entry)| (idx.derivation_idx, entry))
364            .collect()
365            .await
366    }
367
368    pub async fn get_recurring_payment_code(
369        &self,
370        payment_code_idx: u64,
371    ) -> Option<RecurringPaymentCodeEntry> {
372        self.client_ctx
373            .module_db()
374            .begin_transaction_nc()
375            .await
376            .get_value(&RecurringPaymentCodeKey {
377                derivation_idx: payment_code_idx,
378            })
379            .await
380    }
381
382    pub async fn list_recurring_payment_code_invoices(
383        &self,
384        payment_code_idx: u64,
385    ) -> Option<BTreeMap<u64, OperationId>> {
386        let payment_code = self.get_recurring_payment_code(payment_code_idx).await?;
387
388        let operations = (1..=payment_code.last_derivation_index)
389            .map(|invoice_idx: u64| {
390                let invoice_key = tweak_user_key(
391                    SECP256K1,
392                    payment_code.root_keypair.public_key(),
393                    invoice_idx,
394                );
395                let payment_hash =
396                    sha256::Hash::hash(&sha256::Hash::hash(&invoice_key.serialize())[..]);
397                let operation_id = OperationId(*payment_hash.as_ref());
398
399                (invoice_idx, operation_id)
400            })
401            .collect();
402
403        Some(operations)
404    }
405}
406
407#[derive(
408    Debug,
409    Clone,
410    Copy,
411    PartialOrd,
412    Eq,
413    PartialEq,
414    Hash,
415    Encodable,
416    Decodable,
417    Serialize,
418    Deserialize,
419)]
420pub struct PaymentCodeRootKey(pub PublicKey);
421
422#[derive(
423    Debug,
424    Clone,
425    Copy,
426    PartialOrd,
427    Eq,
428    PartialEq,
429    Hash,
430    Encodable,
431    Decodable,
432    Serialize,
433    Deserialize,
434)]
435pub struct PaymentCodeId(sha256::Hash);
436
437impl PaymentCodeRootKey {
438    pub fn to_payment_code_id(&self) -> PaymentCodeId {
439        PaymentCodeId(sha256::Hash::hash(&self.0.serialize()))
440    }
441}
442
443impl Display for PaymentCodeId {
444    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
445        write!(f, "{}", self.0)
446    }
447}
448
449impl FromStr for PaymentCodeId {
450    type Err = anyhow::Error;
451
452    fn from_str(s: &str) -> Result<Self, Self::Err> {
453        Ok(Self(sha256::Hash::from_str(s)?))
454    }
455}
456
457impl Display for PaymentCodeRootKey {
458    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
459        write!(f, "{}", self.0)
460    }
461}
462
463impl FromStr for PaymentCodeRootKey {
464    type Err = anyhow::Error;
465
466    fn from_str(s: &str) -> Result<Self, Self::Err> {
467        Ok(Self(PublicKey::from_str(s)?))
468    }
469}
470
471#[derive(
472    Debug,
473    Clone,
474    Copy,
475    Eq,
476    PartialEq,
477    PartialOrd,
478    Hash,
479    Encodable,
480    Decodable,
481    Serialize,
482    Deserialize,
483)]
484pub enum RecurringPaymentProtocol {
485    LNURL,
486    BOLT12,
487}
488
489#[derive(Debug, Clone, Serialize, Deserialize)]
490pub struct ReurringPaymentReceiveMeta {
491    pub payment_code_id: PaymentCodeId,
492    pub invoice: Bolt11Invoice,
493}
494
495#[derive(Debug, Error)]
496pub enum RecurringPaymentError {
497    #[error("Unsupported protocol: {0:?}")]
498    UnsupportedProtocol(RecurringPaymentProtocol),
499    #[error("Unknown federation ID: {0}")]
500    UnknownFederationId(FederationId),
501    #[error("Unknown payment code: {0:?}")]
502    UnknownPaymentCode(PaymentCodeId),
503    #[error("No compatible lightning module found")]
504    NoLightningModuleFound,
505    #[error("No gateway found")]
506    NoGatewayFound,
507    #[error("Payment code already exists with different settings: {0:?}")]
508    PaymentCodeAlreadyExists(PaymentCodeRootKey),
509    #[error("Federation already registered: {0}")]
510    FederationAlreadyRegistered(FederationId),
511    #[error("Error joining federation: {0}")]
512    JoiningFederationFailed(anyhow::Error),
513    #[error("Error registering with recurring payment service: {0}")]
514    Other(#[from] anyhow::Error),
515}
516
517#[derive(Debug, Clone, Encodable, Decodable, Serialize)]
518pub struct RecurringPaymentCodeEntry {
519    pub protocol: RecurringPaymentProtocol,
520    pub root_keypair: Keypair,
521    pub code: String,
522    pub recurringd_api: SafeUrl,
523    pub last_derivation_index: u64,
524    pub creation_time: SystemTime,
525    pub meta: String,
526}