fedimint_ln_client/recurring/
mod.rs

1pub mod api;
2
3use std::collections::BTreeMap;
4use std::fmt::{Display, Formatter};
5use std::future::pending;
6use std::str::FromStr;
7use std::sync::Arc;
8use std::time::{Duration, SystemTime};
9
10use anyhow::bail;
11use api::{RecurringdApiError, RecurringdClient};
12use async_stream::stream;
13use bitcoin::hashes::sha256;
14use bitcoin::secp256k1::SECP256K1;
15use fedimint_client_module::OperationId;
16use fedimint_client_module::module::ClientContext;
17use fedimint_client_module::oplog::UpdateStreamOrOutcome;
18use fedimint_core::BitcoinHash;
19use fedimint_core::config::FederationId;
20use fedimint_core::core::ModuleKind;
21use fedimint_core::db::IDatabaseTransactionOpsCoreTyped;
22use fedimint_core::encoding::{Decodable, Encodable};
23use fedimint_core::secp256k1::{Keypair, PublicKey};
24use fedimint_core::task::sleep;
25use fedimint_core::util::{BoxFuture, FmtCompact, FmtCompactAnyhow, SafeUrl};
26use fedimint_derive_secret::ChildId;
27use fedimint_eventlog::{Event, EventKind, EventPersistence};
28use futures::StreamExt;
29use futures::future::select_all;
30use lightning_invoice::Bolt11Invoice;
31use serde::{Deserialize, Serialize};
32use thiserror::Error;
33use tokio::select;
34use tokio::sync::Notify;
35use tracing::{debug, trace, warn};
36
37use crate::db::{RecurringPaymentCodeKey, RecurringPaymentCodeKeyPrefix};
38use crate::receive::LightningReceiveError;
39use crate::{
40    LightningClientModule, LightningClientStateMachines, LightningOperationMeta,
41    LightningOperationMetaVariant, LnReceiveState, tweak_user_key, tweak_user_secret_key,
42};
43
44const LOG_CLIENT_RECURRING: &str = "fm::client::ln::recurring";
45
46impl LightningClientModule {
47    pub async fn register_recurring_payment_code(
48        &self,
49        protocol: RecurringPaymentProtocol,
50        recurringd_api: SafeUrl,
51        meta: &str,
52    ) -> Result<RecurringPaymentCodeEntry, RecurringdApiError> {
53        self.client_ctx
54            .module_db()
55            .autocommit(
56                |dbtx, _| {
57                    let recurringd_api_inner = recurringd_api.clone();
58                    let new_recurring_payment_code = self.new_recurring_payment_code.clone();
59                    Box::pin(async move {
60                        let next_idx = dbtx
61                            .find_by_prefix_sorted_descending(&RecurringPaymentCodeKeyPrefix)
62                            .await
63                            .map(|(k, _)| k.derivation_idx)
64                            .next()
65                            .await
66                            .map_or(0, |last_idx| last_idx + 1);
67
68                        let payment_code_root_key = self.get_payment_code_root_key(next_idx);
69
70                        let recurringd_client =
71                            RecurringdClient::new(&recurringd_api_inner.clone());
72                        let register_response = recurringd_client
73                            .register_recurring_payment_code(
74                                self.client_ctx
75                                    .get_config()
76                                    .await
77                                    .global
78                                    .calculate_federation_id(),
79                                protocol,
80                                crate::recurring::PaymentCodeRootKey(
81                                    payment_code_root_key.public_key(),
82                                ),
83                                meta,
84                            )
85                            .await?;
86
87                        debug!(
88                            target: LOG_CLIENT_RECURRING,
89                            ?register_response,
90                            "Registered recurring payment code"
91                        );
92
93                        let payment_code_entry = RecurringPaymentCodeEntry {
94                            protocol,
95                            root_keypair: payment_code_root_key,
96                            code: register_response.recurring_payment_code,
97                            recurringd_api: recurringd_api_inner,
98                            last_derivation_index: 0,
99                            creation_time: fedimint_core::time::now(),
100                            meta: meta.to_owned(),
101                        };
102                        dbtx.insert_new_entry(
103                            &crate::db::RecurringPaymentCodeKey {
104                                derivation_idx: next_idx,
105                            },
106                            &payment_code_entry,
107                        )
108                        .await;
109                        dbtx.on_commit(move || new_recurring_payment_code.notify_waiters());
110
111                        Ok(payment_code_entry)
112                    })
113                },
114                None,
115            )
116            .await
117            .map_err(|e| match e {
118                fedimint_core::db::AutocommitError::ClosureError { error, .. } => error,
119                fedimint_core::db::AutocommitError::CommitFailed { last_error, .. } => {
120                    panic!("Commit failed: {last_error}")
121                }
122            })
123    }
124
125    pub async fn get_recurring_payment_codes(&self) -> Vec<(u64, RecurringPaymentCodeEntry)> {
126        Self::get_recurring_payment_codes_static(self.client_ctx.module_db()).await
127    }
128
129    pub async fn get_recurring_payment_codes_static(
130        db: &fedimint_core::db::Database,
131    ) -> Vec<(u64, RecurringPaymentCodeEntry)> {
132        assert!(!db.is_global(), "Needs to run in module context");
133        db.begin_transaction_nc()
134            .await
135            .find_by_prefix(&RecurringPaymentCodeKeyPrefix)
136            .await
137            .map(|(idx, entry)| (idx.derivation_idx, entry))
138            .collect()
139            .await
140    }
141
142    fn get_payment_code_root_key(&self, payment_code_registration_idx: u64) -> Keypair {
143        self.recurring_payment_code_secret
144            .child_key(ChildId(payment_code_registration_idx))
145            .to_secp_key(&self.secp)
146    }
147
148    pub async fn scan_recurring_payment_code_invoices(
149        client: ClientContext<Self>,
150        new_code_registered: Arc<Notify>,
151    ) {
152        const QUERY_RETRY_DELAY: Duration = Duration::from_secs(60);
153
154        loop {
155            // We have to register the waiter before querying the DB for recurring payment
156            // code registrations so we don't miss any notification between querying the DB
157            // and registering the notifier.
158            let new_code_registered_future = new_code_registered.notified();
159
160            // We wait for all recurring payment codes to have an invoice in parallel
161            let all_recurring_invoice_futures = Self::get_recurring_payment_codes_static(client.module_db())
162                .await
163                .into_iter()
164                .map(|(payment_code_idx, payment_code)| Box::pin(async move {
165                    let client = RecurringdClient::new(&payment_code.recurringd_api.clone());
166                    let invoice_index = payment_code.last_derivation_index + 1;
167
168                    trace!(
169                        target: LOG_CLIENT_RECURRING,
170                        root_key=?payment_code.root_keypair.public_key(),
171                        %invoice_index,
172                        server=%payment_code.recurringd_api,
173                        "Waiting for new invoice from recurringd"
174                    );
175
176                    match client.await_new_invoice(crate::recurring::PaymentCodeRootKey(payment_code.root_keypair.public_key()), invoice_index).await {
177                        Ok(invoice) => {Ok((payment_code_idx, payment_code, invoice_index, invoice))}
178                        Err(err) => {
179                            debug!(
180                                target: LOG_CLIENT_RECURRING,
181                                err=%err.fmt_compact(),
182                                root_key=?payment_code.root_keypair.public_key(),
183                                invoice_index=%invoice_index,
184                                server=%payment_code.recurringd_api,
185                                "Failed querying recurring payment code invoice, will retry in {:?}",
186                                QUERY_RETRY_DELAY,
187                            );
188                            sleep(QUERY_RETRY_DELAY).await;
189                            Err(err)
190                        }
191                    }
192                }))
193                .collect::<Vec<_>>();
194
195            // TODO: isn't there some shorthand for this
196            let await_any_invoice: BoxFuture<_> = if all_recurring_invoice_futures.is_empty() {
197                Box::pin(pending())
198            } else {
199                Box::pin(select_all(all_recurring_invoice_futures))
200            };
201
202            let (payment_code_idx, _payment_code, invoice_idx, invoice) = select! {
203                (ret, _, _) = await_any_invoice => match ret {
204                    Ok(ret) => ret,
205                    Err(_) => {
206                        continue;
207                    }
208                },
209                () = new_code_registered_future => {
210                    continue;
211                }
212            };
213
214            Self::process_recurring_payment_code_invoice(
215                &client,
216                payment_code_idx,
217                invoice_idx,
218                invoice,
219            )
220            .await;
221
222            // Just in case something goes wrong, we don't want to burn too much CPU
223            sleep(Duration::from_secs(1)).await;
224        }
225    }
226
227    async fn process_recurring_payment_code_invoice(
228        client: &ClientContext<Self>,
229        payment_code_idx: u64,
230        invoice_idx: u64,
231        invoice: lightning_invoice::Bolt11Invoice,
232    ) {
233        // TODO: validate invoice hash etc.
234        let mut dbtx = client.module_db().begin_transaction().await;
235        let old_payment_code_entry = dbtx
236            .get_value(&crate::db::RecurringPaymentCodeKey {
237                derivation_idx: payment_code_idx,
238            })
239            .await
240            .expect("We queried it, so it exists in our DB");
241
242        let new_payment_code_entry = RecurringPaymentCodeEntry {
243            last_derivation_index: invoice_idx,
244            ..old_payment_code_entry.clone()
245        };
246        dbtx.insert_entry(
247            &crate::db::RecurringPaymentCodeKey {
248                derivation_idx: payment_code_idx,
249            },
250            &new_payment_code_entry,
251        )
252        .await;
253
254        // We want to increment the invoice counter even if the operation creation
255        // fails. This should never happen and if it does, we'd rather miss an invoice
256        // than get stuck in an infinite loop.
257        let mut dbtx_nc = dbtx.to_ref_nc();
258        if let Ok(operation_id) = Self::create_recurring_receive_operation(
259            client,
260            &mut dbtx_nc,
261            &old_payment_code_entry,
262            invoice_idx,
263            invoice,
264        )
265        .await
266        {
267            client
268                .log_event(
269                    &mut dbtx_nc,
270                    RecurringInvoiceCreatedEvent {
271                        payment_code_idx,
272                        invoice_idx,
273                        operation_id,
274                    },
275                )
276                .await;
277        } else {
278            debug_assert!(
279                false,
280                "Recurring invoice operation creation failed, this should never happen"
281            );
282        }
283        drop(dbtx_nc);
284
285        dbtx.commit_tx().await;
286    }
287
288    #[allow(clippy::pedantic)]
289    async fn create_recurring_receive_operation(
290        client: &ClientContext<Self>,
291        dbtx: &mut fedimint_core::db::DatabaseTransaction<'_>,
292        payment_code: &RecurringPaymentCodeEntry,
293        invoice_index: u64,
294        invoice: lightning_invoice::Bolt11Invoice,
295    ) -> anyhow::Result<OperationId> {
296        // TODO: pipe secure secp context to here
297        let invoice_key =
298            tweak_user_secret_key(SECP256K1, payment_code.root_keypair, invoice_index);
299
300        let operation_id = OperationId(*invoice.payment_hash().as_ref());
301        debug!(
302            target: LOG_CLIENT_RECURRING,
303            ?operation_id,
304            payment_code_key=?payment_code.root_keypair.public_key(),
305            invoice_index=%invoice_index,
306            "Creating recurring receive operation"
307        );
308        let ln_state =
309            LightningClientStateMachines::Receive(crate::receive::LightningReceiveStateMachine {
310                operation_id,
311                // TODO: technically we want a state that doesn't assume the offer was accepted
312                // since we haven't checked, but for an MVP this is good enough
313                state: crate::receive::LightningReceiveStates::ConfirmedInvoice(
314                    crate::receive::LightningReceiveConfirmedInvoice {
315                        invoice: invoice.clone(),
316                        receiving_key: crate::ReceivingKey::Personal(invoice_key),
317                    },
318                ),
319            });
320
321        if let Err(e) = client
322            .manual_operation_start_dbtx(
323                dbtx,
324                operation_id,
325                "ln",
326                LightningOperationMeta {
327                    variant: LightningOperationMetaVariant::RecurringPaymentReceive(
328                        ReurringPaymentReceiveMeta {
329                            payment_code_id: PaymentCodeRootKey(
330                                payment_code.root_keypair.public_key(),
331                            )
332                            .to_payment_code_id(),
333                            invoice,
334                        },
335                    ),
336                    extra_meta: serde_json::Value::Null,
337                },
338                vec![client.make_dyn_state(ln_state)],
339            )
340            .await
341        {
342            warn!(
343                target: LOG_CLIENT_RECURRING,
344                ?operation_id,
345                payment_code_key=?payment_code.root_keypair.public_key(),
346                invoice_index=%invoice_index,
347                err = %e.fmt_compact_anyhow(),
348                "Failed to create recurring receive operation"
349            );
350            Err(e)
351        } else {
352            Ok(operation_id)
353        }
354    }
355
356    pub async fn subscribe_ln_recurring_receive(
357        &self,
358        operation_id: OperationId,
359    ) -> anyhow::Result<UpdateStreamOrOutcome<LnReceiveState>> {
360        let operation = self.client_ctx.get_operation(operation_id).await?;
361        let LightningOperationMetaVariant::RecurringPaymentReceive(ReurringPaymentReceiveMeta {
362            invoice,
363            ..
364        }) = operation.meta::<LightningOperationMeta>().variant
365        else {
366            bail!("Operation is not a recurring lightning receive")
367        };
368
369        let client_ctx = self.client_ctx.clone();
370
371        Ok(self.client_ctx.outcome_or_updates(operation, operation_id, move || {
372            stream! {
373                let self_ref = client_ctx.self_ref();
374
375                yield LnReceiveState::Created;
376                yield LnReceiveState::WaitingForPayment { invoice: invoice.to_string(), timeout: invoice.expiry_time() };
377
378                match self_ref.await_receive_success(operation_id).await {
379                    Ok(_) => {
380                        yield LnReceiveState::Funded;
381
382                        if let Ok(out_points) = self_ref.await_claim_acceptance(operation_id).await {
383                            yield LnReceiveState::AwaitingFunds;
384
385                            if client_ctx.await_primary_module_outputs(operation_id, out_points).await.is_ok() {
386                                yield LnReceiveState::Claimed;
387                                return;
388                            }
389                        }
390
391                        yield LnReceiveState::Canceled { reason: LightningReceiveError::Rejected };
392                    }
393                    Err(e) => {
394                        yield LnReceiveState::Canceled { reason: e };
395                    }
396                }
397            }
398        }))
399    }
400
401    pub async fn list_recurring_payment_codes(&self) -> BTreeMap<u64, RecurringPaymentCodeEntry> {
402        self.client_ctx
403            .module_db()
404            .begin_transaction_nc()
405            .await
406            .find_by_prefix(&RecurringPaymentCodeKeyPrefix)
407            .await
408            .map(|(idx, entry)| (idx.derivation_idx, entry))
409            .collect()
410            .await
411    }
412
413    pub async fn get_recurring_payment_code(
414        &self,
415        payment_code_idx: u64,
416    ) -> Option<RecurringPaymentCodeEntry> {
417        self.client_ctx
418            .module_db()
419            .begin_transaction_nc()
420            .await
421            .get_value(&RecurringPaymentCodeKey {
422                derivation_idx: payment_code_idx,
423            })
424            .await
425    }
426
427    pub async fn list_recurring_payment_code_invoices(
428        &self,
429        payment_code_idx: u64,
430    ) -> Option<BTreeMap<u64, OperationId>> {
431        let payment_code = self.get_recurring_payment_code(payment_code_idx).await?;
432
433        let operations = (1..=payment_code.last_derivation_index)
434            .map(|invoice_idx: u64| {
435                let invoice_key = tweak_user_key(
436                    SECP256K1,
437                    payment_code.root_keypair.public_key(),
438                    invoice_idx,
439                );
440                let payment_hash =
441                    sha256::Hash::hash(&sha256::Hash::hash(&invoice_key.serialize())[..]);
442                let operation_id = OperationId(*payment_hash.as_ref());
443
444                (invoice_idx, operation_id)
445            })
446            .collect();
447
448        Some(operations)
449    }
450}
451
452#[derive(
453    Debug,
454    Clone,
455    Copy,
456    PartialOrd,
457    Eq,
458    PartialEq,
459    Hash,
460    Encodable,
461    Decodable,
462    Serialize,
463    Deserialize,
464)]
465pub struct PaymentCodeRootKey(pub PublicKey);
466
467#[derive(
468    Debug,
469    Clone,
470    Copy,
471    PartialOrd,
472    Eq,
473    PartialEq,
474    Hash,
475    Encodable,
476    Decodable,
477    Serialize,
478    Deserialize,
479)]
480pub struct PaymentCodeId(sha256::Hash);
481
482impl PaymentCodeRootKey {
483    pub fn to_payment_code_id(&self) -> PaymentCodeId {
484        PaymentCodeId(sha256::Hash::hash(&self.0.serialize()))
485    }
486}
487
488impl Display for PaymentCodeId {
489    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
490        write!(f, "{}", self.0)
491    }
492}
493
494impl FromStr for PaymentCodeId {
495    type Err = anyhow::Error;
496
497    fn from_str(s: &str) -> Result<Self, Self::Err> {
498        Ok(Self(sha256::Hash::from_str(s)?))
499    }
500}
501
502impl Display for PaymentCodeRootKey {
503    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
504        write!(f, "{}", self.0)
505    }
506}
507
508impl FromStr for PaymentCodeRootKey {
509    type Err = anyhow::Error;
510
511    fn from_str(s: &str) -> Result<Self, Self::Err> {
512        Ok(Self(PublicKey::from_str(s)?))
513    }
514}
515
516#[derive(
517    Debug,
518    Clone,
519    Copy,
520    Eq,
521    PartialEq,
522    PartialOrd,
523    Hash,
524    Encodable,
525    Decodable,
526    Serialize,
527    Deserialize,
528)]
529pub enum RecurringPaymentProtocol {
530    LNURL,
531    BOLT12,
532}
533
534#[derive(Debug, Clone, Serialize, Deserialize)]
535pub struct ReurringPaymentReceiveMeta {
536    pub payment_code_id: PaymentCodeId,
537    pub invoice: Bolt11Invoice,
538}
539
540#[derive(Debug, Error)]
541pub enum RecurringPaymentError {
542    #[error("Unsupported protocol: {0:?}")]
543    UnsupportedProtocol(RecurringPaymentProtocol),
544    #[error("Unknown federation ID: {0}")]
545    UnknownFederationId(FederationId),
546    #[error("Unknown payment code: {0:?}")]
547    UnknownPaymentCode(PaymentCodeId),
548    #[error("Unknown lightning receive operation: {0:?}")]
549    UnknownInvoice(OperationId),
550    #[error("No compatible lightning module found")]
551    NoLightningModuleFound,
552    #[error("No gateway found")]
553    NoGatewayFound,
554    #[error("Payment code already exists with different settings: {0:?}")]
555    PaymentCodeAlreadyExists(PaymentCodeRootKey),
556    #[error("Federation already registered: {0}")]
557    FederationAlreadyRegistered(FederationId),
558    #[error("Error joining federation: {0}")]
559    JoiningFederationFailed(anyhow::Error),
560    #[error("Error registering with recurring payment service: {0}")]
561    Other(#[from] anyhow::Error),
562}
563
564#[derive(Debug, Clone, Encodable, Decodable, Serialize)]
565pub struct RecurringPaymentCodeEntry {
566    pub protocol: RecurringPaymentProtocol,
567    pub root_keypair: Keypair,
568    pub code: String,
569    pub recurringd_api: SafeUrl,
570    pub last_derivation_index: u64,
571    pub creation_time: SystemTime,
572    pub meta: String,
573}
574
575/// Event that is fired when a recurring payment code (i.e. LNURL) had an
576/// invoice generated for it.
577///
578/// It only means we saw a new invoice, the payment status has to be tracked
579/// independently. To do so use the `operation_id` and subscribe to the update
580/// stream using [`LightningClientModule::subscribe_ln_recurring_receive`] with
581/// it.
582#[derive(Debug, Clone, Serialize, Deserialize)]
583pub struct RecurringInvoiceCreatedEvent {
584    pub payment_code_idx: u64,
585    pub invoice_idx: u64,
586    pub operation_id: OperationId,
587}
588
589impl Event for RecurringInvoiceCreatedEvent {
590    const MODULE: Option<ModuleKind> = Some(fedimint_ln_common::KIND);
591    const KIND: EventKind = EventKind::from_static("recurring_invoice_created");
592    const PERSISTENCE: EventPersistence = EventPersistence::Persistent;
593}