1pub mod api;
2
3use std::collections::BTreeMap;
4use std::fmt::{Display, Formatter};
5use std::future::pending;
6use std::str::FromStr;
7use std::sync::Arc;
8use std::time::{Duration, SystemTime};
9
10use anyhow::bail;
11use api::{RecurringdApiError, RecurringdClient};
12use async_stream::stream;
13use bitcoin::hashes::sha256;
14use bitcoin::secp256k1::SECP256K1;
15use fedimint_client_module::OperationId;
16use fedimint_client_module::module::ClientContext;
17use fedimint_client_module::oplog::UpdateStreamOrOutcome;
18use fedimint_core::BitcoinHash;
19use fedimint_core::config::FederationId;
20use fedimint_core::db::IDatabaseTransactionOpsCoreTyped;
21use fedimint_core::encoding::{Decodable, Encodable};
22use fedimint_core::secp256k1::{Keypair, PublicKey};
23use fedimint_core::task::sleep;
24use fedimint_core::util::{BoxFuture, FmtCompact, SafeUrl};
25use fedimint_derive_secret::ChildId;
26use futures::StreamExt;
27use futures::future::select_all;
28use lightning_invoice::Bolt11Invoice;
29use serde::{Deserialize, Serialize};
30use thiserror::Error;
31use tokio::select;
32use tokio::sync::Notify;
33use tracing::{debug, trace};
34
35use crate::db::{RecurringPaymentCodeKey, RecurringPaymentCodeKeyPrefix};
36use crate::receive::LightningReceiveError;
37use crate::{
38 LightningClientModule, LightningClientStateMachines, LightningOperationMeta,
39 LightningOperationMetaVariant, LnReceiveState, tweak_user_key, tweak_user_secret_key,
40};
41
42impl LightningClientModule {
43 pub async fn register_recurring_payment_code(
44 &self,
45 protocol: RecurringPaymentProtocol,
46 recurringd_api: SafeUrl,
47 meta: &str,
48 ) -> Result<RecurringPaymentCodeEntry, RecurringdApiError> {
49 self.client_ctx
50 .module_db()
51 .autocommit(
52 |dbtx, _| {
53 let recurringd_api_inner = recurringd_api.clone();
54 let new_recurring_payment_code = self.new_recurring_payment_code.clone();
55 Box::pin(async move {
56 let next_idx = dbtx
57 .find_by_prefix_sorted_descending(&RecurringPaymentCodeKeyPrefix)
58 .await
59 .map(|(k, _)| k.derivation_idx)
60 .next()
61 .await
62 .map_or(0, |last_idx| last_idx + 1);
63
64 let payment_code_root_key = self.get_payment_code_root_key(next_idx);
65
66 let recurringd_client =
67 RecurringdClient::new(&recurringd_api_inner.clone());
68 let register_response = recurringd_client
69 .register_recurring_payment_code(
70 self.client_ctx
71 .get_config()
72 .await
73 .global
74 .calculate_federation_id(),
75 protocol,
76 crate::recurring::PaymentCodeRootKey(
77 payment_code_root_key.public_key(),
78 ),
79 meta,
80 )
81 .await?;
82
83 let payment_code_entry = RecurringPaymentCodeEntry {
84 protocol,
85 root_keypair: payment_code_root_key,
86 code: register_response.recurring_payment_code,
87 recurringd_api: recurringd_api_inner,
88 last_derivation_index: 0,
89 creation_time: fedimint_core::time::now(),
90 meta: meta.to_owned(),
91 };
92 dbtx.insert_new_entry(
93 &crate::db::RecurringPaymentCodeKey {
94 derivation_idx: next_idx,
95 },
96 &payment_code_entry,
97 )
98 .await;
99 dbtx.on_commit(move || new_recurring_payment_code.notify_waiters());
100
101 Ok(payment_code_entry)
102 })
103 },
104 None,
105 )
106 .await
107 .map_err(|e| match e {
108 fedimint_core::db::AutocommitError::ClosureError { error, .. } => error,
109 fedimint_core::db::AutocommitError::CommitFailed { last_error, .. } => {
110 panic!("Commit failed: {last_error}")
111 }
112 })
113 }
114
115 pub async fn get_recurring_payment_codes(&self) -> Vec<(u64, RecurringPaymentCodeEntry)> {
116 Self::get_recurring_payment_codes_static(self.client_ctx.module_db()).await
117 }
118
119 pub async fn get_recurring_payment_codes_static(
120 db: &fedimint_core::db::Database,
121 ) -> Vec<(u64, RecurringPaymentCodeEntry)> {
122 assert!(!db.is_global(), "Needs to run in module context");
123 db.begin_transaction_nc()
124 .await
125 .find_by_prefix(&RecurringPaymentCodeKeyPrefix)
126 .await
127 .map(|(idx, entry)| (idx.derivation_idx, entry))
128 .collect()
129 .await
130 }
131
132 fn get_payment_code_root_key(&self, payment_code_registration_idx: u64) -> Keypair {
133 self.recurring_payment_code_secret
134 .child_key(ChildId(payment_code_registration_idx))
135 .to_secp_key(&self.secp)
136 }
137
138 pub async fn scan_recurring_payment_code_invoices(
139 client: ClientContext<Self>,
140 new_code_registered: Arc<Notify>,
141 ) {
142 const QUERY_RETRY_DELAY: Duration = Duration::from_secs(60);
143
144 loop {
145 let new_code_registered_future = new_code_registered.notified();
149
150 let all_recurring_invoice_futures = Self::get_recurring_payment_codes_static(client.module_db())
152 .await
153 .into_iter()
154 .map(|(payment_code_idx, payment_code)| Box::pin(async move {
155 let client = RecurringdClient::new(&payment_code.recurringd_api.clone());
156 let invoice_index = payment_code.last_derivation_index + 1;
157
158 trace!(
159 root_key=?payment_code.root_keypair.public_key(),
160 %invoice_index,
161 server=%payment_code.recurringd_api,
162 "Waiting for new invoice from recurringd"
163 );
164
165 match client.await_new_invoice(crate::recurring::PaymentCodeRootKey(payment_code.root_keypair.public_key()), invoice_index).await {
166 Ok(invoice) => {Ok((payment_code_idx, payment_code, invoice_index, invoice))}
167 Err(err) => {
168 debug!(
169 err=%err.fmt_compact(),
170 root_key=?payment_code.root_keypair.public_key(),
171 invoice_index=%invoice_index,
172 server=%payment_code.recurringd_api,
173 "Failed querying recurring payment code invoice, will retry in {:?}",
174 QUERY_RETRY_DELAY,
175 );
176 sleep(QUERY_RETRY_DELAY).await;
177 Err(err)
178 }
179 }
180 }))
181 .collect::<Vec<_>>();
182
183 let await_any_invoice: BoxFuture<_> = if all_recurring_invoice_futures.is_empty() {
185 Box::pin(pending())
186 } else {
187 Box::pin(select_all(all_recurring_invoice_futures))
188 };
189
190 let (payment_code_idx, _payment_code, invoice_idx, invoice) = select! {
191 (ret, _, _) = await_any_invoice => match ret {
192 Ok(ret) => ret,
193 Err(_) => {
194 continue;
195 }
196 },
197 () = new_code_registered_future => {
198 continue;
199 }
200 };
201
202 Self::process_recurring_payment_code_invoice(
203 &client,
204 payment_code_idx,
205 invoice_idx,
206 invoice,
207 )
208 .await;
209
210 sleep(Duration::from_secs(1)).await;
212 }
213 }
214
215 async fn process_recurring_payment_code_invoice(
216 client: &ClientContext<Self>,
217 payment_code_idx: u64,
218 invoice_idx: u64,
219 invoice: lightning_invoice::Bolt11Invoice,
220 ) {
221 let mut dbtx = client.module_db().begin_transaction().await;
223 let old_payment_code_entry = dbtx
224 .get_value(&crate::db::RecurringPaymentCodeKey {
225 derivation_idx: payment_code_idx,
226 })
227 .await
228 .expect("We queried it, so it exists in our DB");
229
230 let new_payment_code_entry = RecurringPaymentCodeEntry {
231 last_derivation_index: invoice_idx,
232 ..old_payment_code_entry.clone()
233 };
234 dbtx.insert_entry(
235 &crate::db::RecurringPaymentCodeKey {
236 derivation_idx: payment_code_idx,
237 },
238 &new_payment_code_entry,
239 )
240 .await;
241
242 Self::create_recurring_receive_operation(
243 client,
244 &mut dbtx.to_ref_nc(),
245 &old_payment_code_entry,
246 invoice_idx,
247 invoice,
248 )
249 .await;
250
251 dbtx.commit_tx().await;
252 }
253
254 #[allow(clippy::pedantic)]
255 async fn create_recurring_receive_operation(
256 client: &ClientContext<Self>,
257 dbtx: &mut fedimint_core::db::DatabaseTransaction<'_>,
258 payment_code: &RecurringPaymentCodeEntry,
259 invoice_index: u64,
260 invoice: lightning_invoice::Bolt11Invoice,
261 ) {
262 let invoice_key =
264 tweak_user_secret_key(SECP256K1, payment_code.root_keypair, invoice_index);
265
266 let operation_id = OperationId(*invoice.payment_hash().as_ref());
269 debug!(
270 ?operation_id,
271 payment_code_key=?payment_code.root_keypair.public_key(),
272 invoice_index=%invoice_index,
273 "Creating recurring receive operation"
274 );
275 let ln_state =
276 LightningClientStateMachines::Receive(crate::receive::LightningReceiveStateMachine {
277 operation_id,
278 state: crate::receive::LightningReceiveStates::ConfirmedInvoice(
281 crate::receive::LightningReceiveConfirmedInvoice {
282 invoice: invoice.clone(),
283 receiving_key: crate::ReceivingKey::Personal(invoice_key),
284 },
285 ),
286 });
287
288 client
289 .manual_operation_start_dbtx(
290 dbtx,
291 operation_id,
292 "ln",
293 LightningOperationMeta {
294 variant: LightningOperationMetaVariant::RecurringPaymentReceive(
295 ReurringPaymentReceiveMeta {
296 payment_code_id: PaymentCodeRootKey(
297 payment_code.root_keypair.public_key(),
298 )
299 .to_payment_code_id(),
300 invoice,
301 },
302 ),
303 extra_meta: serde_json::Value::Null,
304 },
305 vec![client.make_dyn_state(ln_state)],
306 )
307 .await
308 .expect("OperationId is random");
309 }
310
311 pub async fn subscribe_ln_recurring_receive(
312 &self,
313 operation_id: OperationId,
314 ) -> anyhow::Result<UpdateStreamOrOutcome<LnReceiveState>> {
315 let operation = self.client_ctx.get_operation(operation_id).await?;
316 let LightningOperationMetaVariant::RecurringPaymentReceive(ReurringPaymentReceiveMeta {
317 invoice,
318 ..
319 }) = operation.meta::<LightningOperationMeta>().variant
320 else {
321 bail!("Operation is not a recurring lightning receive")
322 };
323
324 let client_ctx = self.client_ctx.clone();
325
326 Ok(self.client_ctx.outcome_or_updates(operation, operation_id, move || {
327 stream! {
328 let self_ref = client_ctx.self_ref();
329
330 yield LnReceiveState::Created;
331 yield LnReceiveState::WaitingForPayment { invoice: invoice.to_string(), timeout: invoice.expiry_time() };
332
333 match self_ref.await_receive_success(operation_id).await {
334 Ok(_) => {
335 yield LnReceiveState::Funded;
336
337 if let Ok(out_points) = self_ref.await_claim_acceptance(operation_id).await {
338 yield LnReceiveState::AwaitingFunds;
339
340 if client_ctx.await_primary_module_outputs(operation_id, out_points).await.is_ok() {
341 yield LnReceiveState::Claimed;
342 return;
343 }
344 }
345
346 yield LnReceiveState::Canceled { reason: LightningReceiveError::Rejected };
347 }
348 Err(e) => {
349 yield LnReceiveState::Canceled { reason: e };
350 }
351 }
352 }
353 }))
354 }
355
356 pub async fn list_recurring_payment_codes(&self) -> BTreeMap<u64, RecurringPaymentCodeEntry> {
357 self.client_ctx
358 .module_db()
359 .begin_transaction_nc()
360 .await
361 .find_by_prefix(&RecurringPaymentCodeKeyPrefix)
362 .await
363 .map(|(idx, entry)| (idx.derivation_idx, entry))
364 .collect()
365 .await
366 }
367
368 pub async fn get_recurring_payment_code(
369 &self,
370 payment_code_idx: u64,
371 ) -> Option<RecurringPaymentCodeEntry> {
372 self.client_ctx
373 .module_db()
374 .begin_transaction_nc()
375 .await
376 .get_value(&RecurringPaymentCodeKey {
377 derivation_idx: payment_code_idx,
378 })
379 .await
380 }
381
382 pub async fn list_recurring_payment_code_invoices(
383 &self,
384 payment_code_idx: u64,
385 ) -> Option<BTreeMap<u64, OperationId>> {
386 let payment_code = self.get_recurring_payment_code(payment_code_idx).await?;
387
388 let operations = (1..=payment_code.last_derivation_index)
389 .map(|invoice_idx: u64| {
390 let invoice_key = tweak_user_key(
391 SECP256K1,
392 payment_code.root_keypair.public_key(),
393 invoice_idx,
394 );
395 let payment_hash =
396 sha256::Hash::hash(&sha256::Hash::hash(&invoice_key.serialize())[..]);
397 let operation_id = OperationId(*payment_hash.as_ref());
398
399 (invoice_idx, operation_id)
400 })
401 .collect();
402
403 Some(operations)
404 }
405}
406
407#[derive(
408 Debug,
409 Clone,
410 Copy,
411 PartialOrd,
412 Eq,
413 PartialEq,
414 Hash,
415 Encodable,
416 Decodable,
417 Serialize,
418 Deserialize,
419)]
420pub struct PaymentCodeRootKey(pub PublicKey);
421
422#[derive(
423 Debug,
424 Clone,
425 Copy,
426 PartialOrd,
427 Eq,
428 PartialEq,
429 Hash,
430 Encodable,
431 Decodable,
432 Serialize,
433 Deserialize,
434)]
435pub struct PaymentCodeId(sha256::Hash);
436
437impl PaymentCodeRootKey {
438 pub fn to_payment_code_id(&self) -> PaymentCodeId {
439 PaymentCodeId(sha256::Hash::hash(&self.0.serialize()))
440 }
441}
442
443impl Display for PaymentCodeId {
444 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
445 write!(f, "{}", self.0)
446 }
447}
448
449impl FromStr for PaymentCodeId {
450 type Err = anyhow::Error;
451
452 fn from_str(s: &str) -> Result<Self, Self::Err> {
453 Ok(Self(sha256::Hash::from_str(s)?))
454 }
455}
456
457impl Display for PaymentCodeRootKey {
458 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
459 write!(f, "{}", self.0)
460 }
461}
462
463impl FromStr for PaymentCodeRootKey {
464 type Err = anyhow::Error;
465
466 fn from_str(s: &str) -> Result<Self, Self::Err> {
467 Ok(Self(PublicKey::from_str(s)?))
468 }
469}
470
471#[derive(
472 Debug,
473 Clone,
474 Copy,
475 Eq,
476 PartialEq,
477 PartialOrd,
478 Hash,
479 Encodable,
480 Decodable,
481 Serialize,
482 Deserialize,
483)]
484pub enum RecurringPaymentProtocol {
485 LNURL,
486 BOLT12,
487}
488
489#[derive(Debug, Clone, Serialize, Deserialize)]
490pub struct ReurringPaymentReceiveMeta {
491 pub payment_code_id: PaymentCodeId,
492 pub invoice: Bolt11Invoice,
493}
494
495#[derive(Debug, Error)]
496pub enum RecurringPaymentError {
497 #[error("Unsupported protocol: {0:?}")]
498 UnsupportedProtocol(RecurringPaymentProtocol),
499 #[error("Unknown federation ID: {0}")]
500 UnknownFederationId(FederationId),
501 #[error("Unknown payment code: {0:?}")]
502 UnknownPaymentCode(PaymentCodeId),
503 #[error("No compatible lightning module found")]
504 NoLightningModuleFound,
505 #[error("No gateway found")]
506 NoGatewayFound,
507 #[error("Payment code already exists with different settings: {0:?}")]
508 PaymentCodeAlreadyExists(PaymentCodeRootKey),
509 #[error("Federation already registered: {0}")]
510 FederationAlreadyRegistered(FederationId),
511 #[error("Error joining federation: {0}")]
512 JoiningFederationFailed(anyhow::Error),
513 #[error("Error registering with recurring payment service: {0}")]
514 Other(#[from] anyhow::Error),
515}
516
517#[derive(Debug, Clone, Encodable, Decodable, Serialize)]
518pub struct RecurringPaymentCodeEntry {
519 pub protocol: RecurringPaymentProtocol,
520 pub root_keypair: Keypair,
521 pub code: String,
522 pub recurringd_api: SafeUrl,
523 pub last_derivation_index: u64,
524 pub creation_time: SystemTime,
525 pub meta: String,
526}