Skip to main content

fedimint_recurringd/
lib.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::anyhow;
6use fedimint_client::{Client, ClientHandleArc, ClientModule, ClientModuleInstance};
7use fedimint_connectors::ConnectorRegistry;
8use fedimint_core::config::FederationId;
9use fedimint_core::core::OperationId;
10use fedimint_core::db::{
11    AutocommitResultExt, Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped,
12    IRawDatabase,
13};
14use fedimint_core::encoding::{Decodable, Encodable};
15use fedimint_core::invite_code::InviteCode;
16use fedimint_core::secp256k1::SECP256K1;
17use fedimint_core::secp256k1::hashes::sha256;
18use fedimint_core::task::timeout;
19use fedimint_core::util::SafeUrl;
20use fedimint_core::{Amount, BitcoinHash};
21use fedimint_derive_secret::DerivableSecret;
22use fedimint_ln_client::recurring::{
23    PaymentCodeId, PaymentCodeRootKey, RecurringPaymentError, RecurringPaymentProtocol,
24};
25use fedimint_ln_client::{
26    LightningClientInit, LightningClientModule, LightningOperationMeta,
27    LightningOperationMetaVariant, LnReceiveState, tweak_user_key,
28};
29use fedimint_lnurl::{PayResponse, encode_lnurl, pay_request_tag};
30use fedimint_mint_client::MintClientInit;
31use futures::StreamExt;
32use lightning_invoice::{Bolt11Invoice, Bolt11InvoiceDescription, Sha256};
33use serde::{Deserialize, Serialize};
34use tokio::sync::{Notify, RwLock};
35use tracing::{info, warn};
36
37use crate::db::{
38    FederationDbPrefix, PaymentCodeEntry, PaymentCodeInvoiceEntry, PaymentCodeInvoiceKey,
39    PaymentCodeKey, PaymentCodeNextInvoiceIndexKey, PaymentCodeVariant, SchemaVersionKey,
40    load_federation_client_databases, open_client_db, try_add_federation_database,
41};
42
43mod db;
44
45#[derive(Clone)]
46pub struct RecurringInvoiceServer {
47    db: Database,
48    connectors: ConnectorRegistry,
49    clients: Arc<RwLock<HashMap<FederationId, ClientHandleArc>>>,
50    invoice_generated: Arc<Notify>,
51    base_url: SafeUrl,
52}
53
54impl RecurringInvoiceServer {
55    pub async fn new(
56        connectors: ConnectorRegistry,
57        db: impl IRawDatabase + 'static,
58        base_url: SafeUrl,
59    ) -> anyhow::Result<Self> {
60        let db = Database::new(db, Default::default());
61
62        let mut clients = HashMap::<_, ClientHandleArc>::new();
63
64        for (federation_id, db) in load_federation_client_databases(&db).await {
65            let mut client_builder = Client::builder().await?;
66            client_builder.with_module(LightningClientInit::default());
67            client_builder.with_module(MintClientInit);
68            let client = client_builder
69                .open(
70                    connectors.clone(),
71                    db,
72                    fedimint_client::RootSecret::StandardDoubleDerive(Self::default_secret()),
73                )
74                .await?;
75            clients.insert(federation_id, Arc::new(client));
76        }
77
78        let slf = Self {
79            db: db.clone(),
80            clients: Arc::new(RwLock::new(clients)),
81            invoice_generated: Arc::new(Default::default()),
82            base_url,
83            connectors,
84        };
85
86        slf.run_db_migrations().await;
87
88        Ok(slf)
89    }
90
91    /// We don't want to hold any money or sign anything ourselves, we only use
92    /// the client with externally supplied key material and to track
93    /// ongoing progress of other users' receives.
94    fn default_secret() -> DerivableSecret {
95        DerivableSecret::new_root(&[], &[])
96    }
97
98    pub async fn register_federation(
99        &self,
100        invite_code: &InviteCode,
101    ) -> Result<FederationId, RecurringPaymentError> {
102        let federation_id = invite_code.federation_id();
103        info!("Registering federation {}", federation_id);
104
105        // We lock to prevent parallel join attempts
106        // TODO: lock per federation
107        let mut clients = self.clients.write().await;
108        if clients.contains_key(&federation_id) {
109            return Err(RecurringPaymentError::FederationAlreadyRegistered(
110                federation_id,
111            ));
112        }
113
114        // We don't know if joining will succeed or be interrupted. We use a random DB
115        // prefix to initialize the client and only write the prefix to the DB if that
116        // succeeds. If it fails we end up with some orphaned data in the DB, if it ever
117        // becomes a problem we can clean it up later.
118        let client_db_prefix = FederationDbPrefix::random();
119        let client_db = open_client_db(&self.db, client_db_prefix);
120
121        match Self::join_federation_static(self.connectors.clone(), client_db, invite_code).await {
122            Ok(client) => {
123                try_add_federation_database(&self.db, federation_id, client_db_prefix)
124                    .await
125                    .expect("We hold a global lock, no parallel joining can happen");
126                clients.insert(federation_id, client);
127                Ok(federation_id)
128            }
129            Err(e) => {
130                // TODO: clean up DB?
131                Err(e)
132            }
133        }
134    }
135
136    async fn join_federation_static(
137        connectors: ConnectorRegistry,
138        client_db: Database,
139        invite_code: &InviteCode,
140    ) -> Result<ClientHandleArc, RecurringPaymentError> {
141        let mut client_builder = Client::builder()
142            .await
143            .map_err(RecurringPaymentError::JoiningFederationFailed)?;
144
145        client_builder.with_module(LightningClientInit::default());
146        client_builder.with_module(MintClientInit);
147
148        let client = client_builder
149            .preview(connectors, invite_code)
150            .await?
151            .join(
152                client_db,
153                fedimint_client::RootSecret::StandardDoubleDerive(Self::default_secret()),
154            )
155            .await
156            .map_err(RecurringPaymentError::JoiningFederationFailed)?;
157        Ok(Arc::new(client))
158    }
159
160    pub async fn register_recurring_payment_code(
161        &self,
162        federation_id: FederationId,
163        payment_code_root_key: PaymentCodeRootKey,
164        protocol: RecurringPaymentProtocol,
165        meta: &str,
166    ) -> Result<String, RecurringPaymentError> {
167        // TODO: support BOLT12
168        if protocol != RecurringPaymentProtocol::LNURL {
169            return Err(RecurringPaymentError::UnsupportedProtocol(protocol));
170        }
171
172        // Ensure the federation is supported
173        self.get_federation_client(federation_id).await?;
174
175        let payment_code = self.create_lnurl(payment_code_root_key.to_payment_code_id());
176        let payment_code_entry = PaymentCodeEntry {
177            root_key: payment_code_root_key,
178            federation_id,
179            protocol,
180            payment_code: payment_code.clone(),
181            variant: PaymentCodeVariant::Lnurl {
182                meta: meta.to_owned(),
183            },
184        };
185
186        let mut dbtx = self.db.begin_transaction().await;
187        if let Some(existing_code) = dbtx
188            .insert_entry(
189                &PaymentCodeKey {
190                    payment_code_id: payment_code_root_key.to_payment_code_id(),
191                },
192                &payment_code_entry,
193            )
194            .await
195        {
196            if existing_code != payment_code_entry {
197                return Err(RecurringPaymentError::PaymentCodeAlreadyExists(
198                    payment_code_root_key,
199                ));
200            }
201
202            dbtx.ignore_uncommitted();
203            return Ok(payment_code);
204        }
205
206        dbtx.insert_new_entry(
207            &PaymentCodeNextInvoiceIndexKey {
208                payment_code_id: payment_code_root_key.to_payment_code_id(),
209            },
210            &0,
211        )
212        .await;
213        dbtx.commit_tx_result().await.map_err(anyhow::Error::from)?;
214
215        Ok(payment_code)
216    }
217
218    fn create_lnurl(&self, payment_code_id: PaymentCodeId) -> String {
219        encode_lnurl(&format!(
220            "{}lnv1/paycodes/{}",
221            self.base_url, payment_code_id
222        ))
223    }
224
225    pub async fn lnurl_pay(
226        &self,
227        payment_code_id: PaymentCodeId,
228    ) -> Result<PayResponse, RecurringPaymentError> {
229        let payment_code = self.get_payment_code(payment_code_id).await?;
230        let PaymentCodeVariant::Lnurl { meta } = payment_code.variant;
231
232        Ok(PayResponse {
233            callback: format!("{}lnv1/paycodes/{}/invoice", self.base_url, payment_code_id),
234            max_sendable: 100000000000,
235            min_sendable: 1,
236            tag: pay_request_tag(),
237            metadata: meta,
238        })
239    }
240
241    pub async fn lnurl_invoice(
242        &self,
243        payment_code_id: PaymentCodeId,
244        amount: Amount,
245    ) -> Result<LNURLPayInvoice, RecurringPaymentError> {
246        let (operation_id, federation_id, invoice) =
247            self.create_bolt11_invoice(payment_code_id, amount).await?;
248        Ok(LNURLPayInvoice {
249            pr: invoice.to_string(),
250            verify: format!(
251                "{}lnv1/verify/{}/{}",
252                self.base_url,
253                federation_id,
254                operation_id.fmt_full()
255            ),
256        })
257    }
258
259    async fn create_bolt11_invoice(
260        &self,
261        payment_code_id: PaymentCodeId,
262        amount: Amount,
263    ) -> Result<(OperationId, FederationId, Bolt11Invoice), RecurringPaymentError> {
264        // Invoices are valid for one day by default, might become dynamic with BOLT12
265        // support
266        const DEFAULT_EXPIRY_TIME: u64 = 60 * 60 * 24;
267
268        let payment_code = self.get_payment_code(payment_code_id).await?;
269
270        let federation_client = self
271            .get_federation_client(payment_code.federation_id)
272            .await?;
273
274        let (operation_id, invoice) = self
275            .db
276            .autocommit(
277                |dbtx, _| {
278                    let federation_client = federation_client.clone();
279                    let payment_code = payment_code.clone();
280                    Box::pin(async move {
281                        let invoice_index = self
282                            .get_next_invoice_index(&mut dbtx.to_ref_nc(), payment_code_id)
283                            .await;
284
285                        // Check if the invoice index was already used in an aborted call to this
286                        // fn. If so:
287                        //   1. Save the previously generated invoice. We don't want to reuse it
288                        //      since it may be expired and in the future may contain call-specific
289                        //      data, but also want to allow the client to sync past it.
290                        //   2. Increment the invoice index to generate a new invoice since re-using
291                        //      the same index wouldn't work (operation id reuse is forbidden).
292                        let initial_operation_id =
293                            operation_id_from_user_key(payment_code.root_key, invoice_index);
294                        let invoice_index = if let Some(invoice) =
295                            Self::check_if_invoice_exists(&federation_client, initial_operation_id)
296                                .await
297                        {
298                            self.save_bolt11_invoice(
299                                dbtx,
300                                initial_operation_id,
301                                payment_code_id,
302                                invoice_index,
303                                invoice,
304                            )
305                            .await;
306                            self.get_next_invoice_index(&mut dbtx.to_ref_nc(), payment_code_id)
307                                .await
308                        } else {
309                            invoice_index
310                        };
311
312                        // This is where the main part starts: generate the invoice and save it to
313                        // the DB
314                        let federation_client_ln_module = federation_client.get_ln_module()?;
315                        let gateway = federation_client_ln_module
316                            .get_gateway(None, false)
317                            .await?
318                            .ok_or(RecurringPaymentError::NoGatewayFound)?;
319
320                        let lnurl_meta = match payment_code.variant {
321                            PaymentCodeVariant::Lnurl { meta } => meta,
322                        };
323                        let meta_hash = Sha256(sha256::Hash::hash(lnurl_meta.as_bytes()));
324                        let description = Bolt11InvoiceDescription::Hash(meta_hash);
325
326                        // TODO: ideally creating the invoice would take a dbtx as argument so we
327                        // don't have to do the "check if invoice already exists" dance
328                        let (operation_id, invoice, _preimage) = federation_client_ln_module
329                            .create_bolt11_invoice_for_user_tweaked(
330                                amount,
331                                description,
332                                Some(DEFAULT_EXPIRY_TIME),
333                                payment_code.root_key.0,
334                                invoice_index,
335                                serde_json::Value::Null,
336                                Some(gateway),
337                            )
338                            .await?;
339
340                        self.save_bolt11_invoice(
341                            dbtx,
342                            operation_id,
343                            payment_code_id,
344                            invoice_index,
345                            invoice.clone(),
346                        )
347                        .await;
348
349                        Result::<_, anyhow::Error>::Ok((operation_id, invoice))
350                    })
351                },
352                None,
353            )
354            .await
355            .unwrap_autocommit()?;
356
357        await_invoice_confirmed(&federation_client.get_ln_module()?, operation_id).await?;
358
359        Ok((operation_id, federation_client.federation_id(), invoice))
360    }
361
362    async fn save_bolt11_invoice(
363        &self,
364        dbtx: &mut DatabaseTransaction<'_>,
365        operation_id: OperationId,
366        payment_code_id: PaymentCodeId,
367        invoice_index: u64,
368        invoice: Bolt11Invoice,
369    ) {
370        dbtx.insert_new_entry(
371            &PaymentCodeInvoiceKey {
372                payment_code_id,
373                index: invoice_index,
374            },
375            &PaymentCodeInvoiceEntry {
376                operation_id,
377                invoice: PaymentCodeInvoice::Bolt11(invoice.clone()),
378            },
379        )
380        .await;
381
382        let invoice_generated_notifier = self.invoice_generated.clone();
383        dbtx.on_commit(move || {
384            invoice_generated_notifier.notify_waiters();
385        });
386    }
387
388    async fn check_if_invoice_exists(
389        federation_client: &ClientHandleArc,
390        operation_id: OperationId,
391    ) -> Option<Bolt11Invoice> {
392        let operation = federation_client
393            .operation_log()
394            .get_operation(operation_id)
395            .await?;
396
397        assert_eq!(
398            operation.operation_module_kind(),
399            LightningClientModule::kind().as_str()
400        );
401
402        let LightningOperationMetaVariant::Receive { invoice, .. } =
403            operation.meta::<LightningOperationMeta>().variant
404        else {
405            panic!(
406                "Unexpected operation meta variant: {:?}",
407                operation.meta::<LightningOperationMeta>().variant
408            );
409        };
410
411        Some(invoice)
412    }
413
414    async fn get_federation_client(
415        &self,
416        federation_id: FederationId,
417    ) -> Result<ClientHandleArc, RecurringPaymentError> {
418        self.clients
419            .read()
420            .await
421            .get(&federation_id)
422            .cloned()
423            .ok_or(RecurringPaymentError::UnknownFederationId(federation_id))
424    }
425
426    pub async fn await_invoice_index_generated(
427        &self,
428        payment_code_id: PaymentCodeId,
429        invoice_index: u64,
430    ) -> Result<PaymentCodeInvoiceEntry, RecurringPaymentError> {
431        self.get_payment_code(payment_code_id).await?;
432
433        let mut notified = self.invoice_generated.notified();
434        loop {
435            let mut dbtx = self.db.begin_transaction_nc().await;
436            if let Some(invoice_entry) = dbtx
437                .get_value(&PaymentCodeInvoiceKey {
438                    payment_code_id,
439                    index: invoice_index,
440                })
441                .await
442            {
443                break Ok(invoice_entry);
444            };
445
446            notified.await;
447            notified = self.invoice_generated.notified();
448        }
449    }
450
451    async fn get_next_invoice_index(
452        &self,
453        dbtx: &mut DatabaseTransaction<'_>,
454        payment_code_id: PaymentCodeId,
455    ) -> u64 {
456        let next_index = dbtx
457            .get_value(&PaymentCodeNextInvoiceIndexKey { payment_code_id })
458            .await
459            .map(|index| index + 1)
460            .unwrap_or(0);
461        dbtx.insert_entry(
462            &PaymentCodeNextInvoiceIndexKey { payment_code_id },
463            &next_index,
464        )
465        .await;
466
467        next_index
468    }
469
470    pub async fn list_federations(&self) -> Vec<FederationId> {
471        self.clients.read().await.keys().cloned().collect()
472    }
473
474    async fn get_payment_code(
475        &self,
476        payment_code_id: PaymentCodeId,
477    ) -> Result<PaymentCodeEntry, RecurringPaymentError> {
478        self.db
479            .begin_transaction_nc()
480            .await
481            .get_value(&PaymentCodeKey { payment_code_id })
482            .await
483            .ok_or(RecurringPaymentError::UnknownPaymentCode(payment_code_id))
484    }
485
486    /// Returns if an invoice has been paid yet. To avoid DB indirection and
487    /// since the URLs would be similarly long either way we identify
488    /// invoices by federation id and operation id instead of the payment
489    /// code. This function is the basis of `recurringd`'s [LUD-21]
490    /// implementation that allows clients to verify if a given invoice they
491    /// generated using the LNURL has been paid yet.
492    ///
493    /// [LUD-21]: https://github.com/lnurl/luds/blob/luds/21.md
494    pub async fn verify_invoice_paid(
495        &self,
496        federation_id: FederationId,
497        operation_id: OperationId,
498    ) -> Result<InvoiceStatus, RecurringPaymentError> {
499        let federation_client = self.get_federation_client(federation_id).await?;
500
501        // Unfortunately LUD-21 wants us to return the invoice again, so we have to
502        // fetch it from the operation meta.
503        let invoice = {
504            let operation = federation_client
505                .operation_log()
506                .get_operation(operation_id)
507                .await
508                .ok_or(RecurringPaymentError::UnknownInvoice(operation_id))?;
509
510            if operation.operation_module_kind() != LightningClientModule::kind().as_str() {
511                return Err(RecurringPaymentError::UnknownInvoice(operation_id));
512            }
513
514            let LightningOperationMetaVariant::Receive { invoice, .. } =
515                operation.meta::<LightningOperationMeta>().variant
516            else {
517                return Err(RecurringPaymentError::UnknownInvoice(operation_id));
518            };
519
520            invoice
521        };
522
523        let ln_module = federation_client
524            .get_first_module::<LightningClientModule>()
525            .map_err(|e| {
526                warn!("No compatible lightning module found {e}");
527                RecurringPaymentError::NoLightningModuleFound
528            })?;
529
530        let mut stream = ln_module
531            .subscribe_ln_receive(operation_id)
532            .await
533            .map_err(|_| RecurringPaymentError::UnknownInvoice(operation_id))?
534            .into_stream();
535        let status = loop {
536            // Unfortunately the fedimint client doesn't track payment status internally
537            // yet, but relies on integrators to consume the update streams belonging to
538            // operations to figure out their state. Since the verify endpoint is meant to
539            // be non-blocking, we need to find a way to consume the stream until we think
540            // no immediate progress will be made anymore. That's why we limit each update
541            // step to 100ms, far more than a DB read should ever take, and abort if we'd
542            // block to wait for further progress to be made.
543            let update = timeout(Duration::from_millis(100), stream.next()).await;
544            match update {
545                // For some reason recurringd jumps right to claimed without going over funded … but
546                // either is fine to conclude the user will receive their money once they come
547                // online.
548                Ok(Some(LnReceiveState::Funded | LnReceiveState::Claimed)) => {
549                    break PaymentStatus::Paid;
550                }
551                // Keep looking for a state update indicating the invoice having been paid
552                Ok(Some(_)) => {
553                    continue;
554                }
555                // If we reach the end of the update stream without observing a state indicating the
556                // invoice having been paid there was likely some error or the invoice timed out.
557                // Either way we just show the invoice as unpaid.
558                Ok(None) | Err(_) => {
559                    break PaymentStatus::Pending;
560                }
561            }
562        };
563
564        Ok(InvoiceStatus { invoice, status })
565    }
566
567    async fn run_db_migrations(&self) {
568        let migrations = Self::migrations();
569        let schema_version: u64 = self
570            .db
571            .begin_transaction_nc()
572            .await
573            .get_value(&SchemaVersionKey)
574            .await
575            .unwrap_or_default();
576
577        for (target_schema, migration_fn) in migrations
578            .into_iter()
579            .skip_while(|(target_schema, _)| *target_schema <= schema_version)
580        {
581            let mut dbtx = self.db.begin_transaction().await;
582            dbtx.insert_entry(&SchemaVersionKey, &target_schema).await;
583
584            migration_fn(self, dbtx.to_ref_nc()).await;
585
586            dbtx.commit_tx().await;
587        }
588    }
589}
590
591async fn await_invoice_confirmed(
592    ln_module: &ClientModuleInstance<'_, LightningClientModule>,
593    operation_id: OperationId,
594) -> Result<(), RecurringPaymentError> {
595    let mut operation_updated = ln_module
596        .subscribe_ln_receive(operation_id)
597        .await?
598        .into_stream();
599
600    while let Some(update) = operation_updated.next().await {
601        if matches!(update, LnReceiveState::WaitingForPayment { .. }) {
602            return Ok(());
603        }
604    }
605
606    Err(RecurringPaymentError::Other(anyhow!(
607        "BOLT11 invoice not confirmed"
608    )))
609}
610
611#[derive(Debug, Clone, Eq, PartialEq, Hash, Encodable, Decodable)]
612pub enum PaymentCodeInvoice {
613    Bolt11(Bolt11Invoice),
614}
615
616/// Helper struct indicating if an invoice was paid. In the future it may also
617/// contain the preimage to be fully LUD-21 compliant.
618pub struct InvoiceStatus {
619    pub invoice: Bolt11Invoice,
620    pub status: PaymentStatus,
621}
622
623pub enum PaymentStatus {
624    Paid,
625    Pending,
626}
627
628impl PaymentStatus {
629    pub fn is_paid(&self) -> bool {
630        matches!(self, PaymentStatus::Paid)
631    }
632}
633
634/// The lnurl-rs crate doesn't have the `verify` field in this type and we don't
635/// use any of the other fields right now. Once we upstream the verify field
636/// this struct can be removed.
637#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
638pub struct LNURLPayInvoice {
639    pub pr: String,
640    pub verify: String,
641}
642
643fn operation_id_from_user_key(user_key: PaymentCodeRootKey, index: u64) -> OperationId {
644    let invoice_key = tweak_user_key(SECP256K1, user_key.0, index);
645    let preimage = sha256::Hash::hash(&invoice_key.serialize()[..]);
646    let payment_hash = sha256::Hash::hash(&preimage[..]);
647
648    OperationId(payment_hash.to_byte_array())
649}
650
651trait LnClientContextExt {
652    fn get_ln_module(
653        &'_ self,
654    ) -> Result<ClientModuleInstance<'_, LightningClientModule>, RecurringPaymentError>;
655}
656
657impl LnClientContextExt for ClientHandleArc {
658    fn get_ln_module(
659        &'_ self,
660    ) -> Result<ClientModuleInstance<'_, LightningClientModule>, RecurringPaymentError> {
661        self.get_first_module::<LightningClientModule>()
662            .map_err(|e| {
663                warn!("No compatible lightning module found {e}");
664                RecurringPaymentError::NoLightningModuleFound
665            })
666    }
667}