fedimint_recurringd/
lib.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::anyhow;
6use fedimint_client::{Client, ClientHandleArc, ClientModule, ClientModuleInstance};
7use fedimint_connectors::ConnectorRegistry;
8use fedimint_core::config::FederationId;
9use fedimint_core::core::OperationId;
10use fedimint_core::db::{
11    AutocommitResultExt, Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped,
12    IRawDatabase,
13};
14use fedimint_core::encoding::{Decodable, Encodable};
15use fedimint_core::invite_code::InviteCode;
16use fedimint_core::secp256k1::SECP256K1;
17use fedimint_core::secp256k1::hashes::sha256;
18use fedimint_core::task::timeout;
19use fedimint_core::util::SafeUrl;
20use fedimint_core::{Amount, BitcoinHash};
21use fedimint_derive_secret::DerivableSecret;
22use fedimint_ln_client::recurring::{
23    PaymentCodeId, PaymentCodeRootKey, RecurringPaymentError, RecurringPaymentProtocol,
24};
25use fedimint_ln_client::{
26    LightningClientInit, LightningClientModule, LightningOperationMeta,
27    LightningOperationMetaVariant, LnReceiveState, tweak_user_key,
28};
29use fedimint_mint_client::MintClientInit;
30use futures::StreamExt;
31use lightning_invoice::{Bolt11Invoice, Bolt11InvoiceDescription, Sha256};
32use lnurl::Tag;
33use lnurl::lnurl::LnUrl;
34use lnurl::pay::PayResponse;
35use serde::{Deserialize, Serialize};
36use tokio::sync::{Notify, RwLock};
37use tracing::{info, warn};
38
39use crate::db::{
40    FederationDbPrefix, PaymentCodeEntry, PaymentCodeInvoiceEntry, PaymentCodeInvoiceKey,
41    PaymentCodeKey, PaymentCodeNextInvoiceIndexKey, PaymentCodeVariant, SchemaVersionKey,
42    load_federation_client_databases, open_client_db, try_add_federation_database,
43};
44
45mod db;
46
47#[derive(Clone)]
48pub struct RecurringInvoiceServer {
49    db: Database,
50    connectors: ConnectorRegistry,
51    clients: Arc<RwLock<HashMap<FederationId, ClientHandleArc>>>,
52    invoice_generated: Arc<Notify>,
53    base_url: SafeUrl,
54}
55
56impl RecurringInvoiceServer {
57    pub async fn new(
58        connectors: ConnectorRegistry,
59        db: impl IRawDatabase + 'static,
60        base_url: SafeUrl,
61    ) -> anyhow::Result<Self> {
62        let db = Database::new(db, Default::default());
63
64        let mut clients = HashMap::<_, ClientHandleArc>::new();
65
66        for (federation_id, db) in load_federation_client_databases(&db).await {
67            let mut client_builder = Client::builder().await?;
68            client_builder.with_module(LightningClientInit::default());
69            client_builder.with_module(MintClientInit);
70            let client = client_builder
71                .open(
72                    connectors.clone(),
73                    db,
74                    fedimint_client::RootSecret::StandardDoubleDerive(Self::default_secret()),
75                )
76                .await?;
77            clients.insert(federation_id, Arc::new(client));
78        }
79
80        let slf = Self {
81            db: db.clone(),
82            clients: Arc::new(RwLock::new(clients)),
83            invoice_generated: Arc::new(Default::default()),
84            base_url,
85            connectors,
86        };
87
88        slf.run_db_migrations().await;
89
90        Ok(slf)
91    }
92
93    /// We don't want to hold any money or sign anything ourselves, we only use
94    /// the client with externally supplied key material and to track
95    /// ongoing progress of other users' receives.
96    fn default_secret() -> DerivableSecret {
97        DerivableSecret::new_root(&[], &[])
98    }
99
100    pub async fn register_federation(
101        &self,
102        invite_code: &InviteCode,
103    ) -> Result<FederationId, RecurringPaymentError> {
104        let federation_id = invite_code.federation_id();
105        info!("Registering federation {}", federation_id);
106
107        // We lock to prevent parallel join attempts
108        // TODO: lock per federation
109        let mut clients = self.clients.write().await;
110        if clients.contains_key(&federation_id) {
111            return Err(RecurringPaymentError::FederationAlreadyRegistered(
112                federation_id,
113            ));
114        }
115
116        // We don't know if joining will succeed or be interrupted. We use a random DB
117        // prefix to initialize the client and only write the prefix to the DB if that
118        // succeeds. If it fails we end up with some orphaned data in the DB, if it ever
119        // becomes a problem we can clean it up later.
120        let client_db_prefix = FederationDbPrefix::random();
121        let client_db = open_client_db(&self.db, client_db_prefix);
122
123        match Self::join_federation_static(self.connectors.clone(), client_db, invite_code).await {
124            Ok(client) => {
125                try_add_federation_database(&self.db, federation_id, client_db_prefix)
126                    .await
127                    .expect("We hold a global lock, no parallel joining can happen");
128                clients.insert(federation_id, client);
129                Ok(federation_id)
130            }
131            Err(e) => {
132                // TODO: clean up DB?
133                Err(e)
134            }
135        }
136    }
137
138    async fn join_federation_static(
139        connectors: ConnectorRegistry,
140        client_db: Database,
141        invite_code: &InviteCode,
142    ) -> Result<ClientHandleArc, RecurringPaymentError> {
143        let mut client_builder = Client::builder()
144            .await
145            .map_err(RecurringPaymentError::JoiningFederationFailed)?;
146
147        client_builder.with_module(LightningClientInit::default());
148        client_builder.with_module(MintClientInit);
149
150        let client = client_builder
151            .preview(connectors, invite_code)
152            .await?
153            .join(
154                client_db,
155                fedimint_client::RootSecret::StandardDoubleDerive(Self::default_secret()),
156            )
157            .await
158            .map_err(RecurringPaymentError::JoiningFederationFailed)?;
159        Ok(Arc::new(client))
160    }
161
162    pub async fn register_recurring_payment_code(
163        &self,
164        federation_id: FederationId,
165        payment_code_root_key: PaymentCodeRootKey,
166        protocol: RecurringPaymentProtocol,
167        meta: &str,
168    ) -> Result<String, RecurringPaymentError> {
169        // TODO: support BOLT12
170        if protocol != RecurringPaymentProtocol::LNURL {
171            return Err(RecurringPaymentError::UnsupportedProtocol(protocol));
172        }
173
174        // Ensure the federation is supported
175        self.get_federation_client(federation_id).await?;
176
177        let payment_code = self.create_lnurl(payment_code_root_key.to_payment_code_id());
178        let payment_code_entry = PaymentCodeEntry {
179            root_key: payment_code_root_key,
180            federation_id,
181            protocol,
182            payment_code: payment_code.clone(),
183            variant: PaymentCodeVariant::Lnurl {
184                meta: meta.to_owned(),
185            },
186        };
187
188        let mut dbtx = self.db.begin_transaction().await;
189        if let Some(existing_code) = dbtx
190            .insert_entry(
191                &PaymentCodeKey {
192                    payment_code_id: payment_code_root_key.to_payment_code_id(),
193                },
194                &payment_code_entry,
195            )
196            .await
197        {
198            if existing_code != payment_code_entry {
199                return Err(RecurringPaymentError::PaymentCodeAlreadyExists(
200                    payment_code_root_key,
201                ));
202            }
203
204            dbtx.ignore_uncommitted();
205            return Ok(payment_code);
206        }
207
208        dbtx.insert_new_entry(
209            &PaymentCodeNextInvoiceIndexKey {
210                payment_code_id: payment_code_root_key.to_payment_code_id(),
211            },
212            &0,
213        )
214        .await;
215        dbtx.commit_tx_result().await.map_err(anyhow::Error::from)?;
216
217        Ok(payment_code)
218    }
219
220    fn create_lnurl(&self, payment_code_id: PaymentCodeId) -> String {
221        let lnurl = LnUrl::from_url(format!(
222            "{}lnv1/paycodes/{}",
223            self.base_url, payment_code_id
224        ));
225        lnurl.encode()
226    }
227
228    pub async fn lnurl_pay(
229        &self,
230        payment_code_id: PaymentCodeId,
231    ) -> Result<PayResponse, RecurringPaymentError> {
232        let payment_code = self.get_payment_code(payment_code_id).await?;
233        let PaymentCodeVariant::Lnurl { meta } = payment_code.variant;
234
235        Ok(PayResponse {
236            callback: format!("{}lnv1/paycodes/{}/invoice", self.base_url, payment_code_id),
237            max_sendable: 100000000000,
238            min_sendable: 1,
239            tag: Tag::PayRequest,
240            metadata: meta,
241            comment_allowed: None,
242            allows_nostr: None,
243            nostr_pubkey: None,
244        })
245    }
246
247    pub async fn lnurl_invoice(
248        &self,
249        payment_code_id: PaymentCodeId,
250        amount: Amount,
251    ) -> Result<LNURLPayInvoice, RecurringPaymentError> {
252        let (operation_id, federation_id, invoice) =
253            self.create_bolt11_invoice(payment_code_id, amount).await?;
254        Ok(LNURLPayInvoice {
255            pr: invoice.to_string(),
256            verify: format!(
257                "{}lnv1/verify/{}/{}",
258                self.base_url,
259                federation_id,
260                operation_id.fmt_full()
261            ),
262        })
263    }
264
265    async fn create_bolt11_invoice(
266        &self,
267        payment_code_id: PaymentCodeId,
268        amount: Amount,
269    ) -> Result<(OperationId, FederationId, Bolt11Invoice), RecurringPaymentError> {
270        // Invoices are valid for one day by default, might become dynamic with BOLT12
271        // support
272        const DEFAULT_EXPIRY_TIME: u64 = 60 * 60 * 24;
273
274        let payment_code = self.get_payment_code(payment_code_id).await?;
275
276        let federation_client = self
277            .get_federation_client(payment_code.federation_id)
278            .await?;
279
280        let (operation_id, invoice) = self
281            .db
282            .autocommit(
283                |dbtx, _| {
284                    let federation_client = federation_client.clone();
285                    let payment_code = payment_code.clone();
286                    Box::pin(async move {
287                        let invoice_index = self
288                            .get_next_invoice_index(&mut dbtx.to_ref_nc(), payment_code_id)
289                            .await;
290
291                        // Check if the invoice index was already used in an aborted call to this
292                        // fn. If so:
293                        //   1. Save the previously generated invoice. We don't want to reuse it
294                        //      since it may be expired and in the future may contain call-specific
295                        //      data, but also want to allow the client to sync past it.
296                        //   2. Increment the invoice index to generate a new invoice since re-using
297                        //      the same index wouldn't work (operation id reuse is forbidden).
298                        let initial_operation_id =
299                            operation_id_from_user_key(payment_code.root_key, invoice_index);
300                        let invoice_index = if let Some(invoice) =
301                            Self::check_if_invoice_exists(&federation_client, initial_operation_id)
302                                .await
303                        {
304                            self.save_bolt11_invoice(
305                                dbtx,
306                                initial_operation_id,
307                                payment_code_id,
308                                invoice_index,
309                                invoice,
310                            )
311                            .await;
312                            self.get_next_invoice_index(&mut dbtx.to_ref_nc(), payment_code_id)
313                                .await
314                        } else {
315                            invoice_index
316                        };
317
318                        // This is where the main part starts: generate the invoice and save it to
319                        // the DB
320                        let federation_client_ln_module = federation_client.get_ln_module()?;
321                        let gateway = federation_client_ln_module
322                            .get_gateway(None, false)
323                            .await?
324                            .ok_or(RecurringPaymentError::NoGatewayFound)?;
325
326                        let lnurl_meta = match payment_code.variant {
327                            PaymentCodeVariant::Lnurl { meta } => meta,
328                        };
329                        let meta_hash = Sha256(sha256::Hash::hash(lnurl_meta.as_bytes()));
330                        let description = Bolt11InvoiceDescription::Hash(meta_hash);
331
332                        // TODO: ideally creating the invoice would take a dbtx as argument so we
333                        // don't have to do the "check if invoice already exists" dance
334                        let (operation_id, invoice, _preimage) = federation_client_ln_module
335                            .create_bolt11_invoice_for_user_tweaked(
336                                amount,
337                                description,
338                                Some(DEFAULT_EXPIRY_TIME),
339                                payment_code.root_key.0,
340                                invoice_index,
341                                serde_json::Value::Null,
342                                Some(gateway),
343                            )
344                            .await?;
345
346                        self.save_bolt11_invoice(
347                            dbtx,
348                            operation_id,
349                            payment_code_id,
350                            invoice_index,
351                            invoice.clone(),
352                        )
353                        .await;
354
355                        Result::<_, anyhow::Error>::Ok((operation_id, invoice))
356                    })
357                },
358                None,
359            )
360            .await
361            .unwrap_autocommit()?;
362
363        await_invoice_confirmed(&federation_client.get_ln_module()?, operation_id).await?;
364
365        Ok((operation_id, federation_client.federation_id(), invoice))
366    }
367
368    async fn save_bolt11_invoice(
369        &self,
370        dbtx: &mut DatabaseTransaction<'_>,
371        operation_id: OperationId,
372        payment_code_id: PaymentCodeId,
373        invoice_index: u64,
374        invoice: Bolt11Invoice,
375    ) {
376        dbtx.insert_new_entry(
377            &PaymentCodeInvoiceKey {
378                payment_code_id,
379                index: invoice_index,
380            },
381            &PaymentCodeInvoiceEntry {
382                operation_id,
383                invoice: PaymentCodeInvoice::Bolt11(invoice.clone()),
384            },
385        )
386        .await;
387
388        let invoice_generated_notifier = self.invoice_generated.clone();
389        dbtx.on_commit(move || {
390            invoice_generated_notifier.notify_waiters();
391        });
392    }
393
394    async fn check_if_invoice_exists(
395        federation_client: &ClientHandleArc,
396        operation_id: OperationId,
397    ) -> Option<Bolt11Invoice> {
398        let operation = federation_client
399            .operation_log()
400            .get_operation(operation_id)
401            .await?;
402
403        assert_eq!(
404            operation.operation_module_kind(),
405            LightningClientModule::kind().as_str()
406        );
407
408        let LightningOperationMetaVariant::Receive { invoice, .. } =
409            operation.meta::<LightningOperationMeta>().variant
410        else {
411            panic!(
412                "Unexpected operation meta variant: {:?}",
413                operation.meta::<LightningOperationMeta>().variant
414            );
415        };
416
417        Some(invoice)
418    }
419
420    async fn get_federation_client(
421        &self,
422        federation_id: FederationId,
423    ) -> Result<ClientHandleArc, RecurringPaymentError> {
424        self.clients
425            .read()
426            .await
427            .get(&federation_id)
428            .cloned()
429            .ok_or(RecurringPaymentError::UnknownFederationId(federation_id))
430    }
431
432    pub async fn await_invoice_index_generated(
433        &self,
434        payment_code_id: PaymentCodeId,
435        invoice_index: u64,
436    ) -> Result<PaymentCodeInvoiceEntry, RecurringPaymentError> {
437        self.get_payment_code(payment_code_id).await?;
438
439        let mut notified = self.invoice_generated.notified();
440        loop {
441            let mut dbtx = self.db.begin_transaction_nc().await;
442            if let Some(invoice_entry) = dbtx
443                .get_value(&PaymentCodeInvoiceKey {
444                    payment_code_id,
445                    index: invoice_index,
446                })
447                .await
448            {
449                break Ok(invoice_entry);
450            };
451
452            notified.await;
453            notified = self.invoice_generated.notified();
454        }
455    }
456
457    async fn get_next_invoice_index(
458        &self,
459        dbtx: &mut DatabaseTransaction<'_>,
460        payment_code_id: PaymentCodeId,
461    ) -> u64 {
462        let next_index = dbtx
463            .get_value(&PaymentCodeNextInvoiceIndexKey { payment_code_id })
464            .await
465            .map(|index| index + 1)
466            .unwrap_or(0);
467        dbtx.insert_entry(
468            &PaymentCodeNextInvoiceIndexKey { payment_code_id },
469            &next_index,
470        )
471        .await;
472
473        next_index
474    }
475
476    pub async fn list_federations(&self) -> Vec<FederationId> {
477        self.clients.read().await.keys().cloned().collect()
478    }
479
480    async fn get_payment_code(
481        &self,
482        payment_code_id: PaymentCodeId,
483    ) -> Result<PaymentCodeEntry, RecurringPaymentError> {
484        self.db
485            .begin_transaction_nc()
486            .await
487            .get_value(&PaymentCodeKey { payment_code_id })
488            .await
489            .ok_or(RecurringPaymentError::UnknownPaymentCode(payment_code_id))
490    }
491
492    /// Returns if an invoice has been paid yet. To avoid DB indirection and
493    /// since the URLs would be similarly long either way we identify
494    /// invoices by federation id and operation id instead of the payment
495    /// code. This function is the basis of `recurringd`'s [LUD-21]
496    /// implementation that allows clients to verify if a given invoice they
497    /// generated using the LNURL has been paid yet.
498    ///
499    /// [LUD-21]: https://github.com/lnurl/luds/blob/luds/21.md
500    pub async fn verify_invoice_paid(
501        &self,
502        federation_id: FederationId,
503        operation_id: OperationId,
504    ) -> Result<InvoiceStatus, RecurringPaymentError> {
505        let federation_client = self.get_federation_client(federation_id).await?;
506
507        // Unfortunately LUD-21 wants us to return the invoice again, so we have to
508        // fetch it from the operation meta.
509        let invoice = {
510            let operation = federation_client
511                .operation_log()
512                .get_operation(operation_id)
513                .await
514                .ok_or(RecurringPaymentError::UnknownInvoice(operation_id))?;
515
516            if operation.operation_module_kind() != LightningClientModule::kind().as_str() {
517                return Err(RecurringPaymentError::UnknownInvoice(operation_id));
518            }
519
520            let LightningOperationMetaVariant::Receive { invoice, .. } =
521                operation.meta::<LightningOperationMeta>().variant
522            else {
523                return Err(RecurringPaymentError::UnknownInvoice(operation_id));
524            };
525
526            invoice
527        };
528
529        let ln_module = federation_client
530            .get_first_module::<LightningClientModule>()
531            .map_err(|e| {
532                warn!("No compatible lightning module found {e}");
533                RecurringPaymentError::NoLightningModuleFound
534            })?;
535
536        let mut stream = ln_module
537            .subscribe_ln_receive(operation_id)
538            .await
539            .map_err(|_| RecurringPaymentError::UnknownInvoice(operation_id))?
540            .into_stream();
541        let status = loop {
542            // Unfortunately the fedimint client doesn't track payment status internally
543            // yet, but relies on integrators to consume the update streams belonging to
544            // operations to figure out their state. Since the verify endpoint is meant to
545            // be non-blocking, we need to find a way to consume the stream until we think
546            // no immediate progress will be made anymore. That's why we limit each update
547            // step to 100ms, far more than a DB read should ever take, and abort if we'd
548            // block to wait for further progress to be made.
549            let update = timeout(Duration::from_millis(100), stream.next()).await;
550            match update {
551                // For some reason recurringd jumps right to claimed without going over funded … but
552                // either is fine to conclude the user will receive their money once they come
553                // online.
554                Ok(Some(LnReceiveState::Funded | LnReceiveState::Claimed)) => {
555                    break PaymentStatus::Paid;
556                }
557                // Keep looking for a state update indicating the invoice having been paid
558                Ok(Some(_)) => {
559                    continue;
560                }
561                // If we reach the end of the update stream without observing a state indicating the
562                // invoice having been paid there was likely some error or the invoice timed out.
563                // Either way we just show the invoice as unpaid.
564                Ok(None) | Err(_) => {
565                    break PaymentStatus::Pending;
566                }
567            }
568        };
569
570        Ok(InvoiceStatus { invoice, status })
571    }
572
573    async fn run_db_migrations(&self) {
574        let migrations = Self::migrations();
575        let schema_version: u64 = self
576            .db
577            .begin_transaction_nc()
578            .await
579            .get_value(&SchemaVersionKey)
580            .await
581            .unwrap_or_default();
582
583        for (target_schema, migration_fn) in migrations
584            .into_iter()
585            .skip_while(|(target_schema, _)| *target_schema <= schema_version)
586        {
587            let mut dbtx = self.db.begin_transaction().await;
588            dbtx.insert_entry(&SchemaVersionKey, &target_schema).await;
589
590            migration_fn(self, dbtx.to_ref_nc()).await;
591
592            dbtx.commit_tx().await;
593        }
594    }
595}
596
597async fn await_invoice_confirmed(
598    ln_module: &ClientModuleInstance<'_, LightningClientModule>,
599    operation_id: OperationId,
600) -> Result<(), RecurringPaymentError> {
601    let mut operation_updated = ln_module
602        .subscribe_ln_receive(operation_id)
603        .await?
604        .into_stream();
605
606    while let Some(update) = operation_updated.next().await {
607        if matches!(update, LnReceiveState::WaitingForPayment { .. }) {
608            return Ok(());
609        }
610    }
611
612    Err(RecurringPaymentError::Other(anyhow!(
613        "BOLT11 invoice not confirmed"
614    )))
615}
616
617#[derive(Debug, Clone, Eq, PartialEq, Hash, Encodable, Decodable)]
618pub enum PaymentCodeInvoice {
619    Bolt11(Bolt11Invoice),
620}
621
622/// Helper struct indicating if an invoice was paid. In the future it may also
623/// contain the preimage to be fully LUD-21 compliant.
624pub struct InvoiceStatus {
625    pub invoice: Bolt11Invoice,
626    pub status: PaymentStatus,
627}
628
629pub enum PaymentStatus {
630    Paid,
631    Pending,
632}
633
634impl PaymentStatus {
635    pub fn is_paid(&self) -> bool {
636        matches!(self, PaymentStatus::Paid)
637    }
638}
639
640/// The lnurl-rs crate doesn't have the `verify` field in this type and we don't
641/// use any of the other fields right now. Once we upstream the verify field
642/// this struct can be removed.
643#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
644pub struct LNURLPayInvoice {
645    pub pr: String,
646    pub verify: String,
647}
648
649fn operation_id_from_user_key(user_key: PaymentCodeRootKey, index: u64) -> OperationId {
650    let invoice_key = tweak_user_key(SECP256K1, user_key.0, index);
651    let preimage = sha256::Hash::hash(&invoice_key.serialize()[..]);
652    let payment_hash = sha256::Hash::hash(&preimage[..]);
653
654    OperationId(payment_hash.to_byte_array())
655}
656
657trait LnClientContextExt {
658    fn get_ln_module(
659        &'_ self,
660    ) -> Result<ClientModuleInstance<'_, LightningClientModule>, RecurringPaymentError>;
661}
662
663impl LnClientContextExt for ClientHandleArc {
664    fn get_ln_module(
665        &'_ self,
666    ) -> Result<ClientModuleInstance<'_, LightningClientModule>, RecurringPaymentError> {
667        self.get_first_module::<LightningClientModule>()
668            .map_err(|e| {
669                warn!("No compatible lightning module found {e}");
670                RecurringPaymentError::NoLightningModuleFound
671            })
672    }
673}