fedimint_ln_client/recurring/
mod.rs

1pub mod api;
2
3use std::collections::BTreeMap;
4use std::fmt::{Display, Formatter};
5use std::future::pending;
6use std::str::FromStr;
7use std::sync::Arc;
8use std::time::{Duration, SystemTime};
9
10use anyhow::bail;
11use api::{RecurringdApiError, RecurringdClient};
12use async_stream::stream;
13use bitcoin::hashes::sha256;
14use bitcoin::secp256k1::SECP256K1;
15use fedimint_client_module::OperationId;
16use fedimint_client_module::module::ClientContext;
17use fedimint_client_module::oplog::UpdateStreamOrOutcome;
18use fedimint_core::BitcoinHash;
19use fedimint_core::config::FederationId;
20use fedimint_core::db::IDatabaseTransactionOpsCoreTyped;
21use fedimint_core::encoding::{Decodable, Encodable};
22use fedimint_core::secp256k1::{Keypair, PublicKey};
23use fedimint_core::task::sleep;
24use fedimint_core::util::{BoxFuture, FmtCompact, FmtCompactAnyhow, SafeUrl};
25use fedimint_derive_secret::ChildId;
26use futures::StreamExt;
27use futures::future::select_all;
28use lightning_invoice::Bolt11Invoice;
29use serde::{Deserialize, Serialize};
30use thiserror::Error;
31use tokio::select;
32use tokio::sync::Notify;
33use tracing::{debug, trace, warn};
34
35use crate::db::{RecurringPaymentCodeKey, RecurringPaymentCodeKeyPrefix};
36use crate::receive::LightningReceiveError;
37use crate::{
38    LightningClientModule, LightningClientStateMachines, LightningOperationMeta,
39    LightningOperationMetaVariant, LnReceiveState, tweak_user_key, tweak_user_secret_key,
40};
41
42const LOG_CLIENT_RECURRING: &str = "fm::client::ln::recurring";
43
44impl LightningClientModule {
45    pub async fn register_recurring_payment_code(
46        &self,
47        protocol: RecurringPaymentProtocol,
48        recurringd_api: SafeUrl,
49        meta: &str,
50    ) -> Result<RecurringPaymentCodeEntry, RecurringdApiError> {
51        self.client_ctx
52            .module_db()
53            .autocommit(
54                |dbtx, _| {
55                    let recurringd_api_inner = recurringd_api.clone();
56                    let new_recurring_payment_code = self.new_recurring_payment_code.clone();
57                    Box::pin(async move {
58                        let next_idx = dbtx
59                            .find_by_prefix_sorted_descending(&RecurringPaymentCodeKeyPrefix)
60                            .await
61                            .map(|(k, _)| k.derivation_idx)
62                            .next()
63                            .await
64                            .map_or(0, |last_idx| last_idx + 1);
65
66                        let payment_code_root_key = self.get_payment_code_root_key(next_idx);
67
68                        let recurringd_client =
69                            RecurringdClient::new(&recurringd_api_inner.clone());
70                        let register_response = recurringd_client
71                            .register_recurring_payment_code(
72                                self.client_ctx
73                                    .get_config()
74                                    .await
75                                    .global
76                                    .calculate_federation_id(),
77                                protocol,
78                                crate::recurring::PaymentCodeRootKey(
79                                    payment_code_root_key.public_key(),
80                                ),
81                                meta,
82                            )
83                            .await?;
84
85                        debug!(
86                            target: LOG_CLIENT_RECURRING,
87                            ?register_response,
88                            "Registered recurring payment code"
89                        );
90
91                        let payment_code_entry = RecurringPaymentCodeEntry {
92                            protocol,
93                            root_keypair: payment_code_root_key,
94                            code: register_response.recurring_payment_code,
95                            recurringd_api: recurringd_api_inner,
96                            last_derivation_index: 0,
97                            creation_time: fedimint_core::time::now(),
98                            meta: meta.to_owned(),
99                        };
100                        dbtx.insert_new_entry(
101                            &crate::db::RecurringPaymentCodeKey {
102                                derivation_idx: next_idx,
103                            },
104                            &payment_code_entry,
105                        )
106                        .await;
107                        dbtx.on_commit(move || new_recurring_payment_code.notify_waiters());
108
109                        Ok(payment_code_entry)
110                    })
111                },
112                None,
113            )
114            .await
115            .map_err(|e| match e {
116                fedimint_core::db::AutocommitError::ClosureError { error, .. } => error,
117                fedimint_core::db::AutocommitError::CommitFailed { last_error, .. } => {
118                    panic!("Commit failed: {last_error}")
119                }
120            })
121    }
122
123    pub async fn get_recurring_payment_codes(&self) -> Vec<(u64, RecurringPaymentCodeEntry)> {
124        Self::get_recurring_payment_codes_static(self.client_ctx.module_db()).await
125    }
126
127    pub async fn get_recurring_payment_codes_static(
128        db: &fedimint_core::db::Database,
129    ) -> Vec<(u64, RecurringPaymentCodeEntry)> {
130        assert!(!db.is_global(), "Needs to run in module context");
131        db.begin_transaction_nc()
132            .await
133            .find_by_prefix(&RecurringPaymentCodeKeyPrefix)
134            .await
135            .map(|(idx, entry)| (idx.derivation_idx, entry))
136            .collect()
137            .await
138    }
139
140    fn get_payment_code_root_key(&self, payment_code_registration_idx: u64) -> Keypair {
141        self.recurring_payment_code_secret
142            .child_key(ChildId(payment_code_registration_idx))
143            .to_secp_key(&self.secp)
144    }
145
146    pub async fn scan_recurring_payment_code_invoices(
147        client: ClientContext<Self>,
148        new_code_registered: Arc<Notify>,
149    ) {
150        const QUERY_RETRY_DELAY: Duration = Duration::from_secs(60);
151
152        loop {
153            // We have to register the waiter before querying the DB for recurring payment
154            // code registrations so we don't miss any notification between querying the DB
155            // and registering the notifier.
156            let new_code_registered_future = new_code_registered.notified();
157
158            // We wait for all recurring payment codes to have an invoice in parallel
159            let all_recurring_invoice_futures = Self::get_recurring_payment_codes_static(client.module_db())
160                .await
161                .into_iter()
162                .map(|(payment_code_idx, payment_code)| Box::pin(async move {
163                    let client = RecurringdClient::new(&payment_code.recurringd_api.clone());
164                    let invoice_index = payment_code.last_derivation_index + 1;
165
166                    trace!(
167                        target: LOG_CLIENT_RECURRING,
168                        root_key=?payment_code.root_keypair.public_key(),
169                        %invoice_index,
170                        server=%payment_code.recurringd_api,
171                        "Waiting for new invoice from recurringd"
172                    );
173
174                    match client.await_new_invoice(crate::recurring::PaymentCodeRootKey(payment_code.root_keypair.public_key()), invoice_index).await {
175                        Ok(invoice) => {Ok((payment_code_idx, payment_code, invoice_index, invoice))}
176                        Err(err) => {
177                            debug!(
178                                target: LOG_CLIENT_RECURRING,
179                                err=%err.fmt_compact(),
180                                root_key=?payment_code.root_keypair.public_key(),
181                                invoice_index=%invoice_index,
182                                server=%payment_code.recurringd_api,
183                                "Failed querying recurring payment code invoice, will retry in {:?}",
184                                QUERY_RETRY_DELAY,
185                            );
186                            sleep(QUERY_RETRY_DELAY).await;
187                            Err(err)
188                        }
189                    }
190                }))
191                .collect::<Vec<_>>();
192
193            // TODO: isn't there some shorthand for this
194            let await_any_invoice: BoxFuture<_> = if all_recurring_invoice_futures.is_empty() {
195                Box::pin(pending())
196            } else {
197                Box::pin(select_all(all_recurring_invoice_futures))
198            };
199
200            let (payment_code_idx, _payment_code, invoice_idx, invoice) = select! {
201                (ret, _, _) = await_any_invoice => match ret {
202                    Ok(ret) => ret,
203                    Err(_) => {
204                        continue;
205                    }
206                },
207                () = new_code_registered_future => {
208                    continue;
209                }
210            };
211
212            Self::process_recurring_payment_code_invoice(
213                &client,
214                payment_code_idx,
215                invoice_idx,
216                invoice,
217            )
218            .await;
219
220            // Just in case something goes wrong, we don't want to burn too much CPU
221            sleep(Duration::from_secs(1)).await;
222        }
223    }
224
225    async fn process_recurring_payment_code_invoice(
226        client: &ClientContext<Self>,
227        payment_code_idx: u64,
228        invoice_idx: u64,
229        invoice: lightning_invoice::Bolt11Invoice,
230    ) {
231        // TODO: validate invoice hash etc.
232        let mut dbtx = client.module_db().begin_transaction().await;
233        let old_payment_code_entry = dbtx
234            .get_value(&crate::db::RecurringPaymentCodeKey {
235                derivation_idx: payment_code_idx,
236            })
237            .await
238            .expect("We queried it, so it exists in our DB");
239
240        let new_payment_code_entry = RecurringPaymentCodeEntry {
241            last_derivation_index: invoice_idx,
242            ..old_payment_code_entry.clone()
243        };
244        dbtx.insert_entry(
245            &crate::db::RecurringPaymentCodeKey {
246                derivation_idx: payment_code_idx,
247            },
248            &new_payment_code_entry,
249        )
250        .await;
251
252        Self::create_recurring_receive_operation(
253            client,
254            &mut dbtx.to_ref_nc(),
255            &old_payment_code_entry,
256            invoice_idx,
257            invoice,
258        )
259        .await;
260
261        dbtx.commit_tx().await;
262    }
263
264    #[allow(clippy::pedantic)]
265    async fn create_recurring_receive_operation(
266        client: &ClientContext<Self>,
267        dbtx: &mut fedimint_core::db::DatabaseTransaction<'_>,
268        payment_code: &RecurringPaymentCodeEntry,
269        invoice_index: u64,
270        invoice: lightning_invoice::Bolt11Invoice,
271    ) {
272        // TODO: pipe secure secp context to here
273        let invoice_key =
274            tweak_user_secret_key(SECP256K1, payment_code.root_keypair, invoice_index);
275
276        let operation_id = OperationId(*invoice.payment_hash().as_ref());
277        debug!(
278            target: LOG_CLIENT_RECURRING,
279            ?operation_id,
280            payment_code_key=?payment_code.root_keypair.public_key(),
281            invoice_index=%invoice_index,
282            "Creating recurring receive operation"
283        );
284        let ln_state =
285            LightningClientStateMachines::Receive(crate::receive::LightningReceiveStateMachine {
286                operation_id,
287                // TODO: technically we want a state that doesn't assume the offer was accepted
288                // since we haven't checked, but for an MVP this is good enough
289                state: crate::receive::LightningReceiveStates::ConfirmedInvoice(
290                    crate::receive::LightningReceiveConfirmedInvoice {
291                        invoice: invoice.clone(),
292                        receiving_key: crate::ReceivingKey::Personal(invoice_key),
293                    },
294                ),
295            });
296
297        if let Err(e) = client
298            .manual_operation_start_dbtx(
299                dbtx,
300                operation_id,
301                "ln",
302                LightningOperationMeta {
303                    variant: LightningOperationMetaVariant::RecurringPaymentReceive(
304                        ReurringPaymentReceiveMeta {
305                            payment_code_id: PaymentCodeRootKey(
306                                payment_code.root_keypair.public_key(),
307                            )
308                            .to_payment_code_id(),
309                            invoice,
310                        },
311                    ),
312                    extra_meta: serde_json::Value::Null,
313                },
314                vec![client.make_dyn_state(ln_state)],
315            )
316            .await
317        {
318            warn!(
319                target: LOG_CLIENT_RECURRING,
320                ?operation_id,
321                payment_code_key=?payment_code.root_keypair.public_key(),
322                invoice_index=%invoice_index,
323                err = %e.fmt_compact_anyhow(),
324                "Failed to create recurring receive operation"
325            )
326        }
327    }
328
329    pub async fn subscribe_ln_recurring_receive(
330        &self,
331        operation_id: OperationId,
332    ) -> anyhow::Result<UpdateStreamOrOutcome<LnReceiveState>> {
333        let operation = self.client_ctx.get_operation(operation_id).await?;
334        let LightningOperationMetaVariant::RecurringPaymentReceive(ReurringPaymentReceiveMeta {
335            invoice,
336            ..
337        }) = operation.meta::<LightningOperationMeta>().variant
338        else {
339            bail!("Operation is not a recurring lightning receive")
340        };
341
342        let client_ctx = self.client_ctx.clone();
343
344        Ok(self.client_ctx.outcome_or_updates(operation, operation_id, move || {
345            stream! {
346                let self_ref = client_ctx.self_ref();
347
348                yield LnReceiveState::Created;
349                yield LnReceiveState::WaitingForPayment { invoice: invoice.to_string(), timeout: invoice.expiry_time() };
350
351                match self_ref.await_receive_success(operation_id).await {
352                    Ok(_) => {
353                        yield LnReceiveState::Funded;
354
355                        if let Ok(out_points) = self_ref.await_claim_acceptance(operation_id).await {
356                            yield LnReceiveState::AwaitingFunds;
357
358                            if client_ctx.await_primary_module_outputs(operation_id, out_points).await.is_ok() {
359                                yield LnReceiveState::Claimed;
360                                return;
361                            }
362                        }
363
364                        yield LnReceiveState::Canceled { reason: LightningReceiveError::Rejected };
365                    }
366                    Err(e) => {
367                        yield LnReceiveState::Canceled { reason: e };
368                    }
369                }
370            }
371        }))
372    }
373
374    pub async fn list_recurring_payment_codes(&self) -> BTreeMap<u64, RecurringPaymentCodeEntry> {
375        self.client_ctx
376            .module_db()
377            .begin_transaction_nc()
378            .await
379            .find_by_prefix(&RecurringPaymentCodeKeyPrefix)
380            .await
381            .map(|(idx, entry)| (idx.derivation_idx, entry))
382            .collect()
383            .await
384    }
385
386    pub async fn get_recurring_payment_code(
387        &self,
388        payment_code_idx: u64,
389    ) -> Option<RecurringPaymentCodeEntry> {
390        self.client_ctx
391            .module_db()
392            .begin_transaction_nc()
393            .await
394            .get_value(&RecurringPaymentCodeKey {
395                derivation_idx: payment_code_idx,
396            })
397            .await
398    }
399
400    pub async fn list_recurring_payment_code_invoices(
401        &self,
402        payment_code_idx: u64,
403    ) -> Option<BTreeMap<u64, OperationId>> {
404        let payment_code = self.get_recurring_payment_code(payment_code_idx).await?;
405
406        let operations = (1..=payment_code.last_derivation_index)
407            .map(|invoice_idx: u64| {
408                let invoice_key = tweak_user_key(
409                    SECP256K1,
410                    payment_code.root_keypair.public_key(),
411                    invoice_idx,
412                );
413                let payment_hash =
414                    sha256::Hash::hash(&sha256::Hash::hash(&invoice_key.serialize())[..]);
415                let operation_id = OperationId(*payment_hash.as_ref());
416
417                (invoice_idx, operation_id)
418            })
419            .collect();
420
421        Some(operations)
422    }
423}
424
425#[derive(
426    Debug,
427    Clone,
428    Copy,
429    PartialOrd,
430    Eq,
431    PartialEq,
432    Hash,
433    Encodable,
434    Decodable,
435    Serialize,
436    Deserialize,
437)]
438pub struct PaymentCodeRootKey(pub PublicKey);
439
440#[derive(
441    Debug,
442    Clone,
443    Copy,
444    PartialOrd,
445    Eq,
446    PartialEq,
447    Hash,
448    Encodable,
449    Decodable,
450    Serialize,
451    Deserialize,
452)]
453pub struct PaymentCodeId(sha256::Hash);
454
455impl PaymentCodeRootKey {
456    pub fn to_payment_code_id(&self) -> PaymentCodeId {
457        PaymentCodeId(sha256::Hash::hash(&self.0.serialize()))
458    }
459}
460
461impl Display for PaymentCodeId {
462    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
463        write!(f, "{}", self.0)
464    }
465}
466
467impl FromStr for PaymentCodeId {
468    type Err = anyhow::Error;
469
470    fn from_str(s: &str) -> Result<Self, Self::Err> {
471        Ok(Self(sha256::Hash::from_str(s)?))
472    }
473}
474
475impl Display for PaymentCodeRootKey {
476    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
477        write!(f, "{}", self.0)
478    }
479}
480
481impl FromStr for PaymentCodeRootKey {
482    type Err = anyhow::Error;
483
484    fn from_str(s: &str) -> Result<Self, Self::Err> {
485        Ok(Self(PublicKey::from_str(s)?))
486    }
487}
488
489#[derive(
490    Debug,
491    Clone,
492    Copy,
493    Eq,
494    PartialEq,
495    PartialOrd,
496    Hash,
497    Encodable,
498    Decodable,
499    Serialize,
500    Deserialize,
501)]
502pub enum RecurringPaymentProtocol {
503    LNURL,
504    BOLT12,
505}
506
507#[derive(Debug, Clone, Serialize, Deserialize)]
508pub struct ReurringPaymentReceiveMeta {
509    pub payment_code_id: PaymentCodeId,
510    pub invoice: Bolt11Invoice,
511}
512
513#[derive(Debug, Error)]
514pub enum RecurringPaymentError {
515    #[error("Unsupported protocol: {0:?}")]
516    UnsupportedProtocol(RecurringPaymentProtocol),
517    #[error("Unknown federation ID: {0}")]
518    UnknownFederationId(FederationId),
519    #[error("Unknown payment code: {0:?}")]
520    UnknownPaymentCode(PaymentCodeId),
521    #[error("Unknown lightning receive operation: {0:?}")]
522    UnknownInvoice(OperationId),
523    #[error("No compatible lightning module found")]
524    NoLightningModuleFound,
525    #[error("No gateway found")]
526    NoGatewayFound,
527    #[error("Payment code already exists with different settings: {0:?}")]
528    PaymentCodeAlreadyExists(PaymentCodeRootKey),
529    #[error("Federation already registered: {0}")]
530    FederationAlreadyRegistered(FederationId),
531    #[error("Error joining federation: {0}")]
532    JoiningFederationFailed(anyhow::Error),
533    #[error("Error registering with recurring payment service: {0}")]
534    Other(#[from] anyhow::Error),
535}
536
537#[derive(Debug, Clone, Encodable, Decodable, Serialize)]
538pub struct RecurringPaymentCodeEntry {
539    pub protocol: RecurringPaymentProtocol,
540    pub root_keypair: Keypair,
541    pub code: String,
542    pub recurringd_api: SafeUrl,
543    pub last_derivation_index: u64,
544    pub creation_time: SystemTime,
545    pub meta: String,
546}