1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::anyhow;
6use fedimint_client::meta::MetaService;
7use fedimint_client::{Client, ClientHandleArc, ClientModule, ClientModuleInstance};
8use fedimint_client_module::meta::LegacyMetaSource;
9use fedimint_connectors::ConnectorRegistry;
10use fedimint_core::config::FederationId;
11use fedimint_core::core::OperationId;
12use fedimint_core::db::{
13 AutocommitResultExt, Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped,
14 IRawDatabase,
15};
16use fedimint_core::encoding::{Decodable, Encodable};
17use fedimint_core::invite_code::InviteCode;
18use fedimint_core::secp256k1::hashes::sha256;
19use fedimint_core::secp256k1::{PublicKey, SECP256K1};
20use fedimint_core::task::timeout;
21use fedimint_core::util::{FmtCompact, FmtCompactAnyhow, SafeUrl};
22use fedimint_core::{Amount, BitcoinHash, runtime};
23use fedimint_derive_secret::DerivableSecret;
24use fedimint_ln_client::common::{LightningGateway, LightningGatewayAnnouncement};
25use fedimint_ln_client::recurring::{
26 PaymentCodeId, PaymentCodeRootKey, RecurringPaymentError, RecurringPaymentProtocol,
27};
28use fedimint_ln_client::{
29 LightningClientInit, LightningClientModule, LightningOperationMeta,
30 LightningOperationMetaVariant, LnReceiveState, tweak_user_key,
31};
32use fedimint_lnurl::{PayResponse, encode_lnurl, pay_request_tag};
33use fedimint_meta_client::MetaModuleMetaSourceWithFallback;
34use fedimint_mint_client::MintClientInit;
35use futures::StreamExt;
36use lightning_invoice::{Bolt11Invoice, Bolt11InvoiceDescription, Sha256};
37use serde::{Deserialize, Serialize};
38use tokio::sync::{Notify, RwLock, watch};
39use tracing::{debug, info, warn};
40
41use crate::db::{
42 FederationDbPrefix, PaymentCodeEntry, PaymentCodeInvoiceEntry, PaymentCodeInvoiceKey,
43 PaymentCodeKey, PaymentCodeNextInvoiceIndexKey, PaymentCodeVariant, SchemaVersionKey,
44 load_federation_client_databases, open_client_db, try_add_federation_database,
45};
46
47mod db;
48
49#[derive(Clone)]
50pub struct RecurringInvoiceServer {
51 db: Database,
52 connectors: ConnectorRegistry,
53 clients: Arc<RwLock<HashMap<FederationId, ClientHandleArc>>>,
54 gateway_cache: Arc<RwLock<HashMap<FederationId, watch::Receiver<Vec<CachedGateway>>>>>,
55 invoice_generated: Arc<Notify>,
56 base_url: SafeUrl,
57}
58
59#[derive(Clone)]
60struct CachedGateway {
61 gateway: LightningGateway,
62 vetted: bool,
63}
64
65impl RecurringInvoiceServer {
66 pub async fn new(
67 connectors: ConnectorRegistry,
68 db: impl IRawDatabase + 'static,
69 base_url: SafeUrl,
70 ) -> anyhow::Result<Self> {
71 let db = Database::new(db, Default::default());
72
73 let mut clients = HashMap::<_, ClientHandleArc>::new();
74 let mut gateway_cache = HashMap::<FederationId, watch::Receiver<Vec<CachedGateway>>>::new();
75
76 for (federation_id, db) in load_federation_client_databases(&db).await {
77 let mut client_builder = Client::builder().await?;
78 client_builder.with_meta_service(recurringd_meta_service());
79 client_builder.with_module(LightningClientInit::default());
80 client_builder.with_module(MintClientInit);
81 let client = client_builder
82 .open(
83 connectors.clone(),
84 db,
85 fedimint_client::RootSecret::StandardDoubleDerive(Self::default_secret()),
86 )
87 .await?;
88 let client = Arc::new(client);
89 gateway_cache.insert(
90 federation_id,
91 spawn_gateway_cache_refresh(federation_id, &client),
92 );
93 clients.insert(federation_id, client);
94 }
95
96 let slf = Self {
97 db: db.clone(),
98 clients: Arc::new(RwLock::new(clients)),
99 gateway_cache: Arc::new(RwLock::new(gateway_cache)),
100 invoice_generated: Arc::new(Default::default()),
101 base_url,
102 connectors,
103 };
104
105 slf.run_db_migrations().await;
106
107 Ok(slf)
108 }
109
110 fn default_secret() -> DerivableSecret {
114 DerivableSecret::new_root(&[], &[])
115 }
116
117 pub async fn register_federation(
118 &self,
119 invite_code: &InviteCode,
120 ) -> Result<FederationId, RecurringPaymentError> {
121 let federation_id = invite_code.federation_id();
122 info!("Registering federation {}", federation_id);
123
124 let mut clients = self.clients.write().await;
127 if clients.contains_key(&federation_id) {
128 return Err(RecurringPaymentError::FederationAlreadyRegistered(
129 federation_id,
130 ));
131 }
132
133 let client_db_prefix = FederationDbPrefix::random();
138 let client_db = open_client_db(&self.db, client_db_prefix);
139
140 match Self::join_federation_static(self.connectors.clone(), client_db, invite_code).await {
141 Ok(client) => {
142 try_add_federation_database(&self.db, federation_id, client_db_prefix)
143 .await
144 .expect("We hold a global lock, no parallel joining can happen");
145 self.gateway_cache.write().await.insert(
146 federation_id,
147 spawn_gateway_cache_refresh(federation_id, &client),
148 );
149 clients.insert(federation_id, client);
150 Ok(federation_id)
151 }
152 Err(e) => {
153 Err(e)
155 }
156 }
157 }
158
159 async fn join_federation_static(
160 connectors: ConnectorRegistry,
161 client_db: Database,
162 invite_code: &InviteCode,
163 ) -> Result<ClientHandleArc, RecurringPaymentError> {
164 let mut client_builder = Client::builder()
165 .await
166 .map_err(RecurringPaymentError::JoiningFederationFailed)?;
167
168 client_builder.with_meta_service(recurringd_meta_service());
169 client_builder.with_module(LightningClientInit::default());
170 client_builder.with_module(MintClientInit);
171
172 let client = client_builder
173 .preview(connectors, invite_code)
174 .await?
175 .join(
176 client_db,
177 fedimint_client::RootSecret::StandardDoubleDerive(Self::default_secret()),
178 )
179 .await
180 .map_err(RecurringPaymentError::JoiningFederationFailed)?;
181 Ok(Arc::new(client))
182 }
183
184 pub async fn register_recurring_payment_code(
185 &self,
186 federation_id: FederationId,
187 payment_code_root_key: PaymentCodeRootKey,
188 protocol: RecurringPaymentProtocol,
189 meta: &str,
190 ) -> Result<String, RecurringPaymentError> {
191 if protocol != RecurringPaymentProtocol::LNURL {
193 return Err(RecurringPaymentError::UnsupportedProtocol(protocol));
194 }
195
196 self.get_federation_client(federation_id).await?;
198
199 let payment_code = self.create_lnurl(payment_code_root_key.to_payment_code_id());
200 let payment_code_entry = PaymentCodeEntry {
201 root_key: payment_code_root_key,
202 federation_id,
203 protocol,
204 payment_code: payment_code.clone(),
205 variant: PaymentCodeVariant::Lnurl {
206 meta: meta.to_owned(),
207 },
208 };
209
210 let mut dbtx = self.db.begin_transaction().await;
211 if let Some(existing_code) = dbtx
212 .insert_entry(
213 &PaymentCodeKey {
214 payment_code_id: payment_code_root_key.to_payment_code_id(),
215 },
216 &payment_code_entry,
217 )
218 .await
219 {
220 if existing_code != payment_code_entry {
221 return Err(RecurringPaymentError::PaymentCodeAlreadyExists(
222 payment_code_root_key,
223 ));
224 }
225
226 dbtx.ignore_uncommitted();
227 return Ok(payment_code);
228 }
229
230 dbtx.insert_new_entry(
231 &PaymentCodeNextInvoiceIndexKey {
232 payment_code_id: payment_code_root_key.to_payment_code_id(),
233 },
234 &0,
235 )
236 .await;
237 dbtx.commit_tx_result().await.map_err(anyhow::Error::from)?;
238
239 Ok(payment_code)
240 }
241
242 fn create_lnurl(&self, payment_code_id: PaymentCodeId) -> String {
243 encode_lnurl(
244 &self
245 .base_url
246 .join_path(&format!("lnv1/paycodes/{payment_code_id}"))
247 .to_string(),
248 )
249 }
250
251 pub async fn lnurl_pay(
252 &self,
253 payment_code_id: PaymentCodeId,
254 ) -> Result<PayResponse, RecurringPaymentError> {
255 let payment_code = self.get_payment_code(payment_code_id).await?;
256 let PaymentCodeVariant::Lnurl { meta } = payment_code.variant;
257
258 Ok(PayResponse {
259 callback: self
260 .base_url
261 .join_path(&format!("lnv1/paycodes/{payment_code_id}/invoice"))
262 .to_string(),
263 max_sendable: 100000000000,
264 min_sendable: 1,
265 tag: pay_request_tag(),
266 metadata: meta,
267 })
268 }
269
270 pub async fn lnurl_invoice(
271 &self,
272 payment_code_id: PaymentCodeId,
273 amount: Amount,
274 ) -> Result<LNURLPayInvoice, RecurringPaymentError> {
275 let (operation_id, federation_id, invoice) =
276 self.create_bolt11_invoice(payment_code_id, amount).await?;
277 Ok(LNURLPayInvoice {
278 pr: invoice.to_string(),
279 verify: self
280 .base_url
281 .join_path(&format!(
282 "lnv1/verify/{federation_id}/{}",
283 operation_id.fmt_full()
284 ))
285 .to_string(),
286 })
287 }
288
289 async fn create_bolt11_invoice(
290 &self,
291 payment_code_id: PaymentCodeId,
292 amount: Amount,
293 ) -> Result<(OperationId, FederationId, Bolt11Invoice), RecurringPaymentError> {
294 const DEFAULT_EXPIRY_TIME: u64 = 60 * 60 * 24;
297
298 let payment_code = self.get_payment_code(payment_code_id).await?;
299
300 let federation_client = self
301 .get_federation_client(payment_code.federation_id)
302 .await?;
303
304 let gateway = self
305 .get_cached_gateway(payment_code.federation_id, amount)
306 .await?;
307
308 let (operation_id, invoice) = self
309 .db
310 .autocommit(
311 |dbtx, _| {
312 let federation_client = federation_client.clone();
313 let payment_code = payment_code.clone();
314 let gateway = gateway.clone();
315 Box::pin(async move {
316 let mut invoice_index = self
317 .get_next_invoice_index(&mut dbtx.to_ref_nc(), payment_code_id)
318 .await;
319
320 let invoice_index = loop {
332 let operation_id =
333 operation_id_from_user_key(payment_code.root_key, invoice_index);
334
335 let Some(invoice) =
336 Self::check_if_invoice_exists(&federation_client, operation_id)
337 .await
338 else {
339 break invoice_index;
340 };
341
342 self.save_bolt11_invoice(
343 dbtx,
344 operation_id,
345 payment_code_id,
346 invoice_index,
347 invoice,
348 )
349 .await;
350
351 invoice_index = self
352 .get_next_invoice_index(&mut dbtx.to_ref_nc(), payment_code_id)
353 .await;
354 };
355
356 let federation_client_ln_module = federation_client.get_ln_module()?;
359
360 let lnurl_meta = match payment_code.variant {
361 PaymentCodeVariant::Lnurl { meta } => meta,
362 };
363 let meta_hash = Sha256(sha256::Hash::hash(lnurl_meta.as_bytes()));
364 let description = Bolt11InvoiceDescription::Hash(meta_hash);
365
366 let (operation_id, invoice, _preimage) = federation_client_ln_module
369 .create_bolt11_invoice_for_user_tweaked(
370 amount,
371 description,
372 Some(DEFAULT_EXPIRY_TIME),
373 payment_code.root_key.0,
374 invoice_index,
375 serde_json::Value::Null,
376 Some(gateway),
377 )
378 .await?;
379
380 self.save_bolt11_invoice(
381 dbtx,
382 operation_id,
383 payment_code_id,
384 invoice_index,
385 invoice.clone(),
386 )
387 .await;
388
389 Result::<_, anyhow::Error>::Ok((operation_id, invoice))
390 })
391 },
392 None,
393 )
394 .await
395 .unwrap_autocommit()?;
396
397 await_invoice_confirmed(&federation_client.get_ln_module()?, operation_id).await?;
398
399 Ok((operation_id, federation_client.federation_id(), invoice))
400 }
401
402 async fn save_bolt11_invoice(
403 &self,
404 dbtx: &mut DatabaseTransaction<'_>,
405 operation_id: OperationId,
406 payment_code_id: PaymentCodeId,
407 invoice_index: u64,
408 invoice: Bolt11Invoice,
409 ) {
410 dbtx.insert_new_entry(
411 &PaymentCodeInvoiceKey {
412 payment_code_id,
413 index: invoice_index,
414 },
415 &PaymentCodeInvoiceEntry {
416 operation_id,
417 invoice: PaymentCodeInvoice::Bolt11(invoice.clone()),
418 },
419 )
420 .await;
421
422 let invoice_generated_notifier = self.invoice_generated.clone();
423 dbtx.on_commit(move || {
424 invoice_generated_notifier.notify_waiters();
425 });
426 }
427
428 async fn check_if_invoice_exists(
429 federation_client: &ClientHandleArc,
430 operation_id: OperationId,
431 ) -> Option<Bolt11Invoice> {
432 let operation = federation_client
433 .operation_log()
434 .get_operation(operation_id)
435 .await?;
436
437 assert_eq!(
438 operation.operation_module_kind(),
439 LightningClientModule::kind().as_str()
440 );
441
442 let LightningOperationMetaVariant::Receive { invoice, .. } =
443 operation.meta::<LightningOperationMeta>().variant
444 else {
445 panic!(
446 "Unexpected operation meta variant: {:?}",
447 operation.meta::<LightningOperationMeta>().variant
448 );
449 };
450
451 Some(invoice)
452 }
453
454 async fn get_federation_client(
455 &self,
456 federation_id: FederationId,
457 ) -> Result<ClientHandleArc, RecurringPaymentError> {
458 self.clients
459 .read()
460 .await
461 .get(&federation_id)
462 .cloned()
463 .ok_or(RecurringPaymentError::UnknownFederationId(federation_id))
464 }
465
466 async fn get_cached_gateway(
467 &self,
468 federation_id: FederationId,
469 amount: Amount,
470 ) -> Result<LightningGateway, RecurringPaymentError> {
471 const EMPTY_GATEWAY_CACHE_WAIT: Duration = Duration::from_secs(60);
472
473 let mut gateway_cache = self
474 .gateway_cache
475 .read()
476 .await
477 .get(&federation_id)
478 .cloned()
479 .ok_or(RecurringPaymentError::NoGatewayFound)?;
480
481 if let Some(gateway) = select_preferred_gateway(&gateway_cache.borrow(), amount) {
482 return Ok(gateway);
483 }
484
485 timeout(EMPTY_GATEWAY_CACHE_WAIT, async {
486 loop {
487 gateway_cache
488 .changed()
489 .await
490 .map_err(|_| RecurringPaymentError::NoGatewayFound)?;
491
492 if let Some(gateway) =
493 select_preferred_gateway(&gateway_cache.borrow_and_update(), amount)
494 {
495 break Ok(gateway);
496 }
497 }
498 })
499 .await
500 .map_err(|_| RecurringPaymentError::NoGatewayFound)?
501 }
502
503 pub async fn await_invoice_index_generated(
504 &self,
505 payment_code_id: PaymentCodeId,
506 invoice_index: u64,
507 ) -> Result<PaymentCodeInvoiceEntry, RecurringPaymentError> {
508 self.get_payment_code(payment_code_id).await?;
509
510 let mut notified = self.invoice_generated.notified();
511 loop {
512 let mut dbtx = self.db.begin_transaction_nc().await;
513 if let Some(invoice_entry) = dbtx
514 .get_value(&PaymentCodeInvoiceKey {
515 payment_code_id,
516 index: invoice_index,
517 })
518 .await
519 {
520 break Ok(invoice_entry);
521 };
522
523 notified.await;
524 notified = self.invoice_generated.notified();
525 }
526 }
527
528 async fn get_next_invoice_index(
529 &self,
530 dbtx: &mut DatabaseTransaction<'_>,
531 payment_code_id: PaymentCodeId,
532 ) -> u64 {
533 let next_index = dbtx
534 .get_value(&PaymentCodeNextInvoiceIndexKey { payment_code_id })
535 .await
536 .map(|index| index + 1)
537 .unwrap_or(0);
538 dbtx.insert_entry(
539 &PaymentCodeNextInvoiceIndexKey { payment_code_id },
540 &next_index,
541 )
542 .await;
543
544 next_index
545 }
546
547 pub async fn list_federations(&self) -> Vec<FederationId> {
548 self.clients.read().await.keys().cloned().collect()
549 }
550
551 async fn get_payment_code(
552 &self,
553 payment_code_id: PaymentCodeId,
554 ) -> Result<PaymentCodeEntry, RecurringPaymentError> {
555 self.db
556 .begin_transaction_nc()
557 .await
558 .get_value(&PaymentCodeKey { payment_code_id })
559 .await
560 .ok_or(RecurringPaymentError::UnknownPaymentCode(payment_code_id))
561 }
562
563 pub async fn verify_invoice_paid(
572 &self,
573 federation_id: FederationId,
574 operation_id: OperationId,
575 ) -> Result<InvoiceStatus, RecurringPaymentError> {
576 let federation_client = self.get_federation_client(federation_id).await?;
577
578 let invoice = {
581 let operation = federation_client
582 .operation_log()
583 .get_operation(operation_id)
584 .await
585 .ok_or(RecurringPaymentError::UnknownInvoice(operation_id))?;
586
587 if operation.operation_module_kind() != LightningClientModule::kind().as_str() {
588 return Err(RecurringPaymentError::UnknownInvoice(operation_id));
589 }
590
591 let LightningOperationMetaVariant::Receive { invoice, .. } =
592 operation.meta::<LightningOperationMeta>().variant
593 else {
594 return Err(RecurringPaymentError::UnknownInvoice(operation_id));
595 };
596
597 invoice
598 };
599
600 let ln_module = federation_client
601 .get_first_module::<LightningClientModule>()
602 .map_err(|e| {
603 warn!("No compatible lightning module found {e}");
604 RecurringPaymentError::NoLightningModuleFound
605 })?;
606
607 let mut stream = ln_module
608 .subscribe_ln_receive(operation_id)
609 .await
610 .map_err(|_| RecurringPaymentError::UnknownInvoice(operation_id))?
611 .into_stream();
612 let status = loop {
613 let update = timeout(Duration::from_millis(100), stream.next()).await;
621 match update {
622 Ok(Some(LnReceiveState::Funded | LnReceiveState::Claimed)) => {
626 break PaymentStatus::Paid;
627 }
628 Ok(Some(_)) => {
630 continue;
631 }
632 Ok(None) | Err(_) => {
636 break PaymentStatus::Pending;
637 }
638 }
639 };
640
641 Ok(InvoiceStatus { invoice, status })
642 }
643
644 async fn run_db_migrations(&self) {
645 let migrations = Self::migrations();
646 let schema_version: u64 = self
647 .db
648 .begin_transaction_nc()
649 .await
650 .get_value(&SchemaVersionKey)
651 .await
652 .unwrap_or_default();
653
654 for (target_schema, migration_fn) in migrations
655 .into_iter()
656 .skip_while(|(target_schema, _)| *target_schema <= schema_version)
657 {
658 let mut dbtx = self.db.begin_transaction().await;
659 dbtx.insert_entry(&SchemaVersionKey, &target_schema).await;
660
661 migration_fn(self, dbtx.to_ref_nc()).await;
662
663 dbtx.commit_tx().await;
664 }
665 }
666}
667
668async fn await_invoice_confirmed(
669 ln_module: &ClientModuleInstance<'_, LightningClientModule>,
670 operation_id: OperationId,
671) -> Result<(), RecurringPaymentError> {
672 let mut operation_updated = ln_module
673 .subscribe_ln_receive(operation_id)
674 .await?
675 .into_stream();
676
677 while let Some(update) = operation_updated.next().await {
678 if matches!(update, LnReceiveState::WaitingForPayment { .. }) {
679 return Ok(());
680 }
681 }
682
683 Err(RecurringPaymentError::Other(anyhow!(
684 "BOLT11 invoice not confirmed"
685 )))
686}
687
688#[derive(Debug, Clone, Eq, PartialEq, Hash, Encodable, Decodable)]
689pub enum PaymentCodeInvoice {
690 Bolt11(Bolt11Invoice),
691}
692
693pub struct InvoiceStatus {
696 pub invoice: Bolt11Invoice,
697 pub status: PaymentStatus,
698}
699
700pub enum PaymentStatus {
701 Paid,
702 Pending,
703}
704
705impl PaymentStatus {
706 pub fn is_paid(&self) -> bool {
707 matches!(self, PaymentStatus::Paid)
708 }
709}
710
711#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
715pub struct LNURLPayInvoice {
716 pub pr: String,
717 pub verify: String,
718}
719
720fn operation_id_from_user_key(user_key: PaymentCodeRootKey, index: u64) -> OperationId {
721 let invoice_key = tweak_user_key(SECP256K1, user_key.0, index);
722 let preimage = sha256::Hash::hash(&invoice_key.serialize()[..]);
723 let payment_hash = sha256::Hash::hash(&preimage[..]);
724
725 OperationId(payment_hash.to_byte_array())
726}
727
728trait LnClientContextExt {
729 fn get_ln_module(
730 &'_ self,
731 ) -> Result<ClientModuleInstance<'_, LightningClientModule>, RecurringPaymentError>;
732}
733
734impl LnClientContextExt for ClientHandleArc {
735 fn get_ln_module(
736 &'_ self,
737 ) -> Result<ClientModuleInstance<'_, LightningClientModule>, RecurringPaymentError> {
738 self.get_first_module::<LightningClientModule>()
739 .map_err(|e| {
740 warn!("No compatible lightning module found {e}");
741 RecurringPaymentError::NoLightningModuleFound
742 })
743 }
744}
745
746fn recurringd_meta_service() -> Arc<MetaService> {
747 MetaService::new(MetaModuleMetaSourceWithFallback::<LegacyMetaSource>::default())
748}
749
750fn spawn_gateway_cache_refresh(
751 federation_id: FederationId,
752 client: &ClientHandleArc,
753) -> watch::Receiver<Vec<CachedGateway>> {
754 const REFRESH_INTERVAL: Duration = Duration::from_secs(10);
755
756 let (gateway_cache_sender, gateway_cache_receiver) = watch::channel(Vec::new());
757 let task_group = client.task_group().clone();
758 let client = client.clone();
759 task_group.spawn_cancellable("recurringd-gateway-cache-refresh", async move {
760 loop {
761 match select_available_gateways(&client).await {
762 Ok(gateways) => {
763 gateway_cache_sender.send_replace(gateways);
764 }
765 Err(err) => {
766 warn!(
767 federation_id = %federation_id,
768 err = %err.fmt_compact(),
769 "Failed to refresh recurringd gateway cache"
770 );
771 }
772 }
773
774 runtime::sleep(REFRESH_INTERVAL).await;
775 }
776 });
777
778 gateway_cache_receiver
779}
780
781async fn select_available_gateways(
782 client: &ClientHandleArc,
783) -> Result<Vec<CachedGateway>, RecurringPaymentError> {
784 let ln_module = client.get_ln_module()?;
785 ln_module.update_gateway_cache().await.map_err(|err| {
786 warn!(
787 err = %err.fmt_compact_anyhow(),
788 "Failed to refresh gateway announcements"
789 );
790 RecurringPaymentError::NoGatewayFound
791 })?;
792
793 let mut gateways = ln_module.list_gateways().await;
794 if gateways.is_empty() {
795 return Err(RecurringPaymentError::NoGatewayFound);
796 }
797
798 let vetted_gateway_ids = fetch_vetted_gateway_ids(client).await;
799 sort_gateways_by_preference(&mut gateways, &vetted_gateway_ids);
800
801 let mut available_gateways = Vec::new();
802 for gateway in gateways {
803 let gateway_id = gateway.info.gateway_id;
804 let vetted = gateway.vetted || vetted_gateway_ids.contains(&gateway_id);
805 match ln_module
806 .select_available_gateway(Some(gateway.info), None)
807 .await
808 {
809 Ok(gateway) => available_gateways.push(CachedGateway { gateway, vetted }),
810 Err(err) => {
811 debug!(
812 gateway_id = %gateway_id,
813 err = %err.fmt_compact_anyhow(),
814 "Gateway failed availability check"
815 );
816 }
817 }
818 }
819
820 if available_gateways.is_empty() {
821 return Err(RecurringPaymentError::NoGatewayFound);
822 }
823
824 Ok(available_gateways)
825}
826
827fn select_preferred_gateway(
828 gateways: &[CachedGateway],
829 amount: Amount,
830) -> Option<LightningGateway> {
831 gateways
832 .iter()
833 .min_by_key(|gateway| {
834 (
835 !gateway.vetted,
836 gateway_fee_msat(&gateway.gateway, amount),
837 gateway.gateway.gateway_id.serialize(),
838 )
839 })
840 .map(|gateway| gateway.gateway.clone())
841}
842
843fn gateway_fee_msat(gateway: &LightningGateway, amount: Amount) -> u64 {
844 let proportional_fee =
845 (u128::from(amount.msats) * u128::from(gateway.fees.proportional_millionths)) / 1_000_000;
846
847 u64::from(gateway.fees.base_msat)
848 .saturating_add(u64::try_from(proportional_fee).unwrap_or(u64::MAX))
849}
850
851fn sort_gateways_by_preference(
852 gateways: &mut [LightningGatewayAnnouncement],
853 vetted_gateway_ids: &HashSet<PublicKey>,
854) {
855 gateways.sort_by_cached_key(|gateway| {
856 let vetted = gateway.vetted || vetted_gateway_ids.contains(&gateway.info.gateway_id);
857 (
858 !vetted,
859 u64::from(gateway.info.fees.base_msat),
860 gateway.info.gateway_id.serialize(),
861 )
862 });
863}
864
865async fn fetch_vetted_gateway_ids(client: &ClientHandleArc) -> HashSet<PublicKey> {
866 let Some(vetted_gateways) = client
867 .meta_service()
868 .entries(client.db())
869 .await
870 .and_then(|entries| entries.get("vetted_gateways").cloned())
871 .and_then(|value| parse_vetted_gateway_ids(&value))
872 else {
873 debug!("No vetted gateways configured in federation metadata");
874 return HashSet::new();
875 };
876
877 vetted_gateways
878 .into_iter()
879 .filter_map(|gateway_id| match gateway_id.parse::<PublicKey>() {
880 Ok(gateway_id) => Some(gateway_id),
881 Err(err) => {
882 warn!(
883 %gateway_id,
884 err = %err.fmt_compact(),
885 "Failed to parse vetted gateway ID"
886 );
887 None
888 }
889 })
890 .collect()
891}
892
893fn parse_vetted_gateway_ids(value: &serde_json::Value) -> Option<Vec<String>> {
894 if let Ok(gateway_ids) = serde_json::from_value::<Vec<String>>(value.clone()) {
895 return Some(gateway_ids);
896 }
897
898 let value = value.as_str()?;
899
900 match serde_json::from_str::<Vec<String>>(value) {
903 Ok(gateway_ids) => {
904 warn!("vetted_gateways metadata should be configured as a JSON array, not a string");
905 Some(gateway_ids)
906 }
907 Err(_) => None,
908 }
909}