Skip to main content

fedimint_lnv2_client/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::missing_errors_doc)]
3#![allow(clippy::missing_panics_doc)]
4#![allow(clippy::module_name_repetitions)]
5#![allow(clippy::must_use_candidate)]
6
7pub use fedimint_lnv2_common as common;
8
9mod api;
10#[cfg(feature = "cli")]
11mod cli;
12mod db;
13pub mod events;
14mod receive_sm;
15mod send_sm;
16
17use std::collections::{BTreeMap, BTreeSet};
18use std::sync::Arc;
19
20use async_stream::stream;
21use bitcoin::hashes::{Hash, sha256};
22use bitcoin::secp256k1;
23use db::{DbKeyPrefix, GatewayKey, IncomingContractStreamIndexKey};
24use fedimint_api_client::api::DynModuleApi;
25use fedimint_client_module::module::init::{ClientModuleInit, ClientModuleInitArgs};
26use fedimint_client_module::module::recovery::NoModuleBackup;
27use fedimint_client_module::module::{ClientContext, ClientModule, OutPointRange};
28use fedimint_client_module::oplog::UpdateStreamOrOutcome;
29use fedimint_client_module::sm::{Context, DynState, ModuleNotifier, State, StateTransition};
30use fedimint_client_module::transaction::{
31    ClientOutput, ClientOutputBundle, ClientOutputSM, TransactionBuilder,
32};
33use fedimint_client_module::{DynGlobalClientContext, sm_enum_variant_translation};
34use fedimint_core::config::FederationId;
35use fedimint_core::core::{IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId};
36use fedimint_core::db::{DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
37use fedimint_core::encoding::{Decodable, Encodable};
38use fedimint_core::module::{
39    Amounts, ApiAuth, ApiVersion, CommonModuleInit, ModuleCommon, ModuleInit, MultiApiVersion,
40};
41use fedimint_core::secp256k1::SECP256K1;
42use fedimint_core::task::TaskGroup;
43use fedimint_core::time::duration_since_epoch;
44use fedimint_core::util::SafeUrl;
45use fedimint_core::{Amount, PeerId, apply, async_trait_maybe_send};
46use fedimint_derive_secret::{ChildId, DerivableSecret};
47use fedimint_lnv2_common::config::LightningClientConfig;
48use fedimint_lnv2_common::contracts::{IncomingContract, OutgoingContract, PaymentImage};
49use fedimint_lnv2_common::gateway_api::{
50    GatewayConnection, PaymentFee, RealGatewayConnection, RoutingInfo,
51};
52use fedimint_lnv2_common::{
53    Bolt11InvoiceDescription, GatewayApi, KIND, LightningCommonInit, LightningInvoice,
54    LightningModuleTypes, LightningOutput, LightningOutputV0, MINIMUM_INCOMING_CONTRACT_AMOUNT,
55    lnurl, tweak,
56};
57use futures::StreamExt;
58use lightning_invoice::{Bolt11Invoice, Currency};
59use secp256k1::{Keypair, PublicKey, Scalar, SecretKey, ecdh};
60use serde::{Deserialize, Serialize};
61use serde_json::Value;
62use strum::IntoEnumIterator as _;
63use thiserror::Error;
64use tpe::{AggregateDecryptionKey, derive_agg_dk};
65use tracing::warn;
66
67use crate::api::LightningFederationApi;
68use crate::events::SendPaymentEvent;
69use crate::receive_sm::{ReceiveSMCommon, ReceiveSMState, ReceiveStateMachine};
70use crate::send_sm::{SendSMCommon, SendSMState, SendStateMachine};
71
72/// Number of blocks until outgoing lightning contracts times out and user
73/// client can refund it unilaterally
74const EXPIRATION_DELTA_LIMIT: u64 = 1440;
75
76/// A two hour buffer in case either the client or gateway go offline
77const CONTRACT_CONFIRMATION_BUFFER: u64 = 12;
78
79#[allow(clippy::large_enum_variant)]
80#[derive(Debug, Clone, Serialize, Deserialize)]
81pub enum LightningOperationMeta {
82    Send(SendOperationMeta),
83    Receive(ReceiveOperationMeta),
84    LnurlReceive(LnurlReceiveOperationMeta),
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct SendOperationMeta {
89    pub change_outpoint_range: OutPointRange,
90    pub gateway: SafeUrl,
91    pub contract: OutgoingContract,
92    pub invoice: LightningInvoice,
93    pub custom_meta: Value,
94}
95
96impl SendOperationMeta {
97    /// Calculate the absolute fee paid to the gateway on success.
98    pub fn gateway_fee(&self) -> Amount {
99        match &self.invoice {
100            LightningInvoice::Bolt11(invoice) => self.contract.amount.saturating_sub(
101                Amount::from_msats(invoice.amount_milli_satoshis().expect("Invoice has amount")),
102            ),
103        }
104    }
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct ReceiveOperationMeta {
109    pub gateway: SafeUrl,
110    pub contract: IncomingContract,
111    pub invoice: LightningInvoice,
112    pub custom_meta: Value,
113}
114
115impl ReceiveOperationMeta {
116    /// Calculate the absolute fee paid to the gateway on success.
117    pub fn gateway_fee(&self) -> Amount {
118        match &self.invoice {
119            LightningInvoice::Bolt11(invoice) => {
120                Amount::from_msats(invoice.amount_milli_satoshis().expect("Invoice has amount"))
121                    .saturating_sub(self.contract.commitment.amount)
122            }
123        }
124    }
125}
126
127#[derive(Debug, Clone, Serialize, Deserialize)]
128pub struct LnurlReceiveOperationMeta {
129    pub contract: IncomingContract,
130    pub custom_meta: Value,
131}
132
133#[cfg_attr(doc, aquamarine::aquamarine)]
134/// The state of an operation sending a payment over lightning.
135///
136/// ```mermaid
137/// graph LR
138/// classDef virtual fill:#fff,stroke-dasharray: 5 5
139///
140///     Funding -- funding transaction is rejected --> Rejected
141///     Funding -- funding transaction is accepted --> Funded
142///     Funded -- payment is confirmed  --> Success
143///     Funded -- payment attempt expires --> Refunding
144///     Funded -- gateway cancels payment attempt --> Refunding
145///     Refunding -- payment is confirmed --> Success
146///     Refunding -- ecash is minted --> Refunded
147///     Refunding -- minting ecash fails --> Failure
148/// ```
149/// The transition from Refunding to Success is only possible if the gateway
150/// misbehaves.
151#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
152pub enum SendOperationState {
153    /// We are funding the contract to incentivize the gateway.
154    Funding,
155    /// We are waiting for the gateway to complete the payment.
156    Funded,
157    /// The payment was successful.
158    Success([u8; 32]),
159    /// The payment has failed and we are refunding the contract.
160    Refunding,
161    /// The payment has been refunded.
162    Refunded,
163    /// Either a programming error has occurred or the federation is malicious.
164    Failure,
165}
166
167/// The final state of an operation sending a payment over lightning.
168#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
169pub enum FinalSendOperationState {
170    /// The payment was successful. Carries the payment preimage proving
171    /// the gateway settled the invoice, serialized as a lowercase hex string.
172    Success(#[serde(with = "fedimint_core::hex::serde")] [u8; 32]),
173    /// The payment has been refunded.
174    Refunded,
175    /// Either a programming error has occurred or the federation is malicious.
176    Failure,
177}
178
179pub type SendResult = Result<OperationId, SendPaymentError>;
180
181#[cfg_attr(doc, aquamarine::aquamarine)]
182/// The state of an operation receiving a payment over lightning.
183///
184/// ```mermaid
185/// graph LR
186/// classDef virtual fill:#fff,stroke-dasharray: 5 5
187///
188///     Pending -- payment is confirmed --> Claiming
189///     Pending -- invoice expires --> Expired
190///     Claiming -- ecash is minted --> Claimed
191///     Claiming -- minting ecash fails --> Failure
192/// ```
193#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
194pub enum ReceiveOperationState {
195    /// We are waiting for the payment.
196    Pending,
197    /// The payment request has expired.
198    Expired,
199    /// The payment has been confirmed and we are issuing the ecash.
200    Claiming,
201    /// The payment has been successful.
202    Claimed,
203    /// Either a programming error has occurred or the federation is malicious.
204    Failure,
205}
206
207/// The final state of an operation receiving a payment over lightning.
208#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
209pub enum FinalReceiveOperationState {
210    /// The payment request has expired.
211    Expired,
212    /// The payment has been successful.
213    Claimed,
214    /// Either a programming error has occurred or the federation is malicious.
215    Failure,
216}
217
218pub type ReceiveResult = Result<(Bolt11Invoice, OperationId), ReceiveError>;
219
220#[derive(Clone)]
221pub struct LightningClientInit {
222    pub gateway_conn: Option<Arc<dyn GatewayConnection + Send + Sync>>,
223    pub custom_meta_fn: Arc<dyn Fn() -> Value + Send + Sync>,
224}
225
226impl std::fmt::Debug for LightningClientInit {
227    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
228        f.debug_struct("LightningClientInit")
229            .field("gateway_conn", &self.gateway_conn)
230            .field("custom_meta_fn", &"<function>")
231            .finish()
232    }
233}
234
235impl Default for LightningClientInit {
236    fn default() -> Self {
237        LightningClientInit {
238            gateway_conn: None,
239            custom_meta_fn: Arc::new(|| Value::Null),
240        }
241    }
242}
243
244impl ModuleInit for LightningClientInit {
245    type Common = LightningCommonInit;
246
247    async fn dump_database(
248        &self,
249        _dbtx: &mut DatabaseTransaction<'_>,
250        _prefix_names: Vec<String>,
251    ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
252        Box::new(BTreeMap::new().into_iter())
253    }
254}
255
256#[apply(async_trait_maybe_send!)]
257impl ClientModuleInit for LightningClientInit {
258    type Module = LightningClientModule;
259
260    fn supported_api_versions(&self) -> MultiApiVersion {
261        MultiApiVersion::try_from_iter([ApiVersion { major: 0, minor: 0 }])
262            .expect("no version conflicts")
263    }
264
265    async fn init(&self, args: &ClientModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
266        let gateway_conn = if let Some(gateway_conn) = self.gateway_conn.clone() {
267            gateway_conn
268        } else {
269            let api = GatewayApi::new(None, args.connector_registry.clone());
270            Arc::new(RealGatewayConnection { api })
271        };
272        Ok(LightningClientModule::new(
273            *args.federation_id(),
274            args.cfg().clone(),
275            args.notifier().clone(),
276            args.context(),
277            args.module_api().clone(),
278            args.module_root_secret(),
279            gateway_conn,
280            self.custom_meta_fn.clone(),
281            args.admin_auth().cloned(),
282            args.task_group(),
283            args.client_span(),
284        ))
285    }
286
287    fn used_db_prefixes(&self) -> Option<BTreeSet<u8>> {
288        Some(
289            DbKeyPrefix::iter()
290                .map(|p| p as u8)
291                .chain(
292                    DbKeyPrefix::ExternalReservedStart as u8
293                        ..=DbKeyPrefix::CoreInternalReservedEnd as u8,
294                )
295                .collect(),
296        )
297    }
298}
299
300#[derive(Debug, Clone)]
301pub struct LightningClientContext {
302    federation_id: FederationId,
303    gateway_conn: Arc<dyn GatewayConnection + Send + Sync>,
304    pub(crate) client_ctx: ClientContext<LightningClientModule>,
305}
306
307impl Context for LightningClientContext {
308    const KIND: Option<ModuleKind> = Some(KIND);
309}
310
311#[derive(Debug, Clone)]
312pub struct LightningClientModule {
313    federation_id: FederationId,
314    cfg: LightningClientConfig,
315    notifier: ModuleNotifier<LightningClientStateMachines>,
316    client_ctx: ClientContext<Self>,
317    module_api: DynModuleApi,
318    keypair: Keypair,
319    lnurl_keypair: Keypair,
320    gateway_conn: Arc<dyn GatewayConnection + Send + Sync>,
321    #[allow(unused)] // The field is only used by the cli feature
322    admin_auth: Option<ApiAuth>,
323}
324
325#[apply(async_trait_maybe_send!)]
326impl ClientModule for LightningClientModule {
327    type Init = LightningClientInit;
328    type Common = LightningModuleTypes;
329    type Backup = NoModuleBackup;
330    type ModuleStateMachineContext = LightningClientContext;
331    type States = LightningClientStateMachines;
332
333    fn context(&self) -> Self::ModuleStateMachineContext {
334        LightningClientContext {
335            federation_id: self.federation_id,
336            gateway_conn: self.gateway_conn.clone(),
337            client_ctx: self.client_ctx.clone(),
338        }
339    }
340
341    fn input_fee(
342        &self,
343        amounts: &Amounts,
344        _input: &<Self::Common as ModuleCommon>::Input,
345    ) -> Option<Amounts> {
346        Some(Amounts::new_bitcoin(
347            self.cfg.fee_consensus.fee(amounts.expect_only_bitcoin()),
348        ))
349    }
350
351    fn output_fee(
352        &self,
353        amounts: &Amounts,
354        _output: &<Self::Common as ModuleCommon>::Output,
355    ) -> Option<Amounts> {
356        Some(Amounts::new_bitcoin(
357            self.cfg.fee_consensus.fee(amounts.expect_only_bitcoin()),
358        ))
359    }
360
361    #[cfg(feature = "cli")]
362    async fn handle_cli_command(
363        &self,
364        args: &[std::ffi::OsString],
365    ) -> anyhow::Result<serde_json::Value> {
366        cli::handle_cli_command(self, args).await
367    }
368}
369
370impl LightningClientModule {
371    #[allow(clippy::too_many_arguments)]
372    fn new(
373        federation_id: FederationId,
374        cfg: LightningClientConfig,
375        notifier: ModuleNotifier<LightningClientStateMachines>,
376        client_ctx: ClientContext<Self>,
377        module_api: DynModuleApi,
378        module_root_secret: &DerivableSecret,
379        gateway_conn: Arc<dyn GatewayConnection + Send + Sync>,
380        custom_meta_fn: Arc<dyn Fn() -> Value + Send + Sync>,
381        admin_auth: Option<ApiAuth>,
382        task_group: &TaskGroup,
383        client_span: &tracing::Span,
384    ) -> Self {
385        let module = Self {
386            federation_id,
387            cfg,
388            notifier,
389            client_ctx,
390            module_api,
391            keypair: module_root_secret
392                .child_key(ChildId(0))
393                .to_secp_key(SECP256K1),
394            lnurl_keypair: module_root_secret
395                .child_key(ChildId(1))
396                .to_secp_key(SECP256K1),
397            gateway_conn,
398            admin_auth,
399        };
400
401        module.spawn_receive_lnurl_task(custom_meta_fn, task_group, client_span);
402
403        module.spawn_gateway_map_update_task(task_group, client_span);
404
405        module
406    }
407
408    fn spawn_gateway_map_update_task(&self, task_group: &TaskGroup, client_span: &tracing::Span) {
409        let module = self.clone();
410        let api = self.module_api.clone();
411
412        task_group.spawn_cancellable_with_span(
413            client_span.clone(),
414            "gateway_map_update_task",
415            async move {
416                api.wait_for_initialized_connections().await;
417                module.update_gateway_map().await;
418            },
419        );
420    }
421
422    async fn update_gateway_map(&self) {
423        // Update the mapping from lightning node public keys to gateway api
424        // endpoints maintained in the module database. When paying an invoice this
425        // enables the client to select the gateway that has created the invoice,
426        // if possible, such that the payment does not go over lightning, reducing
427        // fees and latency.
428
429        if let Ok(gateways) = self.module_api.gateways().await {
430            let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
431
432            for gateway in gateways {
433                if let Ok(Some(routing_info)) = self
434                    .gateway_conn
435                    .routing_info(gateway.clone(), &self.federation_id)
436                    .await
437                {
438                    dbtx.insert_entry(&GatewayKey(routing_info.lightning_public_key), &gateway)
439                        .await;
440                }
441            }
442
443            if let Err(e) = dbtx.commit_tx_result().await {
444                warn!("Failed to commit the updated gateway mapping to the database: {e}");
445            }
446        }
447    }
448
449    /// Selects an available gateway by querying the federation's registered
450    /// gateways, checking if one of them match the invoice's payee public
451    /// key, then queries the gateway for `RoutingInfo` to determine if it is
452    /// online.
453    pub async fn select_gateway(
454        &self,
455        invoice: Option<Bolt11Invoice>,
456    ) -> Result<(SafeUrl, RoutingInfo), SelectGatewayError> {
457        let gateways = self
458            .module_api
459            .gateways()
460            .await
461            .map_err(|e| SelectGatewayError::FailedToRequestGateways(e.to_string()))?;
462
463        if gateways.is_empty() {
464            return Err(SelectGatewayError::NoGatewaysAvailable);
465        }
466
467        if let Some(invoice) = invoice
468            && let Some(gateway) = self
469                .client_ctx
470                .module_db()
471                .begin_transaction_nc()
472                .await
473                .get_value(&GatewayKey(invoice.recover_payee_pub_key()))
474                .await
475                .filter(|gateway| gateways.contains(gateway))
476            && let Ok(Some(routing_info)) = self.routing_info(&gateway).await
477        {
478            return Ok((gateway, routing_info));
479        }
480
481        for gateway in gateways {
482            if let Ok(Some(routing_info)) = self.routing_info(&gateway).await {
483                return Ok((gateway, routing_info));
484            }
485        }
486
487        Err(SelectGatewayError::GatewaysUnresponsive)
488    }
489
490    /// Sends a request to each peer for their registered gateway list and
491    /// returns a `Vec<SafeUrl` of all registered gateways to the client.
492    pub async fn list_gateways(
493        &self,
494        peer: Option<PeerId>,
495    ) -> Result<Vec<SafeUrl>, ListGatewaysError> {
496        if let Some(peer) = peer {
497            self.module_api
498                .gateways_from_peer(peer)
499                .await
500                .map_err(|_| ListGatewaysError::FailedToListGateways)
501        } else {
502            self.module_api
503                .gateways()
504                .await
505                .map_err(|_| ListGatewaysError::FailedToListGateways)
506        }
507    }
508
509    /// Requests the `RoutingInfo`, including fee information, from the gateway
510    /// available at the `SafeUrl`.
511    pub async fn routing_info(
512        &self,
513        gateway: &SafeUrl,
514    ) -> Result<Option<RoutingInfo>, RoutingInfoError> {
515        self.gateway_conn
516            .routing_info(gateway.clone(), &self.federation_id)
517            .await
518            .map_err(|_| RoutingInfoError::FailedToRequestRoutingInfo)
519    }
520
521    /// Pay an invoice. For testing you can optionally specify a gateway to
522    /// route with, otherwise a gateway will be selected automatically. If the
523    /// invoice was created by a gateway connected to our federation, the same
524    /// gateway will be selected to allow for a direct ecash swap. Otherwise we
525    /// select a random online gateway.
526    ///
527    /// The fee for this payment may depend on the selected gateway but
528    /// will be limited to one and a half percent plus one hundred satoshis.
529    /// This fee accounts for the fee charged by the gateway as well as
530    /// the additional fee required to reliably route this payment over
531    /// lightning if necessary. Since the gateway has been vetted by at least
532    /// one guardian we trust it to set a reasonable fee and only enforce a
533    /// rather high limit.
534    ///
535    /// The absolute fee for a payment can be calculated from the operation meta
536    /// to be shown to the user in the transaction history.
537    #[allow(clippy::too_many_lines)]
538    pub async fn send(
539        &self,
540        invoice: Bolt11Invoice,
541        gateway: Option<SafeUrl>,
542        custom_meta: Value,
543    ) -> Result<OperationId, SendPaymentError> {
544        let amount = invoice
545            .amount_milli_satoshis()
546            .ok_or(SendPaymentError::InvoiceMissingAmount)?;
547
548        if invoice.is_expired() {
549            return Err(SendPaymentError::InvoiceExpired);
550        }
551
552        if self.cfg.network != invoice.currency().into() {
553            return Err(SendPaymentError::WrongCurrency {
554                invoice_currency: invoice.currency(),
555                federation_currency: self.cfg.network.into(),
556            });
557        }
558
559        let operation_id = self.get_next_operation_id(&invoice).await?;
560
561        let (ephemeral_tweak, ephemeral_pk) = tweak::generate(self.keypair.public_key());
562
563        let refund_keypair = SecretKey::from_slice(&ephemeral_tweak)
564            .expect("32 bytes, within curve order")
565            .keypair(secp256k1::SECP256K1);
566
567        let (gateway_api, routing_info) = match gateway {
568            Some(gateway_api) => (
569                gateway_api.clone(),
570                self.routing_info(&gateway_api)
571                    .await
572                    .map_err(|e| SendPaymentError::FailedToConnectToGateway(e.to_string()))?
573                    .ok_or(SendPaymentError::FederationNotSupported)?,
574            ),
575            None => self
576                .select_gateway(Some(invoice.clone()))
577                .await
578                .map_err(SendPaymentError::SelectGateway)?,
579        };
580
581        let (send_fee, expiration_delta) = routing_info.send_parameters(&invoice);
582
583        if !send_fee.le(&PaymentFee::SEND_FEE_LIMIT) {
584            return Err(SendPaymentError::GatewayFeeExceedsLimit);
585        }
586
587        if EXPIRATION_DELTA_LIMIT < expiration_delta {
588            return Err(SendPaymentError::GatewayExpirationExceedsLimit);
589        }
590
591        let consensus_block_count = self
592            .module_api
593            .consensus_block_count()
594            .await
595            .map_err(|e| SendPaymentError::FailedToRequestBlockCount(e.to_string()))?;
596
597        let contract = OutgoingContract {
598            payment_image: PaymentImage::Hash(*invoice.payment_hash()),
599            amount: send_fee.add_to(amount),
600            expiration: consensus_block_count + expiration_delta + CONTRACT_CONFIRMATION_BUFFER,
601            claim_pk: routing_info.module_public_key,
602            refund_pk: refund_keypair.public_key(),
603            ephemeral_pk,
604        };
605
606        let contract_clone = contract.clone();
607        let gateway_api_clone = gateway_api.clone();
608        let invoice_clone = invoice.clone();
609
610        let client_output = ClientOutput::<LightningOutput> {
611            output: LightningOutput::V0(LightningOutputV0::Outgoing(contract.clone())),
612            amounts: Amounts::new_bitcoin(contract.amount),
613        };
614
615        let client_output_sm = ClientOutputSM::<LightningClientStateMachines> {
616            state_machines: Arc::new(move |range: OutPointRange| {
617                vec![LightningClientStateMachines::Send(SendStateMachine {
618                    common: SendSMCommon {
619                        operation_id,
620                        outpoint: range.into_iter().next().unwrap(),
621                        contract: contract_clone.clone(),
622                        gateway_api: Some(gateway_api_clone.clone()),
623                        invoice: Some(LightningInvoice::Bolt11(invoice_clone.clone())),
624                        refund_keypair,
625                    },
626                    state: SendSMState::Funding,
627                })]
628            }),
629        };
630
631        let client_output = self.client_ctx.make_client_outputs(ClientOutputBundle::new(
632            vec![client_output],
633            vec![client_output_sm],
634        ));
635
636        let transaction = TransactionBuilder::new().with_outputs(client_output);
637
638        self.client_ctx
639            .finalize_and_submit_transaction(
640                operation_id,
641                LightningCommonInit::KIND.as_str(),
642                move |change_outpoint_range| {
643                    LightningOperationMeta::Send(SendOperationMeta {
644                        change_outpoint_range,
645                        gateway: gateway_api.clone(),
646                        contract: contract.clone(),
647                        invoice: LightningInvoice::Bolt11(invoice.clone()),
648                        custom_meta: custom_meta.clone(),
649                    })
650                },
651                transaction,
652            )
653            .await
654            .map_err(|e| SendPaymentError::FailedToFundPayment(e.to_string()))?;
655
656        let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
657
658        self.client_ctx
659            .log_event(
660                &mut dbtx,
661                SendPaymentEvent {
662                    operation_id,
663                    amount: send_fee.add_to(amount),
664                    fee: send_fee.fee(amount),
665                },
666            )
667            .await;
668
669        dbtx.commit_tx().await;
670
671        Ok(operation_id)
672    }
673
674    async fn get_next_operation_id(
675        &self,
676        invoice: &Bolt11Invoice,
677    ) -> Result<OperationId, SendPaymentError> {
678        for payment_attempt in 0..u64::MAX {
679            let operation_id = OperationId::from_encodable(&(invoice.clone(), payment_attempt));
680
681            if !self.client_ctx.operation_exists(operation_id).await {
682                return Ok(operation_id);
683            }
684
685            if self.client_ctx.has_active_states(operation_id).await {
686                return Err(SendPaymentError::PaymentInProgress(operation_id));
687            }
688
689            let mut stream = self
690                .subscribe_send_operation_state_updates(operation_id)
691                .await
692                .expect("operation_id exists")
693                .into_stream();
694
695            // This will not block since we checked for active states and there were none,
696            // so by definition a final state has to have been assumed already.
697            while let Some(state) = stream.next().await {
698                if let SendOperationState::Success(_) = state {
699                    return Err(SendPaymentError::InvoiceAlreadyPaid(operation_id));
700                }
701            }
702        }
703
704        panic!("We could not find an unused operation id for sending a lightning payment");
705    }
706
707    /// Subscribe to all state updates of the send operation.
708    pub async fn subscribe_send_operation_state_updates(
709        &self,
710        operation_id: OperationId,
711    ) -> anyhow::Result<UpdateStreamOrOutcome<SendOperationState>> {
712        let operation = self.client_ctx.get_operation(operation_id).await?;
713        let mut stream = self.notifier.subscribe(operation_id).await;
714        let client_ctx = self.client_ctx.clone();
715        let module_api = self.module_api.clone();
716
717        Ok(self.client_ctx.outcome_or_updates(operation, operation_id, move || {
718            stream! {
719                loop {
720                    if let Some(LightningClientStateMachines::Send(state)) = stream.next().await {
721                        match state.state {
722                            SendSMState::Funding => yield SendOperationState::Funding,
723                            SendSMState::Funded => yield SendOperationState::Funded,
724                            SendSMState::Success(preimage) => {
725                                // the preimage has been verified by the state machine previously
726                                assert!(state.common.contract.verify_preimage(&preimage));
727
728                                yield SendOperationState::Success(preimage);
729                                return;
730                            },
731                            SendSMState::Refunding(out_points) => {
732                                yield SendOperationState::Refunding;
733
734                                if client_ctx.await_primary_module_outputs(operation_id, out_points.clone()).await.is_ok() {
735                                    yield SendOperationState::Refunded;
736                                    return;
737                                }
738
739                                // The gateway may have incorrectly claimed the outgoing contract thereby causing
740                                // our refund transaction to be rejected. Therefore, we check one last time if
741                                // the preimage is available before we enter the failure state.
742                                if let Some(preimage) = module_api.await_preimage(
743                                    state.common.outpoint,
744                                    0
745                                ).await
746                                    && state.common.contract.verify_preimage(&preimage) {
747                                        yield SendOperationState::Success(preimage);
748                                        return;
749                                    }
750
751                                yield SendOperationState::Failure;
752                                return;
753                            },
754                            SendSMState::Rejected(..) => {
755                                yield SendOperationState::Failure;
756                                return;
757                            },
758                        }
759                    }
760                }
761            }
762        }))
763    }
764
765    /// Await the final state of the send operation.
766    pub async fn await_final_send_operation_state(
767        &self,
768        operation_id: OperationId,
769    ) -> anyhow::Result<FinalSendOperationState> {
770        let mut stream = self
771            .subscribe_send_operation_state_updates(operation_id)
772            .await?
773            .into_stream();
774
775        let mut final_state = None;
776
777        while let Some(state) = stream.next().await {
778            match state {
779                SendOperationState::Success(preimage) => {
780                    final_state = Some(FinalSendOperationState::Success(preimage));
781                }
782                SendOperationState::Refunded => {
783                    final_state = Some(FinalSendOperationState::Refunded);
784                }
785                SendOperationState::Failure => final_state = Some(FinalSendOperationState::Failure),
786                _ => {}
787            }
788        }
789
790        Ok(final_state.expect("Stream contains one final state"))
791    }
792
793    /// Request an invoice. For testing you can optionally specify a gateway to
794    /// generate the invoice, otherwise a random online gateway will be selected
795    /// automatically.
796    ///
797    /// The total fee for this payment may depend on the chosen gateway but
798    /// will be limited to half of one percent plus fifty satoshis. Since the
799    /// selected gateway has been vetted by at least one guardian we trust it to
800    /// set a reasonable fee and only enforce a rather high limit.
801    ///
802    /// The absolute fee for a payment can be calculated from the operation meta
803    /// to be shown to the user in the transaction history.
804    pub async fn receive(
805        &self,
806        amount: Amount,
807        expiry_secs: u32,
808        description: Bolt11InvoiceDescription,
809        gateway: Option<SafeUrl>,
810        custom_meta: Value,
811    ) -> Result<(Bolt11Invoice, OperationId), ReceiveError> {
812        let (gateway, contract, invoice) = self
813            .create_contract_and_fetch_invoice(
814                self.keypair.public_key(),
815                amount,
816                expiry_secs,
817                description,
818                gateway,
819            )
820            .await?;
821
822        let operation_id = self
823            .receive_incoming_contract(
824                self.keypair.secret_key(),
825                contract.clone(),
826                LightningOperationMeta::Receive(ReceiveOperationMeta {
827                    gateway,
828                    contract,
829                    invoice: LightningInvoice::Bolt11(invoice.clone()),
830                    custom_meta,
831                }),
832            )
833            .await
834            .expect("The contract has been generated with our public key");
835
836        Ok((invoice, operation_id))
837    }
838
839    /// Create an incoming contract locked to a public key derived from the
840    /// recipient's static module public key and fetches the corresponding
841    /// invoice.
842    async fn create_contract_and_fetch_invoice(
843        &self,
844        recipient_static_pk: PublicKey,
845        amount: Amount,
846        expiry_secs: u32,
847        description: Bolt11InvoiceDescription,
848        gateway: Option<SafeUrl>,
849    ) -> Result<(SafeUrl, IncomingContract, Bolt11Invoice), ReceiveError> {
850        let (ephemeral_tweak, ephemeral_pk) = tweak::generate(recipient_static_pk);
851
852        let encryption_seed = ephemeral_tweak
853            .consensus_hash::<sha256::Hash>()
854            .to_byte_array();
855
856        let preimage = encryption_seed
857            .consensus_hash::<sha256::Hash>()
858            .to_byte_array();
859
860        let (gateway, routing_info) = match gateway {
861            Some(gateway) => (
862                gateway.clone(),
863                self.routing_info(&gateway)
864                    .await
865                    .map_err(|e| ReceiveError::FailedToConnectToGateway(e.to_string()))?
866                    .ok_or(ReceiveError::FederationNotSupported)?,
867            ),
868            None => self
869                .select_gateway(None)
870                .await
871                .map_err(ReceiveError::SelectGateway)?,
872        };
873
874        if !routing_info.receive_fee.le(&PaymentFee::RECEIVE_FEE_LIMIT) {
875            return Err(ReceiveError::GatewayFeeExceedsLimit);
876        }
877
878        let contract_amount = routing_info.receive_fee.subtract_from(amount.msats);
879
880        if contract_amount < MINIMUM_INCOMING_CONTRACT_AMOUNT {
881            return Err(ReceiveError::AmountTooSmall);
882        }
883
884        let expiration = duration_since_epoch()
885            .as_secs()
886            .saturating_add(u64::from(expiry_secs));
887
888        let claim_pk = recipient_static_pk
889            .mul_tweak(
890                secp256k1::SECP256K1,
891                &Scalar::from_be_bytes(ephemeral_tweak).expect("Within curve order"),
892            )
893            .expect("Tweak is valid");
894
895        let contract = IncomingContract::new(
896            self.cfg.tpe_agg_pk,
897            encryption_seed,
898            preimage,
899            PaymentImage::Hash(preimage.consensus_hash()),
900            contract_amount,
901            expiration,
902            claim_pk,
903            routing_info.module_public_key,
904            ephemeral_pk,
905        );
906
907        let invoice = self
908            .gateway_conn
909            .bolt11_invoice(
910                gateway.clone(),
911                self.federation_id,
912                contract.clone(),
913                amount,
914                description,
915                expiry_secs,
916            )
917            .await
918            .map_err(|e| ReceiveError::FailedToConnectToGateway(e.to_string()))?;
919
920        if invoice.payment_hash() != &preimage.consensus_hash() {
921            return Err(ReceiveError::InvalidInvoice);
922        }
923
924        if invoice.amount_milli_satoshis() != Some(amount.msats) {
925            return Err(ReceiveError::IncorrectInvoiceAmount);
926        }
927
928        Ok((gateway, contract, invoice))
929    }
930
931    // Receive an incoming contract locked to a public key derived from our
932    // static module public key.
933    async fn receive_incoming_contract(
934        &self,
935        sk: SecretKey,
936        contract: IncomingContract,
937        operation_meta: LightningOperationMeta,
938    ) -> Option<OperationId> {
939        let operation_id = OperationId::from_encodable(&contract.clone());
940
941        let (claim_keypair, agg_decryption_key) = self.recover_contract_keys(sk, &contract)?;
942
943        let receive_sm = LightningClientStateMachines::Receive(ReceiveStateMachine {
944            common: ReceiveSMCommon {
945                operation_id,
946                contract: contract.clone(),
947                claim_keypair,
948                agg_decryption_key,
949            },
950            state: ReceiveSMState::Pending,
951        });
952
953        // this may only fail if the operation id is already in use, in which case we
954        // ignore the error such that the method is idempotent
955        self.client_ctx
956            .manual_operation_start(
957                operation_id,
958                LightningCommonInit::KIND.as_str(),
959                operation_meta,
960                vec![self.client_ctx.make_dyn_state(receive_sm)],
961            )
962            .await
963            .ok();
964
965        Some(operation_id)
966    }
967
968    fn recover_contract_keys(
969        &self,
970        sk: SecretKey,
971        contract: &IncomingContract,
972    ) -> Option<(Keypair, AggregateDecryptionKey)> {
973        let tweak = ecdh::SharedSecret::new(&contract.commitment.ephemeral_pk, &sk);
974
975        let encryption_seed = tweak
976            .secret_bytes()
977            .consensus_hash::<sha256::Hash>()
978            .to_byte_array();
979
980        let claim_keypair = sk
981            .mul_tweak(&Scalar::from_be_bytes(tweak.secret_bytes()).expect("Within curve order"))
982            .expect("Tweak is valid")
983            .keypair(secp256k1::SECP256K1);
984
985        if claim_keypair.public_key() != contract.commitment.claim_pk {
986            return None; // The claim key is not derived from our pk
987        }
988
989        let agg_decryption_key = derive_agg_dk(&self.cfg.tpe_agg_pk, &encryption_seed);
990
991        if !contract.verify_agg_decryption_key(&self.cfg.tpe_agg_pk, &agg_decryption_key) {
992            return None; // The decryption key is not derived from our pk
993        }
994
995        contract.decrypt_preimage(&agg_decryption_key)?;
996
997        Some((claim_keypair, agg_decryption_key))
998    }
999
1000    /// Subscribe to all state updates of the receive operation.
1001    pub async fn subscribe_receive_operation_state_updates(
1002        &self,
1003        operation_id: OperationId,
1004    ) -> anyhow::Result<UpdateStreamOrOutcome<ReceiveOperationState>> {
1005        let operation = self.client_ctx.get_operation(operation_id).await?;
1006        let mut stream = self.notifier.subscribe(operation_id).await;
1007        let client_ctx = self.client_ctx.clone();
1008
1009        Ok(self.client_ctx.outcome_or_updates(operation, operation_id, move || {
1010            stream! {
1011                loop {
1012                    if let Some(LightningClientStateMachines::Receive(state)) = stream.next().await {
1013                        match state.state {
1014                            ReceiveSMState::Pending => yield ReceiveOperationState::Pending,
1015                            ReceiveSMState::Claiming(out_points) => {
1016                                yield ReceiveOperationState::Claiming;
1017
1018                                if client_ctx.await_primary_module_outputs(operation_id, out_points).await.is_ok() {
1019                                    yield ReceiveOperationState::Claimed;
1020                                } else {
1021                                    yield ReceiveOperationState::Failure;
1022                                }
1023                                return;
1024                            },
1025                            ReceiveSMState::Expired => {
1026                                yield ReceiveOperationState::Expired;
1027                                return;
1028                            }
1029                        }
1030                    }
1031                }
1032            }
1033        }))
1034    }
1035
1036    /// Await the final state of the receive operation.
1037    pub async fn await_final_receive_operation_state(
1038        &self,
1039        operation_id: OperationId,
1040    ) -> anyhow::Result<FinalReceiveOperationState> {
1041        let mut stream = self
1042            .subscribe_receive_operation_state_updates(operation_id)
1043            .await?
1044            .into_stream();
1045
1046        let mut final_state = None;
1047
1048        while let Some(state) = stream.next().await {
1049            match state {
1050                ReceiveOperationState::Expired => {
1051                    final_state = Some(FinalReceiveOperationState::Expired);
1052                }
1053                ReceiveOperationState::Claimed => {
1054                    final_state = Some(FinalReceiveOperationState::Claimed);
1055                }
1056                ReceiveOperationState::Failure => {
1057                    final_state = Some(FinalReceiveOperationState::Failure);
1058                }
1059                _ => {}
1060            }
1061        }
1062
1063        Ok(final_state.expect("Stream contains one final state"))
1064    }
1065
1066    /// Generate an lnurl for the client. You can optionally specify a gateway
1067    /// to use for testing purposes.
1068    pub async fn generate_lnurl(
1069        &self,
1070        recurringd: SafeUrl,
1071        gateway: Option<SafeUrl>,
1072    ) -> Result<String, GenerateLnurlError> {
1073        let gateways = if let Some(gateway) = gateway {
1074            vec![gateway]
1075        } else {
1076            let gateways = self
1077                .module_api
1078                .gateways()
1079                .await
1080                .map_err(|e| GenerateLnurlError::FailedToRequestGateways(e.to_string()))?;
1081
1082            if gateways.is_empty() {
1083                return Err(GenerateLnurlError::NoGatewaysAvailable);
1084            }
1085
1086            gateways
1087        };
1088
1089        let payload = fedimint_core::base32::encode_prefixed(
1090            fedimint_core::base32::FEDIMINT_PREFIX,
1091            &lnurl::LnurlRequest {
1092                federation_id: self.federation_id,
1093                recipient_pk: self.lnurl_keypair.public_key(),
1094                aggregate_pk: self.cfg.tpe_agg_pk,
1095                gateways,
1096            },
1097        );
1098
1099        Ok(fedimint_lnurl::encode_lnurl(&format!(
1100            "{recurringd}pay/{payload}"
1101        )))
1102    }
1103
1104    fn spawn_receive_lnurl_task(
1105        &self,
1106        custom_meta_fn: Arc<dyn Fn() -> Value + Send + Sync>,
1107        task_group: &TaskGroup,
1108        client_span: &tracing::Span,
1109    ) {
1110        let module = self.clone();
1111        let api = self.module_api.clone();
1112
1113        task_group.spawn_cancellable_with_span(
1114            client_span.clone(),
1115            "receive_lnurl_task",
1116            async move {
1117                api.wait_for_initialized_connections().await;
1118                loop {
1119                    module.receive_lnurl(custom_meta_fn()).await;
1120                }
1121            },
1122        );
1123    }
1124
1125    async fn receive_lnurl(&self, custom_meta: Value) {
1126        let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
1127
1128        let stream_index = dbtx
1129            .get_value(&IncomingContractStreamIndexKey)
1130            .await
1131            .unwrap_or(0);
1132
1133        let (contracts, next_index) = self
1134            .module_api
1135            .await_incoming_contracts(stream_index, 128)
1136            .await;
1137
1138        for contract in &contracts {
1139            if let Some(operation_id) = self
1140                .receive_incoming_contract(
1141                    self.lnurl_keypair.secret_key(),
1142                    contract.clone(),
1143                    LightningOperationMeta::LnurlReceive(LnurlReceiveOperationMeta {
1144                        contract: contract.clone(),
1145                        custom_meta: custom_meta.clone(),
1146                    }),
1147                )
1148                .await
1149            {
1150                self.await_final_receive_operation_state(operation_id)
1151                    .await
1152                    .ok();
1153            }
1154        }
1155
1156        dbtx.insert_entry(&IncomingContractStreamIndexKey, &next_index)
1157            .await;
1158
1159        dbtx.commit_tx().await;
1160    }
1161}
1162
1163#[derive(Error, Debug, Clone, Eq, PartialEq)]
1164pub enum SelectGatewayError {
1165    #[error("Failed to request gateways")]
1166    FailedToRequestGateways(String),
1167    #[error("No gateways are available")]
1168    NoGatewaysAvailable,
1169    #[error("All gateways failed to respond")]
1170    GatewaysUnresponsive,
1171}
1172
1173#[derive(Error, Debug, Clone, Eq, PartialEq)]
1174pub enum SendPaymentError {
1175    #[error("Invoice is missing an amount")]
1176    InvoiceMissingAmount,
1177    #[error("Invoice has expired")]
1178    InvoiceExpired,
1179    #[error("A payment for this invoice is already in progress")]
1180    PaymentInProgress(OperationId),
1181    #[error("This invoice has already been paid")]
1182    InvoiceAlreadyPaid(OperationId),
1183    #[error(transparent)]
1184    SelectGateway(SelectGatewayError),
1185    #[error("Failed to connect to gateway")]
1186    FailedToConnectToGateway(String),
1187    #[error("Gateway does not support this federation")]
1188    FederationNotSupported,
1189    #[error("Gateway fee exceeds the allowed limit")]
1190    GatewayFeeExceedsLimit,
1191    #[error("Gateway expiration time exceeds the allowed limit")]
1192    GatewayExpirationExceedsLimit,
1193    #[error("Failed to request block count")]
1194    FailedToRequestBlockCount(String),
1195    #[error("Failed to fund the payment")]
1196    FailedToFundPayment(String),
1197    #[error("Invoice is for a different currency")]
1198    WrongCurrency {
1199        invoice_currency: Currency,
1200        federation_currency: Currency,
1201    },
1202}
1203
1204#[derive(Error, Debug, Clone, Eq, PartialEq)]
1205pub enum ReceiveError {
1206    #[error(transparent)]
1207    SelectGateway(SelectGatewayError),
1208    #[error("Failed to connect to gateway")]
1209    FailedToConnectToGateway(String),
1210    #[error("Gateway does not support this federation")]
1211    FederationNotSupported,
1212    #[error("Gateway fee exceeds the allowed limit")]
1213    GatewayFeeExceedsLimit,
1214    #[error("Amount is too small to cover fees")]
1215    AmountTooSmall,
1216    #[error("Gateway returned an invalid invoice")]
1217    InvalidInvoice,
1218    #[error("Gateway returned an invoice with incorrect amount")]
1219    IncorrectInvoiceAmount,
1220}
1221
1222#[derive(Error, Debug, Clone, Eq, PartialEq)]
1223pub enum GenerateLnurlError {
1224    #[error("No gateways are available")]
1225    NoGatewaysAvailable,
1226    #[error("Failed to request gateways")]
1227    FailedToRequestGateways(String),
1228}
1229
1230#[derive(Error, Debug, Clone, Eq, PartialEq)]
1231pub enum ListGatewaysError {
1232    #[error("Failed to request gateways")]
1233    FailedToListGateways,
1234}
1235
1236#[derive(Error, Debug, Clone, Eq, PartialEq)]
1237pub enum RoutingInfoError {
1238    #[error("Failed to request routing info")]
1239    FailedToRequestRoutingInfo,
1240}
1241
1242#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
1243pub enum LightningClientStateMachines {
1244    Send(SendStateMachine),
1245    Receive(ReceiveStateMachine),
1246}
1247
1248impl IntoDynInstance for LightningClientStateMachines {
1249    type DynType = DynState;
1250
1251    fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
1252        DynState::from_typed(instance_id, self)
1253    }
1254}
1255
1256impl State for LightningClientStateMachines {
1257    type ModuleContext = LightningClientContext;
1258
1259    fn transitions(
1260        &self,
1261        context: &Self::ModuleContext,
1262        global_context: &DynGlobalClientContext,
1263    ) -> Vec<StateTransition<Self>> {
1264        match self {
1265            LightningClientStateMachines::Send(state) => {
1266                sm_enum_variant_translation!(
1267                    state.transitions(context, global_context),
1268                    LightningClientStateMachines::Send
1269                )
1270            }
1271            LightningClientStateMachines::Receive(state) => {
1272                sm_enum_variant_translation!(
1273                    state.transitions(context, global_context),
1274                    LightningClientStateMachines::Receive
1275                )
1276            }
1277        }
1278    }
1279
1280    fn operation_id(&self) -> OperationId {
1281        match self {
1282            LightningClientStateMachines::Send(state) => state.operation_id(),
1283            LightningClientStateMachines::Receive(state) => state.operation_id(),
1284        }
1285    }
1286}