fedimint_recurringd/
lib.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::anyhow;
6use fedimint_api_client::api::net::ConnectorType;
7use fedimint_client::{Client, ClientHandleArc, ClientModule, ClientModuleInstance};
8use fedimint_connectors::ConnectorRegistry;
9use fedimint_core::config::FederationId;
10use fedimint_core::core::OperationId;
11use fedimint_core::db::{
12    AutocommitResultExt, Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped,
13    IRawDatabase,
14};
15use fedimint_core::encoding::{Decodable, Encodable};
16use fedimint_core::invite_code::InviteCode;
17use fedimint_core::secp256k1::SECP256K1;
18use fedimint_core::secp256k1::hashes::sha256;
19use fedimint_core::task::timeout;
20use fedimint_core::util::SafeUrl;
21use fedimint_core::{Amount, BitcoinHash};
22use fedimint_derive_secret::DerivableSecret;
23use fedimint_ln_client::recurring::{
24    PaymentCodeId, PaymentCodeRootKey, RecurringPaymentError, RecurringPaymentProtocol,
25};
26use fedimint_ln_client::{
27    LightningClientInit, LightningClientModule, LightningOperationMeta,
28    LightningOperationMetaVariant, LnReceiveState, tweak_user_key,
29};
30use fedimint_mint_client::MintClientInit;
31use futures::StreamExt;
32use lightning_invoice::{Bolt11Invoice, Bolt11InvoiceDescription, Sha256};
33use lnurl::Tag;
34use lnurl::lnurl::LnUrl;
35use lnurl::pay::PayResponse;
36use serde::{Deserialize, Serialize};
37use tokio::sync::{Notify, RwLock};
38use tracing::{info, warn};
39
40use crate::db::{
41    FederationDbPrefix, PaymentCodeEntry, PaymentCodeInvoiceEntry, PaymentCodeInvoiceKey,
42    PaymentCodeKey, PaymentCodeNextInvoiceIndexKey, PaymentCodeVariant, SchemaVersionKey,
43    load_federation_client_databases, open_client_db, try_add_federation_database,
44};
45
46mod db;
47
48#[derive(Clone)]
49pub struct RecurringInvoiceServer {
50    db: Database,
51    connectors: ConnectorRegistry,
52    clients: Arc<RwLock<HashMap<FederationId, ClientHandleArc>>>,
53    invoice_generated: Arc<Notify>,
54    base_url: SafeUrl,
55}
56
57impl RecurringInvoiceServer {
58    pub async fn new(
59        connectors: ConnectorRegistry,
60        db: impl IRawDatabase + 'static,
61        base_url: SafeUrl,
62    ) -> anyhow::Result<Self> {
63        let db = Database::new(db, Default::default());
64
65        let mut clients = HashMap::<_, ClientHandleArc>::new();
66
67        for (federation_id, db) in load_federation_client_databases(&db).await {
68            let mut client_builder = Client::builder().await?;
69            client_builder.with_module(LightningClientInit::default());
70            client_builder.with_module(MintClientInit);
71            let client = client_builder
72                .open(
73                    connectors.clone(),
74                    db,
75                    fedimint_client::RootSecret::StandardDoubleDerive(Self::default_secret()),
76                )
77                .await?;
78            clients.insert(federation_id, Arc::new(client));
79        }
80
81        let slf = Self {
82            db: db.clone(),
83            clients: Arc::new(RwLock::new(clients)),
84            invoice_generated: Arc::new(Default::default()),
85            base_url,
86            connectors,
87        };
88
89        slf.run_db_migrations().await;
90
91        Ok(slf)
92    }
93
94    /// We don't want to hold any money or sign anything ourselves, we only use
95    /// the client with externally supplied key material and to track
96    /// ongoing progress of other users' receives.
97    fn default_secret() -> DerivableSecret {
98        DerivableSecret::new_root(&[], &[])
99    }
100
101    pub async fn register_federation(
102        &self,
103        invite_code: &InviteCode,
104    ) -> Result<FederationId, RecurringPaymentError> {
105        let federation_id = invite_code.federation_id();
106        info!("Registering federation {}", federation_id);
107
108        // We lock to prevent parallel join attempts
109        // TODO: lock per federation
110        let mut clients = self.clients.write().await;
111        if clients.contains_key(&federation_id) {
112            return Err(RecurringPaymentError::FederationAlreadyRegistered(
113                federation_id,
114            ));
115        }
116
117        // We don't know if joining will succeed or be interrupted. We use a random DB
118        // prefix to initialize the client and only write the prefix to the DB if that
119        // succeeds. If it fails we end up with some orphaned data in the DB, if it ever
120        // becomes a problem we can clean it up later.
121        let client_db_prefix = FederationDbPrefix::random();
122        let client_db = open_client_db(&self.db, client_db_prefix);
123
124        match Self::join_federation_static(self.connectors.clone(), client_db, invite_code).await {
125            Ok(client) => {
126                try_add_federation_database(&self.db, federation_id, client_db_prefix)
127                    .await
128                    .expect("We hold a global lock, no parallel joining can happen");
129                clients.insert(federation_id, client);
130                Ok(federation_id)
131            }
132            Err(e) => {
133                // TODO: clean up DB?
134                Err(e)
135            }
136        }
137    }
138
139    async fn join_federation_static(
140        connectors: ConnectorRegistry,
141        client_db: Database,
142        invite_code: &InviteCode,
143    ) -> Result<ClientHandleArc, RecurringPaymentError> {
144        let mut client_builder = Client::builder()
145            .await
146            .map_err(RecurringPaymentError::JoiningFederationFailed)?;
147
148        client_builder.with_connector(ConnectorType::default());
149        client_builder.with_module(LightningClientInit::default());
150        client_builder.with_module(MintClientInit);
151
152        let client = client_builder
153            .preview(connectors, invite_code)
154            .await?
155            .join(
156                client_db,
157                fedimint_client::RootSecret::StandardDoubleDerive(Self::default_secret()),
158            )
159            .await
160            .map_err(RecurringPaymentError::JoiningFederationFailed)?;
161        Ok(Arc::new(client))
162    }
163
164    pub async fn register_recurring_payment_code(
165        &self,
166        federation_id: FederationId,
167        payment_code_root_key: PaymentCodeRootKey,
168        protocol: RecurringPaymentProtocol,
169        meta: &str,
170    ) -> Result<String, RecurringPaymentError> {
171        // TODO: support BOLT12
172        if protocol != RecurringPaymentProtocol::LNURL {
173            return Err(RecurringPaymentError::UnsupportedProtocol(protocol));
174        }
175
176        // Ensure the federation is supported
177        self.get_federation_client(federation_id).await?;
178
179        let payment_code = self.create_lnurl(payment_code_root_key.to_payment_code_id());
180        let payment_code_entry = PaymentCodeEntry {
181            root_key: payment_code_root_key,
182            federation_id,
183            protocol,
184            payment_code: payment_code.clone(),
185            variant: PaymentCodeVariant::Lnurl {
186                meta: meta.to_owned(),
187            },
188        };
189
190        let mut dbtx = self.db.begin_transaction().await;
191        if let Some(existing_code) = dbtx
192            .insert_entry(
193                &PaymentCodeKey {
194                    payment_code_id: payment_code_root_key.to_payment_code_id(),
195                },
196                &payment_code_entry,
197            )
198            .await
199        {
200            if existing_code != payment_code_entry {
201                return Err(RecurringPaymentError::PaymentCodeAlreadyExists(
202                    payment_code_root_key,
203                ));
204            }
205
206            dbtx.ignore_uncommitted();
207            return Ok(payment_code);
208        }
209
210        dbtx.insert_new_entry(
211            &PaymentCodeNextInvoiceIndexKey {
212                payment_code_id: payment_code_root_key.to_payment_code_id(),
213            },
214            &0,
215        )
216        .await;
217        dbtx.commit_tx_result().await?;
218
219        Ok(payment_code)
220    }
221
222    fn create_lnurl(&self, payment_code_id: PaymentCodeId) -> String {
223        let lnurl = LnUrl::from_url(format!(
224            "{}lnv1/paycodes/{}",
225            self.base_url, payment_code_id
226        ));
227        lnurl.encode()
228    }
229
230    pub async fn lnurl_pay(
231        &self,
232        payment_code_id: PaymentCodeId,
233    ) -> Result<PayResponse, RecurringPaymentError> {
234        let payment_code = self.get_payment_code(payment_code_id).await?;
235        let PaymentCodeVariant::Lnurl { meta } = payment_code.variant;
236
237        Ok(PayResponse {
238            callback: format!("{}lnv1/paycodes/{}/invoice", self.base_url, payment_code_id),
239            max_sendable: 100000000000,
240            min_sendable: 1,
241            tag: Tag::PayRequest,
242            metadata: meta,
243            comment_allowed: None,
244            allows_nostr: None,
245            nostr_pubkey: None,
246        })
247    }
248
249    pub async fn lnurl_invoice(
250        &self,
251        payment_code_id: PaymentCodeId,
252        amount: Amount,
253    ) -> Result<LNURLPayInvoice, RecurringPaymentError> {
254        let (operation_id, federation_id, invoice) =
255            self.create_bolt11_invoice(payment_code_id, amount).await?;
256        Ok(LNURLPayInvoice {
257            pr: invoice.to_string(),
258            verify: format!(
259                "{}lnv1/verify/{}/{}",
260                self.base_url,
261                federation_id,
262                operation_id.fmt_full()
263            ),
264        })
265    }
266
267    async fn create_bolt11_invoice(
268        &self,
269        payment_code_id: PaymentCodeId,
270        amount: Amount,
271    ) -> Result<(OperationId, FederationId, Bolt11Invoice), RecurringPaymentError> {
272        // Invoices are valid for one day by default, might become dynamic with BOLT12
273        // support
274        const DEFAULT_EXPIRY_TIME: u64 = 60 * 60 * 24;
275
276        let payment_code = self.get_payment_code(payment_code_id).await?;
277
278        let federation_client = self
279            .get_federation_client(payment_code.federation_id)
280            .await?;
281
282        let (operation_id, invoice) = self
283            .db
284            .autocommit(
285                |dbtx, _| {
286                    let federation_client = federation_client.clone();
287                    let payment_code = payment_code.clone();
288                    Box::pin(async move {
289                        let invoice_index = self
290                            .get_next_invoice_index(&mut dbtx.to_ref_nc(), payment_code_id)
291                            .await;
292
293                        // Check if the invoice index was already used in an aborted call to this
294                        // fn. If so:
295                        //   1. Save the previously generated invoice. We don't want to reuse it
296                        //      since it may be expired and in the future may contain call-specific
297                        //      data, but also want to allow the client to sync past it.
298                        //   2. Increment the invoice index to generate a new invoice since re-using
299                        //      the same index wouldn't work (operation id reuse is forbidden).
300                        let initial_operation_id =
301                            operation_id_from_user_key(payment_code.root_key, invoice_index);
302                        let invoice_index = if let Some(invoice) =
303                            Self::check_if_invoice_exists(&federation_client, initial_operation_id)
304                                .await
305                        {
306                            self.save_bolt11_invoice(
307                                dbtx,
308                                initial_operation_id,
309                                payment_code_id,
310                                invoice_index,
311                                invoice,
312                            )
313                            .await;
314                            self.get_next_invoice_index(&mut dbtx.to_ref_nc(), payment_code_id)
315                                .await
316                        } else {
317                            invoice_index
318                        };
319
320                        // This is where the main part starts: generate the invoice and save it to
321                        // the DB
322                        let federation_client_ln_module = federation_client.get_ln_module()?;
323                        let gateway = federation_client_ln_module
324                            .get_gateway(None, false)
325                            .await?
326                            .ok_or(RecurringPaymentError::NoGatewayFound)?;
327
328                        let lnurl_meta = match payment_code.variant {
329                            PaymentCodeVariant::Lnurl { meta } => meta,
330                        };
331                        let meta_hash = Sha256(sha256::Hash::hash(lnurl_meta.as_bytes()));
332                        let description = Bolt11InvoiceDescription::Hash(meta_hash);
333
334                        // TODO: ideally creating the invoice would take a dbtx as argument so we
335                        // don't have to do the "check if invoice already exists" dance
336                        let (operation_id, invoice, _preimage) = federation_client_ln_module
337                            .create_bolt11_invoice_for_user_tweaked(
338                                amount,
339                                description,
340                                Some(DEFAULT_EXPIRY_TIME),
341                                payment_code.root_key.0,
342                                invoice_index,
343                                serde_json::Value::Null,
344                                Some(gateway),
345                            )
346                            .await?;
347
348                        self.save_bolt11_invoice(
349                            dbtx,
350                            operation_id,
351                            payment_code_id,
352                            invoice_index,
353                            invoice.clone(),
354                        )
355                        .await;
356
357                        Result::<_, anyhow::Error>::Ok((operation_id, invoice))
358                    })
359                },
360                None,
361            )
362            .await
363            .unwrap_autocommit()?;
364
365        await_invoice_confirmed(&federation_client.get_ln_module()?, operation_id).await?;
366
367        Ok((operation_id, federation_client.federation_id(), invoice))
368    }
369
370    async fn save_bolt11_invoice(
371        &self,
372        dbtx: &mut DatabaseTransaction<'_>,
373        operation_id: OperationId,
374        payment_code_id: PaymentCodeId,
375        invoice_index: u64,
376        invoice: Bolt11Invoice,
377    ) {
378        dbtx.insert_new_entry(
379            &PaymentCodeInvoiceKey {
380                payment_code_id,
381                index: invoice_index,
382            },
383            &PaymentCodeInvoiceEntry {
384                operation_id,
385                invoice: PaymentCodeInvoice::Bolt11(invoice.clone()),
386            },
387        )
388        .await;
389
390        let invoice_generated_notifier = self.invoice_generated.clone();
391        dbtx.on_commit(move || {
392            invoice_generated_notifier.notify_waiters();
393        });
394    }
395
396    async fn check_if_invoice_exists(
397        federation_client: &ClientHandleArc,
398        operation_id: OperationId,
399    ) -> Option<Bolt11Invoice> {
400        let operation = federation_client
401            .operation_log()
402            .get_operation(operation_id)
403            .await?;
404
405        assert_eq!(
406            operation.operation_module_kind(),
407            LightningClientModule::kind().as_str()
408        );
409
410        let LightningOperationMetaVariant::Receive { invoice, .. } =
411            operation.meta::<LightningOperationMeta>().variant
412        else {
413            panic!(
414                "Unexpected operation meta variant: {:?}",
415                operation.meta::<LightningOperationMeta>().variant
416            );
417        };
418
419        Some(invoice)
420    }
421
422    async fn get_federation_client(
423        &self,
424        federation_id: FederationId,
425    ) -> Result<ClientHandleArc, RecurringPaymentError> {
426        self.clients
427            .read()
428            .await
429            .get(&federation_id)
430            .cloned()
431            .ok_or(RecurringPaymentError::UnknownFederationId(federation_id))
432    }
433
434    pub async fn await_invoice_index_generated(
435        &self,
436        payment_code_id: PaymentCodeId,
437        invoice_index: u64,
438    ) -> Result<PaymentCodeInvoiceEntry, RecurringPaymentError> {
439        self.get_payment_code(payment_code_id).await?;
440
441        let mut notified = self.invoice_generated.notified();
442        loop {
443            let mut dbtx = self.db.begin_transaction_nc().await;
444            if let Some(invoice_entry) = dbtx
445                .get_value(&PaymentCodeInvoiceKey {
446                    payment_code_id,
447                    index: invoice_index,
448                })
449                .await
450            {
451                break Ok(invoice_entry);
452            };
453
454            notified.await;
455            notified = self.invoice_generated.notified();
456        }
457    }
458
459    async fn get_next_invoice_index(
460        &self,
461        dbtx: &mut DatabaseTransaction<'_>,
462        payment_code_id: PaymentCodeId,
463    ) -> u64 {
464        let next_index = dbtx
465            .get_value(&PaymentCodeNextInvoiceIndexKey { payment_code_id })
466            .await
467            .map(|index| index + 1)
468            .unwrap_or(0);
469        dbtx.insert_entry(
470            &PaymentCodeNextInvoiceIndexKey { payment_code_id },
471            &next_index,
472        )
473        .await;
474
475        next_index
476    }
477
478    pub async fn list_federations(&self) -> Vec<FederationId> {
479        self.clients.read().await.keys().cloned().collect()
480    }
481
482    async fn get_payment_code(
483        &self,
484        payment_code_id: PaymentCodeId,
485    ) -> Result<PaymentCodeEntry, RecurringPaymentError> {
486        self.db
487            .begin_transaction_nc()
488            .await
489            .get_value(&PaymentCodeKey { payment_code_id })
490            .await
491            .ok_or(RecurringPaymentError::UnknownPaymentCode(payment_code_id))
492    }
493
494    /// Returns if an invoice has been paid yet. To avoid DB indirection and
495    /// since the URLs would be similarly long either way we identify
496    /// invoices by federation id and operation id instead of the payment
497    /// code. This function is the basis of `recurringd`'s [LUD-21]
498    /// implementation that allows clients to verify if a given invoice they
499    /// generated using the LNURL has been paid yet.
500    ///
501    /// [LUD-21]: https://github.com/lnurl/luds/blob/luds/21.md
502    pub async fn verify_invoice_paid(
503        &self,
504        federation_id: FederationId,
505        operation_id: OperationId,
506    ) -> Result<InvoiceStatus, RecurringPaymentError> {
507        let federation_client = self.get_federation_client(federation_id).await?;
508
509        // Unfortunately LUD-21 wants us to return the invoice again, so we have to
510        // fetch it from the operation meta.
511        let invoice = {
512            let operation = federation_client
513                .operation_log()
514                .get_operation(operation_id)
515                .await
516                .ok_or(RecurringPaymentError::UnknownInvoice(operation_id))?;
517
518            if operation.operation_module_kind() != LightningClientModule::kind().as_str() {
519                return Err(RecurringPaymentError::UnknownInvoice(operation_id));
520            }
521
522            let LightningOperationMetaVariant::Receive { invoice, .. } =
523                operation.meta::<LightningOperationMeta>().variant
524            else {
525                return Err(RecurringPaymentError::UnknownInvoice(operation_id));
526            };
527
528            invoice
529        };
530
531        let ln_module = federation_client
532            .get_first_module::<LightningClientModule>()
533            .map_err(|e| {
534                warn!("No compatible lightning module found {e}");
535                RecurringPaymentError::NoLightningModuleFound
536            })?;
537
538        let mut stream = ln_module
539            .subscribe_ln_receive(operation_id)
540            .await
541            .map_err(|_| RecurringPaymentError::UnknownInvoice(operation_id))?
542            .into_stream();
543        let status = loop {
544            // Unfortunately the fedimint client doesn't track payment status internally
545            // yet, but relies on integrators to consume the update streams belonging to
546            // operations to figure out their state. Since the verify endpoint is meant to
547            // be non-blocking, we need to find a way to consume the stream until we think
548            // no immediate progress will be made anymore. That's why we limit each update
549            // step to 100ms, far more than a DB read should ever take, and abort if we'd
550            // block to wait for further progress to be made.
551            let update = timeout(Duration::from_millis(100), stream.next()).await;
552            match update {
553                // For some reason recurringd jumps right to claimed without going over funded … but
554                // either is fine to conclude the user will receive their money once they come
555                // online.
556                Ok(Some(LnReceiveState::Funded | LnReceiveState::Claimed)) => {
557                    break PaymentStatus::Paid;
558                }
559                // Keep looking for a state update indicating the invoice having been paid
560                Ok(Some(_)) => {
561                    continue;
562                }
563                // If we reach the end of the update stream without observing a state indicating the
564                // invoice having been paid there was likely some error or the invoice timed out.
565                // Either way we just show the invoice as unpaid.
566                Ok(None) | Err(_) => {
567                    break PaymentStatus::Pending;
568                }
569            }
570        };
571
572        Ok(InvoiceStatus { invoice, status })
573    }
574
575    async fn run_db_migrations(&self) {
576        let migrations = Self::migrations();
577        let schema_version: u64 = self
578            .db
579            .begin_transaction_nc()
580            .await
581            .get_value(&SchemaVersionKey)
582            .await
583            .unwrap_or_default();
584
585        for (target_schema, migration_fn) in migrations
586            .into_iter()
587            .skip_while(|(target_schema, _)| *target_schema <= schema_version)
588        {
589            let mut dbtx = self.db.begin_transaction().await;
590            dbtx.insert_entry(&SchemaVersionKey, &target_schema).await;
591
592            migration_fn(self, dbtx.to_ref_nc()).await;
593
594            dbtx.commit_tx().await;
595        }
596    }
597}
598
599async fn await_invoice_confirmed(
600    ln_module: &ClientModuleInstance<'_, LightningClientModule>,
601    operation_id: OperationId,
602) -> Result<(), RecurringPaymentError> {
603    let mut operation_updated = ln_module
604        .subscribe_ln_receive(operation_id)
605        .await?
606        .into_stream();
607
608    while let Some(update) = operation_updated.next().await {
609        if matches!(update, LnReceiveState::WaitingForPayment { .. }) {
610            return Ok(());
611        }
612    }
613
614    Err(RecurringPaymentError::Other(anyhow!(
615        "BOLT11 invoice not confirmed"
616    )))
617}
618
619#[derive(Debug, Clone, Eq, PartialEq, Hash, Encodable, Decodable)]
620pub enum PaymentCodeInvoice {
621    Bolt11(Bolt11Invoice),
622}
623
624/// Helper struct indicating if an invoice was paid. In the future it may also
625/// contain the preimage to be fully LUD-21 compliant.
626pub struct InvoiceStatus {
627    pub invoice: Bolt11Invoice,
628    pub status: PaymentStatus,
629}
630
631pub enum PaymentStatus {
632    Paid,
633    Pending,
634}
635
636impl PaymentStatus {
637    pub fn is_paid(&self) -> bool {
638        matches!(self, PaymentStatus::Paid)
639    }
640}
641
642/// The lnurl-rs crate doesn't have the `verify` field in this type and we don't
643/// use any of the other fields right now. Once we upstream the verify field
644/// this struct can be removed.
645#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
646pub struct LNURLPayInvoice {
647    pub pr: String,
648    pub verify: String,
649}
650
651fn operation_id_from_user_key(user_key: PaymentCodeRootKey, index: u64) -> OperationId {
652    let invoice_key = tweak_user_key(SECP256K1, user_key.0, index);
653    let preimage = sha256::Hash::hash(&invoice_key.serialize()[..]);
654    let payment_hash = sha256::Hash::hash(&preimage[..]);
655
656    OperationId(payment_hash.to_byte_array())
657}
658
659trait LnClientContextExt {
660    fn get_ln_module(
661        &'_ self,
662    ) -> Result<ClientModuleInstance<'_, LightningClientModule>, RecurringPaymentError>;
663}
664
665impl LnClientContextExt for ClientHandleArc {
666    fn get_ln_module(
667        &'_ self,
668    ) -> Result<ClientModuleInstance<'_, LightningClientModule>, RecurringPaymentError> {
669        self.get_first_module::<LightningClientModule>()
670            .map_err(|e| {
671                warn!("No compatible lightning module found {e}");
672                RecurringPaymentError::NoLightningModuleFound
673            })
674    }
675}