1#![deny(clippy::pedantic)]
2#![allow(clippy::missing_errors_doc)]
3#![allow(clippy::missing_panics_doc)]
4#![allow(clippy::module_name_repetitions)]
5#![allow(clippy::must_use_candidate)]
6
7mod api;
8#[cfg(feature = "cli")]
9mod cli;
10mod db;
11pub mod events;
12mod receive_sm;
13mod send_sm;
14
15use std::collections::{BTreeMap, BTreeSet};
16use std::sync::Arc;
17
18use async_stream::stream;
19use bitcoin::hashes::{Hash, sha256};
20use bitcoin::secp256k1;
21use db::{DbKeyPrefix, GatewayKey, IncomingContractStreamIndexKey};
22use fedimint_api_client::api::DynModuleApi;
23use fedimint_client_module::module::init::{ClientModuleInit, ClientModuleInitArgs};
24use fedimint_client_module::module::recovery::NoModuleBackup;
25use fedimint_client_module::module::{ClientContext, ClientModule, OutPointRange};
26use fedimint_client_module::oplog::UpdateStreamOrOutcome;
27use fedimint_client_module::sm::{Context, DynState, ModuleNotifier, State, StateTransition};
28use fedimint_client_module::transaction::{
29 ClientOutput, ClientOutputBundle, ClientOutputSM, TransactionBuilder,
30};
31use fedimint_client_module::{DynGlobalClientContext, sm_enum_variant_translation};
32use fedimint_core::config::FederationId;
33use fedimint_core::core::{IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId};
34use fedimint_core::db::{DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
35use fedimint_core::encoding::{Decodable, Encodable};
36use fedimint_core::module::{
37 Amounts, ApiAuth, ApiVersion, CommonModuleInit, ModuleCommon, ModuleInit, MultiApiVersion,
38};
39use fedimint_core::secp256k1::SECP256K1;
40use fedimint_core::task::TaskGroup;
41use fedimint_core::time::duration_since_epoch;
42use fedimint_core::util::SafeUrl;
43use fedimint_core::{Amount, PeerId, apply, async_trait_maybe_send};
44use fedimint_derive_secret::{ChildId, DerivableSecret};
45use fedimint_lnv2_common::config::LightningClientConfig;
46use fedimint_lnv2_common::contracts::{IncomingContract, OutgoingContract, PaymentImage};
47use fedimint_lnv2_common::gateway_api::{
48 GatewayConnection, PaymentFee, RealGatewayConnection, RoutingInfo,
49};
50use fedimint_lnv2_common::{
51 Bolt11InvoiceDescription, GatewayApi, KIND, LightningCommonInit, LightningInvoice,
52 LightningModuleTypes, LightningOutput, LightningOutputV0, lnurl, tweak,
53};
54use futures::StreamExt;
55use lightning_invoice::{Bolt11Invoice, Currency};
56use secp256k1::{Keypair, PublicKey, Scalar, SecretKey, ecdh};
57use serde::{Deserialize, Serialize};
58use serde_json::Value;
59use strum::IntoEnumIterator as _;
60use thiserror::Error;
61use tpe::{AggregateDecryptionKey, derive_agg_dk};
62use tracing::warn;
63
64use crate::api::LightningFederationApi;
65use crate::events::SendPaymentEvent;
66use crate::receive_sm::{ReceiveSMCommon, ReceiveSMState, ReceiveStateMachine};
67use crate::send_sm::{SendSMCommon, SendSMState, SendStateMachine};
68
69const EXPIRATION_DELTA_LIMIT: u64 = 1440;
72
73const CONTRACT_CONFIRMATION_BUFFER: u64 = 12;
75
76#[allow(clippy::large_enum_variant)]
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub enum LightningOperationMeta {
79 Send(SendOperationMeta),
80 Receive(ReceiveOperationMeta),
81 LnurlReceive(LnurlReceiveOperationMeta),
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct SendOperationMeta {
86 pub change_outpoint_range: OutPointRange,
87 pub gateway: SafeUrl,
88 pub contract: OutgoingContract,
89 pub invoice: LightningInvoice,
90 pub custom_meta: Value,
91}
92
93impl SendOperationMeta {
94 pub fn gateway_fee(&self) -> Amount {
96 match &self.invoice {
97 LightningInvoice::Bolt11(invoice) => self.contract.amount.saturating_sub(
98 Amount::from_msats(invoice.amount_milli_satoshis().expect("Invoice has amount")),
99 ),
100 }
101 }
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct ReceiveOperationMeta {
106 pub gateway: SafeUrl,
107 pub contract: IncomingContract,
108 pub invoice: LightningInvoice,
109 pub custom_meta: Value,
110}
111
112impl ReceiveOperationMeta {
113 pub fn gateway_fee(&self) -> Amount {
115 match &self.invoice {
116 LightningInvoice::Bolt11(invoice) => {
117 Amount::from_msats(invoice.amount_milli_satoshis().expect("Invoice has amount"))
118 .saturating_sub(self.contract.commitment.amount)
119 }
120 }
121 }
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct LnurlReceiveOperationMeta {
126 pub contract: IncomingContract,
127 pub custom_meta: Value,
128}
129
130#[cfg_attr(doc, aquamarine::aquamarine)]
131#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
149pub enum SendOperationState {
150 Funding,
152 Funded,
154 Success([u8; 32]),
156 Refunding,
158 Refunded,
160 Failure,
162}
163
164#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
166pub enum FinalSendOperationState {
167 Success,
169 Refunded,
171 Failure,
173}
174
175pub type SendResult = Result<OperationId, SendPaymentError>;
176
177#[cfg_attr(doc, aquamarine::aquamarine)]
178#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
190pub enum ReceiveOperationState {
191 Pending,
193 Expired,
195 Claiming,
197 Claimed,
199 Failure,
201}
202
203#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
205pub enum FinalReceiveOperationState {
206 Expired,
208 Claimed,
210 Failure,
212}
213
214pub type ReceiveResult = Result<(Bolt11Invoice, OperationId), ReceiveError>;
215
216#[derive(Clone)]
217pub struct LightningClientInit {
218 pub gateway_conn: Option<Arc<dyn GatewayConnection + Send + Sync>>,
219 pub custom_meta_fn: Arc<dyn Fn() -> Value + Send + Sync>,
220}
221
222impl std::fmt::Debug for LightningClientInit {
223 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
224 f.debug_struct("LightningClientInit")
225 .field("gateway_conn", &self.gateway_conn)
226 .field("custom_meta_fn", &"<function>")
227 .finish()
228 }
229}
230
231impl Default for LightningClientInit {
232 fn default() -> Self {
233 LightningClientInit {
234 gateway_conn: None,
235 custom_meta_fn: Arc::new(|| Value::Null),
236 }
237 }
238}
239
240impl ModuleInit for LightningClientInit {
241 type Common = LightningCommonInit;
242
243 async fn dump_database(
244 &self,
245 _dbtx: &mut DatabaseTransaction<'_>,
246 _prefix_names: Vec<String>,
247 ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
248 Box::new(BTreeMap::new().into_iter())
249 }
250}
251
252#[apply(async_trait_maybe_send!)]
253impl ClientModuleInit for LightningClientInit {
254 type Module = LightningClientModule;
255
256 fn supported_api_versions(&self) -> MultiApiVersion {
257 MultiApiVersion::try_from_iter([ApiVersion { major: 0, minor: 0 }])
258 .expect("no version conflicts")
259 }
260
261 async fn init(&self, args: &ClientModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
262 let gateway_conn = if let Some(gateway_conn) = self.gateway_conn.clone() {
263 gateway_conn
264 } else {
265 let api = GatewayApi::new(None, args.connector_registry.clone());
266 Arc::new(RealGatewayConnection { api })
267 };
268 Ok(LightningClientModule::new(
269 *args.federation_id(),
270 args.cfg().clone(),
271 args.notifier().clone(),
272 args.context(),
273 args.module_api().clone(),
274 args.module_root_secret(),
275 gateway_conn,
276 self.custom_meta_fn.clone(),
277 args.admin_auth().cloned(),
278 args.task_group(),
279 ))
280 }
281
282 fn used_db_prefixes(&self) -> Option<BTreeSet<u8>> {
283 Some(
284 DbKeyPrefix::iter()
285 .map(|p| p as u8)
286 .chain(
287 DbKeyPrefix::ExternalReservedStart as u8
288 ..=DbKeyPrefix::CoreInternalReservedEnd as u8,
289 )
290 .collect(),
291 )
292 }
293}
294
295#[derive(Debug, Clone)]
296pub struct LightningClientContext {
297 federation_id: FederationId,
298 gateway_conn: Arc<dyn GatewayConnection + Send + Sync>,
299 pub(crate) client_ctx: ClientContext<LightningClientModule>,
300}
301
302impl Context for LightningClientContext {
303 const KIND: Option<ModuleKind> = Some(KIND);
304}
305
306#[derive(Debug, Clone)]
307pub struct LightningClientModule {
308 federation_id: FederationId,
309 cfg: LightningClientConfig,
310 notifier: ModuleNotifier<LightningClientStateMachines>,
311 client_ctx: ClientContext<Self>,
312 module_api: DynModuleApi,
313 keypair: Keypair,
314 lnurl_keypair: Keypair,
315 gateway_conn: Arc<dyn GatewayConnection + Send + Sync>,
316 #[allow(unused)] admin_auth: Option<ApiAuth>,
318}
319
320#[apply(async_trait_maybe_send!)]
321impl ClientModule for LightningClientModule {
322 type Init = LightningClientInit;
323 type Common = LightningModuleTypes;
324 type Backup = NoModuleBackup;
325 type ModuleStateMachineContext = LightningClientContext;
326 type States = LightningClientStateMachines;
327
328 fn context(&self) -> Self::ModuleStateMachineContext {
329 LightningClientContext {
330 federation_id: self.federation_id,
331 gateway_conn: self.gateway_conn.clone(),
332 client_ctx: self.client_ctx.clone(),
333 }
334 }
335
336 fn input_fee(
337 &self,
338 amounts: &Amounts,
339 _input: &<Self::Common as ModuleCommon>::Input,
340 ) -> Option<Amounts> {
341 Some(Amounts::new_bitcoin(
342 self.cfg.fee_consensus.fee(amounts.expect_only_bitcoin()),
343 ))
344 }
345
346 fn output_fee(
347 &self,
348 amounts: &Amounts,
349 _output: &<Self::Common as ModuleCommon>::Output,
350 ) -> Option<Amounts> {
351 Some(Amounts::new_bitcoin(
352 self.cfg.fee_consensus.fee(amounts.expect_only_bitcoin()),
353 ))
354 }
355
356 #[cfg(feature = "cli")]
357 async fn handle_cli_command(
358 &self,
359 args: &[std::ffi::OsString],
360 ) -> anyhow::Result<serde_json::Value> {
361 cli::handle_cli_command(self, args).await
362 }
363}
364
365impl LightningClientModule {
366 #[allow(clippy::too_many_arguments)]
367 fn new(
368 federation_id: FederationId,
369 cfg: LightningClientConfig,
370 notifier: ModuleNotifier<LightningClientStateMachines>,
371 client_ctx: ClientContext<Self>,
372 module_api: DynModuleApi,
373 module_root_secret: &DerivableSecret,
374 gateway_conn: Arc<dyn GatewayConnection + Send + Sync>,
375 custom_meta_fn: Arc<dyn Fn() -> Value + Send + Sync>,
376 admin_auth: Option<ApiAuth>,
377 task_group: &TaskGroup,
378 ) -> Self {
379 let module = Self {
380 federation_id,
381 cfg,
382 notifier,
383 client_ctx,
384 module_api,
385 keypair: module_root_secret
386 .child_key(ChildId(0))
387 .to_secp_key(SECP256K1),
388 lnurl_keypair: module_root_secret
389 .child_key(ChildId(1))
390 .to_secp_key(SECP256K1),
391 gateway_conn,
392 admin_auth,
393 };
394
395 module.spawn_receive_lnurl_task(custom_meta_fn, task_group);
396
397 module.spawn_gateway_map_update_task(task_group);
398
399 module
400 }
401
402 fn spawn_gateway_map_update_task(&self, task_group: &TaskGroup) {
403 let module = self.clone();
404
405 task_group.spawn_cancellable("gateway_map_update_task", async move {
406 module.update_gateway_map().await;
407 });
408 }
409
410 async fn update_gateway_map(&self) {
411 if let Ok(gateways) = self.module_api.gateways().await {
418 let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
419
420 for gateway in gateways {
421 if let Ok(Some(routing_info)) = self
422 .gateway_conn
423 .routing_info(gateway.clone(), &self.federation_id)
424 .await
425 {
426 dbtx.insert_entry(&GatewayKey(routing_info.lightning_public_key), &gateway)
427 .await;
428 }
429 }
430
431 if let Err(e) = dbtx.commit_tx_result().await {
432 warn!("Failed to commit the updated gateway mapping to the database: {e}");
433 }
434 }
435 }
436
437 pub async fn select_gateway(
442 &self,
443 invoice: Option<Bolt11Invoice>,
444 ) -> Result<(SafeUrl, RoutingInfo), SelectGatewayError> {
445 let gateways = self
446 .module_api
447 .gateways()
448 .await
449 .map_err(|e| SelectGatewayError::FailedToRequestGateways(e.to_string()))?;
450
451 if gateways.is_empty() {
452 return Err(SelectGatewayError::NoGatewaysAvailable);
453 }
454
455 if let Some(invoice) = invoice
456 && let Some(gateway) = self
457 .client_ctx
458 .module_db()
459 .begin_transaction_nc()
460 .await
461 .get_value(&GatewayKey(invoice.recover_payee_pub_key()))
462 .await
463 .filter(|gateway| gateways.contains(gateway))
464 && let Ok(Some(routing_info)) = self.routing_info(&gateway).await
465 {
466 return Ok((gateway, routing_info));
467 }
468
469 for gateway in gateways {
470 if let Ok(Some(routing_info)) = self.routing_info(&gateway).await {
471 return Ok((gateway, routing_info));
472 }
473 }
474
475 Err(SelectGatewayError::GatewaysUnresponsive)
476 }
477
478 pub async fn list_gateways(
481 &self,
482 peer: Option<PeerId>,
483 ) -> Result<Vec<SafeUrl>, ListGatewaysError> {
484 if let Some(peer) = peer {
485 self.module_api
486 .gateways_from_peer(peer)
487 .await
488 .map_err(|_| ListGatewaysError::FailedToListGateways)
489 } else {
490 self.module_api
491 .gateways()
492 .await
493 .map_err(|_| ListGatewaysError::FailedToListGateways)
494 }
495 }
496
497 pub async fn routing_info(
500 &self,
501 gateway: &SafeUrl,
502 ) -> Result<Option<RoutingInfo>, RoutingInfoError> {
503 self.gateway_conn
504 .routing_info(gateway.clone(), &self.federation_id)
505 .await
506 .map_err(|_| RoutingInfoError::FailedToRequestRoutingInfo)
507 }
508
509 #[allow(clippy::too_many_lines)]
526 pub async fn send(
527 &self,
528 invoice: Bolt11Invoice,
529 gateway: Option<SafeUrl>,
530 custom_meta: Value,
531 ) -> Result<OperationId, SendPaymentError> {
532 let amount = invoice
533 .amount_milli_satoshis()
534 .ok_or(SendPaymentError::InvoiceMissingAmount)?;
535
536 if invoice.is_expired() {
537 return Err(SendPaymentError::InvoiceExpired);
538 }
539
540 if self.cfg.network != invoice.currency().into() {
541 return Err(SendPaymentError::WrongCurrency {
542 invoice_currency: invoice.currency(),
543 federation_currency: self.cfg.network.into(),
544 });
545 }
546
547 let operation_id = self.get_next_operation_id(&invoice).await?;
548
549 let (ephemeral_tweak, ephemeral_pk) = tweak::generate(self.keypair.public_key());
550
551 let refund_keypair = SecretKey::from_slice(&ephemeral_tweak)
552 .expect("32 bytes, within curve order")
553 .keypair(secp256k1::SECP256K1);
554
555 let (gateway_api, routing_info) = match gateway {
556 Some(gateway_api) => (
557 gateway_api.clone(),
558 self.routing_info(&gateway_api)
559 .await
560 .map_err(|e| SendPaymentError::FailedToConnectToGateway(e.to_string()))?
561 .ok_or(SendPaymentError::FederationNotSupported)?,
562 ),
563 None => self
564 .select_gateway(Some(invoice.clone()))
565 .await
566 .map_err(SendPaymentError::SelectGateway)?,
567 };
568
569 let (send_fee, expiration_delta) = routing_info.send_parameters(&invoice);
570
571 if !send_fee.le(&PaymentFee::SEND_FEE_LIMIT) {
572 return Err(SendPaymentError::GatewayFeeExceedsLimit);
573 }
574
575 if EXPIRATION_DELTA_LIMIT < expiration_delta {
576 return Err(SendPaymentError::GatewayExpirationExceedsLimit);
577 }
578
579 let consensus_block_count = self
580 .module_api
581 .consensus_block_count()
582 .await
583 .map_err(|e| SendPaymentError::FailedToRequestBlockCount(e.to_string()))?;
584
585 let contract = OutgoingContract {
586 payment_image: PaymentImage::Hash(*invoice.payment_hash()),
587 amount: send_fee.add_to(amount),
588 expiration: consensus_block_count + expiration_delta + CONTRACT_CONFIRMATION_BUFFER,
589 claim_pk: routing_info.module_public_key,
590 refund_pk: refund_keypair.public_key(),
591 ephemeral_pk,
592 };
593
594 let contract_clone = contract.clone();
595 let gateway_api_clone = gateway_api.clone();
596 let invoice_clone = invoice.clone();
597
598 let client_output = ClientOutput::<LightningOutput> {
599 output: LightningOutput::V0(LightningOutputV0::Outgoing(contract.clone())),
600 amounts: Amounts::new_bitcoin(contract.amount),
601 };
602
603 let client_output_sm = ClientOutputSM::<LightningClientStateMachines> {
604 state_machines: Arc::new(move |range: OutPointRange| {
605 vec![LightningClientStateMachines::Send(SendStateMachine {
606 common: SendSMCommon {
607 operation_id,
608 outpoint: range.into_iter().next().unwrap(),
609 contract: contract_clone.clone(),
610 gateway_api: Some(gateway_api_clone.clone()),
611 invoice: Some(LightningInvoice::Bolt11(invoice_clone.clone())),
612 refund_keypair,
613 },
614 state: SendSMState::Funding,
615 })]
616 }),
617 };
618
619 let client_output = self.client_ctx.make_client_outputs(ClientOutputBundle::new(
620 vec![client_output],
621 vec![client_output_sm],
622 ));
623
624 let transaction = TransactionBuilder::new().with_outputs(client_output);
625
626 self.client_ctx
627 .finalize_and_submit_transaction(
628 operation_id,
629 LightningCommonInit::KIND.as_str(),
630 move |change_outpoint_range| {
631 LightningOperationMeta::Send(SendOperationMeta {
632 change_outpoint_range,
633 gateway: gateway_api.clone(),
634 contract: contract.clone(),
635 invoice: LightningInvoice::Bolt11(invoice.clone()),
636 custom_meta: custom_meta.clone(),
637 })
638 },
639 transaction,
640 )
641 .await
642 .map_err(|e| SendPaymentError::FailedToFundPayment(e.to_string()))?;
643
644 let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
645
646 self.client_ctx
647 .log_event(
648 &mut dbtx,
649 SendPaymentEvent {
650 operation_id,
651 amount: send_fee.add_to(amount),
652 fee: Some(send_fee.fee(amount)),
653 },
654 )
655 .await;
656
657 dbtx.commit_tx().await;
658
659 Ok(operation_id)
660 }
661
662 async fn get_next_operation_id(
663 &self,
664 invoice: &Bolt11Invoice,
665 ) -> Result<OperationId, SendPaymentError> {
666 for payment_attempt in 0..u64::MAX {
667 let operation_id = OperationId::from_encodable(&(invoice.clone(), payment_attempt));
668
669 if !self.client_ctx.operation_exists(operation_id).await {
670 return Ok(operation_id);
671 }
672
673 if self.client_ctx.has_active_states(operation_id).await {
674 return Err(SendPaymentError::PaymentInProgress(operation_id));
675 }
676
677 let mut stream = self
678 .subscribe_send_operation_state_updates(operation_id)
679 .await
680 .expect("operation_id exists")
681 .into_stream();
682
683 while let Some(state) = stream.next().await {
686 if let SendOperationState::Success(_) = state {
687 return Err(SendPaymentError::InvoiceAlreadyPaid(operation_id));
688 }
689 }
690 }
691
692 panic!("We could not find an unused operation id for sending a lightning payment");
693 }
694
695 pub async fn subscribe_send_operation_state_updates(
697 &self,
698 operation_id: OperationId,
699 ) -> anyhow::Result<UpdateStreamOrOutcome<SendOperationState>> {
700 let operation = self.client_ctx.get_operation(operation_id).await?;
701 let mut stream = self.notifier.subscribe(operation_id).await;
702 let client_ctx = self.client_ctx.clone();
703 let module_api = self.module_api.clone();
704
705 Ok(self.client_ctx.outcome_or_updates(operation, operation_id, move || {
706 stream! {
707 loop {
708 if let Some(LightningClientStateMachines::Send(state)) = stream.next().await {
709 match state.state {
710 SendSMState::Funding => yield SendOperationState::Funding,
711 SendSMState::Funded => yield SendOperationState::Funded,
712 SendSMState::Success(preimage) => {
713 assert!(state.common.contract.verify_preimage(&preimage));
715
716 yield SendOperationState::Success(preimage);
717 return;
718 },
719 SendSMState::Refunding(out_points) => {
720 yield SendOperationState::Refunding;
721
722 if client_ctx.await_primary_module_outputs(operation_id, out_points.clone()).await.is_ok() {
723 yield SendOperationState::Refunded;
724 return;
725 }
726
727 if let Some(preimage) = module_api.await_preimage(
731 state.common.outpoint,
732 0
733 ).await
734 && state.common.contract.verify_preimage(&preimage) {
735 yield SendOperationState::Success(preimage);
736 return;
737 }
738
739 yield SendOperationState::Failure;
740 return;
741 },
742 SendSMState::Rejected(..) => {
743 yield SendOperationState::Failure;
744 return;
745 },
746 }
747 }
748 }
749 }
750 }))
751 }
752
753 pub async fn await_final_send_operation_state(
755 &self,
756 operation_id: OperationId,
757 ) -> anyhow::Result<FinalSendOperationState> {
758 let state = self
759 .subscribe_send_operation_state_updates(operation_id)
760 .await?
761 .into_stream()
762 .filter_map(|state| {
763 futures::future::ready(match state {
764 SendOperationState::Success(_) => Some(FinalSendOperationState::Success),
765 SendOperationState::Refunded => Some(FinalSendOperationState::Refunded),
766 SendOperationState::Failure => Some(FinalSendOperationState::Failure),
767 _ => None,
768 })
769 })
770 .next()
771 .await
772 .expect("Stream contains one final state");
773
774 Ok(state)
775 }
776
777 pub async fn receive(
789 &self,
790 amount: Amount,
791 expiry_secs: u32,
792 description: Bolt11InvoiceDescription,
793 gateway: Option<SafeUrl>,
794 custom_meta: Value,
795 ) -> Result<(Bolt11Invoice, OperationId), ReceiveError> {
796 let (gateway, contract, invoice) = self
797 .create_contract_and_fetch_invoice(
798 self.keypair.public_key(),
799 amount,
800 expiry_secs,
801 description,
802 gateway,
803 )
804 .await?;
805
806 let operation_id = self
807 .receive_incoming_contract(
808 self.keypair.secret_key(),
809 contract.clone(),
810 LightningOperationMeta::Receive(ReceiveOperationMeta {
811 gateway,
812 contract,
813 invoice: LightningInvoice::Bolt11(invoice.clone()),
814 custom_meta,
815 }),
816 )
817 .await
818 .expect("The contract has been generated with our public key");
819
820 Ok((invoice, operation_id))
821 }
822
823 async fn create_contract_and_fetch_invoice(
827 &self,
828 recipient_static_pk: PublicKey,
829 amount: Amount,
830 expiry_secs: u32,
831 description: Bolt11InvoiceDescription,
832 gateway: Option<SafeUrl>,
833 ) -> Result<(SafeUrl, IncomingContract, Bolt11Invoice), ReceiveError> {
834 let (ephemeral_tweak, ephemeral_pk) = tweak::generate(recipient_static_pk);
835
836 let encryption_seed = ephemeral_tweak
837 .consensus_hash::<sha256::Hash>()
838 .to_byte_array();
839
840 let preimage = encryption_seed
841 .consensus_hash::<sha256::Hash>()
842 .to_byte_array();
843
844 let (gateway, routing_info) = match gateway {
845 Some(gateway) => (
846 gateway.clone(),
847 self.routing_info(&gateway)
848 .await
849 .map_err(|e| ReceiveError::FailedToConnectToGateway(e.to_string()))?
850 .ok_or(ReceiveError::FederationNotSupported)?,
851 ),
852 None => self
853 .select_gateway(None)
854 .await
855 .map_err(ReceiveError::SelectGateway)?,
856 };
857
858 if !routing_info.receive_fee.le(&PaymentFee::RECEIVE_FEE_LIMIT) {
859 return Err(ReceiveError::GatewayFeeExceedsLimit);
860 }
861
862 let contract_amount = routing_info.receive_fee.subtract_from(amount.msats);
863
864 if contract_amount < Amount::from_sats(5) {
867 return Err(ReceiveError::AmountTooSmall);
868 }
869
870 let expiration = duration_since_epoch()
871 .as_secs()
872 .saturating_add(u64::from(expiry_secs));
873
874 let claim_pk = recipient_static_pk
875 .mul_tweak(
876 secp256k1::SECP256K1,
877 &Scalar::from_be_bytes(ephemeral_tweak).expect("Within curve order"),
878 )
879 .expect("Tweak is valid");
880
881 let contract = IncomingContract::new(
882 self.cfg.tpe_agg_pk,
883 encryption_seed,
884 preimage,
885 PaymentImage::Hash(preimage.consensus_hash()),
886 contract_amount,
887 expiration,
888 claim_pk,
889 routing_info.module_public_key,
890 ephemeral_pk,
891 );
892
893 let invoice = self
894 .gateway_conn
895 .bolt11_invoice(
896 gateway.clone(),
897 self.federation_id,
898 contract.clone(),
899 amount,
900 description,
901 expiry_secs,
902 )
903 .await
904 .map_err(|e| ReceiveError::FailedToConnectToGateway(e.to_string()))?;
905
906 if invoice.payment_hash() != &preimage.consensus_hash() {
907 return Err(ReceiveError::InvalidInvoice);
908 }
909
910 if invoice.amount_milli_satoshis() != Some(amount.msats) {
911 return Err(ReceiveError::IncorrectInvoiceAmount);
912 }
913
914 Ok((gateway, contract, invoice))
915 }
916
917 async fn receive_incoming_contract(
920 &self,
921 sk: SecretKey,
922 contract: IncomingContract,
923 operation_meta: LightningOperationMeta,
924 ) -> Option<OperationId> {
925 let operation_id = OperationId::from_encodable(&contract.clone());
926
927 let (claim_keypair, agg_decryption_key) = self.recover_contract_keys(sk, &contract)?;
928
929 let receive_sm = LightningClientStateMachines::Receive(ReceiveStateMachine {
930 common: ReceiveSMCommon {
931 operation_id,
932 contract: contract.clone(),
933 claim_keypair,
934 agg_decryption_key,
935 },
936 state: ReceiveSMState::Pending,
937 });
938
939 self.client_ctx
942 .manual_operation_start(
943 operation_id,
944 LightningCommonInit::KIND.as_str(),
945 operation_meta,
946 vec![self.client_ctx.make_dyn_state(receive_sm)],
947 )
948 .await
949 .ok();
950
951 Some(operation_id)
952 }
953
954 fn recover_contract_keys(
955 &self,
956 sk: SecretKey,
957 contract: &IncomingContract,
958 ) -> Option<(Keypair, AggregateDecryptionKey)> {
959 let tweak = ecdh::SharedSecret::new(&contract.commitment.ephemeral_pk, &sk);
960
961 let encryption_seed = tweak
962 .secret_bytes()
963 .consensus_hash::<sha256::Hash>()
964 .to_byte_array();
965
966 let claim_keypair = sk
967 .mul_tweak(&Scalar::from_be_bytes(tweak.secret_bytes()).expect("Within curve order"))
968 .expect("Tweak is valid")
969 .keypair(secp256k1::SECP256K1);
970
971 if claim_keypair.public_key() != contract.commitment.claim_pk {
972 return None; }
974
975 let agg_decryption_key = derive_agg_dk(&self.cfg.tpe_agg_pk, &encryption_seed);
976
977 if !contract.verify_agg_decryption_key(&self.cfg.tpe_agg_pk, &agg_decryption_key) {
978 return None; }
980
981 contract.decrypt_preimage(&agg_decryption_key)?;
982
983 Some((claim_keypair, agg_decryption_key))
984 }
985
986 pub async fn subscribe_receive_operation_state_updates(
988 &self,
989 operation_id: OperationId,
990 ) -> anyhow::Result<UpdateStreamOrOutcome<ReceiveOperationState>> {
991 let operation = self.client_ctx.get_operation(operation_id).await?;
992 let mut stream = self.notifier.subscribe(operation_id).await;
993 let client_ctx = self.client_ctx.clone();
994
995 Ok(self.client_ctx.outcome_or_updates(operation, operation_id, move || {
996 stream! {
997 loop {
998 if let Some(LightningClientStateMachines::Receive(state)) = stream.next().await {
999 match state.state {
1000 ReceiveSMState::Pending => yield ReceiveOperationState::Pending,
1001 ReceiveSMState::Claiming(out_points) => {
1002 yield ReceiveOperationState::Claiming;
1003
1004 if client_ctx.await_primary_module_outputs(operation_id, out_points).await.is_ok() {
1005 yield ReceiveOperationState::Claimed;
1006 } else {
1007 yield ReceiveOperationState::Failure;
1008 }
1009 return;
1010 },
1011 ReceiveSMState::Expired => {
1012 yield ReceiveOperationState::Expired;
1013 return;
1014 }
1015 }
1016 }
1017 }
1018 }
1019 }))
1020 }
1021
1022 pub async fn await_final_receive_operation_state(
1024 &self,
1025 operation_id: OperationId,
1026 ) -> anyhow::Result<FinalReceiveOperationState> {
1027 let state = self
1028 .subscribe_receive_operation_state_updates(operation_id)
1029 .await?
1030 .into_stream()
1031 .filter_map(|state| {
1032 futures::future::ready(match state {
1033 ReceiveOperationState::Expired => Some(FinalReceiveOperationState::Expired),
1034 ReceiveOperationState::Claimed => Some(FinalReceiveOperationState::Claimed),
1035 ReceiveOperationState::Failure => Some(FinalReceiveOperationState::Failure),
1036 _ => None,
1037 })
1038 })
1039 .next()
1040 .await
1041 .expect("Stream contains one final state");
1042
1043 Ok(state)
1044 }
1045
1046 pub async fn generate_lnurl(
1049 &self,
1050 recurringd: SafeUrl,
1051 gateway: Option<SafeUrl>,
1052 ) -> Result<String, GenerateLnurlError> {
1053 let gateways = if let Some(gateway) = gateway {
1054 vec![gateway]
1055 } else {
1056 let gateways = self
1057 .module_api
1058 .gateways()
1059 .await
1060 .map_err(|e| GenerateLnurlError::FailedToRequestGateways(e.to_string()))?;
1061
1062 if gateways.is_empty() {
1063 return Err(GenerateLnurlError::NoGatewaysAvailable);
1064 }
1065
1066 gateways
1067 };
1068
1069 let lnurl = lnurl::generate_lnurl(
1070 recurringd,
1071 self.federation_id,
1072 self.lnurl_keypair.public_key(),
1073 self.cfg.tpe_agg_pk,
1074 gateways,
1075 )
1076 .await
1077 .map_err(|e| GenerateLnurlError::FailedToRequestLnurl(e.to_string()))?;
1078
1079 Ok(lnurl)
1080 }
1081
1082 fn spawn_receive_lnurl_task(
1083 &self,
1084 custom_meta_fn: Arc<dyn Fn() -> Value + Send + Sync>,
1085 task_group: &TaskGroup,
1086 ) {
1087 let module = self.clone();
1088
1089 task_group.spawn_cancellable("receive_lnurl_task", async move {
1090 loop {
1091 module.receive_lnurl(custom_meta_fn()).await;
1092 }
1093 });
1094 }
1095
1096 async fn receive_lnurl(&self, custom_meta: Value) {
1097 let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
1098
1099 let stream_index = dbtx
1100 .get_value(&IncomingContractStreamIndexKey)
1101 .await
1102 .unwrap_or(0);
1103
1104 let (contracts, next_index) = self
1105 .module_api
1106 .await_incoming_contracts(stream_index, 128)
1107 .await;
1108
1109 for contract in &contracts {
1110 if let Some(operation_id) = self
1111 .receive_incoming_contract(
1112 self.lnurl_keypair.secret_key(),
1113 contract.clone(),
1114 LightningOperationMeta::LnurlReceive(LnurlReceiveOperationMeta {
1115 contract: contract.clone(),
1116 custom_meta: custom_meta.clone(),
1117 }),
1118 )
1119 .await
1120 {
1121 self.await_final_receive_operation_state(operation_id)
1122 .await
1123 .ok();
1124 }
1125 }
1126
1127 dbtx.insert_entry(&IncomingContractStreamIndexKey, &next_index)
1128 .await;
1129
1130 dbtx.commit_tx().await;
1131 }
1132}
1133
1134#[derive(Error, Debug, Clone, Eq, PartialEq)]
1135pub enum SelectGatewayError {
1136 #[error("Failed to request gateways")]
1137 FailedToRequestGateways(String),
1138 #[error("No gateways are available")]
1139 NoGatewaysAvailable,
1140 #[error("All gateways failed to respond")]
1141 GatewaysUnresponsive,
1142}
1143
1144#[derive(Error, Debug, Clone, Eq, PartialEq)]
1145pub enum SendPaymentError {
1146 #[error("Invoice is missing an amount")]
1147 InvoiceMissingAmount,
1148 #[error("Invoice has expired")]
1149 InvoiceExpired,
1150 #[error("A payment for this invoice is already in progress")]
1151 PaymentInProgress(OperationId),
1152 #[error("This invoice has already been paid")]
1153 InvoiceAlreadyPaid(OperationId),
1154 #[error(transparent)]
1155 SelectGateway(SelectGatewayError),
1156 #[error("Failed to connect to gateway")]
1157 FailedToConnectToGateway(String),
1158 #[error("Gateway does not support this federation")]
1159 FederationNotSupported,
1160 #[error("Gateway fee exceeds the allowed limit")]
1161 GatewayFeeExceedsLimit,
1162 #[error("Gateway expiration time exceeds the allowed limit")]
1163 GatewayExpirationExceedsLimit,
1164 #[error("Failed to request block count")]
1165 FailedToRequestBlockCount(String),
1166 #[error("Failed to fund the payment")]
1167 FailedToFundPayment(String),
1168 #[error("Invoice is for a different currency")]
1169 WrongCurrency {
1170 invoice_currency: Currency,
1171 federation_currency: Currency,
1172 },
1173}
1174
1175#[derive(Error, Debug, Clone, Eq, PartialEq)]
1176pub enum ReceiveError {
1177 #[error(transparent)]
1178 SelectGateway(SelectGatewayError),
1179 #[error("Failed to connect to gateway")]
1180 FailedToConnectToGateway(String),
1181 #[error("Gateway does not support this federation")]
1182 FederationNotSupported,
1183 #[error("Gateway fee exceeds the allowed limit")]
1184 GatewayFeeExceedsLimit,
1185 #[error("Amount is too small to cover fees")]
1186 AmountTooSmall,
1187 #[error("Gateway returned an invalid invoice")]
1188 InvalidInvoice,
1189 #[error("Gateway returned an invoice with incorrect amount")]
1190 IncorrectInvoiceAmount,
1191}
1192
1193#[derive(Error, Debug, Clone, Eq, PartialEq)]
1194pub enum GenerateLnurlError {
1195 #[error("No gateways are available")]
1196 NoGatewaysAvailable,
1197 #[error("Failed to request gateways")]
1198 FailedToRequestGateways(String),
1199 #[error("Failed to request LNURL")]
1200 FailedToRequestLnurl(String),
1201}
1202
1203#[derive(Error, Debug, Clone, Eq, PartialEq)]
1204pub enum ListGatewaysError {
1205 #[error("Failed to request gateways")]
1206 FailedToListGateways,
1207}
1208
1209#[derive(Error, Debug, Clone, Eq, PartialEq)]
1210pub enum RoutingInfoError {
1211 #[error("Failed to request routing info")]
1212 FailedToRequestRoutingInfo,
1213}
1214
1215#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
1216pub enum LightningClientStateMachines {
1217 Send(SendStateMachine),
1218 Receive(ReceiveStateMachine),
1219}
1220
1221impl IntoDynInstance for LightningClientStateMachines {
1222 type DynType = DynState;
1223
1224 fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
1225 DynState::from_typed(instance_id, self)
1226 }
1227}
1228
1229impl State for LightningClientStateMachines {
1230 type ModuleContext = LightningClientContext;
1231
1232 fn transitions(
1233 &self,
1234 context: &Self::ModuleContext,
1235 global_context: &DynGlobalClientContext,
1236 ) -> Vec<StateTransition<Self>> {
1237 match self {
1238 LightningClientStateMachines::Send(state) => {
1239 sm_enum_variant_translation!(
1240 state.transitions(context, global_context),
1241 LightningClientStateMachines::Send
1242 )
1243 }
1244 LightningClientStateMachines::Receive(state) => {
1245 sm_enum_variant_translation!(
1246 state.transitions(context, global_context),
1247 LightningClientStateMachines::Receive
1248 )
1249 }
1250 }
1251 }
1252
1253 fn operation_id(&self) -> OperationId {
1254 match self {
1255 LightningClientStateMachines::Send(state) => state.operation_id(),
1256 LightningClientStateMachines::Receive(state) => state.operation_id(),
1257 }
1258 }
1259}