Skip to main content

fedimint_recurringd/
lib.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::anyhow;
6use fedimint_client::meta::MetaService;
7use fedimint_client::{Client, ClientHandleArc, ClientModule, ClientModuleInstance};
8use fedimint_client_module::meta::LegacyMetaSource;
9use fedimint_connectors::ConnectorRegistry;
10use fedimint_core::config::FederationId;
11use fedimint_core::core::OperationId;
12use fedimint_core::db::{
13    AutocommitResultExt, Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped,
14    IRawDatabase,
15};
16use fedimint_core::encoding::{Decodable, Encodable};
17use fedimint_core::invite_code::InviteCode;
18use fedimint_core::secp256k1::hashes::sha256;
19use fedimint_core::secp256k1::{PublicKey, SECP256K1};
20use fedimint_core::task::timeout;
21use fedimint_core::util::{FmtCompact, FmtCompactAnyhow, SafeUrl};
22use fedimint_core::{Amount, BitcoinHash, runtime};
23use fedimint_derive_secret::DerivableSecret;
24use fedimint_ln_client::common::{LightningGateway, LightningGatewayAnnouncement};
25use fedimint_ln_client::recurring::{
26    PaymentCodeId, PaymentCodeRootKey, RecurringPaymentError, RecurringPaymentProtocol,
27};
28use fedimint_ln_client::{
29    LightningClientInit, LightningClientModule, LightningOperationMeta,
30    LightningOperationMetaVariant, LnReceiveState, tweak_user_key,
31};
32use fedimint_lnurl::{PayResponse, encode_lnurl, pay_request_tag};
33use fedimint_meta_client::MetaModuleMetaSourceWithFallback;
34use fedimint_mint_client::MintClientInit;
35use futures::StreamExt;
36use lightning_invoice::{Bolt11Invoice, Bolt11InvoiceDescription, Sha256};
37use serde::{Deserialize, Serialize};
38use tokio::sync::{Notify, RwLock, watch};
39use tracing::{debug, info, warn};
40
41use crate::db::{
42    FederationDbPrefix, PaymentCodeEntry, PaymentCodeInvoiceEntry, PaymentCodeInvoiceKey,
43    PaymentCodeKey, PaymentCodeNextInvoiceIndexKey, PaymentCodeVariant, SchemaVersionKey,
44    load_federation_client_databases, open_client_db, try_add_federation_database,
45};
46
47mod db;
48
49#[derive(Clone)]
50pub struct RecurringInvoiceServer {
51    db: Database,
52    connectors: ConnectorRegistry,
53    clients: Arc<RwLock<HashMap<FederationId, ClientHandleArc>>>,
54    gateway_cache: Arc<RwLock<HashMap<FederationId, watch::Receiver<Vec<CachedGateway>>>>>,
55    invoice_generated: Arc<Notify>,
56    base_url: SafeUrl,
57}
58
59#[derive(Clone)]
60struct CachedGateway {
61    gateway: LightningGateway,
62    vetted: bool,
63}
64
65impl RecurringInvoiceServer {
66    pub async fn new(
67        connectors: ConnectorRegistry,
68        db: impl IRawDatabase + 'static,
69        base_url: SafeUrl,
70    ) -> anyhow::Result<Self> {
71        let db = Database::new(db, Default::default());
72
73        let mut clients = HashMap::<_, ClientHandleArc>::new();
74        let mut gateway_cache = HashMap::<FederationId, watch::Receiver<Vec<CachedGateway>>>::new();
75
76        for (federation_id, db) in load_federation_client_databases(&db).await {
77            let mut client_builder = Client::builder().await?;
78            client_builder.with_meta_service(recurringd_meta_service());
79            client_builder.with_module(LightningClientInit::default());
80            client_builder.with_module(MintClientInit);
81            let client = client_builder
82                .open(
83                    connectors.clone(),
84                    db,
85                    fedimint_client::RootSecret::StandardDoubleDerive(Self::default_secret()),
86                )
87                .await?;
88            let client = Arc::new(client);
89            gateway_cache.insert(
90                federation_id,
91                spawn_gateway_cache_refresh(federation_id, &client),
92            );
93            clients.insert(federation_id, client);
94        }
95
96        let slf = Self {
97            db: db.clone(),
98            clients: Arc::new(RwLock::new(clients)),
99            gateway_cache: Arc::new(RwLock::new(gateway_cache)),
100            invoice_generated: Arc::new(Default::default()),
101            base_url,
102            connectors,
103        };
104
105        slf.run_db_migrations().await;
106
107        Ok(slf)
108    }
109
110    /// We don't want to hold any money or sign anything ourselves, we only use
111    /// the client with externally supplied key material and to track
112    /// ongoing progress of other users' receives.
113    fn default_secret() -> DerivableSecret {
114        DerivableSecret::new_root(&[], &[])
115    }
116
117    pub async fn register_federation(
118        &self,
119        invite_code: &InviteCode,
120    ) -> Result<FederationId, RecurringPaymentError> {
121        let federation_id = invite_code.federation_id();
122        info!("Registering federation {}", federation_id);
123
124        // We lock to prevent parallel join attempts
125        // TODO: lock per federation
126        let mut clients = self.clients.write().await;
127        if clients.contains_key(&federation_id) {
128            return Err(RecurringPaymentError::FederationAlreadyRegistered(
129                federation_id,
130            ));
131        }
132
133        // We don't know if joining will succeed or be interrupted. We use a random DB
134        // prefix to initialize the client and only write the prefix to the DB if that
135        // succeeds. If it fails we end up with some orphaned data in the DB, if it ever
136        // becomes a problem we can clean it up later.
137        let client_db_prefix = FederationDbPrefix::random();
138        let client_db = open_client_db(&self.db, client_db_prefix);
139
140        match Self::join_federation_static(self.connectors.clone(), client_db, invite_code).await {
141            Ok(client) => {
142                try_add_federation_database(&self.db, federation_id, client_db_prefix)
143                    .await
144                    .expect("We hold a global lock, no parallel joining can happen");
145                self.gateway_cache.write().await.insert(
146                    federation_id,
147                    spawn_gateway_cache_refresh(federation_id, &client),
148                );
149                clients.insert(federation_id, client);
150                Ok(federation_id)
151            }
152            Err(e) => {
153                // TODO: clean up DB?
154                Err(e)
155            }
156        }
157    }
158
159    async fn join_federation_static(
160        connectors: ConnectorRegistry,
161        client_db: Database,
162        invite_code: &InviteCode,
163    ) -> Result<ClientHandleArc, RecurringPaymentError> {
164        let mut client_builder = Client::builder()
165            .await
166            .map_err(RecurringPaymentError::JoiningFederationFailed)?;
167
168        client_builder.with_meta_service(recurringd_meta_service());
169        client_builder.with_module(LightningClientInit::default());
170        client_builder.with_module(MintClientInit);
171
172        let client = client_builder
173            .preview(connectors, invite_code)
174            .await?
175            .join(
176                client_db,
177                fedimint_client::RootSecret::StandardDoubleDerive(Self::default_secret()),
178            )
179            .await
180            .map_err(RecurringPaymentError::JoiningFederationFailed)?;
181        Ok(Arc::new(client))
182    }
183
184    pub async fn register_recurring_payment_code(
185        &self,
186        federation_id: FederationId,
187        payment_code_root_key: PaymentCodeRootKey,
188        protocol: RecurringPaymentProtocol,
189        meta: &str,
190    ) -> Result<String, RecurringPaymentError> {
191        // TODO: support BOLT12
192        if protocol != RecurringPaymentProtocol::LNURL {
193            return Err(RecurringPaymentError::UnsupportedProtocol(protocol));
194        }
195
196        // Ensure the federation is supported
197        self.get_federation_client(federation_id).await?;
198
199        let payment_code = self.create_lnurl(payment_code_root_key.to_payment_code_id());
200        let payment_code_entry = PaymentCodeEntry {
201            root_key: payment_code_root_key,
202            federation_id,
203            protocol,
204            payment_code: payment_code.clone(),
205            variant: PaymentCodeVariant::Lnurl {
206                meta: meta.to_owned(),
207            },
208        };
209
210        let mut dbtx = self.db.begin_transaction().await;
211        if let Some(existing_code) = dbtx
212            .insert_entry(
213                &PaymentCodeKey {
214                    payment_code_id: payment_code_root_key.to_payment_code_id(),
215                },
216                &payment_code_entry,
217            )
218            .await
219        {
220            if existing_code != payment_code_entry {
221                return Err(RecurringPaymentError::PaymentCodeAlreadyExists(
222                    payment_code_root_key,
223                ));
224            }
225
226            dbtx.ignore_uncommitted();
227            return Ok(payment_code);
228        }
229
230        dbtx.insert_new_entry(
231            &PaymentCodeNextInvoiceIndexKey {
232                payment_code_id: payment_code_root_key.to_payment_code_id(),
233            },
234            &0,
235        )
236        .await;
237        dbtx.commit_tx_result().await.map_err(anyhow::Error::from)?;
238
239        Ok(payment_code)
240    }
241
242    fn create_lnurl(&self, payment_code_id: PaymentCodeId) -> String {
243        encode_lnurl(
244            &self
245                .base_url
246                .join_path(&format!("lnv1/paycodes/{payment_code_id}"))
247                .to_string(),
248        )
249    }
250
251    pub async fn lnurl_pay(
252        &self,
253        payment_code_id: PaymentCodeId,
254    ) -> Result<PayResponse, RecurringPaymentError> {
255        let payment_code = self.get_payment_code(payment_code_id).await?;
256        let PaymentCodeVariant::Lnurl { meta } = payment_code.variant;
257
258        Ok(PayResponse {
259            callback: self
260                .base_url
261                .join_path(&format!("lnv1/paycodes/{payment_code_id}/invoice"))
262                .to_string(),
263            max_sendable: 100000000000,
264            min_sendable: 1,
265            tag: pay_request_tag(),
266            metadata: meta,
267        })
268    }
269
270    pub async fn lnurl_invoice(
271        &self,
272        payment_code_id: PaymentCodeId,
273        amount: Amount,
274    ) -> Result<LNURLPayInvoice, RecurringPaymentError> {
275        let (operation_id, federation_id, invoice) =
276            self.create_bolt11_invoice(payment_code_id, amount).await?;
277        Ok(LNURLPayInvoice {
278            pr: invoice.to_string(),
279            verify: self
280                .base_url
281                .join_path(&format!(
282                    "lnv1/verify/{federation_id}/{}",
283                    operation_id.fmt_full()
284                ))
285                .to_string(),
286        })
287    }
288
289    async fn create_bolt11_invoice(
290        &self,
291        payment_code_id: PaymentCodeId,
292        amount: Amount,
293    ) -> Result<(OperationId, FederationId, Bolt11Invoice), RecurringPaymentError> {
294        // Invoices are valid for one day by default, might become dynamic with BOLT12
295        // support
296        const DEFAULT_EXPIRY_TIME: u64 = 60 * 60 * 24;
297
298        let payment_code = self.get_payment_code(payment_code_id).await?;
299
300        let federation_client = self
301            .get_federation_client(payment_code.federation_id)
302            .await?;
303
304        let gateway = self
305            .get_cached_gateway(payment_code.federation_id, amount)
306            .await?;
307
308        let (operation_id, invoice) = self
309            .db
310            .autocommit(
311                |dbtx, _| {
312                    let federation_client = federation_client.clone();
313                    let payment_code = payment_code.clone();
314                    let gateway = gateway.clone();
315                    Box::pin(async move {
316                        let mut invoice_index = self
317                            .get_next_invoice_index(&mut dbtx.to_ref_nc(), payment_code_id)
318                            .await;
319
320                        // Check if any invoice indices were already used in aborted calls to this
321                        // fn. If so:
322                        //   1. Save each previously generated invoice. We don't want to reuse it
323                        //      since it may be expired and in the future may contain call-specific
324                        //      data, but also want to allow the client to sync past it.
325                        //   2. Increment the invoice index until we find an unused one, since
326                        //      re-using an index would re-use an operation id, which is forbidden.
327                        //
328                        // A single request can only create one orphaned operation, but multiple
329                        // cancelled/restarted requests in a row can leave multiple consecutive
330                        // orphaned operations before recurringd commits its own DB state.
331                        let invoice_index = loop {
332                            let operation_id =
333                                operation_id_from_user_key(payment_code.root_key, invoice_index);
334
335                            let Some(invoice) =
336                                Self::check_if_invoice_exists(&federation_client, operation_id)
337                                    .await
338                            else {
339                                break invoice_index;
340                            };
341
342                            self.save_bolt11_invoice(
343                                dbtx,
344                                operation_id,
345                                payment_code_id,
346                                invoice_index,
347                                invoice,
348                            )
349                            .await;
350
351                            invoice_index = self
352                                .get_next_invoice_index(&mut dbtx.to_ref_nc(), payment_code_id)
353                                .await;
354                        };
355
356                        // This is where the main part starts: generate the invoice and save it to
357                        // the DB
358                        let federation_client_ln_module = federation_client.get_ln_module()?;
359
360                        let lnurl_meta = match payment_code.variant {
361                            PaymentCodeVariant::Lnurl { meta } => meta,
362                        };
363                        let meta_hash = Sha256(sha256::Hash::hash(lnurl_meta.as_bytes()));
364                        let description = Bolt11InvoiceDescription::Hash(meta_hash);
365
366                        // TODO: ideally creating the invoice would take a dbtx as argument so we
367                        // don't have to do the "check if invoice already exists" dance
368                        let (operation_id, invoice, _preimage) = federation_client_ln_module
369                            .create_bolt11_invoice_for_user_tweaked(
370                                amount,
371                                description,
372                                Some(DEFAULT_EXPIRY_TIME),
373                                payment_code.root_key.0,
374                                invoice_index,
375                                serde_json::Value::Null,
376                                Some(gateway),
377                            )
378                            .await?;
379
380                        self.save_bolt11_invoice(
381                            dbtx,
382                            operation_id,
383                            payment_code_id,
384                            invoice_index,
385                            invoice.clone(),
386                        )
387                        .await;
388
389                        Result::<_, anyhow::Error>::Ok((operation_id, invoice))
390                    })
391                },
392                None,
393            )
394            .await
395            .unwrap_autocommit()?;
396
397        await_invoice_confirmed(&federation_client.get_ln_module()?, operation_id).await?;
398
399        Ok((operation_id, federation_client.federation_id(), invoice))
400    }
401
402    async fn save_bolt11_invoice(
403        &self,
404        dbtx: &mut DatabaseTransaction<'_>,
405        operation_id: OperationId,
406        payment_code_id: PaymentCodeId,
407        invoice_index: u64,
408        invoice: Bolt11Invoice,
409    ) {
410        dbtx.insert_new_entry(
411            &PaymentCodeInvoiceKey {
412                payment_code_id,
413                index: invoice_index,
414            },
415            &PaymentCodeInvoiceEntry {
416                operation_id,
417                invoice: PaymentCodeInvoice::Bolt11(invoice.clone()),
418            },
419        )
420        .await;
421
422        let invoice_generated_notifier = self.invoice_generated.clone();
423        dbtx.on_commit(move || {
424            invoice_generated_notifier.notify_waiters();
425        });
426    }
427
428    async fn check_if_invoice_exists(
429        federation_client: &ClientHandleArc,
430        operation_id: OperationId,
431    ) -> Option<Bolt11Invoice> {
432        let operation = federation_client
433            .operation_log()
434            .get_operation(operation_id)
435            .await?;
436
437        assert_eq!(
438            operation.operation_module_kind(),
439            LightningClientModule::kind().as_str()
440        );
441
442        let LightningOperationMetaVariant::Receive { invoice, .. } =
443            operation.meta::<LightningOperationMeta>().variant
444        else {
445            panic!(
446                "Unexpected operation meta variant: {:?}",
447                operation.meta::<LightningOperationMeta>().variant
448            );
449        };
450
451        Some(invoice)
452    }
453
454    async fn get_federation_client(
455        &self,
456        federation_id: FederationId,
457    ) -> Result<ClientHandleArc, RecurringPaymentError> {
458        self.clients
459            .read()
460            .await
461            .get(&federation_id)
462            .cloned()
463            .ok_or(RecurringPaymentError::UnknownFederationId(federation_id))
464    }
465
466    async fn get_cached_gateway(
467        &self,
468        federation_id: FederationId,
469        amount: Amount,
470    ) -> Result<LightningGateway, RecurringPaymentError> {
471        const EMPTY_GATEWAY_CACHE_WAIT: Duration = Duration::from_secs(60);
472
473        let mut gateway_cache = self
474            .gateway_cache
475            .read()
476            .await
477            .get(&federation_id)
478            .cloned()
479            .ok_or(RecurringPaymentError::NoGatewayFound)?;
480
481        if let Some(gateway) = select_preferred_gateway(&gateway_cache.borrow(), amount) {
482            return Ok(gateway);
483        }
484
485        timeout(EMPTY_GATEWAY_CACHE_WAIT, async {
486            loop {
487                gateway_cache
488                    .changed()
489                    .await
490                    .map_err(|_| RecurringPaymentError::NoGatewayFound)?;
491
492                if let Some(gateway) =
493                    select_preferred_gateway(&gateway_cache.borrow_and_update(), amount)
494                {
495                    break Ok(gateway);
496                }
497            }
498        })
499        .await
500        .map_err(|_| RecurringPaymentError::NoGatewayFound)?
501    }
502
503    pub async fn await_invoice_index_generated(
504        &self,
505        payment_code_id: PaymentCodeId,
506        invoice_index: u64,
507    ) -> Result<PaymentCodeInvoiceEntry, RecurringPaymentError> {
508        self.get_payment_code(payment_code_id).await?;
509
510        let mut notified = self.invoice_generated.notified();
511        loop {
512            let mut dbtx = self.db.begin_transaction_nc().await;
513            if let Some(invoice_entry) = dbtx
514                .get_value(&PaymentCodeInvoiceKey {
515                    payment_code_id,
516                    index: invoice_index,
517                })
518                .await
519            {
520                break Ok(invoice_entry);
521            };
522
523            notified.await;
524            notified = self.invoice_generated.notified();
525        }
526    }
527
528    async fn get_next_invoice_index(
529        &self,
530        dbtx: &mut DatabaseTransaction<'_>,
531        payment_code_id: PaymentCodeId,
532    ) -> u64 {
533        let next_index = dbtx
534            .get_value(&PaymentCodeNextInvoiceIndexKey { payment_code_id })
535            .await
536            .map(|index| index + 1)
537            .unwrap_or(0);
538        dbtx.insert_entry(
539            &PaymentCodeNextInvoiceIndexKey { payment_code_id },
540            &next_index,
541        )
542        .await;
543
544        next_index
545    }
546
547    pub async fn list_federations(&self) -> Vec<FederationId> {
548        self.clients.read().await.keys().cloned().collect()
549    }
550
551    async fn get_payment_code(
552        &self,
553        payment_code_id: PaymentCodeId,
554    ) -> Result<PaymentCodeEntry, RecurringPaymentError> {
555        self.db
556            .begin_transaction_nc()
557            .await
558            .get_value(&PaymentCodeKey { payment_code_id })
559            .await
560            .ok_or(RecurringPaymentError::UnknownPaymentCode(payment_code_id))
561    }
562
563    /// Returns if an invoice has been paid yet. To avoid DB indirection and
564    /// since the URLs would be similarly long either way we identify
565    /// invoices by federation id and operation id instead of the payment
566    /// code. This function is the basis of `recurringd`'s [LUD-21]
567    /// implementation that allows clients to verify if a given invoice they
568    /// generated using the LNURL has been paid yet.
569    ///
570    /// [LUD-21]: https://github.com/lnurl/luds/blob/luds/21.md
571    pub async fn verify_invoice_paid(
572        &self,
573        federation_id: FederationId,
574        operation_id: OperationId,
575    ) -> Result<InvoiceStatus, RecurringPaymentError> {
576        let federation_client = self.get_federation_client(federation_id).await?;
577
578        // Unfortunately LUD-21 wants us to return the invoice again, so we have to
579        // fetch it from the operation meta.
580        let invoice = {
581            let operation = federation_client
582                .operation_log()
583                .get_operation(operation_id)
584                .await
585                .ok_or(RecurringPaymentError::UnknownInvoice(operation_id))?;
586
587            if operation.operation_module_kind() != LightningClientModule::kind().as_str() {
588                return Err(RecurringPaymentError::UnknownInvoice(operation_id));
589            }
590
591            let LightningOperationMetaVariant::Receive { invoice, .. } =
592                operation.meta::<LightningOperationMeta>().variant
593            else {
594                return Err(RecurringPaymentError::UnknownInvoice(operation_id));
595            };
596
597            invoice
598        };
599
600        let ln_module = federation_client
601            .get_first_module::<LightningClientModule>()
602            .map_err(|e| {
603                warn!("No compatible lightning module found {e}");
604                RecurringPaymentError::NoLightningModuleFound
605            })?;
606
607        let mut stream = ln_module
608            .subscribe_ln_receive(operation_id)
609            .await
610            .map_err(|_| RecurringPaymentError::UnknownInvoice(operation_id))?
611            .into_stream();
612        let status = loop {
613            // Unfortunately the fedimint client doesn't track payment status internally
614            // yet, but relies on integrators to consume the update streams belonging to
615            // operations to figure out their state. Since the verify endpoint is meant to
616            // be non-blocking, we need to find a way to consume the stream until we think
617            // no immediate progress will be made anymore. That's why we limit each update
618            // step to 100ms, far more than a DB read should ever take, and abort if we'd
619            // block to wait for further progress to be made.
620            let update = timeout(Duration::from_millis(100), stream.next()).await;
621            match update {
622                // For some reason recurringd jumps right to claimed without going over funded … but
623                // either is fine to conclude the user will receive their money once they come
624                // online.
625                Ok(Some(LnReceiveState::Funded | LnReceiveState::Claimed)) => {
626                    break PaymentStatus::Paid;
627                }
628                // Keep looking for a state update indicating the invoice having been paid
629                Ok(Some(_)) => {
630                    continue;
631                }
632                // If we reach the end of the update stream without observing a state indicating the
633                // invoice having been paid there was likely some error or the invoice timed out.
634                // Either way we just show the invoice as unpaid.
635                Ok(None) | Err(_) => {
636                    break PaymentStatus::Pending;
637                }
638            }
639        };
640
641        Ok(InvoiceStatus { invoice, status })
642    }
643
644    async fn run_db_migrations(&self) {
645        let migrations = Self::migrations();
646        let schema_version: u64 = self
647            .db
648            .begin_transaction_nc()
649            .await
650            .get_value(&SchemaVersionKey)
651            .await
652            .unwrap_or_default();
653
654        for (target_schema, migration_fn) in migrations
655            .into_iter()
656            .skip_while(|(target_schema, _)| *target_schema <= schema_version)
657        {
658            let mut dbtx = self.db.begin_transaction().await;
659            dbtx.insert_entry(&SchemaVersionKey, &target_schema).await;
660
661            migration_fn(self, dbtx.to_ref_nc()).await;
662
663            dbtx.commit_tx().await;
664        }
665    }
666}
667
668async fn await_invoice_confirmed(
669    ln_module: &ClientModuleInstance<'_, LightningClientModule>,
670    operation_id: OperationId,
671) -> Result<(), RecurringPaymentError> {
672    let mut operation_updated = ln_module
673        .subscribe_ln_receive(operation_id)
674        .await?
675        .into_stream();
676
677    while let Some(update) = operation_updated.next().await {
678        if matches!(update, LnReceiveState::WaitingForPayment { .. }) {
679            return Ok(());
680        }
681    }
682
683    Err(RecurringPaymentError::Other(anyhow!(
684        "BOLT11 invoice not confirmed"
685    )))
686}
687
688#[derive(Debug, Clone, Eq, PartialEq, Hash, Encodable, Decodable)]
689pub enum PaymentCodeInvoice {
690    Bolt11(Bolt11Invoice),
691}
692
693/// Helper struct indicating if an invoice was paid. In the future it may also
694/// contain the preimage to be fully LUD-21 compliant.
695pub struct InvoiceStatus {
696    pub invoice: Bolt11Invoice,
697    pub status: PaymentStatus,
698}
699
700pub enum PaymentStatus {
701    Paid,
702    Pending,
703}
704
705impl PaymentStatus {
706    pub fn is_paid(&self) -> bool {
707        matches!(self, PaymentStatus::Paid)
708    }
709}
710
711/// The lnurl-rs crate doesn't have the `verify` field in this type and we don't
712/// use any of the other fields right now. Once we upstream the verify field
713/// this struct can be removed.
714#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
715pub struct LNURLPayInvoice {
716    pub pr: String,
717    pub verify: String,
718}
719
720fn operation_id_from_user_key(user_key: PaymentCodeRootKey, index: u64) -> OperationId {
721    let invoice_key = tweak_user_key(SECP256K1, user_key.0, index);
722    let preimage = sha256::Hash::hash(&invoice_key.serialize()[..]);
723    let payment_hash = sha256::Hash::hash(&preimage[..]);
724
725    OperationId(payment_hash.to_byte_array())
726}
727
728trait LnClientContextExt {
729    fn get_ln_module(
730        &'_ self,
731    ) -> Result<ClientModuleInstance<'_, LightningClientModule>, RecurringPaymentError>;
732}
733
734impl LnClientContextExt for ClientHandleArc {
735    fn get_ln_module(
736        &'_ self,
737    ) -> Result<ClientModuleInstance<'_, LightningClientModule>, RecurringPaymentError> {
738        self.get_first_module::<LightningClientModule>()
739            .map_err(|e| {
740                warn!("No compatible lightning module found {e}");
741                RecurringPaymentError::NoLightningModuleFound
742            })
743    }
744}
745
746fn recurringd_meta_service() -> Arc<MetaService> {
747    MetaService::new(MetaModuleMetaSourceWithFallback::<LegacyMetaSource>::default())
748}
749
750fn spawn_gateway_cache_refresh(
751    federation_id: FederationId,
752    client: &ClientHandleArc,
753) -> watch::Receiver<Vec<CachedGateway>> {
754    const REFRESH_INTERVAL: Duration = Duration::from_secs(10);
755
756    let (gateway_cache_sender, gateway_cache_receiver) = watch::channel(Vec::new());
757    let task_group = client.task_group().clone();
758    let client = client.clone();
759    task_group.spawn_cancellable("recurringd-gateway-cache-refresh", async move {
760        loop {
761            match select_available_gateways(&client).await {
762                Ok(gateways) => {
763                    gateway_cache_sender.send_replace(gateways);
764                }
765                Err(err) => {
766                    warn!(
767                        federation_id = %federation_id,
768                        err = %err.fmt_compact(),
769                        "Failed to refresh recurringd gateway cache"
770                    );
771                }
772            }
773
774            runtime::sleep(REFRESH_INTERVAL).await;
775        }
776    });
777
778    gateway_cache_receiver
779}
780
781async fn select_available_gateways(
782    client: &ClientHandleArc,
783) -> Result<Vec<CachedGateway>, RecurringPaymentError> {
784    let ln_module = client.get_ln_module()?;
785    ln_module.update_gateway_cache().await.map_err(|err| {
786        warn!(
787            err = %err.fmt_compact_anyhow(),
788            "Failed to refresh gateway announcements"
789        );
790        RecurringPaymentError::NoGatewayFound
791    })?;
792
793    let mut gateways = ln_module.list_gateways().await;
794    if gateways.is_empty() {
795        return Err(RecurringPaymentError::NoGatewayFound);
796    }
797
798    let vetted_gateway_ids = fetch_vetted_gateway_ids(client).await;
799    sort_gateways_by_preference(&mut gateways, &vetted_gateway_ids);
800
801    let mut available_gateways = Vec::new();
802    for gateway in gateways {
803        let gateway_id = gateway.info.gateway_id;
804        let vetted = gateway.vetted || vetted_gateway_ids.contains(&gateway_id);
805        match ln_module
806            .select_available_gateway(Some(gateway.info), None)
807            .await
808        {
809            Ok(gateway) => available_gateways.push(CachedGateway { gateway, vetted }),
810            Err(err) => {
811                debug!(
812                    gateway_id = %gateway_id,
813                    err = %err.fmt_compact_anyhow(),
814                    "Gateway failed availability check"
815                );
816            }
817        }
818    }
819
820    if available_gateways.is_empty() {
821        return Err(RecurringPaymentError::NoGatewayFound);
822    }
823
824    Ok(available_gateways)
825}
826
827fn select_preferred_gateway(
828    gateways: &[CachedGateway],
829    amount: Amount,
830) -> Option<LightningGateway> {
831    gateways
832        .iter()
833        .min_by_key(|gateway| {
834            (
835                !gateway.vetted,
836                gateway_fee_msat(&gateway.gateway, amount),
837                gateway.gateway.gateway_id.serialize(),
838            )
839        })
840        .map(|gateway| gateway.gateway.clone())
841}
842
843fn gateway_fee_msat(gateway: &LightningGateway, amount: Amount) -> u64 {
844    let proportional_fee =
845        (u128::from(amount.msats) * u128::from(gateway.fees.proportional_millionths)) / 1_000_000;
846
847    u64::from(gateway.fees.base_msat)
848        .saturating_add(u64::try_from(proportional_fee).unwrap_or(u64::MAX))
849}
850
851fn sort_gateways_by_preference(
852    gateways: &mut [LightningGatewayAnnouncement],
853    vetted_gateway_ids: &HashSet<PublicKey>,
854) {
855    gateways.sort_by_cached_key(|gateway| {
856        let vetted = gateway.vetted || vetted_gateway_ids.contains(&gateway.info.gateway_id);
857        (
858            !vetted,
859            u64::from(gateway.info.fees.base_msat),
860            gateway.info.gateway_id.serialize(),
861        )
862    });
863}
864
865async fn fetch_vetted_gateway_ids(client: &ClientHandleArc) -> HashSet<PublicKey> {
866    let Some(vetted_gateways) = client
867        .meta_service()
868        .entries(client.db())
869        .await
870        .and_then(|entries| entries.get("vetted_gateways").cloned())
871        .and_then(|value| parse_vetted_gateway_ids(&value))
872    else {
873        debug!("No vetted gateways configured in federation metadata");
874        return HashSet::new();
875    };
876
877    vetted_gateways
878        .into_iter()
879        .filter_map(|gateway_id| match gateway_id.parse::<PublicKey>() {
880            Ok(gateway_id) => Some(gateway_id),
881            Err(err) => {
882                warn!(
883                    %gateway_id,
884                    err = %err.fmt_compact(),
885                    "Failed to parse vetted gateway ID"
886                );
887                None
888            }
889        })
890        .collect()
891}
892
893fn parse_vetted_gateway_ids(value: &serde_json::Value) -> Option<Vec<String>> {
894    if let Ok(gateway_ids) = serde_json::from_value::<Vec<String>>(value.clone()) {
895        return Some(gateway_ids);
896    }
897
898    let value = value.as_str()?;
899
900    // The canonical metadata format is a JSON array of gateway ID strings. Older
901    // configs may have stored that JSON array as a string.
902    match serde_json::from_str::<Vec<String>>(value) {
903        Ok(gateway_ids) => {
904            warn!("vetted_gateways metadata should be configured as a JSON array, not a string");
905            Some(gateway_ids)
906        }
907        Err(_) => None,
908    }
909}