1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::anyhow;
6use fedimint_client::{Client, ClientHandleArc, ClientModule, ClientModuleInstance};
7use fedimint_connectors::ConnectorRegistry;
8use fedimint_core::config::FederationId;
9use fedimint_core::core::OperationId;
10use fedimint_core::db::{
11 AutocommitResultExt, Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped,
12 IRawDatabase,
13};
14use fedimint_core::encoding::{Decodable, Encodable};
15use fedimint_core::invite_code::InviteCode;
16use fedimint_core::secp256k1::SECP256K1;
17use fedimint_core::secp256k1::hashes::sha256;
18use fedimint_core::task::timeout;
19use fedimint_core::util::{FmtCompactAnyhow, SafeUrl};
20use fedimint_core::{Amount, BitcoinHash};
21use fedimint_derive_secret::DerivableSecret;
22use fedimint_ln_client::recurring::{
23 PaymentCodeId, PaymentCodeRootKey, RecurringPaymentError, RecurringPaymentProtocol,
24};
25use fedimint_ln_client::{
26 LightningClientInit, LightningClientModule, LightningOperationMeta,
27 LightningOperationMetaVariant, LnReceiveState, tweak_user_key,
28};
29use fedimint_lnurl::{PayResponse, encode_lnurl, pay_request_tag};
30use fedimint_mint_client::MintClientInit;
31use futures::StreamExt;
32use lightning_invoice::{Bolt11Invoice, Bolt11InvoiceDescription, Sha256};
33use serde::{Deserialize, Serialize};
34use tokio::sync::{Notify, RwLock};
35use tracing::{info, warn};
36
37use crate::db::{
38 FederationDbPrefix, PaymentCodeEntry, PaymentCodeInvoiceEntry, PaymentCodeInvoiceKey,
39 PaymentCodeKey, PaymentCodeNextInvoiceIndexKey, PaymentCodeVariant, SchemaVersionKey,
40 load_federation_client_databases, open_client_db, try_add_federation_database,
41};
42
43mod db;
44
45#[derive(Clone)]
46pub struct RecurringInvoiceServer {
47 db: Database,
48 connectors: ConnectorRegistry,
49 clients: Arc<RwLock<HashMap<FederationId, ClientHandleArc>>>,
50 invoice_generated: Arc<Notify>,
51 base_url: SafeUrl,
52}
53
54impl RecurringInvoiceServer {
55 pub async fn new(
56 connectors: ConnectorRegistry,
57 db: impl IRawDatabase + 'static,
58 base_url: SafeUrl,
59 ) -> anyhow::Result<Self> {
60 let db = Database::new(db, Default::default());
61
62 let mut clients = HashMap::<_, ClientHandleArc>::new();
63
64 for (federation_id, db) in load_federation_client_databases(&db).await {
65 let mut client_builder = Client::builder().await?;
66 client_builder.with_module(LightningClientInit::default());
67 client_builder.with_module(MintClientInit);
68 let client = client_builder
69 .open(
70 connectors.clone(),
71 db,
72 fedimint_client::RootSecret::StandardDoubleDerive(Self::default_secret()),
73 )
74 .await?;
75 let client = Arc::new(client);
76 spawn_gateway_cache_refresh(&client);
77 clients.insert(federation_id, client);
78 }
79
80 let slf = Self {
81 db: db.clone(),
82 clients: Arc::new(RwLock::new(clients)),
83 invoice_generated: Arc::new(Default::default()),
84 base_url,
85 connectors,
86 };
87
88 slf.run_db_migrations().await;
89
90 Ok(slf)
91 }
92
93 fn default_secret() -> DerivableSecret {
97 DerivableSecret::new_root(&[], &[])
98 }
99
100 pub async fn register_federation(
101 &self,
102 invite_code: &InviteCode,
103 ) -> Result<FederationId, RecurringPaymentError> {
104 let federation_id = invite_code.federation_id();
105 info!("Registering federation {}", federation_id);
106
107 let mut clients = self.clients.write().await;
110 if clients.contains_key(&federation_id) {
111 return Err(RecurringPaymentError::FederationAlreadyRegistered(
112 federation_id,
113 ));
114 }
115
116 let client_db_prefix = FederationDbPrefix::random();
121 let client_db = open_client_db(&self.db, client_db_prefix);
122
123 match Self::join_federation_static(self.connectors.clone(), client_db, invite_code).await {
124 Ok(client) => {
125 try_add_federation_database(&self.db, federation_id, client_db_prefix)
126 .await
127 .expect("We hold a global lock, no parallel joining can happen");
128 spawn_gateway_cache_refresh(&client);
129 clients.insert(federation_id, client);
130 Ok(federation_id)
131 }
132 Err(e) => {
133 Err(e)
135 }
136 }
137 }
138
139 async fn join_federation_static(
140 connectors: ConnectorRegistry,
141 client_db: Database,
142 invite_code: &InviteCode,
143 ) -> Result<ClientHandleArc, RecurringPaymentError> {
144 let mut client_builder = Client::builder()
145 .await
146 .map_err(RecurringPaymentError::JoiningFederationFailed)?;
147
148 client_builder.with_module(LightningClientInit::default());
149 client_builder.with_module(MintClientInit);
150
151 let client = client_builder
152 .preview(connectors, invite_code)
153 .await?
154 .join(
155 client_db,
156 fedimint_client::RootSecret::StandardDoubleDerive(Self::default_secret()),
157 )
158 .await
159 .map_err(RecurringPaymentError::JoiningFederationFailed)?;
160 Ok(Arc::new(client))
161 }
162
163 pub async fn register_recurring_payment_code(
164 &self,
165 federation_id: FederationId,
166 payment_code_root_key: PaymentCodeRootKey,
167 protocol: RecurringPaymentProtocol,
168 meta: &str,
169 ) -> Result<String, RecurringPaymentError> {
170 if protocol != RecurringPaymentProtocol::LNURL {
172 return Err(RecurringPaymentError::UnsupportedProtocol(protocol));
173 }
174
175 self.get_federation_client(federation_id).await?;
177
178 let payment_code = self.create_lnurl(payment_code_root_key.to_payment_code_id());
179 let payment_code_entry = PaymentCodeEntry {
180 root_key: payment_code_root_key,
181 federation_id,
182 protocol,
183 payment_code: payment_code.clone(),
184 variant: PaymentCodeVariant::Lnurl {
185 meta: meta.to_owned(),
186 },
187 };
188
189 let mut dbtx = self.db.begin_transaction().await;
190 if let Some(existing_code) = dbtx
191 .insert_entry(
192 &PaymentCodeKey {
193 payment_code_id: payment_code_root_key.to_payment_code_id(),
194 },
195 &payment_code_entry,
196 )
197 .await
198 {
199 if existing_code != payment_code_entry {
200 return Err(RecurringPaymentError::PaymentCodeAlreadyExists(
201 payment_code_root_key,
202 ));
203 }
204
205 dbtx.ignore_uncommitted();
206 return Ok(payment_code);
207 }
208
209 dbtx.insert_new_entry(
210 &PaymentCodeNextInvoiceIndexKey {
211 payment_code_id: payment_code_root_key.to_payment_code_id(),
212 },
213 &0,
214 )
215 .await;
216 dbtx.commit_tx_result().await.map_err(anyhow::Error::from)?;
217
218 Ok(payment_code)
219 }
220
221 fn create_lnurl(&self, payment_code_id: PaymentCodeId) -> String {
222 encode_lnurl(
223 &self
224 .base_url
225 .join_path(&format!("lnv1/paycodes/{payment_code_id}"))
226 .to_string(),
227 )
228 }
229
230 pub async fn lnurl_pay(
231 &self,
232 payment_code_id: PaymentCodeId,
233 ) -> Result<PayResponse, RecurringPaymentError> {
234 let payment_code = self.get_payment_code(payment_code_id).await?;
235 let PaymentCodeVariant::Lnurl { meta } = payment_code.variant;
236
237 Ok(PayResponse {
238 callback: self
239 .base_url
240 .join_path(&format!("lnv1/paycodes/{payment_code_id}/invoice"))
241 .to_string(),
242 max_sendable: 100000000000,
243 min_sendable: 1,
244 tag: pay_request_tag(),
245 metadata: meta,
246 })
247 }
248
249 pub async fn lnurl_invoice(
250 &self,
251 payment_code_id: PaymentCodeId,
252 amount: Amount,
253 ) -> Result<LNURLPayInvoice, RecurringPaymentError> {
254 let (operation_id, federation_id, invoice) =
255 self.create_bolt11_invoice(payment_code_id, amount).await?;
256 Ok(LNURLPayInvoice {
257 pr: invoice.to_string(),
258 verify: self
259 .base_url
260 .join_path(&format!(
261 "lnv1/verify/{federation_id}/{}",
262 operation_id.fmt_full()
263 ))
264 .to_string(),
265 })
266 }
267
268 async fn create_bolt11_invoice(
269 &self,
270 payment_code_id: PaymentCodeId,
271 amount: Amount,
272 ) -> Result<(OperationId, FederationId, Bolt11Invoice), RecurringPaymentError> {
273 const DEFAULT_EXPIRY_TIME: u64 = 60 * 60 * 24;
276
277 let payment_code = self.get_payment_code(payment_code_id).await?;
278
279 let federation_client = self
280 .get_federation_client(payment_code.federation_id)
281 .await?;
282
283 let (operation_id, invoice) = self
284 .db
285 .autocommit(
286 |dbtx, _| {
287 let federation_client = federation_client.clone();
288 let payment_code = payment_code.clone();
289 Box::pin(async move {
290 let invoice_index = self
291 .get_next_invoice_index(&mut dbtx.to_ref_nc(), payment_code_id)
292 .await;
293
294 let initial_operation_id =
302 operation_id_from_user_key(payment_code.root_key, invoice_index);
303 let invoice_index = if let Some(invoice) =
304 Self::check_if_invoice_exists(&federation_client, initial_operation_id)
305 .await
306 {
307 self.save_bolt11_invoice(
308 dbtx,
309 initial_operation_id,
310 payment_code_id,
311 invoice_index,
312 invoice,
313 )
314 .await;
315 self.get_next_invoice_index(&mut dbtx.to_ref_nc(), payment_code_id)
316 .await
317 } else {
318 invoice_index
319 };
320
321 let federation_client_ln_module = federation_client.get_ln_module()?;
324 let gateway = match federation_client_ln_module
330 .select_available_gateway(None, None)
331 .await
332 {
333 Ok(gateway) => gateway,
334 Err(err) => {
335 warn!(err = %err.fmt_compact_anyhow(), "No gateway available, refreshing cache and retrying");
336 federation_client_ln_module
337 .update_gateway_cache()
338 .await
339 .map_err(|err| {
340 warn!(err = %err.fmt_compact_anyhow(), "Failed to refresh gateway cache");
341 RecurringPaymentError::NoGatewayFound
342 })?;
343 federation_client_ln_module
344 .select_available_gateway(None, None)
345 .await
346 .map_err(|err| {
347 warn!(err = %err.fmt_compact_anyhow(), "Failed to select an available gateway");
348 RecurringPaymentError::NoGatewayFound
349 })?
350 }
351 };
352
353 let lnurl_meta = match payment_code.variant {
354 PaymentCodeVariant::Lnurl { meta } => meta,
355 };
356 let meta_hash = Sha256(sha256::Hash::hash(lnurl_meta.as_bytes()));
357 let description = Bolt11InvoiceDescription::Hash(meta_hash);
358
359 let (operation_id, invoice, _preimage) = federation_client_ln_module
362 .create_bolt11_invoice_for_user_tweaked(
363 amount,
364 description,
365 Some(DEFAULT_EXPIRY_TIME),
366 payment_code.root_key.0,
367 invoice_index,
368 serde_json::Value::Null,
369 Some(gateway),
370 )
371 .await?;
372
373 self.save_bolt11_invoice(
374 dbtx,
375 operation_id,
376 payment_code_id,
377 invoice_index,
378 invoice.clone(),
379 )
380 .await;
381
382 Result::<_, anyhow::Error>::Ok((operation_id, invoice))
383 })
384 },
385 None,
386 )
387 .await
388 .unwrap_autocommit()?;
389
390 await_invoice_confirmed(&federation_client.get_ln_module()?, operation_id).await?;
391
392 Ok((operation_id, federation_client.federation_id(), invoice))
393 }
394
395 async fn save_bolt11_invoice(
396 &self,
397 dbtx: &mut DatabaseTransaction<'_>,
398 operation_id: OperationId,
399 payment_code_id: PaymentCodeId,
400 invoice_index: u64,
401 invoice: Bolt11Invoice,
402 ) {
403 dbtx.insert_new_entry(
404 &PaymentCodeInvoiceKey {
405 payment_code_id,
406 index: invoice_index,
407 },
408 &PaymentCodeInvoiceEntry {
409 operation_id,
410 invoice: PaymentCodeInvoice::Bolt11(invoice.clone()),
411 },
412 )
413 .await;
414
415 let invoice_generated_notifier = self.invoice_generated.clone();
416 dbtx.on_commit(move || {
417 invoice_generated_notifier.notify_waiters();
418 });
419 }
420
421 async fn check_if_invoice_exists(
422 federation_client: &ClientHandleArc,
423 operation_id: OperationId,
424 ) -> Option<Bolt11Invoice> {
425 let operation = federation_client
426 .operation_log()
427 .get_operation(operation_id)
428 .await?;
429
430 assert_eq!(
431 operation.operation_module_kind(),
432 LightningClientModule::kind().as_str()
433 );
434
435 let LightningOperationMetaVariant::Receive { invoice, .. } =
436 operation.meta::<LightningOperationMeta>().variant
437 else {
438 panic!(
439 "Unexpected operation meta variant: {:?}",
440 operation.meta::<LightningOperationMeta>().variant
441 );
442 };
443
444 Some(invoice)
445 }
446
447 async fn get_federation_client(
448 &self,
449 federation_id: FederationId,
450 ) -> Result<ClientHandleArc, RecurringPaymentError> {
451 self.clients
452 .read()
453 .await
454 .get(&federation_id)
455 .cloned()
456 .ok_or(RecurringPaymentError::UnknownFederationId(federation_id))
457 }
458
459 pub async fn await_invoice_index_generated(
460 &self,
461 payment_code_id: PaymentCodeId,
462 invoice_index: u64,
463 ) -> Result<PaymentCodeInvoiceEntry, RecurringPaymentError> {
464 self.get_payment_code(payment_code_id).await?;
465
466 let mut notified = self.invoice_generated.notified();
467 loop {
468 let mut dbtx = self.db.begin_transaction_nc().await;
469 if let Some(invoice_entry) = dbtx
470 .get_value(&PaymentCodeInvoiceKey {
471 payment_code_id,
472 index: invoice_index,
473 })
474 .await
475 {
476 break Ok(invoice_entry);
477 };
478
479 notified.await;
480 notified = self.invoice_generated.notified();
481 }
482 }
483
484 async fn get_next_invoice_index(
485 &self,
486 dbtx: &mut DatabaseTransaction<'_>,
487 payment_code_id: PaymentCodeId,
488 ) -> u64 {
489 let next_index = dbtx
490 .get_value(&PaymentCodeNextInvoiceIndexKey { payment_code_id })
491 .await
492 .map(|index| index + 1)
493 .unwrap_or(0);
494 dbtx.insert_entry(
495 &PaymentCodeNextInvoiceIndexKey { payment_code_id },
496 &next_index,
497 )
498 .await;
499
500 next_index
501 }
502
503 pub async fn list_federations(&self) -> Vec<FederationId> {
504 self.clients.read().await.keys().cloned().collect()
505 }
506
507 async fn get_payment_code(
508 &self,
509 payment_code_id: PaymentCodeId,
510 ) -> Result<PaymentCodeEntry, RecurringPaymentError> {
511 self.db
512 .begin_transaction_nc()
513 .await
514 .get_value(&PaymentCodeKey { payment_code_id })
515 .await
516 .ok_or(RecurringPaymentError::UnknownPaymentCode(payment_code_id))
517 }
518
519 pub async fn verify_invoice_paid(
528 &self,
529 federation_id: FederationId,
530 operation_id: OperationId,
531 ) -> Result<InvoiceStatus, RecurringPaymentError> {
532 let federation_client = self.get_federation_client(federation_id).await?;
533
534 let invoice = {
537 let operation = federation_client
538 .operation_log()
539 .get_operation(operation_id)
540 .await
541 .ok_or(RecurringPaymentError::UnknownInvoice(operation_id))?;
542
543 if operation.operation_module_kind() != LightningClientModule::kind().as_str() {
544 return Err(RecurringPaymentError::UnknownInvoice(operation_id));
545 }
546
547 let LightningOperationMetaVariant::Receive { invoice, .. } =
548 operation.meta::<LightningOperationMeta>().variant
549 else {
550 return Err(RecurringPaymentError::UnknownInvoice(operation_id));
551 };
552
553 invoice
554 };
555
556 let ln_module = federation_client
557 .get_first_module::<LightningClientModule>()
558 .map_err(|e| {
559 warn!("No compatible lightning module found {e}");
560 RecurringPaymentError::NoLightningModuleFound
561 })?;
562
563 let mut stream = ln_module
564 .subscribe_ln_receive(operation_id)
565 .await
566 .map_err(|_| RecurringPaymentError::UnknownInvoice(operation_id))?
567 .into_stream();
568 let status = loop {
569 let update = timeout(Duration::from_millis(100), stream.next()).await;
577 match update {
578 Ok(Some(LnReceiveState::Funded | LnReceiveState::Claimed)) => {
582 break PaymentStatus::Paid;
583 }
584 Ok(Some(_)) => {
586 continue;
587 }
588 Ok(None) | Err(_) => {
592 break PaymentStatus::Pending;
593 }
594 }
595 };
596
597 Ok(InvoiceStatus { invoice, status })
598 }
599
600 async fn run_db_migrations(&self) {
601 let migrations = Self::migrations();
602 let schema_version: u64 = self
603 .db
604 .begin_transaction_nc()
605 .await
606 .get_value(&SchemaVersionKey)
607 .await
608 .unwrap_or_default();
609
610 for (target_schema, migration_fn) in migrations
611 .into_iter()
612 .skip_while(|(target_schema, _)| *target_schema <= schema_version)
613 {
614 let mut dbtx = self.db.begin_transaction().await;
615 dbtx.insert_entry(&SchemaVersionKey, &target_schema).await;
616
617 migration_fn(self, dbtx.to_ref_nc()).await;
618
619 dbtx.commit_tx().await;
620 }
621 }
622}
623
624async fn await_invoice_confirmed(
625 ln_module: &ClientModuleInstance<'_, LightningClientModule>,
626 operation_id: OperationId,
627) -> Result<(), RecurringPaymentError> {
628 let mut operation_updated = ln_module
629 .subscribe_ln_receive(operation_id)
630 .await?
631 .into_stream();
632
633 while let Some(update) = operation_updated.next().await {
634 if matches!(update, LnReceiveState::WaitingForPayment { .. }) {
635 return Ok(());
636 }
637 }
638
639 Err(RecurringPaymentError::Other(anyhow!(
640 "BOLT11 invoice not confirmed"
641 )))
642}
643
644#[derive(Debug, Clone, Eq, PartialEq, Hash, Encodable, Decodable)]
645pub enum PaymentCodeInvoice {
646 Bolt11(Bolt11Invoice),
647}
648
649pub struct InvoiceStatus {
652 pub invoice: Bolt11Invoice,
653 pub status: PaymentStatus,
654}
655
656pub enum PaymentStatus {
657 Paid,
658 Pending,
659}
660
661impl PaymentStatus {
662 pub fn is_paid(&self) -> bool {
663 matches!(self, PaymentStatus::Paid)
664 }
665}
666
667#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
671pub struct LNURLPayInvoice {
672 pub pr: String,
673 pub verify: String,
674}
675
676fn operation_id_from_user_key(user_key: PaymentCodeRootKey, index: u64) -> OperationId {
677 let invoice_key = tweak_user_key(SECP256K1, user_key.0, index);
678 let preimage = sha256::Hash::hash(&invoice_key.serialize()[..]);
679 let payment_hash = sha256::Hash::hash(&preimage[..]);
680
681 OperationId(payment_hash.to_byte_array())
682}
683
684trait LnClientContextExt {
685 fn get_ln_module(
686 &'_ self,
687 ) -> Result<ClientModuleInstance<'_, LightningClientModule>, RecurringPaymentError>;
688}
689
690impl LnClientContextExt for ClientHandleArc {
691 fn get_ln_module(
692 &'_ self,
693 ) -> Result<ClientModuleInstance<'_, LightningClientModule>, RecurringPaymentError> {
694 self.get_first_module::<LightningClientModule>()
695 .map_err(|e| {
696 warn!("No compatible lightning module found {e}");
697 RecurringPaymentError::NoLightningModuleFound
698 })
699 }
700}
701
702fn spawn_gateway_cache_refresh(client: &ClientHandleArc) {
704 let client = client.clone();
705 client
706 .task_group()
707 .clone()
708 .spawn_cancellable("recurringd-gateway-cache-refresh", async move {
709 let Ok(ln_module) = client.get_ln_module() else {
710 warn!("No lightning module found, not refreshing gateway cache");
711 return;
712 };
713 ln_module
716 .update_gateway_cache_continuously(|gateways| async move { gateways })
717 .await
718 });
719}