1pub mod api;
2
3use std::collections::BTreeMap;
4use std::fmt::{Display, Formatter};
5use std::future::pending;
6use std::str::FromStr;
7use std::sync::Arc;
8use std::time::{Duration, SystemTime};
9
10use anyhow::bail;
11use api::{RecurringdApiError, RecurringdClient};
12use async_stream::stream;
13use bitcoin::hashes::sha256;
14use bitcoin::secp256k1::SECP256K1;
15use fedimint_client_module::OperationId;
16use fedimint_client_module::module::ClientContext;
17use fedimint_client_module::oplog::UpdateStreamOrOutcome;
18use fedimint_core::BitcoinHash;
19use fedimint_core::config::FederationId;
20use fedimint_core::core::ModuleKind;
21use fedimint_core::db::IDatabaseTransactionOpsCoreTyped;
22use fedimint_core::encoding::{Decodable, Encodable};
23use fedimint_core::secp256k1::{Keypair, PublicKey};
24use fedimint_core::task::sleep;
25use fedimint_core::util::{BoxFuture, FmtCompact, FmtCompactAnyhow, SafeUrl};
26use fedimint_derive_secret::ChildId;
27use fedimint_eventlog::{Event, EventKind, EventPersistence};
28use futures::StreamExt;
29use futures::future::select_all;
30use lightning_invoice::Bolt11Invoice;
31use serde::{Deserialize, Serialize};
32use thiserror::Error;
33use tokio::select;
34use tokio::sync::Notify;
35use tracing::{debug, trace, warn};
36
37use crate::db::{RecurringPaymentCodeKey, RecurringPaymentCodeKeyPrefix};
38use crate::receive::LightningReceiveError;
39use crate::{
40 LightningClientModule, LightningClientStateMachines, LightningOperationMeta,
41 LightningOperationMetaVariant, LnReceiveState, tweak_user_key, tweak_user_secret_key,
42};
43
44const LOG_CLIENT_RECURRING: &str = "fm::client::ln::recurring";
45
46impl LightningClientModule {
47 pub async fn register_recurring_payment_code(
48 &self,
49 protocol: RecurringPaymentProtocol,
50 recurringd_api: SafeUrl,
51 meta: &str,
52 ) -> Result<RecurringPaymentCodeEntry, RecurringdApiError> {
53 self.client_ctx
54 .module_db()
55 .autocommit(
56 |dbtx, _| {
57 let recurringd_api_inner = recurringd_api.clone();
58 let new_recurring_payment_code = self.new_recurring_payment_code.clone();
59 Box::pin(async move {
60 let next_idx = dbtx
61 .find_by_prefix_sorted_descending(&RecurringPaymentCodeKeyPrefix)
62 .await
63 .map(|(k, _)| k.derivation_idx)
64 .next()
65 .await
66 .map_or(0, |last_idx| last_idx + 1);
67
68 let payment_code_root_key = self.get_payment_code_root_key(next_idx);
69
70 let recurringd_client =
71 RecurringdClient::new(&recurringd_api_inner.clone());
72 let register_response = recurringd_client
73 .register_recurring_payment_code(
74 self.client_ctx
75 .get_config()
76 .await
77 .global
78 .calculate_federation_id(),
79 protocol,
80 crate::recurring::PaymentCodeRootKey(
81 payment_code_root_key.public_key(),
82 ),
83 meta,
84 )
85 .await?;
86
87 debug!(
88 target: LOG_CLIENT_RECURRING,
89 ?register_response,
90 "Registered recurring payment code"
91 );
92
93 let payment_code_entry = RecurringPaymentCodeEntry {
94 protocol,
95 root_keypair: payment_code_root_key,
96 code: register_response.recurring_payment_code,
97 recurringd_api: recurringd_api_inner,
98 last_derivation_index: 0,
99 creation_time: fedimint_core::time::now(),
100 meta: meta.to_owned(),
101 };
102 dbtx.insert_new_entry(
103 &crate::db::RecurringPaymentCodeKey {
104 derivation_idx: next_idx,
105 },
106 &payment_code_entry,
107 )
108 .await;
109 dbtx.on_commit(move || new_recurring_payment_code.notify_waiters());
110
111 Ok(payment_code_entry)
112 })
113 },
114 None,
115 )
116 .await
117 .map_err(|e| match e {
118 fedimint_core::db::AutocommitError::ClosureError { error, .. } => error,
119 fedimint_core::db::AutocommitError::CommitFailed { last_error, .. } => {
120 panic!("Commit failed: {last_error}")
121 }
122 })
123 }
124
125 pub async fn get_recurring_payment_codes(&self) -> Vec<(u64, RecurringPaymentCodeEntry)> {
126 Self::get_recurring_payment_codes_static(self.client_ctx.module_db()).await
127 }
128
129 pub async fn get_recurring_payment_codes_static(
130 db: &fedimint_core::db::Database,
131 ) -> Vec<(u64, RecurringPaymentCodeEntry)> {
132 assert!(!db.is_global(), "Needs to run in module context");
133 db.begin_transaction_nc()
134 .await
135 .find_by_prefix(&RecurringPaymentCodeKeyPrefix)
136 .await
137 .map(|(idx, entry)| (idx.derivation_idx, entry))
138 .collect()
139 .await
140 }
141
142 fn get_payment_code_root_key(&self, payment_code_registration_idx: u64) -> Keypair {
143 self.recurring_payment_code_secret
144 .child_key(ChildId(payment_code_registration_idx))
145 .to_secp_key(&self.secp)
146 }
147
148 pub async fn scan_recurring_payment_code_invoices(
149 client: ClientContext<Self>,
150 new_code_registered: Arc<Notify>,
151 ) {
152 const QUERY_RETRY_DELAY: Duration = Duration::from_secs(60);
153
154 loop {
155 let new_code_registered_future = new_code_registered.notified();
159
160 let all_recurring_invoice_futures = Self::get_recurring_payment_codes_static(client.module_db())
162 .await
163 .into_iter()
164 .map(|(payment_code_idx, payment_code)| Box::pin(async move {
165 let client = RecurringdClient::new(&payment_code.recurringd_api.clone());
166 let invoice_index = payment_code.last_derivation_index + 1;
167
168 trace!(
169 target: LOG_CLIENT_RECURRING,
170 root_key=?payment_code.root_keypair.public_key(),
171 %invoice_index,
172 server=%payment_code.recurringd_api,
173 "Waiting for new invoice from recurringd"
174 );
175
176 match client.await_new_invoice(crate::recurring::PaymentCodeRootKey(payment_code.root_keypair.public_key()), invoice_index).await {
177 Ok(invoice) => {Ok((payment_code_idx, payment_code, invoice_index, invoice))}
178 Err(err) => {
179 debug!(
180 target: LOG_CLIENT_RECURRING,
181 err=%err.fmt_compact(),
182 root_key=?payment_code.root_keypair.public_key(),
183 invoice_index=%invoice_index,
184 server=%payment_code.recurringd_api,
185 "Failed querying recurring payment code invoice, will retry in {:?}",
186 QUERY_RETRY_DELAY,
187 );
188 sleep(QUERY_RETRY_DELAY).await;
189 Err(err)
190 }
191 }
192 }))
193 .collect::<Vec<_>>();
194
195 let await_any_invoice: BoxFuture<_> = if all_recurring_invoice_futures.is_empty() {
197 Box::pin(pending())
198 } else {
199 Box::pin(select_all(all_recurring_invoice_futures))
200 };
201
202 let (payment_code_idx, _payment_code, invoice_idx, invoice) = select! {
203 (ret, _, _) = await_any_invoice => match ret {
204 Ok(ret) => ret,
205 Err(_) => {
206 continue;
207 }
208 },
209 () = new_code_registered_future => {
210 continue;
211 }
212 };
213
214 Self::process_recurring_payment_code_invoice(
215 &client,
216 payment_code_idx,
217 invoice_idx,
218 invoice,
219 )
220 .await;
221
222 sleep(Duration::from_secs(1)).await;
224 }
225 }
226
227 async fn process_recurring_payment_code_invoice(
228 client: &ClientContext<Self>,
229 payment_code_idx: u64,
230 invoice_idx: u64,
231 invoice: lightning_invoice::Bolt11Invoice,
232 ) {
233 let mut dbtx = client.module_db().begin_transaction().await;
235 let old_payment_code_entry = dbtx
236 .get_value(&crate::db::RecurringPaymentCodeKey {
237 derivation_idx: payment_code_idx,
238 })
239 .await
240 .expect("We queried it, so it exists in our DB");
241
242 let new_payment_code_entry = RecurringPaymentCodeEntry {
243 last_derivation_index: invoice_idx,
244 ..old_payment_code_entry.clone()
245 };
246 dbtx.insert_entry(
247 &crate::db::RecurringPaymentCodeKey {
248 derivation_idx: payment_code_idx,
249 },
250 &new_payment_code_entry,
251 )
252 .await;
253
254 let mut dbtx_nc = dbtx.to_ref_nc();
258 if let Ok(operation_id) = Self::create_recurring_receive_operation(
259 client,
260 &mut dbtx_nc,
261 &old_payment_code_entry,
262 invoice_idx,
263 invoice,
264 )
265 .await
266 {
267 client
268 .log_event(
269 &mut dbtx_nc,
270 RecurringInvoiceCreatedEvent {
271 payment_code_idx,
272 invoice_idx,
273 operation_id,
274 },
275 )
276 .await;
277 } else {
278 debug_assert!(
279 false,
280 "Recurring invoice operation creation failed, this should never happen"
281 );
282 }
283 drop(dbtx_nc);
284
285 dbtx.commit_tx().await;
286 }
287
288 #[allow(clippy::pedantic)]
289 async fn create_recurring_receive_operation(
290 client: &ClientContext<Self>,
291 dbtx: &mut fedimint_core::db::DatabaseTransaction<'_>,
292 payment_code: &RecurringPaymentCodeEntry,
293 invoice_index: u64,
294 invoice: lightning_invoice::Bolt11Invoice,
295 ) -> anyhow::Result<OperationId> {
296 let invoice_key =
298 tweak_user_secret_key(SECP256K1, payment_code.root_keypair, invoice_index);
299
300 let operation_id = OperationId(*invoice.payment_hash().as_ref());
301 debug!(
302 target: LOG_CLIENT_RECURRING,
303 ?operation_id,
304 payment_code_key=?payment_code.root_keypair.public_key(),
305 invoice_index=%invoice_index,
306 "Creating recurring receive operation"
307 );
308 let ln_state =
309 LightningClientStateMachines::Receive(crate::receive::LightningReceiveStateMachine {
310 operation_id,
311 state: crate::receive::LightningReceiveStates::ConfirmedInvoice(
314 crate::receive::LightningReceiveConfirmedInvoice {
315 invoice: invoice.clone(),
316 receiving_key: crate::ReceivingKey::Personal(invoice_key),
317 },
318 ),
319 });
320
321 if let Err(e) = client
322 .manual_operation_start_dbtx(
323 dbtx,
324 operation_id,
325 "ln",
326 LightningOperationMeta {
327 variant: LightningOperationMetaVariant::RecurringPaymentReceive(
328 ReurringPaymentReceiveMeta {
329 payment_code_id: PaymentCodeRootKey(
330 payment_code.root_keypair.public_key(),
331 )
332 .to_payment_code_id(),
333 invoice,
334 },
335 ),
336 extra_meta: serde_json::Value::Null,
337 },
338 vec![client.make_dyn_state(ln_state)],
339 )
340 .await
341 {
342 warn!(
343 target: LOG_CLIENT_RECURRING,
344 ?operation_id,
345 payment_code_key=?payment_code.root_keypair.public_key(),
346 invoice_index=%invoice_index,
347 err = %e.fmt_compact_anyhow(),
348 "Failed to create recurring receive operation"
349 );
350 Err(e)
351 } else {
352 Ok(operation_id)
353 }
354 }
355
356 pub async fn subscribe_ln_recurring_receive(
357 &self,
358 operation_id: OperationId,
359 ) -> anyhow::Result<UpdateStreamOrOutcome<LnReceiveState>> {
360 let operation = self.client_ctx.get_operation(operation_id).await?;
361 let LightningOperationMetaVariant::RecurringPaymentReceive(ReurringPaymentReceiveMeta {
362 invoice,
363 ..
364 }) = operation.meta::<LightningOperationMeta>().variant
365 else {
366 bail!("Operation is not a recurring lightning receive")
367 };
368
369 let client_ctx = self.client_ctx.clone();
370
371 Ok(self.client_ctx.outcome_or_updates(operation, operation_id, move || {
372 stream! {
373 let self_ref = client_ctx.self_ref();
374
375 yield LnReceiveState::Created;
376 yield LnReceiveState::WaitingForPayment { invoice: invoice.to_string(), timeout: invoice.expiry_time() };
377
378 match self_ref.await_receive_success(operation_id).await {
379 Ok(_) => {
380 yield LnReceiveState::Funded;
381
382 if let Ok(out_points) = self_ref.await_claim_acceptance(operation_id).await {
383 yield LnReceiveState::AwaitingFunds;
384
385 if client_ctx.await_primary_module_outputs(operation_id, out_points).await.is_ok() {
386 yield LnReceiveState::Claimed;
387 return;
388 }
389 }
390
391 yield LnReceiveState::Canceled { reason: LightningReceiveError::Rejected };
392 }
393 Err(e) => {
394 yield LnReceiveState::Canceled { reason: e };
395 }
396 }
397 }
398 }))
399 }
400
401 pub async fn list_recurring_payment_codes(&self) -> BTreeMap<u64, RecurringPaymentCodeEntry> {
402 self.client_ctx
403 .module_db()
404 .begin_transaction_nc()
405 .await
406 .find_by_prefix(&RecurringPaymentCodeKeyPrefix)
407 .await
408 .map(|(idx, entry)| (idx.derivation_idx, entry))
409 .collect()
410 .await
411 }
412
413 pub async fn get_recurring_payment_code(
414 &self,
415 payment_code_idx: u64,
416 ) -> Option<RecurringPaymentCodeEntry> {
417 self.client_ctx
418 .module_db()
419 .begin_transaction_nc()
420 .await
421 .get_value(&RecurringPaymentCodeKey {
422 derivation_idx: payment_code_idx,
423 })
424 .await
425 }
426
427 pub async fn list_recurring_payment_code_invoices(
428 &self,
429 payment_code_idx: u64,
430 ) -> Option<BTreeMap<u64, OperationId>> {
431 let payment_code = self.get_recurring_payment_code(payment_code_idx).await?;
432
433 let operations = (1..=payment_code.last_derivation_index)
434 .map(|invoice_idx: u64| {
435 let invoice_key = tweak_user_key(
436 SECP256K1,
437 payment_code.root_keypair.public_key(),
438 invoice_idx,
439 );
440 let payment_hash =
441 sha256::Hash::hash(&sha256::Hash::hash(&invoice_key.serialize())[..]);
442 let operation_id = OperationId(*payment_hash.as_ref());
443
444 (invoice_idx, operation_id)
445 })
446 .collect();
447
448 Some(operations)
449 }
450}
451
452#[derive(
453 Debug,
454 Clone,
455 Copy,
456 PartialOrd,
457 Eq,
458 PartialEq,
459 Hash,
460 Encodable,
461 Decodable,
462 Serialize,
463 Deserialize,
464)]
465pub struct PaymentCodeRootKey(pub PublicKey);
466
467#[derive(
468 Debug,
469 Clone,
470 Copy,
471 PartialOrd,
472 Eq,
473 PartialEq,
474 Hash,
475 Encodable,
476 Decodable,
477 Serialize,
478 Deserialize,
479)]
480pub struct PaymentCodeId(sha256::Hash);
481
482impl PaymentCodeRootKey {
483 pub fn to_payment_code_id(&self) -> PaymentCodeId {
484 PaymentCodeId(sha256::Hash::hash(&self.0.serialize()))
485 }
486}
487
488impl Display for PaymentCodeId {
489 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
490 write!(f, "{}", self.0)
491 }
492}
493
494impl FromStr for PaymentCodeId {
495 type Err = anyhow::Error;
496
497 fn from_str(s: &str) -> Result<Self, Self::Err> {
498 Ok(Self(sha256::Hash::from_str(s)?))
499 }
500}
501
502impl Display for PaymentCodeRootKey {
503 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
504 write!(f, "{}", self.0)
505 }
506}
507
508impl FromStr for PaymentCodeRootKey {
509 type Err = anyhow::Error;
510
511 fn from_str(s: &str) -> Result<Self, Self::Err> {
512 Ok(Self(PublicKey::from_str(s)?))
513 }
514}
515
516#[derive(
517 Debug,
518 Clone,
519 Copy,
520 Eq,
521 PartialEq,
522 PartialOrd,
523 Hash,
524 Encodable,
525 Decodable,
526 Serialize,
527 Deserialize,
528)]
529pub enum RecurringPaymentProtocol {
530 LNURL,
531 BOLT12,
532}
533
534#[derive(Debug, Clone, Serialize, Deserialize)]
535pub struct ReurringPaymentReceiveMeta {
536 pub payment_code_id: PaymentCodeId,
537 pub invoice: Bolt11Invoice,
538}
539
540#[derive(Debug, Error)]
541pub enum RecurringPaymentError {
542 #[error("Unsupported protocol: {0:?}")]
543 UnsupportedProtocol(RecurringPaymentProtocol),
544 #[error("Unknown federation ID: {0}")]
545 UnknownFederationId(FederationId),
546 #[error("Unknown payment code: {0:?}")]
547 UnknownPaymentCode(PaymentCodeId),
548 #[error("Unknown lightning receive operation: {0:?}")]
549 UnknownInvoice(OperationId),
550 #[error("No compatible lightning module found")]
551 NoLightningModuleFound,
552 #[error("No gateway found")]
553 NoGatewayFound,
554 #[error("Payment code already exists with different settings: {0:?}")]
555 PaymentCodeAlreadyExists(PaymentCodeRootKey),
556 #[error("Federation already registered: {0}")]
557 FederationAlreadyRegistered(FederationId),
558 #[error("Error joining federation: {0}")]
559 JoiningFederationFailed(anyhow::Error),
560 #[error("Error registering with recurring payment service: {0}")]
561 Other(#[from] anyhow::Error),
562}
563
564#[derive(Debug, Clone, Encodable, Decodable, Serialize)]
565pub struct RecurringPaymentCodeEntry {
566 pub protocol: RecurringPaymentProtocol,
567 pub root_keypair: Keypair,
568 pub code: String,
569 pub recurringd_api: SafeUrl,
570 pub last_derivation_index: u64,
571 pub creation_time: SystemTime,
572 pub meta: String,
573}
574
575#[derive(Debug, Clone, Serialize, Deserialize)]
583pub struct RecurringInvoiceCreatedEvent {
584 pub payment_code_idx: u64,
585 pub invoice_idx: u64,
586 pub operation_id: OperationId,
587}
588
589impl Event for RecurringInvoiceCreatedEvent {
590 const MODULE: Option<ModuleKind> = Some(fedimint_ln_common::KIND);
591 const KIND: EventKind = EventKind::from_static("recurring_invoice_created");
592 const PERSISTENCE: EventPersistence = EventPersistence::Persistent;
593}