fedimint_recurringd/
lib.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::anyhow;
6use fedimint_api_client::api::net::Connector;
7use fedimint_client::{Client, ClientHandleArc, ClientModule, ClientModuleInstance};
8use fedimint_core::config::FederationId;
9use fedimint_core::core::{ModuleKind, OperationId};
10use fedimint_core::db::{
11    AutocommitResultExt, Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped,
12    IRawDatabase,
13};
14use fedimint_core::encoding::{Decodable, Encodable};
15use fedimint_core::invite_code::InviteCode;
16use fedimint_core::secp256k1::SECP256K1;
17use fedimint_core::secp256k1::hashes::sha256;
18use fedimint_core::task::timeout;
19use fedimint_core::util::SafeUrl;
20use fedimint_core::{Amount, BitcoinHash};
21use fedimint_derive_secret::DerivableSecret;
22use fedimint_ln_client::recurring::{
23    PaymentCodeId, PaymentCodeRootKey, RecurringPaymentError, RecurringPaymentProtocol,
24};
25use fedimint_ln_client::{
26    LightningClientInit, LightningClientModule, LightningOperationMeta,
27    LightningOperationMetaVariant, LnReceiveState, tweak_user_key,
28};
29use fedimint_mint_client::MintClientInit;
30use futures::StreamExt;
31use lightning_invoice::{Bolt11Invoice, Bolt11InvoiceDescription, Sha256};
32use lnurl::Tag;
33use lnurl::lnurl::LnUrl;
34use lnurl::pay::PayResponse;
35use serde::{Deserialize, Serialize};
36use tokio::sync::{Notify, RwLock};
37use tracing::{info, warn};
38
39use crate::db::{
40    FederationDbPrefix, PaymentCodeEntry, PaymentCodeInvoiceEntry, PaymentCodeInvoiceKey,
41    PaymentCodeKey, PaymentCodeNextInvoiceIndexKey, PaymentCodeVariant, SchemaVersionKey,
42    load_federation_client_databases, open_client_db, try_add_federation_database,
43};
44
45mod db;
46
47#[derive(Clone)]
48pub struct RecurringInvoiceServer {
49    db: Database,
50    clients: Arc<RwLock<HashMap<FederationId, ClientHandleArc>>>,
51    invoice_generated: Arc<Notify>,
52    base_url: SafeUrl,
53}
54
55impl RecurringInvoiceServer {
56    pub async fn new(db: impl IRawDatabase + 'static, base_url: SafeUrl) -> anyhow::Result<Self> {
57        let db = Database::new(db, Default::default());
58
59        let mut clients = HashMap::<_, ClientHandleArc>::new();
60
61        for (federation_id, db) in load_federation_client_databases(&db).await {
62            let mut client_builder = Client::builder(db).await?;
63            client_builder.with_module(LightningClientInit::default());
64            client_builder.with_module(MintClientInit);
65            client_builder.with_primary_module_kind(ModuleKind::from_static_str("mint"));
66            let client = client_builder
67                .open(fedimint_client::RootSecret::StandardDoubleDerive(
68                    Self::default_secret(),
69                ))
70                .await?;
71            clients.insert(federation_id, Arc::new(client));
72        }
73
74        let slf = Self {
75            db: db.clone(),
76            clients: Arc::new(RwLock::new(clients)),
77            invoice_generated: Arc::new(Default::default()),
78            base_url,
79        };
80
81        slf.run_db_migrations().await;
82
83        Ok(slf)
84    }
85
86    /// We don't want to hold any money or sign anything ourselves, we only use
87    /// the client with externally supplied key material and to track
88    /// ongoing progress of other users' receives.
89    fn default_secret() -> DerivableSecret {
90        DerivableSecret::new_root(&[], &[])
91    }
92
93    pub async fn register_federation(
94        &self,
95        invite_code: &InviteCode,
96    ) -> Result<FederationId, RecurringPaymentError> {
97        let federation_id = invite_code.federation_id();
98        info!("Registering federation {}", federation_id);
99
100        // We lock to prevent parallel join attempts
101        // TODO: lock per federation
102        let mut clients = self.clients.write().await;
103        if clients.contains_key(&federation_id) {
104            return Err(RecurringPaymentError::FederationAlreadyRegistered(
105                federation_id,
106            ));
107        }
108
109        // We don't know if joining will succeed or be interrupted. We use a random DB
110        // prefix to initialize the client and only write the prefix to the DB if that
111        // succeeds. If it fails we end up with some orphaned data in the DB, if it ever
112        // becomes a problem we can clean it up later.
113        let client_db_prefix = FederationDbPrefix::random();
114        let client_db = open_client_db(&self.db, client_db_prefix);
115
116        match Self::join_federation_static(client_db, invite_code).await {
117            Ok(client) => {
118                try_add_federation_database(&self.db, federation_id, client_db_prefix)
119                    .await
120                    .expect("We hold a global lock, no parallel joining can happen");
121                clients.insert(federation_id, client);
122                Ok(federation_id)
123            }
124            Err(e) => {
125                // TODO: clean up DB?
126                Err(e)
127            }
128        }
129    }
130
131    async fn join_federation_static(
132        client_db: Database,
133        invite_code: &InviteCode,
134    ) -> Result<ClientHandleArc, RecurringPaymentError> {
135        let mut client_builder = Client::builder(client_db)
136            .await
137            .map_err(RecurringPaymentError::JoiningFederationFailed)?;
138
139        client_builder.with_connector(Connector::default());
140        client_builder.with_module(LightningClientInit::default());
141        client_builder.with_module(MintClientInit);
142        client_builder.with_primary_module_kind(ModuleKind::from_static_str("mint"));
143
144        let client = client_builder
145            .preview(invite_code)
146            .await?
147            .join(fedimint_client::RootSecret::StandardDoubleDerive(
148                Self::default_secret(),
149            ))
150            .await
151            .map_err(RecurringPaymentError::JoiningFederationFailed)?;
152        Ok(Arc::new(client))
153    }
154
155    pub async fn register_recurring_payment_code(
156        &self,
157        federation_id: FederationId,
158        payment_code_root_key: PaymentCodeRootKey,
159        protocol: RecurringPaymentProtocol,
160        meta: &str,
161    ) -> Result<String, RecurringPaymentError> {
162        // TODO: support BOLT12
163        if protocol != RecurringPaymentProtocol::LNURL {
164            return Err(RecurringPaymentError::UnsupportedProtocol(protocol));
165        }
166
167        // Ensure the federation is supported
168        self.get_federation_client(federation_id).await?;
169
170        let payment_code = self.create_lnurl(payment_code_root_key.to_payment_code_id());
171        let payment_code_entry = PaymentCodeEntry {
172            root_key: payment_code_root_key,
173            federation_id,
174            protocol,
175            payment_code: payment_code.clone(),
176            variant: PaymentCodeVariant::Lnurl {
177                meta: meta.to_owned(),
178            },
179        };
180
181        let mut dbtx = self.db.begin_transaction().await;
182        if let Some(existing_code) = dbtx
183            .insert_entry(
184                &PaymentCodeKey {
185                    payment_code_id: payment_code_root_key.to_payment_code_id(),
186                },
187                &payment_code_entry,
188            )
189            .await
190        {
191            if existing_code != payment_code_entry {
192                return Err(RecurringPaymentError::PaymentCodeAlreadyExists(
193                    payment_code_root_key,
194                ));
195            }
196
197            dbtx.ignore_uncommitted();
198            return Ok(payment_code);
199        }
200
201        dbtx.insert_new_entry(
202            &PaymentCodeNextInvoiceIndexKey {
203                payment_code_id: payment_code_root_key.to_payment_code_id(),
204            },
205            &0,
206        )
207        .await;
208        dbtx.commit_tx_result().await?;
209
210        Ok(payment_code)
211    }
212
213    fn create_lnurl(&self, payment_code_id: PaymentCodeId) -> String {
214        let lnurl = LnUrl::from_url(format!(
215            "{}lnv1/paycodes/{}",
216            self.base_url, payment_code_id
217        ));
218        lnurl.encode()
219    }
220
221    pub async fn lnurl_pay(
222        &self,
223        payment_code_id: PaymentCodeId,
224    ) -> Result<PayResponse, RecurringPaymentError> {
225        let payment_code = self.get_payment_code(payment_code_id).await?;
226        let PaymentCodeVariant::Lnurl { meta } = payment_code.variant;
227
228        Ok(PayResponse {
229            callback: format!("{}lnv1/paycodes/{}/invoice", self.base_url, payment_code_id),
230            max_sendable: 100000000000,
231            min_sendable: 1,
232            tag: Tag::PayRequest,
233            metadata: meta,
234            comment_allowed: None,
235            allows_nostr: None,
236            nostr_pubkey: None,
237        })
238    }
239
240    pub async fn lnurl_invoice(
241        &self,
242        payment_code_id: PaymentCodeId,
243        amount: Amount,
244    ) -> Result<LNURLPayInvoice, RecurringPaymentError> {
245        let (operation_id, federation_id, invoice) =
246            self.create_bolt11_invoice(payment_code_id, amount).await?;
247        Ok(LNURLPayInvoice {
248            pr: invoice.to_string(),
249            verify: format!(
250                "{}lnv1/verify/{}/{}",
251                self.base_url,
252                federation_id,
253                operation_id.fmt_full()
254            ),
255        })
256    }
257
258    async fn create_bolt11_invoice(
259        &self,
260        payment_code_id: PaymentCodeId,
261        amount: Amount,
262    ) -> Result<(OperationId, FederationId, Bolt11Invoice), RecurringPaymentError> {
263        // Invoices are valid for one day by default, might become dynamic with BOLT12
264        // support
265        const DEFAULT_EXPIRY_TIME: u64 = 60 * 60 * 24;
266
267        let payment_code = self.get_payment_code(payment_code_id).await?;
268
269        let federation_client = self
270            .get_federation_client(payment_code.federation_id)
271            .await?;
272
273        let (operation_id, invoice) = self
274            .db
275            .autocommit(
276                |dbtx, _| {
277                    let federation_client = federation_client.clone();
278                    let payment_code = payment_code.clone();
279                    Box::pin(async move {
280                        let invoice_index = self
281                            .get_next_invoice_index(&mut dbtx.to_ref_nc(), payment_code_id)
282                            .await;
283
284                        // Check if the invoice index was already used in an aborted call to this
285                        // fn. If so:
286                        //   1. Save the previously generated invoice. We don't want to reuse it
287                        //      since it may be expired and in the future may contain call-specific
288                        //      data, but also want to allow the client to sync past it.
289                        //   2. Increment the invoice index to generate a new invoice since re-using
290                        //      the same index wouldn't work (operation id reuse is forbidden).
291                        let initial_operation_id =
292                            operation_id_from_user_key(payment_code.root_key, invoice_index);
293                        let invoice_index = if let Some(invoice) =
294                            Self::check_if_invoice_exists(&federation_client, initial_operation_id)
295                                .await
296                        {
297                            self.save_bolt11_invoice(
298                                dbtx,
299                                initial_operation_id,
300                                payment_code_id,
301                                invoice_index,
302                                invoice,
303                            )
304                            .await;
305                            self.get_next_invoice_index(&mut dbtx.to_ref_nc(), payment_code_id)
306                                .await
307                        } else {
308                            invoice_index
309                        };
310
311                        // This is where the main part starts: generate the invoice and save it to
312                        // the DB
313                        let federation_client_ln_module = federation_client.get_ln_module()?;
314                        let gateway = federation_client_ln_module
315                            .get_gateway(None, false)
316                            .await?
317                            .ok_or(RecurringPaymentError::NoGatewayFound)?;
318
319                        let lnurl_meta = match payment_code.variant {
320                            PaymentCodeVariant::Lnurl { meta } => meta,
321                        };
322                        let meta_hash = Sha256(sha256::Hash::hash(lnurl_meta.as_bytes()));
323                        let description = Bolt11InvoiceDescription::Hash(meta_hash);
324
325                        // TODO: ideally creating the invoice would take a dbtx as argument so we
326                        // don't have to do the "check if invoice already exists" dance
327                        let (operation_id, invoice, _preimage) = federation_client_ln_module
328                            .create_bolt11_invoice_for_user_tweaked(
329                                amount,
330                                description,
331                                Some(DEFAULT_EXPIRY_TIME),
332                                payment_code.root_key.0,
333                                invoice_index,
334                                serde_json::Value::Null,
335                                Some(gateway),
336                            )
337                            .await?;
338
339                        self.save_bolt11_invoice(
340                            dbtx,
341                            operation_id,
342                            payment_code_id,
343                            invoice_index,
344                            invoice.clone(),
345                        )
346                        .await;
347
348                        Result::<_, anyhow::Error>::Ok((operation_id, invoice))
349                    })
350                },
351                None,
352            )
353            .await
354            .unwrap_autocommit()?;
355
356        await_invoice_confirmed(&federation_client.get_ln_module()?, operation_id).await?;
357
358        Ok((operation_id, federation_client.federation_id(), invoice))
359    }
360
361    async fn save_bolt11_invoice(
362        &self,
363        dbtx: &mut DatabaseTransaction<'_>,
364        operation_id: OperationId,
365        payment_code_id: PaymentCodeId,
366        invoice_index: u64,
367        invoice: Bolt11Invoice,
368    ) {
369        dbtx.insert_new_entry(
370            &PaymentCodeInvoiceKey {
371                payment_code_id,
372                index: invoice_index,
373            },
374            &PaymentCodeInvoiceEntry {
375                operation_id,
376                invoice: PaymentCodeInvoice::Bolt11(invoice.clone()),
377            },
378        )
379        .await;
380
381        let invoice_generated_notifier = self.invoice_generated.clone();
382        dbtx.on_commit(move || {
383            invoice_generated_notifier.notify_waiters();
384        });
385    }
386
387    async fn check_if_invoice_exists(
388        federation_client: &ClientHandleArc,
389        operation_id: OperationId,
390    ) -> Option<Bolt11Invoice> {
391        let operation = federation_client
392            .operation_log()
393            .get_operation(operation_id)
394            .await?;
395
396        assert_eq!(
397            operation.operation_module_kind(),
398            LightningClientModule::kind().as_str()
399        );
400
401        let LightningOperationMetaVariant::Receive { invoice, .. } =
402            operation.meta::<LightningOperationMeta>().variant
403        else {
404            panic!(
405                "Unexpected operation meta variant: {:?}",
406                operation.meta::<LightningOperationMeta>().variant
407            );
408        };
409
410        Some(invoice)
411    }
412
413    async fn get_federation_client(
414        &self,
415        federation_id: FederationId,
416    ) -> Result<ClientHandleArc, RecurringPaymentError> {
417        self.clients
418            .read()
419            .await
420            .get(&federation_id)
421            .cloned()
422            .ok_or(RecurringPaymentError::UnknownFederationId(federation_id))
423    }
424
425    pub async fn await_invoice_index_generated(
426        &self,
427        payment_code_id: PaymentCodeId,
428        invoice_index: u64,
429    ) -> Result<PaymentCodeInvoiceEntry, RecurringPaymentError> {
430        self.get_payment_code(payment_code_id).await?;
431
432        let mut notified = self.invoice_generated.notified();
433        loop {
434            let mut dbtx = self.db.begin_transaction_nc().await;
435            if let Some(invoice_entry) = dbtx
436                .get_value(&PaymentCodeInvoiceKey {
437                    payment_code_id,
438                    index: invoice_index,
439                })
440                .await
441            {
442                break Ok(invoice_entry);
443            };
444
445            notified.await;
446            notified = self.invoice_generated.notified();
447        }
448    }
449
450    async fn get_next_invoice_index(
451        &self,
452        dbtx: &mut DatabaseTransaction<'_>,
453        payment_code_id: PaymentCodeId,
454    ) -> u64 {
455        let next_index = dbtx
456            .get_value(&PaymentCodeNextInvoiceIndexKey { payment_code_id })
457            .await
458            .map(|index| index + 1)
459            .unwrap_or(0);
460        dbtx.insert_entry(
461            &PaymentCodeNextInvoiceIndexKey { payment_code_id },
462            &next_index,
463        )
464        .await;
465
466        next_index
467    }
468
469    pub async fn list_federations(&self) -> Vec<FederationId> {
470        self.clients.read().await.keys().cloned().collect()
471    }
472
473    async fn get_payment_code(
474        &self,
475        payment_code_id: PaymentCodeId,
476    ) -> Result<PaymentCodeEntry, RecurringPaymentError> {
477        self.db
478            .begin_transaction_nc()
479            .await
480            .get_value(&PaymentCodeKey { payment_code_id })
481            .await
482            .ok_or(RecurringPaymentError::UnknownPaymentCode(payment_code_id))
483    }
484
485    /// Returns if an invoice has been paid yet. To avoid DB indirection and
486    /// since the URLs would be similarly long either way we identify
487    /// invoices by federation id and operation id instead of the payment
488    /// code. This function is the basis of `recurringd`'s [LUD-21]
489    /// implementation that allows clients to verify if a given invoice they
490    /// generated using the LNURL has been paid yet.
491    ///
492    /// [LUD-21]: https://github.com/lnurl/luds/blob/luds/21.md
493    pub async fn verify_invoice_paid(
494        &self,
495        federation_id: FederationId,
496        operation_id: OperationId,
497    ) -> Result<InvoiceStatus, RecurringPaymentError> {
498        let federation_client = self.get_federation_client(federation_id).await?;
499
500        // Unfortunately LUD-21 wants us to return the invoice again, so we have to
501        // fetch it from the operation meta.
502        let invoice = {
503            let operation = federation_client
504                .operation_log()
505                .get_operation(operation_id)
506                .await
507                .ok_or(RecurringPaymentError::UnknownInvoice(operation_id))?;
508
509            if operation.operation_module_kind() != LightningClientModule::kind().as_str() {
510                return Err(RecurringPaymentError::UnknownInvoice(operation_id));
511            }
512
513            let LightningOperationMetaVariant::Receive { invoice, .. } =
514                operation.meta::<LightningOperationMeta>().variant
515            else {
516                return Err(RecurringPaymentError::UnknownInvoice(operation_id));
517            };
518
519            invoice
520        };
521
522        let ln_module = federation_client
523            .get_first_module::<LightningClientModule>()
524            .map_err(|e| {
525                warn!("No compatible lightning module found {e}");
526                RecurringPaymentError::NoLightningModuleFound
527            })?;
528
529        let mut stream = ln_module
530            .subscribe_ln_receive(operation_id)
531            .await
532            .map_err(|_| RecurringPaymentError::UnknownInvoice(operation_id))?
533            .into_stream();
534        let status = loop {
535            // Unfortunately the fedimint client doesn't track payment status internally
536            // yet, but relies on integrators to consume the update streams belonging to
537            // operations to figure out their state. Since the verify endpoint is meant to
538            // be non-blocking, we need to find a way to consume the stream until we think
539            // no immediate progress will be made anymore. That's why we limit each update
540            // step to 100ms, far more than a DB read should ever take, and abort if we'd
541            // block to wait for further progress to be made.
542            let update = timeout(Duration::from_millis(100), stream.next()).await;
543            match update {
544                // For some reason recurringd jumps right to claimed without going over funded … but
545                // either is fine to conclude the user will receive their money once they come
546                // online.
547                Ok(Some(LnReceiveState::Funded | LnReceiveState::Claimed)) => {
548                    break PaymentStatus::Paid;
549                }
550                // Keep looking for a state update indicating the invoice having been paid
551                Ok(Some(_)) => {
552                    continue;
553                }
554                // If we reach the end of the update stream without observing a state indicating the
555                // invoice having been paid there was likely some error or the invoice timed out.
556                // Either way we just show the invoice as unpaid.
557                Ok(None) | Err(_) => {
558                    break PaymentStatus::Pending;
559                }
560            }
561        };
562
563        Ok(InvoiceStatus { invoice, status })
564    }
565
566    async fn run_db_migrations(&self) {
567        let migrations = Self::migrations();
568        let schema_version: u64 = self
569            .db
570            .begin_transaction_nc()
571            .await
572            .get_value(&SchemaVersionKey)
573            .await
574            .unwrap_or_default();
575
576        for (target_schema, migration_fn) in migrations
577            .into_iter()
578            .skip_while(|(target_schema, _)| *target_schema <= schema_version)
579        {
580            let mut dbtx = self.db.begin_transaction().await;
581            dbtx.insert_entry(&SchemaVersionKey, &target_schema).await;
582
583            migration_fn(self, dbtx.to_ref_nc()).await;
584
585            dbtx.commit_tx().await;
586        }
587    }
588}
589
590async fn await_invoice_confirmed(
591    ln_module: &ClientModuleInstance<'_, LightningClientModule>,
592    operation_id: OperationId,
593) -> Result<(), RecurringPaymentError> {
594    let mut operation_updated = ln_module
595        .subscribe_ln_receive(operation_id)
596        .await?
597        .into_stream();
598
599    while let Some(update) = operation_updated.next().await {
600        if matches!(update, LnReceiveState::WaitingForPayment { .. }) {
601            return Ok(());
602        }
603    }
604
605    Err(RecurringPaymentError::Other(anyhow!(
606        "BOLT11 invoice not confirmed"
607    )))
608}
609
610#[derive(Debug, Clone, Eq, PartialEq, Hash, Encodable, Decodable)]
611pub enum PaymentCodeInvoice {
612    Bolt11(Bolt11Invoice),
613}
614
615/// Helper struct indicating if an invoice was paid. In the future it may also
616/// contain the preimage to be fully LUD-21 compliant.
617pub struct InvoiceStatus {
618    pub invoice: Bolt11Invoice,
619    pub status: PaymentStatus,
620}
621
622pub enum PaymentStatus {
623    Paid,
624    Pending,
625}
626
627impl PaymentStatus {
628    pub fn is_paid(&self) -> bool {
629        matches!(self, PaymentStatus::Paid)
630    }
631}
632
633/// The lnurl-rs crate doesn't have the `verify` field in this type and we don't
634/// use any of the other fields right now. Once we upstream the verify field
635/// this struct can be removed.
636#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
637pub struct LNURLPayInvoice {
638    pub pr: String,
639    pub verify: String,
640}
641
642fn operation_id_from_user_key(user_key: PaymentCodeRootKey, index: u64) -> OperationId {
643    let invoice_key = tweak_user_key(SECP256K1, user_key.0, index);
644    let preimage = sha256::Hash::hash(&invoice_key.serialize()[..]);
645    let payment_hash = sha256::Hash::hash(&preimage[..]);
646
647    OperationId(payment_hash.to_byte_array())
648}
649
650trait LnClientContextExt {
651    fn get_ln_module(
652        &'_ self,
653    ) -> Result<ClientModuleInstance<'_, LightningClientModule>, RecurringPaymentError>;
654}
655
656impl LnClientContextExt for ClientHandleArc {
657    fn get_ln_module(
658        &'_ self,
659    ) -> Result<ClientModuleInstance<'_, LightningClientModule>, RecurringPaymentError> {
660        self.get_first_module::<LightningClientModule>()
661            .map_err(|e| {
662                warn!("No compatible lightning module found {e}");
663                RecurringPaymentError::NoLightningModuleFound
664            })
665    }
666}