1#![deny(clippy::pedantic)]
2#![allow(clippy::missing_errors_doc)]
3#![allow(clippy::missing_panics_doc)]
4#![allow(clippy::module_name_repetitions)]
5#![allow(clippy::must_use_candidate)]
6
7mod api;
8#[cfg(feature = "cli")]
9mod cli;
10mod db;
11pub mod events;
12mod receive_sm;
13mod send_sm;
14
15use std::collections::{BTreeMap, BTreeSet};
16use std::sync::Arc;
17
18use async_stream::stream;
19use bitcoin::hashes::{Hash, sha256};
20use bitcoin::secp256k1;
21use db::{DbKeyPrefix, GatewayKey, IncomingContractStreamIndexKey};
22use fedimint_api_client::api::DynModuleApi;
23use fedimint_client_module::module::init::{ClientModuleInit, ClientModuleInitArgs};
24use fedimint_client_module::module::recovery::NoModuleBackup;
25use fedimint_client_module::module::{ClientContext, ClientModule, OutPointRange};
26use fedimint_client_module::oplog::UpdateStreamOrOutcome;
27use fedimint_client_module::sm::{Context, DynState, ModuleNotifier, State, StateTransition};
28use fedimint_client_module::transaction::{
29 ClientOutput, ClientOutputBundle, ClientOutputSM, TransactionBuilder,
30};
31use fedimint_client_module::{DynGlobalClientContext, sm_enum_variant_translation};
32use fedimint_core::config::FederationId;
33use fedimint_core::core::{IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId};
34use fedimint_core::db::{DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
35use fedimint_core::encoding::{Decodable, Encodable};
36use fedimint_core::module::{
37 Amounts, ApiAuth, ApiVersion, CommonModuleInit, ModuleCommon, ModuleInit, MultiApiVersion,
38};
39use fedimint_core::secp256k1::SECP256K1;
40use fedimint_core::task::TaskGroup;
41use fedimint_core::time::duration_since_epoch;
42use fedimint_core::util::SafeUrl;
43use fedimint_core::{Amount, PeerId, apply, async_trait_maybe_send};
44use fedimint_derive_secret::{ChildId, DerivableSecret};
45use fedimint_lnv2_common::config::LightningClientConfig;
46use fedimint_lnv2_common::contracts::{IncomingContract, OutgoingContract, PaymentImage};
47use fedimint_lnv2_common::gateway_api::{
48 GatewayConnection, PaymentFee, RealGatewayConnection, RoutingInfo,
49};
50use fedimint_lnv2_common::{
51 Bolt11InvoiceDescription, GatewayApi, KIND, LightningCommonInit, LightningInvoice,
52 LightningModuleTypes, LightningOutput, LightningOutputV0, MINIMUM_INCOMING_CONTRACT_AMOUNT,
53 lnurl, tweak,
54};
55use futures::StreamExt;
56use lightning_invoice::{Bolt11Invoice, Currency};
57use secp256k1::{Keypair, PublicKey, Scalar, SecretKey, ecdh};
58use serde::{Deserialize, Serialize};
59use serde_json::Value;
60use strum::IntoEnumIterator as _;
61use thiserror::Error;
62use tpe::{AggregateDecryptionKey, derive_agg_dk};
63use tracing::warn;
64
65use crate::api::LightningFederationApi;
66use crate::events::SendPaymentEvent;
67use crate::receive_sm::{ReceiveSMCommon, ReceiveSMState, ReceiveStateMachine};
68use crate::send_sm::{SendSMCommon, SendSMState, SendStateMachine};
69
70const EXPIRATION_DELTA_LIMIT: u64 = 1440;
73
74const CONTRACT_CONFIRMATION_BUFFER: u64 = 12;
76
77#[allow(clippy::large_enum_variant)]
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub enum LightningOperationMeta {
80 Send(SendOperationMeta),
81 Receive(ReceiveOperationMeta),
82 LnurlReceive(LnurlReceiveOperationMeta),
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct SendOperationMeta {
87 pub change_outpoint_range: OutPointRange,
88 pub gateway: SafeUrl,
89 pub contract: OutgoingContract,
90 pub invoice: LightningInvoice,
91 pub custom_meta: Value,
92}
93
94impl SendOperationMeta {
95 pub fn gateway_fee(&self) -> Amount {
97 match &self.invoice {
98 LightningInvoice::Bolt11(invoice) => self.contract.amount.saturating_sub(
99 Amount::from_msats(invoice.amount_milli_satoshis().expect("Invoice has amount")),
100 ),
101 }
102 }
103}
104
105#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct ReceiveOperationMeta {
107 pub gateway: SafeUrl,
108 pub contract: IncomingContract,
109 pub invoice: LightningInvoice,
110 pub custom_meta: Value,
111}
112
113impl ReceiveOperationMeta {
114 pub fn gateway_fee(&self) -> Amount {
116 match &self.invoice {
117 LightningInvoice::Bolt11(invoice) => {
118 Amount::from_msats(invoice.amount_milli_satoshis().expect("Invoice has amount"))
119 .saturating_sub(self.contract.commitment.amount)
120 }
121 }
122 }
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct LnurlReceiveOperationMeta {
127 pub contract: IncomingContract,
128 pub custom_meta: Value,
129}
130
131#[cfg_attr(doc, aquamarine::aquamarine)]
132#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
150pub enum SendOperationState {
151 Funding,
153 Funded,
155 Success([u8; 32]),
157 Refunding,
159 Refunded,
161 Failure,
163}
164
165#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
167pub enum FinalSendOperationState {
168 Success,
170 Refunded,
172 Failure,
174}
175
176pub type SendResult = Result<OperationId, SendPaymentError>;
177
178#[cfg_attr(doc, aquamarine::aquamarine)]
179#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
191pub enum ReceiveOperationState {
192 Pending,
194 Expired,
196 Claiming,
198 Claimed,
200 Failure,
202}
203
204#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
206pub enum FinalReceiveOperationState {
207 Expired,
209 Claimed,
211 Failure,
213}
214
215pub type ReceiveResult = Result<(Bolt11Invoice, OperationId), ReceiveError>;
216
217#[derive(Clone)]
218pub struct LightningClientInit {
219 pub gateway_conn: Option<Arc<dyn GatewayConnection + Send + Sync>>,
220 pub custom_meta_fn: Arc<dyn Fn() -> Value + Send + Sync>,
221}
222
223impl std::fmt::Debug for LightningClientInit {
224 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
225 f.debug_struct("LightningClientInit")
226 .field("gateway_conn", &self.gateway_conn)
227 .field("custom_meta_fn", &"<function>")
228 .finish()
229 }
230}
231
232impl Default for LightningClientInit {
233 fn default() -> Self {
234 LightningClientInit {
235 gateway_conn: None,
236 custom_meta_fn: Arc::new(|| Value::Null),
237 }
238 }
239}
240
241impl ModuleInit for LightningClientInit {
242 type Common = LightningCommonInit;
243
244 async fn dump_database(
245 &self,
246 _dbtx: &mut DatabaseTransaction<'_>,
247 _prefix_names: Vec<String>,
248 ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
249 Box::new(BTreeMap::new().into_iter())
250 }
251}
252
253#[apply(async_trait_maybe_send!)]
254impl ClientModuleInit for LightningClientInit {
255 type Module = LightningClientModule;
256
257 fn supported_api_versions(&self) -> MultiApiVersion {
258 MultiApiVersion::try_from_iter([ApiVersion { major: 0, minor: 0 }])
259 .expect("no version conflicts")
260 }
261
262 async fn init(&self, args: &ClientModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
263 let gateway_conn = if let Some(gateway_conn) = self.gateway_conn.clone() {
264 gateway_conn
265 } else {
266 let api = GatewayApi::new(None, args.connector_registry.clone());
267 Arc::new(RealGatewayConnection { api })
268 };
269 Ok(LightningClientModule::new(
270 *args.federation_id(),
271 args.cfg().clone(),
272 args.notifier().clone(),
273 args.context(),
274 args.module_api().clone(),
275 args.module_root_secret(),
276 gateway_conn,
277 self.custom_meta_fn.clone(),
278 args.admin_auth().cloned(),
279 args.task_group(),
280 ))
281 }
282
283 fn used_db_prefixes(&self) -> Option<BTreeSet<u8>> {
284 Some(
285 DbKeyPrefix::iter()
286 .map(|p| p as u8)
287 .chain(
288 DbKeyPrefix::ExternalReservedStart as u8
289 ..=DbKeyPrefix::CoreInternalReservedEnd as u8,
290 )
291 .collect(),
292 )
293 }
294}
295
296#[derive(Debug, Clone)]
297pub struct LightningClientContext {
298 federation_id: FederationId,
299 gateway_conn: Arc<dyn GatewayConnection + Send + Sync>,
300 pub(crate) client_ctx: ClientContext<LightningClientModule>,
301}
302
303impl Context for LightningClientContext {
304 const KIND: Option<ModuleKind> = Some(KIND);
305}
306
307#[derive(Debug, Clone)]
308pub struct LightningClientModule {
309 federation_id: FederationId,
310 cfg: LightningClientConfig,
311 notifier: ModuleNotifier<LightningClientStateMachines>,
312 client_ctx: ClientContext<Self>,
313 module_api: DynModuleApi,
314 keypair: Keypair,
315 lnurl_keypair: Keypair,
316 gateway_conn: Arc<dyn GatewayConnection + Send + Sync>,
317 #[allow(unused)] admin_auth: Option<ApiAuth>,
319}
320
321#[apply(async_trait_maybe_send!)]
322impl ClientModule for LightningClientModule {
323 type Init = LightningClientInit;
324 type Common = LightningModuleTypes;
325 type Backup = NoModuleBackup;
326 type ModuleStateMachineContext = LightningClientContext;
327 type States = LightningClientStateMachines;
328
329 fn context(&self) -> Self::ModuleStateMachineContext {
330 LightningClientContext {
331 federation_id: self.federation_id,
332 gateway_conn: self.gateway_conn.clone(),
333 client_ctx: self.client_ctx.clone(),
334 }
335 }
336
337 fn input_fee(
338 &self,
339 amounts: &Amounts,
340 _input: &<Self::Common as ModuleCommon>::Input,
341 ) -> Option<Amounts> {
342 Some(Amounts::new_bitcoin(
343 self.cfg.fee_consensus.fee(amounts.expect_only_bitcoin()),
344 ))
345 }
346
347 fn output_fee(
348 &self,
349 amounts: &Amounts,
350 _output: &<Self::Common as ModuleCommon>::Output,
351 ) -> Option<Amounts> {
352 Some(Amounts::new_bitcoin(
353 self.cfg.fee_consensus.fee(amounts.expect_only_bitcoin()),
354 ))
355 }
356
357 #[cfg(feature = "cli")]
358 async fn handle_cli_command(
359 &self,
360 args: &[std::ffi::OsString],
361 ) -> anyhow::Result<serde_json::Value> {
362 cli::handle_cli_command(self, args).await
363 }
364}
365
366impl LightningClientModule {
367 #[allow(clippy::too_many_arguments)]
368 fn new(
369 federation_id: FederationId,
370 cfg: LightningClientConfig,
371 notifier: ModuleNotifier<LightningClientStateMachines>,
372 client_ctx: ClientContext<Self>,
373 module_api: DynModuleApi,
374 module_root_secret: &DerivableSecret,
375 gateway_conn: Arc<dyn GatewayConnection + Send + Sync>,
376 custom_meta_fn: Arc<dyn Fn() -> Value + Send + Sync>,
377 admin_auth: Option<ApiAuth>,
378 task_group: &TaskGroup,
379 ) -> Self {
380 let module = Self {
381 federation_id,
382 cfg,
383 notifier,
384 client_ctx,
385 module_api,
386 keypair: module_root_secret
387 .child_key(ChildId(0))
388 .to_secp_key(SECP256K1),
389 lnurl_keypair: module_root_secret
390 .child_key(ChildId(1))
391 .to_secp_key(SECP256K1),
392 gateway_conn,
393 admin_auth,
394 };
395
396 module.spawn_receive_lnurl_task(custom_meta_fn, task_group);
397
398 module.spawn_gateway_map_update_task(task_group);
399
400 module
401 }
402
403 fn spawn_gateway_map_update_task(&self, task_group: &TaskGroup) {
404 let module = self.clone();
405 let api = self.module_api.clone();
406
407 task_group.spawn_cancellable("gateway_map_update_task", async move {
408 api.wait_for_initialized_connections().await;
409 module.update_gateway_map().await;
410 });
411 }
412
413 async fn update_gateway_map(&self) {
414 if let Ok(gateways) = self.module_api.gateways().await {
421 let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
422
423 for gateway in gateways {
424 if let Ok(Some(routing_info)) = self
425 .gateway_conn
426 .routing_info(gateway.clone(), &self.federation_id)
427 .await
428 {
429 dbtx.insert_entry(&GatewayKey(routing_info.lightning_public_key), &gateway)
430 .await;
431 }
432 }
433
434 if let Err(e) = dbtx.commit_tx_result().await {
435 warn!("Failed to commit the updated gateway mapping to the database: {e}");
436 }
437 }
438 }
439
440 pub async fn select_gateway(
445 &self,
446 invoice: Option<Bolt11Invoice>,
447 ) -> Result<(SafeUrl, RoutingInfo), SelectGatewayError> {
448 let gateways = self
449 .module_api
450 .gateways()
451 .await
452 .map_err(|e| SelectGatewayError::FailedToRequestGateways(e.to_string()))?;
453
454 if gateways.is_empty() {
455 return Err(SelectGatewayError::NoGatewaysAvailable);
456 }
457
458 if let Some(invoice) = invoice
459 && let Some(gateway) = self
460 .client_ctx
461 .module_db()
462 .begin_transaction_nc()
463 .await
464 .get_value(&GatewayKey(invoice.recover_payee_pub_key()))
465 .await
466 .filter(|gateway| gateways.contains(gateway))
467 && let Ok(Some(routing_info)) = self.routing_info(&gateway).await
468 {
469 return Ok((gateway, routing_info));
470 }
471
472 for gateway in gateways {
473 if let Ok(Some(routing_info)) = self.routing_info(&gateway).await {
474 return Ok((gateway, routing_info));
475 }
476 }
477
478 Err(SelectGatewayError::GatewaysUnresponsive)
479 }
480
481 pub async fn list_gateways(
484 &self,
485 peer: Option<PeerId>,
486 ) -> Result<Vec<SafeUrl>, ListGatewaysError> {
487 if let Some(peer) = peer {
488 self.module_api
489 .gateways_from_peer(peer)
490 .await
491 .map_err(|_| ListGatewaysError::FailedToListGateways)
492 } else {
493 self.module_api
494 .gateways()
495 .await
496 .map_err(|_| ListGatewaysError::FailedToListGateways)
497 }
498 }
499
500 pub async fn routing_info(
503 &self,
504 gateway: &SafeUrl,
505 ) -> Result<Option<RoutingInfo>, RoutingInfoError> {
506 self.gateway_conn
507 .routing_info(gateway.clone(), &self.federation_id)
508 .await
509 .map_err(|_| RoutingInfoError::FailedToRequestRoutingInfo)
510 }
511
512 #[allow(clippy::too_many_lines)]
529 pub async fn send(
530 &self,
531 invoice: Bolt11Invoice,
532 gateway: Option<SafeUrl>,
533 custom_meta: Value,
534 ) -> Result<OperationId, SendPaymentError> {
535 let amount = invoice
536 .amount_milli_satoshis()
537 .ok_or(SendPaymentError::InvoiceMissingAmount)?;
538
539 if invoice.is_expired() {
540 return Err(SendPaymentError::InvoiceExpired);
541 }
542
543 if self.cfg.network != invoice.currency().into() {
544 return Err(SendPaymentError::WrongCurrency {
545 invoice_currency: invoice.currency(),
546 federation_currency: self.cfg.network.into(),
547 });
548 }
549
550 let operation_id = self.get_next_operation_id(&invoice).await?;
551
552 let (ephemeral_tweak, ephemeral_pk) = tweak::generate(self.keypair.public_key());
553
554 let refund_keypair = SecretKey::from_slice(&ephemeral_tweak)
555 .expect("32 bytes, within curve order")
556 .keypair(secp256k1::SECP256K1);
557
558 let (gateway_api, routing_info) = match gateway {
559 Some(gateway_api) => (
560 gateway_api.clone(),
561 self.routing_info(&gateway_api)
562 .await
563 .map_err(|e| SendPaymentError::FailedToConnectToGateway(e.to_string()))?
564 .ok_or(SendPaymentError::FederationNotSupported)?,
565 ),
566 None => self
567 .select_gateway(Some(invoice.clone()))
568 .await
569 .map_err(SendPaymentError::SelectGateway)?,
570 };
571
572 let (send_fee, expiration_delta) = routing_info.send_parameters(&invoice);
573
574 if !send_fee.le(&PaymentFee::SEND_FEE_LIMIT) {
575 return Err(SendPaymentError::GatewayFeeExceedsLimit);
576 }
577
578 if EXPIRATION_DELTA_LIMIT < expiration_delta {
579 return Err(SendPaymentError::GatewayExpirationExceedsLimit);
580 }
581
582 let consensus_block_count = self
583 .module_api
584 .consensus_block_count()
585 .await
586 .map_err(|e| SendPaymentError::FailedToRequestBlockCount(e.to_string()))?;
587
588 let contract = OutgoingContract {
589 payment_image: PaymentImage::Hash(*invoice.payment_hash()),
590 amount: send_fee.add_to(amount),
591 expiration: consensus_block_count + expiration_delta + CONTRACT_CONFIRMATION_BUFFER,
592 claim_pk: routing_info.module_public_key,
593 refund_pk: refund_keypair.public_key(),
594 ephemeral_pk,
595 };
596
597 let contract_clone = contract.clone();
598 let gateway_api_clone = gateway_api.clone();
599 let invoice_clone = invoice.clone();
600
601 let client_output = ClientOutput::<LightningOutput> {
602 output: LightningOutput::V0(LightningOutputV0::Outgoing(contract.clone())),
603 amounts: Amounts::new_bitcoin(contract.amount),
604 };
605
606 let client_output_sm = ClientOutputSM::<LightningClientStateMachines> {
607 state_machines: Arc::new(move |range: OutPointRange| {
608 vec![LightningClientStateMachines::Send(SendStateMachine {
609 common: SendSMCommon {
610 operation_id,
611 outpoint: range.into_iter().next().unwrap(),
612 contract: contract_clone.clone(),
613 gateway_api: Some(gateway_api_clone.clone()),
614 invoice: Some(LightningInvoice::Bolt11(invoice_clone.clone())),
615 refund_keypair,
616 },
617 state: SendSMState::Funding,
618 })]
619 }),
620 };
621
622 let client_output = self.client_ctx.make_client_outputs(ClientOutputBundle::new(
623 vec![client_output],
624 vec![client_output_sm],
625 ));
626
627 let transaction = TransactionBuilder::new().with_outputs(client_output);
628
629 self.client_ctx
630 .finalize_and_submit_transaction(
631 operation_id,
632 LightningCommonInit::KIND.as_str(),
633 move |change_outpoint_range| {
634 LightningOperationMeta::Send(SendOperationMeta {
635 change_outpoint_range,
636 gateway: gateway_api.clone(),
637 contract: contract.clone(),
638 invoice: LightningInvoice::Bolt11(invoice.clone()),
639 custom_meta: custom_meta.clone(),
640 })
641 },
642 transaction,
643 )
644 .await
645 .map_err(|e| SendPaymentError::FailedToFundPayment(e.to_string()))?;
646
647 let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
648
649 self.client_ctx
650 .log_event(
651 &mut dbtx,
652 SendPaymentEvent {
653 operation_id,
654 amount: send_fee.add_to(amount),
655 fee: Some(send_fee.fee(amount)),
656 },
657 )
658 .await;
659
660 dbtx.commit_tx().await;
661
662 Ok(operation_id)
663 }
664
665 async fn get_next_operation_id(
666 &self,
667 invoice: &Bolt11Invoice,
668 ) -> Result<OperationId, SendPaymentError> {
669 for payment_attempt in 0..u64::MAX {
670 let operation_id = OperationId::from_encodable(&(invoice.clone(), payment_attempt));
671
672 if !self.client_ctx.operation_exists(operation_id).await {
673 return Ok(operation_id);
674 }
675
676 if self.client_ctx.has_active_states(operation_id).await {
677 return Err(SendPaymentError::PaymentInProgress(operation_id));
678 }
679
680 let mut stream = self
681 .subscribe_send_operation_state_updates(operation_id)
682 .await
683 .expect("operation_id exists")
684 .into_stream();
685
686 while let Some(state) = stream.next().await {
689 if let SendOperationState::Success(_) = state {
690 return Err(SendPaymentError::InvoiceAlreadyPaid(operation_id));
691 }
692 }
693 }
694
695 panic!("We could not find an unused operation id for sending a lightning payment");
696 }
697
698 pub async fn subscribe_send_operation_state_updates(
700 &self,
701 operation_id: OperationId,
702 ) -> anyhow::Result<UpdateStreamOrOutcome<SendOperationState>> {
703 let operation = self.client_ctx.get_operation(operation_id).await?;
704 let mut stream = self.notifier.subscribe(operation_id).await;
705 let client_ctx = self.client_ctx.clone();
706 let module_api = self.module_api.clone();
707
708 Ok(self.client_ctx.outcome_or_updates(operation, operation_id, move || {
709 stream! {
710 loop {
711 if let Some(LightningClientStateMachines::Send(state)) = stream.next().await {
712 match state.state {
713 SendSMState::Funding => yield SendOperationState::Funding,
714 SendSMState::Funded => yield SendOperationState::Funded,
715 SendSMState::Success(preimage) => {
716 assert!(state.common.contract.verify_preimage(&preimage));
718
719 yield SendOperationState::Success(preimage);
720 return;
721 },
722 SendSMState::Refunding(out_points) => {
723 yield SendOperationState::Refunding;
724
725 if client_ctx.await_primary_module_outputs(operation_id, out_points.clone()).await.is_ok() {
726 yield SendOperationState::Refunded;
727 return;
728 }
729
730 if let Some(preimage) = module_api.await_preimage(
734 state.common.outpoint,
735 0
736 ).await
737 && state.common.contract.verify_preimage(&preimage) {
738 yield SendOperationState::Success(preimage);
739 return;
740 }
741
742 yield SendOperationState::Failure;
743 return;
744 },
745 SendSMState::Rejected(..) => {
746 yield SendOperationState::Failure;
747 return;
748 },
749 }
750 }
751 }
752 }
753 }))
754 }
755
756 pub async fn await_final_send_operation_state(
758 &self,
759 operation_id: OperationId,
760 ) -> anyhow::Result<FinalSendOperationState> {
761 let state = self
762 .subscribe_send_operation_state_updates(operation_id)
763 .await?
764 .into_stream()
765 .filter_map(|state| {
766 futures::future::ready(match state {
767 SendOperationState::Success(_) => Some(FinalSendOperationState::Success),
768 SendOperationState::Refunded => Some(FinalSendOperationState::Refunded),
769 SendOperationState::Failure => Some(FinalSendOperationState::Failure),
770 _ => None,
771 })
772 })
773 .next()
774 .await
775 .expect("Stream contains one final state");
776
777 Ok(state)
778 }
779
780 pub async fn receive(
792 &self,
793 amount: Amount,
794 expiry_secs: u32,
795 description: Bolt11InvoiceDescription,
796 gateway: Option<SafeUrl>,
797 custom_meta: Value,
798 ) -> Result<(Bolt11Invoice, OperationId), ReceiveError> {
799 let (gateway, contract, invoice) = self
800 .create_contract_and_fetch_invoice(
801 self.keypair.public_key(),
802 amount,
803 expiry_secs,
804 description,
805 gateway,
806 )
807 .await?;
808
809 let operation_id = self
810 .receive_incoming_contract(
811 self.keypair.secret_key(),
812 contract.clone(),
813 LightningOperationMeta::Receive(ReceiveOperationMeta {
814 gateway,
815 contract,
816 invoice: LightningInvoice::Bolt11(invoice.clone()),
817 custom_meta,
818 }),
819 )
820 .await
821 .expect("The contract has been generated with our public key");
822
823 Ok((invoice, operation_id))
824 }
825
826 async fn create_contract_and_fetch_invoice(
830 &self,
831 recipient_static_pk: PublicKey,
832 amount: Amount,
833 expiry_secs: u32,
834 description: Bolt11InvoiceDescription,
835 gateway: Option<SafeUrl>,
836 ) -> Result<(SafeUrl, IncomingContract, Bolt11Invoice), ReceiveError> {
837 let (ephemeral_tweak, ephemeral_pk) = tweak::generate(recipient_static_pk);
838
839 let encryption_seed = ephemeral_tweak
840 .consensus_hash::<sha256::Hash>()
841 .to_byte_array();
842
843 let preimage = encryption_seed
844 .consensus_hash::<sha256::Hash>()
845 .to_byte_array();
846
847 let (gateway, routing_info) = match gateway {
848 Some(gateway) => (
849 gateway.clone(),
850 self.routing_info(&gateway)
851 .await
852 .map_err(|e| ReceiveError::FailedToConnectToGateway(e.to_string()))?
853 .ok_or(ReceiveError::FederationNotSupported)?,
854 ),
855 None => self
856 .select_gateway(None)
857 .await
858 .map_err(ReceiveError::SelectGateway)?,
859 };
860
861 if !routing_info.receive_fee.le(&PaymentFee::RECEIVE_FEE_LIMIT) {
862 return Err(ReceiveError::GatewayFeeExceedsLimit);
863 }
864
865 let contract_amount = routing_info.receive_fee.subtract_from(amount.msats);
866
867 if contract_amount < MINIMUM_INCOMING_CONTRACT_AMOUNT {
868 return Err(ReceiveError::AmountTooSmall);
869 }
870
871 let expiration = duration_since_epoch()
872 .as_secs()
873 .saturating_add(u64::from(expiry_secs));
874
875 let claim_pk = recipient_static_pk
876 .mul_tweak(
877 secp256k1::SECP256K1,
878 &Scalar::from_be_bytes(ephemeral_tweak).expect("Within curve order"),
879 )
880 .expect("Tweak is valid");
881
882 let contract = IncomingContract::new(
883 self.cfg.tpe_agg_pk,
884 encryption_seed,
885 preimage,
886 PaymentImage::Hash(preimage.consensus_hash()),
887 contract_amount,
888 expiration,
889 claim_pk,
890 routing_info.module_public_key,
891 ephemeral_pk,
892 );
893
894 let invoice = self
895 .gateway_conn
896 .bolt11_invoice(
897 gateway.clone(),
898 self.federation_id,
899 contract.clone(),
900 amount,
901 description,
902 expiry_secs,
903 )
904 .await
905 .map_err(|e| ReceiveError::FailedToConnectToGateway(e.to_string()))?;
906
907 if invoice.payment_hash() != &preimage.consensus_hash() {
908 return Err(ReceiveError::InvalidInvoice);
909 }
910
911 if invoice.amount_milli_satoshis() != Some(amount.msats) {
912 return Err(ReceiveError::IncorrectInvoiceAmount);
913 }
914
915 Ok((gateway, contract, invoice))
916 }
917
918 async fn receive_incoming_contract(
921 &self,
922 sk: SecretKey,
923 contract: IncomingContract,
924 operation_meta: LightningOperationMeta,
925 ) -> Option<OperationId> {
926 let operation_id = OperationId::from_encodable(&contract.clone());
927
928 let (claim_keypair, agg_decryption_key) = self.recover_contract_keys(sk, &contract)?;
929
930 let receive_sm = LightningClientStateMachines::Receive(ReceiveStateMachine {
931 common: ReceiveSMCommon {
932 operation_id,
933 contract: contract.clone(),
934 claim_keypair,
935 agg_decryption_key,
936 },
937 state: ReceiveSMState::Pending,
938 });
939
940 self.client_ctx
943 .manual_operation_start(
944 operation_id,
945 LightningCommonInit::KIND.as_str(),
946 operation_meta,
947 vec![self.client_ctx.make_dyn_state(receive_sm)],
948 )
949 .await
950 .ok();
951
952 Some(operation_id)
953 }
954
955 fn recover_contract_keys(
956 &self,
957 sk: SecretKey,
958 contract: &IncomingContract,
959 ) -> Option<(Keypair, AggregateDecryptionKey)> {
960 let tweak = ecdh::SharedSecret::new(&contract.commitment.ephemeral_pk, &sk);
961
962 let encryption_seed = tweak
963 .secret_bytes()
964 .consensus_hash::<sha256::Hash>()
965 .to_byte_array();
966
967 let claim_keypair = sk
968 .mul_tweak(&Scalar::from_be_bytes(tweak.secret_bytes()).expect("Within curve order"))
969 .expect("Tweak is valid")
970 .keypair(secp256k1::SECP256K1);
971
972 if claim_keypair.public_key() != contract.commitment.claim_pk {
973 return None; }
975
976 let agg_decryption_key = derive_agg_dk(&self.cfg.tpe_agg_pk, &encryption_seed);
977
978 if !contract.verify_agg_decryption_key(&self.cfg.tpe_agg_pk, &agg_decryption_key) {
979 return None; }
981
982 contract.decrypt_preimage(&agg_decryption_key)?;
983
984 Some((claim_keypair, agg_decryption_key))
985 }
986
987 pub async fn subscribe_receive_operation_state_updates(
989 &self,
990 operation_id: OperationId,
991 ) -> anyhow::Result<UpdateStreamOrOutcome<ReceiveOperationState>> {
992 let operation = self.client_ctx.get_operation(operation_id).await?;
993 let mut stream = self.notifier.subscribe(operation_id).await;
994 let client_ctx = self.client_ctx.clone();
995
996 Ok(self.client_ctx.outcome_or_updates(operation, operation_id, move || {
997 stream! {
998 loop {
999 if let Some(LightningClientStateMachines::Receive(state)) = stream.next().await {
1000 match state.state {
1001 ReceiveSMState::Pending => yield ReceiveOperationState::Pending,
1002 ReceiveSMState::Claiming(out_points) => {
1003 yield ReceiveOperationState::Claiming;
1004
1005 if client_ctx.await_primary_module_outputs(operation_id, out_points).await.is_ok() {
1006 yield ReceiveOperationState::Claimed;
1007 } else {
1008 yield ReceiveOperationState::Failure;
1009 }
1010 return;
1011 },
1012 ReceiveSMState::Expired => {
1013 yield ReceiveOperationState::Expired;
1014 return;
1015 }
1016 }
1017 }
1018 }
1019 }
1020 }))
1021 }
1022
1023 pub async fn await_final_receive_operation_state(
1025 &self,
1026 operation_id: OperationId,
1027 ) -> anyhow::Result<FinalReceiveOperationState> {
1028 let state = self
1029 .subscribe_receive_operation_state_updates(operation_id)
1030 .await?
1031 .into_stream()
1032 .filter_map(|state| {
1033 futures::future::ready(match state {
1034 ReceiveOperationState::Expired => Some(FinalReceiveOperationState::Expired),
1035 ReceiveOperationState::Claimed => Some(FinalReceiveOperationState::Claimed),
1036 ReceiveOperationState::Failure => Some(FinalReceiveOperationState::Failure),
1037 _ => None,
1038 })
1039 })
1040 .next()
1041 .await
1042 .expect("Stream contains one final state");
1043
1044 Ok(state)
1045 }
1046
1047 pub async fn generate_lnurl(
1050 &self,
1051 recurringd: SafeUrl,
1052 gateway: Option<SafeUrl>,
1053 ) -> Result<String, GenerateLnurlError> {
1054 let gateways = if let Some(gateway) = gateway {
1055 vec![gateway]
1056 } else {
1057 let gateways = self
1058 .module_api
1059 .gateways()
1060 .await
1061 .map_err(|e| GenerateLnurlError::FailedToRequestGateways(e.to_string()))?;
1062
1063 if gateways.is_empty() {
1064 return Err(GenerateLnurlError::NoGatewaysAvailable);
1065 }
1066
1067 gateways
1068 };
1069
1070 let lnurl = lnurl::generate_lnurl(
1071 &recurringd,
1072 self.federation_id,
1073 self.lnurl_keypair.public_key(),
1074 self.cfg.tpe_agg_pk,
1075 gateways,
1076 );
1077
1078 Ok(lnurl)
1079 }
1080
1081 fn spawn_receive_lnurl_task(
1082 &self,
1083 custom_meta_fn: Arc<dyn Fn() -> Value + Send + Sync>,
1084 task_group: &TaskGroup,
1085 ) {
1086 let module = self.clone();
1087 let api = self.module_api.clone();
1088
1089 task_group.spawn_cancellable("receive_lnurl_task", async move {
1090 api.wait_for_initialized_connections().await;
1091 loop {
1092 module.receive_lnurl(custom_meta_fn()).await;
1093 }
1094 });
1095 }
1096
1097 async fn receive_lnurl(&self, custom_meta: Value) {
1098 let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
1099
1100 let stream_index = dbtx
1101 .get_value(&IncomingContractStreamIndexKey)
1102 .await
1103 .unwrap_or(0);
1104
1105 let (contracts, next_index) = self
1106 .module_api
1107 .await_incoming_contracts(stream_index, 128)
1108 .await;
1109
1110 for contract in &contracts {
1111 if let Some(operation_id) = self
1112 .receive_incoming_contract(
1113 self.lnurl_keypair.secret_key(),
1114 contract.clone(),
1115 LightningOperationMeta::LnurlReceive(LnurlReceiveOperationMeta {
1116 contract: contract.clone(),
1117 custom_meta: custom_meta.clone(),
1118 }),
1119 )
1120 .await
1121 {
1122 self.await_final_receive_operation_state(operation_id)
1123 .await
1124 .ok();
1125 }
1126 }
1127
1128 dbtx.insert_entry(&IncomingContractStreamIndexKey, &next_index)
1129 .await;
1130
1131 dbtx.commit_tx().await;
1132 }
1133}
1134
1135#[derive(Error, Debug, Clone, Eq, PartialEq)]
1136pub enum SelectGatewayError {
1137 #[error("Failed to request gateways")]
1138 FailedToRequestGateways(String),
1139 #[error("No gateways are available")]
1140 NoGatewaysAvailable,
1141 #[error("All gateways failed to respond")]
1142 GatewaysUnresponsive,
1143}
1144
1145#[derive(Error, Debug, Clone, Eq, PartialEq)]
1146pub enum SendPaymentError {
1147 #[error("Invoice is missing an amount")]
1148 InvoiceMissingAmount,
1149 #[error("Invoice has expired")]
1150 InvoiceExpired,
1151 #[error("A payment for this invoice is already in progress")]
1152 PaymentInProgress(OperationId),
1153 #[error("This invoice has already been paid")]
1154 InvoiceAlreadyPaid(OperationId),
1155 #[error(transparent)]
1156 SelectGateway(SelectGatewayError),
1157 #[error("Failed to connect to gateway")]
1158 FailedToConnectToGateway(String),
1159 #[error("Gateway does not support this federation")]
1160 FederationNotSupported,
1161 #[error("Gateway fee exceeds the allowed limit")]
1162 GatewayFeeExceedsLimit,
1163 #[error("Gateway expiration time exceeds the allowed limit")]
1164 GatewayExpirationExceedsLimit,
1165 #[error("Failed to request block count")]
1166 FailedToRequestBlockCount(String),
1167 #[error("Failed to fund the payment")]
1168 FailedToFundPayment(String),
1169 #[error("Invoice is for a different currency")]
1170 WrongCurrency {
1171 invoice_currency: Currency,
1172 federation_currency: Currency,
1173 },
1174}
1175
1176#[derive(Error, Debug, Clone, Eq, PartialEq)]
1177pub enum ReceiveError {
1178 #[error(transparent)]
1179 SelectGateway(SelectGatewayError),
1180 #[error("Failed to connect to gateway")]
1181 FailedToConnectToGateway(String),
1182 #[error("Gateway does not support this federation")]
1183 FederationNotSupported,
1184 #[error("Gateway fee exceeds the allowed limit")]
1185 GatewayFeeExceedsLimit,
1186 #[error("Amount is too small to cover fees")]
1187 AmountTooSmall,
1188 #[error("Gateway returned an invalid invoice")]
1189 InvalidInvoice,
1190 #[error("Gateway returned an invoice with incorrect amount")]
1191 IncorrectInvoiceAmount,
1192}
1193
1194#[derive(Error, Debug, Clone, Eq, PartialEq)]
1195pub enum GenerateLnurlError {
1196 #[error("No gateways are available")]
1197 NoGatewaysAvailable,
1198 #[error("Failed to request gateways")]
1199 FailedToRequestGateways(String),
1200}
1201
1202#[derive(Error, Debug, Clone, Eq, PartialEq)]
1203pub enum ListGatewaysError {
1204 #[error("Failed to request gateways")]
1205 FailedToListGateways,
1206}
1207
1208#[derive(Error, Debug, Clone, Eq, PartialEq)]
1209pub enum RoutingInfoError {
1210 #[error("Failed to request routing info")]
1211 FailedToRequestRoutingInfo,
1212}
1213
1214#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
1215pub enum LightningClientStateMachines {
1216 Send(SendStateMachine),
1217 Receive(ReceiveStateMachine),
1218}
1219
1220impl IntoDynInstance for LightningClientStateMachines {
1221 type DynType = DynState;
1222
1223 fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
1224 DynState::from_typed(instance_id, self)
1225 }
1226}
1227
1228impl State for LightningClientStateMachines {
1229 type ModuleContext = LightningClientContext;
1230
1231 fn transitions(
1232 &self,
1233 context: &Self::ModuleContext,
1234 global_context: &DynGlobalClientContext,
1235 ) -> Vec<StateTransition<Self>> {
1236 match self {
1237 LightningClientStateMachines::Send(state) => {
1238 sm_enum_variant_translation!(
1239 state.transitions(context, global_context),
1240 LightningClientStateMachines::Send
1241 )
1242 }
1243 LightningClientStateMachines::Receive(state) => {
1244 sm_enum_variant_translation!(
1245 state.transitions(context, global_context),
1246 LightningClientStateMachines::Receive
1247 )
1248 }
1249 }
1250 }
1251
1252 fn operation_id(&self) -> OperationId {
1253 match self {
1254 LightningClientStateMachines::Send(state) => state.operation_id(),
1255 LightningClientStateMachines::Receive(state) => state.operation_id(),
1256 }
1257 }
1258}