1pub mod api;
2
3use std::collections::BTreeMap;
4use std::fmt::{Display, Formatter};
5use std::future::pending;
6use std::str::FromStr;
7use std::sync::Arc;
8use std::time::{Duration, SystemTime};
9
10use anyhow::bail;
11use api::{RecurringdApiError, RecurringdClient};
12use async_stream::stream;
13use bitcoin::hashes::sha256;
14use bitcoin::secp256k1::SECP256K1;
15use fedimint_client_module::OperationId;
16use fedimint_client_module::module::ClientContext;
17use fedimint_client_module::oplog::UpdateStreamOrOutcome;
18use fedimint_core::BitcoinHash;
19use fedimint_core::config::FederationId;
20use fedimint_core::db::IDatabaseTransactionOpsCoreTyped;
21use fedimint_core::encoding::{Decodable, Encodable};
22use fedimint_core::secp256k1::{Keypair, PublicKey};
23use fedimint_core::task::sleep;
24use fedimint_core::util::{BoxFuture, FmtCompact, FmtCompactAnyhow, SafeUrl};
25use fedimint_derive_secret::ChildId;
26use futures::StreamExt;
27use futures::future::select_all;
28use lightning_invoice::Bolt11Invoice;
29use serde::{Deserialize, Serialize};
30use thiserror::Error;
31use tokio::select;
32use tokio::sync::Notify;
33use tracing::{debug, trace, warn};
34
35use crate::db::{RecurringPaymentCodeKey, RecurringPaymentCodeKeyPrefix};
36use crate::receive::LightningReceiveError;
37use crate::{
38 LightningClientModule, LightningClientStateMachines, LightningOperationMeta,
39 LightningOperationMetaVariant, LnReceiveState, tweak_user_key, tweak_user_secret_key,
40};
41
42const LOG_CLIENT_RECURRING: &str = "fm::client::ln::recurring";
43
44impl LightningClientModule {
45 pub async fn register_recurring_payment_code(
46 &self,
47 protocol: RecurringPaymentProtocol,
48 recurringd_api: SafeUrl,
49 meta: &str,
50 ) -> Result<RecurringPaymentCodeEntry, RecurringdApiError> {
51 self.client_ctx
52 .module_db()
53 .autocommit(
54 |dbtx, _| {
55 let recurringd_api_inner = recurringd_api.clone();
56 let new_recurring_payment_code = self.new_recurring_payment_code.clone();
57 Box::pin(async move {
58 let next_idx = dbtx
59 .find_by_prefix_sorted_descending(&RecurringPaymentCodeKeyPrefix)
60 .await
61 .map(|(k, _)| k.derivation_idx)
62 .next()
63 .await
64 .map_or(0, |last_idx| last_idx + 1);
65
66 let payment_code_root_key = self.get_payment_code_root_key(next_idx);
67
68 let recurringd_client =
69 RecurringdClient::new(&recurringd_api_inner.clone());
70 let register_response = recurringd_client
71 .register_recurring_payment_code(
72 self.client_ctx
73 .get_config()
74 .await
75 .global
76 .calculate_federation_id(),
77 protocol,
78 crate::recurring::PaymentCodeRootKey(
79 payment_code_root_key.public_key(),
80 ),
81 meta,
82 )
83 .await?;
84
85 debug!(
86 target: LOG_CLIENT_RECURRING,
87 ?register_response,
88 "Registered recurring payment code"
89 );
90
91 let payment_code_entry = RecurringPaymentCodeEntry {
92 protocol,
93 root_keypair: payment_code_root_key,
94 code: register_response.recurring_payment_code,
95 recurringd_api: recurringd_api_inner,
96 last_derivation_index: 0,
97 creation_time: fedimint_core::time::now(),
98 meta: meta.to_owned(),
99 };
100 dbtx.insert_new_entry(
101 &crate::db::RecurringPaymentCodeKey {
102 derivation_idx: next_idx,
103 },
104 &payment_code_entry,
105 )
106 .await;
107 dbtx.on_commit(move || new_recurring_payment_code.notify_waiters());
108
109 Ok(payment_code_entry)
110 })
111 },
112 None,
113 )
114 .await
115 .map_err(|e| match e {
116 fedimint_core::db::AutocommitError::ClosureError { error, .. } => error,
117 fedimint_core::db::AutocommitError::CommitFailed { last_error, .. } => {
118 panic!("Commit failed: {last_error}")
119 }
120 })
121 }
122
123 pub async fn get_recurring_payment_codes(&self) -> Vec<(u64, RecurringPaymentCodeEntry)> {
124 Self::get_recurring_payment_codes_static(self.client_ctx.module_db()).await
125 }
126
127 pub async fn get_recurring_payment_codes_static(
128 db: &fedimint_core::db::Database,
129 ) -> Vec<(u64, RecurringPaymentCodeEntry)> {
130 assert!(!db.is_global(), "Needs to run in module context");
131 db.begin_transaction_nc()
132 .await
133 .find_by_prefix(&RecurringPaymentCodeKeyPrefix)
134 .await
135 .map(|(idx, entry)| (idx.derivation_idx, entry))
136 .collect()
137 .await
138 }
139
140 fn get_payment_code_root_key(&self, payment_code_registration_idx: u64) -> Keypair {
141 self.recurring_payment_code_secret
142 .child_key(ChildId(payment_code_registration_idx))
143 .to_secp_key(&self.secp)
144 }
145
146 pub async fn scan_recurring_payment_code_invoices(
147 client: ClientContext<Self>,
148 new_code_registered: Arc<Notify>,
149 ) {
150 const QUERY_RETRY_DELAY: Duration = Duration::from_secs(60);
151
152 loop {
153 let new_code_registered_future = new_code_registered.notified();
157
158 let all_recurring_invoice_futures = Self::get_recurring_payment_codes_static(client.module_db())
160 .await
161 .into_iter()
162 .map(|(payment_code_idx, payment_code)| Box::pin(async move {
163 let client = RecurringdClient::new(&payment_code.recurringd_api.clone());
164 let invoice_index = payment_code.last_derivation_index + 1;
165
166 trace!(
167 target: LOG_CLIENT_RECURRING,
168 root_key=?payment_code.root_keypair.public_key(),
169 %invoice_index,
170 server=%payment_code.recurringd_api,
171 "Waiting for new invoice from recurringd"
172 );
173
174 match client.await_new_invoice(crate::recurring::PaymentCodeRootKey(payment_code.root_keypair.public_key()), invoice_index).await {
175 Ok(invoice) => {Ok((payment_code_idx, payment_code, invoice_index, invoice))}
176 Err(err) => {
177 debug!(
178 target: LOG_CLIENT_RECURRING,
179 err=%err.fmt_compact(),
180 root_key=?payment_code.root_keypair.public_key(),
181 invoice_index=%invoice_index,
182 server=%payment_code.recurringd_api,
183 "Failed querying recurring payment code invoice, will retry in {:?}",
184 QUERY_RETRY_DELAY,
185 );
186 sleep(QUERY_RETRY_DELAY).await;
187 Err(err)
188 }
189 }
190 }))
191 .collect::<Vec<_>>();
192
193 let await_any_invoice: BoxFuture<_> = if all_recurring_invoice_futures.is_empty() {
195 Box::pin(pending())
196 } else {
197 Box::pin(select_all(all_recurring_invoice_futures))
198 };
199
200 let (payment_code_idx, _payment_code, invoice_idx, invoice) = select! {
201 (ret, _, _) = await_any_invoice => match ret {
202 Ok(ret) => ret,
203 Err(_) => {
204 continue;
205 }
206 },
207 () = new_code_registered_future => {
208 continue;
209 }
210 };
211
212 Self::process_recurring_payment_code_invoice(
213 &client,
214 payment_code_idx,
215 invoice_idx,
216 invoice,
217 )
218 .await;
219
220 sleep(Duration::from_secs(1)).await;
222 }
223 }
224
225 async fn process_recurring_payment_code_invoice(
226 client: &ClientContext<Self>,
227 payment_code_idx: u64,
228 invoice_idx: u64,
229 invoice: lightning_invoice::Bolt11Invoice,
230 ) {
231 let mut dbtx = client.module_db().begin_transaction().await;
233 let old_payment_code_entry = dbtx
234 .get_value(&crate::db::RecurringPaymentCodeKey {
235 derivation_idx: payment_code_idx,
236 })
237 .await
238 .expect("We queried it, so it exists in our DB");
239
240 let new_payment_code_entry = RecurringPaymentCodeEntry {
241 last_derivation_index: invoice_idx,
242 ..old_payment_code_entry.clone()
243 };
244 dbtx.insert_entry(
245 &crate::db::RecurringPaymentCodeKey {
246 derivation_idx: payment_code_idx,
247 },
248 &new_payment_code_entry,
249 )
250 .await;
251
252 Self::create_recurring_receive_operation(
253 client,
254 &mut dbtx.to_ref_nc(),
255 &old_payment_code_entry,
256 invoice_idx,
257 invoice,
258 )
259 .await;
260
261 dbtx.commit_tx().await;
262 }
263
264 #[allow(clippy::pedantic)]
265 async fn create_recurring_receive_operation(
266 client: &ClientContext<Self>,
267 dbtx: &mut fedimint_core::db::DatabaseTransaction<'_>,
268 payment_code: &RecurringPaymentCodeEntry,
269 invoice_index: u64,
270 invoice: lightning_invoice::Bolt11Invoice,
271 ) {
272 let invoice_key =
274 tweak_user_secret_key(SECP256K1, payment_code.root_keypair, invoice_index);
275
276 let operation_id = OperationId(*invoice.payment_hash().as_ref());
277 debug!(
278 target: LOG_CLIENT_RECURRING,
279 ?operation_id,
280 payment_code_key=?payment_code.root_keypair.public_key(),
281 invoice_index=%invoice_index,
282 "Creating recurring receive operation"
283 );
284 let ln_state =
285 LightningClientStateMachines::Receive(crate::receive::LightningReceiveStateMachine {
286 operation_id,
287 state: crate::receive::LightningReceiveStates::ConfirmedInvoice(
290 crate::receive::LightningReceiveConfirmedInvoice {
291 invoice: invoice.clone(),
292 receiving_key: crate::ReceivingKey::Personal(invoice_key),
293 },
294 ),
295 });
296
297 if let Err(e) = client
298 .manual_operation_start_dbtx(
299 dbtx,
300 operation_id,
301 "ln",
302 LightningOperationMeta {
303 variant: LightningOperationMetaVariant::RecurringPaymentReceive(
304 ReurringPaymentReceiveMeta {
305 payment_code_id: PaymentCodeRootKey(
306 payment_code.root_keypair.public_key(),
307 )
308 .to_payment_code_id(),
309 invoice,
310 },
311 ),
312 extra_meta: serde_json::Value::Null,
313 },
314 vec![client.make_dyn_state(ln_state)],
315 )
316 .await
317 {
318 warn!(
319 target: LOG_CLIENT_RECURRING,
320 ?operation_id,
321 payment_code_key=?payment_code.root_keypair.public_key(),
322 invoice_index=%invoice_index,
323 err = %e.fmt_compact_anyhow(),
324 "Failed to create recurring receive operation"
325 )
326 }
327 }
328
329 pub async fn subscribe_ln_recurring_receive(
330 &self,
331 operation_id: OperationId,
332 ) -> anyhow::Result<UpdateStreamOrOutcome<LnReceiveState>> {
333 let operation = self.client_ctx.get_operation(operation_id).await?;
334 let LightningOperationMetaVariant::RecurringPaymentReceive(ReurringPaymentReceiveMeta {
335 invoice,
336 ..
337 }) = operation.meta::<LightningOperationMeta>().variant
338 else {
339 bail!("Operation is not a recurring lightning receive")
340 };
341
342 let client_ctx = self.client_ctx.clone();
343
344 Ok(self.client_ctx.outcome_or_updates(operation, operation_id, move || {
345 stream! {
346 let self_ref = client_ctx.self_ref();
347
348 yield LnReceiveState::Created;
349 yield LnReceiveState::WaitingForPayment { invoice: invoice.to_string(), timeout: invoice.expiry_time() };
350
351 match self_ref.await_receive_success(operation_id).await {
352 Ok(_) => {
353 yield LnReceiveState::Funded;
354
355 if let Ok(out_points) = self_ref.await_claim_acceptance(operation_id).await {
356 yield LnReceiveState::AwaitingFunds;
357
358 if client_ctx.await_primary_module_outputs(operation_id, out_points).await.is_ok() {
359 yield LnReceiveState::Claimed;
360 return;
361 }
362 }
363
364 yield LnReceiveState::Canceled { reason: LightningReceiveError::Rejected };
365 }
366 Err(e) => {
367 yield LnReceiveState::Canceled { reason: e };
368 }
369 }
370 }
371 }))
372 }
373
374 pub async fn list_recurring_payment_codes(&self) -> BTreeMap<u64, RecurringPaymentCodeEntry> {
375 self.client_ctx
376 .module_db()
377 .begin_transaction_nc()
378 .await
379 .find_by_prefix(&RecurringPaymentCodeKeyPrefix)
380 .await
381 .map(|(idx, entry)| (idx.derivation_idx, entry))
382 .collect()
383 .await
384 }
385
386 pub async fn get_recurring_payment_code(
387 &self,
388 payment_code_idx: u64,
389 ) -> Option<RecurringPaymentCodeEntry> {
390 self.client_ctx
391 .module_db()
392 .begin_transaction_nc()
393 .await
394 .get_value(&RecurringPaymentCodeKey {
395 derivation_idx: payment_code_idx,
396 })
397 .await
398 }
399
400 pub async fn list_recurring_payment_code_invoices(
401 &self,
402 payment_code_idx: u64,
403 ) -> Option<BTreeMap<u64, OperationId>> {
404 let payment_code = self.get_recurring_payment_code(payment_code_idx).await?;
405
406 let operations = (1..=payment_code.last_derivation_index)
407 .map(|invoice_idx: u64| {
408 let invoice_key = tweak_user_key(
409 SECP256K1,
410 payment_code.root_keypair.public_key(),
411 invoice_idx,
412 );
413 let payment_hash =
414 sha256::Hash::hash(&sha256::Hash::hash(&invoice_key.serialize())[..]);
415 let operation_id = OperationId(*payment_hash.as_ref());
416
417 (invoice_idx, operation_id)
418 })
419 .collect();
420
421 Some(operations)
422 }
423}
424
425#[derive(
426 Debug,
427 Clone,
428 Copy,
429 PartialOrd,
430 Eq,
431 PartialEq,
432 Hash,
433 Encodable,
434 Decodable,
435 Serialize,
436 Deserialize,
437)]
438pub struct PaymentCodeRootKey(pub PublicKey);
439
440#[derive(
441 Debug,
442 Clone,
443 Copy,
444 PartialOrd,
445 Eq,
446 PartialEq,
447 Hash,
448 Encodable,
449 Decodable,
450 Serialize,
451 Deserialize,
452)]
453pub struct PaymentCodeId(sha256::Hash);
454
455impl PaymentCodeRootKey {
456 pub fn to_payment_code_id(&self) -> PaymentCodeId {
457 PaymentCodeId(sha256::Hash::hash(&self.0.serialize()))
458 }
459}
460
461impl Display for PaymentCodeId {
462 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
463 write!(f, "{}", self.0)
464 }
465}
466
467impl FromStr for PaymentCodeId {
468 type Err = anyhow::Error;
469
470 fn from_str(s: &str) -> Result<Self, Self::Err> {
471 Ok(Self(sha256::Hash::from_str(s)?))
472 }
473}
474
475impl Display for PaymentCodeRootKey {
476 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
477 write!(f, "{}", self.0)
478 }
479}
480
481impl FromStr for PaymentCodeRootKey {
482 type Err = anyhow::Error;
483
484 fn from_str(s: &str) -> Result<Self, Self::Err> {
485 Ok(Self(PublicKey::from_str(s)?))
486 }
487}
488
489#[derive(
490 Debug,
491 Clone,
492 Copy,
493 Eq,
494 PartialEq,
495 PartialOrd,
496 Hash,
497 Encodable,
498 Decodable,
499 Serialize,
500 Deserialize,
501)]
502pub enum RecurringPaymentProtocol {
503 LNURL,
504 BOLT12,
505}
506
507#[derive(Debug, Clone, Serialize, Deserialize)]
508pub struct ReurringPaymentReceiveMeta {
509 pub payment_code_id: PaymentCodeId,
510 pub invoice: Bolt11Invoice,
511}
512
513#[derive(Debug, Error)]
514pub enum RecurringPaymentError {
515 #[error("Unsupported protocol: {0:?}")]
516 UnsupportedProtocol(RecurringPaymentProtocol),
517 #[error("Unknown federation ID: {0}")]
518 UnknownFederationId(FederationId),
519 #[error("Unknown payment code: {0:?}")]
520 UnknownPaymentCode(PaymentCodeId),
521 #[error("Unknown lightning receive operation: {0:?}")]
522 UnknownInvoice(OperationId),
523 #[error("No compatible lightning module found")]
524 NoLightningModuleFound,
525 #[error("No gateway found")]
526 NoGatewayFound,
527 #[error("Payment code already exists with different settings: {0:?}")]
528 PaymentCodeAlreadyExists(PaymentCodeRootKey),
529 #[error("Federation already registered: {0}")]
530 FederationAlreadyRegistered(FederationId),
531 #[error("Error joining federation: {0}")]
532 JoiningFederationFailed(anyhow::Error),
533 #[error("Error registering with recurring payment service: {0}")]
534 Other(#[from] anyhow::Error),
535}
536
537#[derive(Debug, Clone, Encodable, Decodable, Serialize)]
538pub struct RecurringPaymentCodeEntry {
539 pub protocol: RecurringPaymentProtocol,
540 pub root_keypair: Keypair,
541 pub code: String,
542 pub recurringd_api: SafeUrl,
543 pub last_derivation_index: u64,
544 pub creation_time: SystemTime,
545 pub meta: String,
546}