Skip to main content

fedimint_lnv2_client/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::missing_errors_doc)]
3#![allow(clippy::missing_panics_doc)]
4#![allow(clippy::module_name_repetitions)]
5#![allow(clippy::must_use_candidate)]
6
7pub use fedimint_lnv2_common as common;
8
9mod api;
10#[cfg(feature = "cli")]
11mod cli;
12mod db;
13pub mod events;
14mod receive_sm;
15mod send_sm;
16
17use std::collections::{BTreeMap, BTreeSet};
18use std::sync::Arc;
19
20use async_stream::stream;
21use bitcoin::hashes::{Hash, sha256};
22use bitcoin::secp256k1;
23use db::{DbKeyPrefix, GatewayKey, IncomingContractStreamIndexKey};
24use fedimint_api_client::api::DynModuleApi;
25use fedimint_client_module::module::init::{ClientModuleInit, ClientModuleInitArgs};
26use fedimint_client_module::module::recovery::NoModuleBackup;
27use fedimint_client_module::module::{ClientContext, ClientModule, OutPointRange};
28use fedimint_client_module::oplog::UpdateStreamOrOutcome;
29use fedimint_client_module::sm::{Context, DynState, ModuleNotifier, State, StateTransition};
30use fedimint_client_module::transaction::{
31    ClientOutput, ClientOutputBundle, ClientOutputSM, TransactionBuilder,
32};
33use fedimint_client_module::{DynGlobalClientContext, sm_enum_variant_translation};
34use fedimint_core::config::FederationId;
35use fedimint_core::core::{IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId};
36use fedimint_core::db::{DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
37use fedimint_core::encoding::{Decodable, Encodable};
38use fedimint_core::module::{
39    Amounts, ApiAuth, ApiVersion, CommonModuleInit, ModuleCommon, ModuleInit, MultiApiVersion,
40};
41use fedimint_core::secp256k1::SECP256K1;
42use fedimint_core::task::TaskGroup;
43use fedimint_core::time::duration_since_epoch;
44use fedimint_core::util::SafeUrl;
45use fedimint_core::{Amount, PeerId, apply, async_trait_maybe_send};
46use fedimint_derive_secret::{ChildId, DerivableSecret};
47use fedimint_lnv2_common::config::LightningClientConfig;
48use fedimint_lnv2_common::contracts::{IncomingContract, OutgoingContract, PaymentImage};
49use fedimint_lnv2_common::gateway_api::{
50    GatewayConnection, PaymentFee, RealGatewayConnection, RoutingInfo,
51};
52use fedimint_lnv2_common::{
53    Bolt11InvoiceDescription, GatewayApi, KIND, LightningCommonInit, LightningInvoice,
54    LightningModuleTypes, LightningOutput, LightningOutputV0, MINIMUM_INCOMING_CONTRACT_AMOUNT,
55    lnurl, tweak,
56};
57use futures::StreamExt;
58use lightning_invoice::{Bolt11Invoice, Currency};
59use secp256k1::{Keypair, PublicKey, Scalar, SecretKey, ecdh};
60use serde::{Deserialize, Serialize};
61use serde_json::Value;
62use strum::IntoEnumIterator as _;
63use thiserror::Error;
64use tpe::{AggregateDecryptionKey, derive_agg_dk};
65use tracing::warn;
66
67use crate::api::LightningFederationApi;
68use crate::events::SendPaymentEvent;
69use crate::receive_sm::{ReceiveSMCommon, ReceiveSMState, ReceiveStateMachine};
70use crate::send_sm::{SendSMCommon, SendSMState, SendStateMachine};
71
72/// Number of blocks until outgoing lightning contracts times out and user
73/// client can refund it unilaterally
74const EXPIRATION_DELTA_LIMIT: u64 = 1440;
75
76/// A two hour buffer in case either the client or gateway go offline
77const CONTRACT_CONFIRMATION_BUFFER: u64 = 12;
78
79#[allow(clippy::large_enum_variant)]
80#[derive(Debug, Clone, Serialize, Deserialize)]
81pub enum LightningOperationMeta {
82    Send(SendOperationMeta),
83    Receive(ReceiveOperationMeta),
84    LnurlReceive(LnurlReceiveOperationMeta),
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct SendOperationMeta {
89    pub change_outpoint_range: OutPointRange,
90    pub gateway: SafeUrl,
91    pub contract: OutgoingContract,
92    pub invoice: LightningInvoice,
93    pub custom_meta: Value,
94}
95
96impl SendOperationMeta {
97    /// Calculate the absolute fee paid to the gateway on success.
98    pub fn gateway_fee(&self) -> Amount {
99        match &self.invoice {
100            LightningInvoice::Bolt11(invoice) => self.contract.amount.saturating_sub(
101                Amount::from_msats(invoice.amount_milli_satoshis().expect("Invoice has amount")),
102            ),
103        }
104    }
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct ReceiveOperationMeta {
109    pub gateway: SafeUrl,
110    pub contract: IncomingContract,
111    pub invoice: LightningInvoice,
112    pub custom_meta: Value,
113}
114
115impl ReceiveOperationMeta {
116    /// Calculate the absolute fee paid to the gateway on success.
117    pub fn gateway_fee(&self) -> Amount {
118        match &self.invoice {
119            LightningInvoice::Bolt11(invoice) => {
120                Amount::from_msats(invoice.amount_milli_satoshis().expect("Invoice has amount"))
121                    .saturating_sub(self.contract.commitment.amount)
122            }
123        }
124    }
125}
126
127#[derive(Debug, Clone, Serialize, Deserialize)]
128pub struct LnurlReceiveOperationMeta {
129    pub contract: IncomingContract,
130    pub custom_meta: Value,
131}
132
133#[cfg_attr(doc, aquamarine::aquamarine)]
134/// The state of an operation sending a payment over lightning.
135///
136/// ```mermaid
137/// graph LR
138/// classDef virtual fill:#fff,stroke-dasharray: 5 5
139///
140///     Funding -- funding transaction is rejected --> Rejected
141///     Funding -- funding transaction is accepted --> Funded
142///     Funded -- payment is confirmed  --> Success
143///     Funded -- payment attempt expires --> Refunding
144///     Funded -- gateway cancels payment attempt --> Refunding
145///     Refunding -- payment is confirmed --> Success
146///     Refunding -- ecash is minted --> Refunded
147///     Refunding -- minting ecash fails --> Failure
148/// ```
149/// The transition from Refunding to Success is only possible if the gateway
150/// misbehaves.
151#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
152pub enum SendOperationState {
153    /// We are funding the contract to incentivize the gateway.
154    Funding,
155    /// We are waiting for the gateway to complete the payment.
156    Funded,
157    /// The payment was successful.
158    Success([u8; 32]),
159    /// The payment has failed and we are refunding the contract.
160    Refunding,
161    /// The payment has been refunded.
162    Refunded,
163    /// Either a programming error has occurred or the federation is malicious.
164    Failure,
165}
166
167/// The final state of an operation sending a payment over lightning.
168#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
169pub enum FinalSendOperationState {
170    /// The payment was successful.
171    Success,
172    /// The payment has been refunded.
173    Refunded,
174    /// Either a programming error has occurred or the federation is malicious.
175    Failure,
176}
177
178pub type SendResult = Result<OperationId, SendPaymentError>;
179
180#[cfg_attr(doc, aquamarine::aquamarine)]
181/// The state of an operation receiving a payment over lightning.
182///
183/// ```mermaid
184/// graph LR
185/// classDef virtual fill:#fff,stroke-dasharray: 5 5
186///
187///     Pending -- payment is confirmed --> Claiming
188///     Pending -- invoice expires --> Expired
189///     Claiming -- ecash is minted --> Claimed
190///     Claiming -- minting ecash fails --> Failure
191/// ```
192#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
193pub enum ReceiveOperationState {
194    /// We are waiting for the payment.
195    Pending,
196    /// The payment request has expired.
197    Expired,
198    /// The payment has been confirmed and we are issuing the ecash.
199    Claiming,
200    /// The payment has been successful.
201    Claimed,
202    /// Either a programming error has occurred or the federation is malicious.
203    Failure,
204}
205
206/// The final state of an operation receiving a payment over lightning.
207#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
208pub enum FinalReceiveOperationState {
209    /// The payment request has expired.
210    Expired,
211    /// The payment has been successful.
212    Claimed,
213    /// Either a programming error has occurred or the federation is malicious.
214    Failure,
215}
216
217pub type ReceiveResult = Result<(Bolt11Invoice, OperationId), ReceiveError>;
218
219#[derive(Clone)]
220pub struct LightningClientInit {
221    pub gateway_conn: Option<Arc<dyn GatewayConnection + Send + Sync>>,
222    pub custom_meta_fn: Arc<dyn Fn() -> Value + Send + Sync>,
223}
224
225impl std::fmt::Debug for LightningClientInit {
226    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
227        f.debug_struct("LightningClientInit")
228            .field("gateway_conn", &self.gateway_conn)
229            .field("custom_meta_fn", &"<function>")
230            .finish()
231    }
232}
233
234impl Default for LightningClientInit {
235    fn default() -> Self {
236        LightningClientInit {
237            gateway_conn: None,
238            custom_meta_fn: Arc::new(|| Value::Null),
239        }
240    }
241}
242
243impl ModuleInit for LightningClientInit {
244    type Common = LightningCommonInit;
245
246    async fn dump_database(
247        &self,
248        _dbtx: &mut DatabaseTransaction<'_>,
249        _prefix_names: Vec<String>,
250    ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
251        Box::new(BTreeMap::new().into_iter())
252    }
253}
254
255#[apply(async_trait_maybe_send!)]
256impl ClientModuleInit for LightningClientInit {
257    type Module = LightningClientModule;
258
259    fn supported_api_versions(&self) -> MultiApiVersion {
260        MultiApiVersion::try_from_iter([ApiVersion { major: 0, minor: 0 }])
261            .expect("no version conflicts")
262    }
263
264    async fn init(&self, args: &ClientModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
265        let gateway_conn = if let Some(gateway_conn) = self.gateway_conn.clone() {
266            gateway_conn
267        } else {
268            let api = GatewayApi::new(None, args.connector_registry.clone());
269            Arc::new(RealGatewayConnection { api })
270        };
271        Ok(LightningClientModule::new(
272            *args.federation_id(),
273            args.cfg().clone(),
274            args.notifier().clone(),
275            args.context(),
276            args.module_api().clone(),
277            args.module_root_secret(),
278            gateway_conn,
279            self.custom_meta_fn.clone(),
280            args.admin_auth().cloned(),
281            args.task_group(),
282            args.client_span(),
283        ))
284    }
285
286    fn used_db_prefixes(&self) -> Option<BTreeSet<u8>> {
287        Some(
288            DbKeyPrefix::iter()
289                .map(|p| p as u8)
290                .chain(
291                    DbKeyPrefix::ExternalReservedStart as u8
292                        ..=DbKeyPrefix::CoreInternalReservedEnd as u8,
293                )
294                .collect(),
295        )
296    }
297}
298
299#[derive(Debug, Clone)]
300pub struct LightningClientContext {
301    federation_id: FederationId,
302    gateway_conn: Arc<dyn GatewayConnection + Send + Sync>,
303    pub(crate) client_ctx: ClientContext<LightningClientModule>,
304}
305
306impl Context for LightningClientContext {
307    const KIND: Option<ModuleKind> = Some(KIND);
308}
309
310#[derive(Debug, Clone)]
311pub struct LightningClientModule {
312    federation_id: FederationId,
313    cfg: LightningClientConfig,
314    notifier: ModuleNotifier<LightningClientStateMachines>,
315    client_ctx: ClientContext<Self>,
316    module_api: DynModuleApi,
317    keypair: Keypair,
318    lnurl_keypair: Keypair,
319    gateway_conn: Arc<dyn GatewayConnection + Send + Sync>,
320    #[allow(unused)] // The field is only used by the cli feature
321    admin_auth: Option<ApiAuth>,
322}
323
324#[apply(async_trait_maybe_send!)]
325impl ClientModule for LightningClientModule {
326    type Init = LightningClientInit;
327    type Common = LightningModuleTypes;
328    type Backup = NoModuleBackup;
329    type ModuleStateMachineContext = LightningClientContext;
330    type States = LightningClientStateMachines;
331
332    fn context(&self) -> Self::ModuleStateMachineContext {
333        LightningClientContext {
334            federation_id: self.federation_id,
335            gateway_conn: self.gateway_conn.clone(),
336            client_ctx: self.client_ctx.clone(),
337        }
338    }
339
340    fn input_fee(
341        &self,
342        amounts: &Amounts,
343        _input: &<Self::Common as ModuleCommon>::Input,
344    ) -> Option<Amounts> {
345        Some(Amounts::new_bitcoin(
346            self.cfg.fee_consensus.fee(amounts.expect_only_bitcoin()),
347        ))
348    }
349
350    fn output_fee(
351        &self,
352        amounts: &Amounts,
353        _output: &<Self::Common as ModuleCommon>::Output,
354    ) -> Option<Amounts> {
355        Some(Amounts::new_bitcoin(
356            self.cfg.fee_consensus.fee(amounts.expect_only_bitcoin()),
357        ))
358    }
359
360    #[cfg(feature = "cli")]
361    async fn handle_cli_command(
362        &self,
363        args: &[std::ffi::OsString],
364    ) -> anyhow::Result<serde_json::Value> {
365        cli::handle_cli_command(self, args).await
366    }
367}
368
369impl LightningClientModule {
370    #[allow(clippy::too_many_arguments)]
371    fn new(
372        federation_id: FederationId,
373        cfg: LightningClientConfig,
374        notifier: ModuleNotifier<LightningClientStateMachines>,
375        client_ctx: ClientContext<Self>,
376        module_api: DynModuleApi,
377        module_root_secret: &DerivableSecret,
378        gateway_conn: Arc<dyn GatewayConnection + Send + Sync>,
379        custom_meta_fn: Arc<dyn Fn() -> Value + Send + Sync>,
380        admin_auth: Option<ApiAuth>,
381        task_group: &TaskGroup,
382        client_span: &tracing::Span,
383    ) -> Self {
384        let module = Self {
385            federation_id,
386            cfg,
387            notifier,
388            client_ctx,
389            module_api,
390            keypair: module_root_secret
391                .child_key(ChildId(0))
392                .to_secp_key(SECP256K1),
393            lnurl_keypair: module_root_secret
394                .child_key(ChildId(1))
395                .to_secp_key(SECP256K1),
396            gateway_conn,
397            admin_auth,
398        };
399
400        module.spawn_receive_lnurl_task(custom_meta_fn, task_group, client_span);
401
402        module.spawn_gateway_map_update_task(task_group, client_span);
403
404        module
405    }
406
407    fn spawn_gateway_map_update_task(&self, task_group: &TaskGroup, client_span: &tracing::Span) {
408        let module = self.clone();
409        let api = self.module_api.clone();
410
411        task_group.spawn_cancellable_with_span(
412            client_span.clone(),
413            "gateway_map_update_task",
414            async move {
415                api.wait_for_initialized_connections().await;
416                module.update_gateway_map().await;
417            },
418        );
419    }
420
421    async fn update_gateway_map(&self) {
422        // Update the mapping from lightning node public keys to gateway api
423        // endpoints maintained in the module database. When paying an invoice this
424        // enables the client to select the gateway that has created the invoice,
425        // if possible, such that the payment does not go over lightning, reducing
426        // fees and latency.
427
428        if let Ok(gateways) = self.module_api.gateways().await {
429            let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
430
431            for gateway in gateways {
432                if let Ok(Some(routing_info)) = self
433                    .gateway_conn
434                    .routing_info(gateway.clone(), &self.federation_id)
435                    .await
436                {
437                    dbtx.insert_entry(&GatewayKey(routing_info.lightning_public_key), &gateway)
438                        .await;
439                }
440            }
441
442            if let Err(e) = dbtx.commit_tx_result().await {
443                warn!("Failed to commit the updated gateway mapping to the database: {e}");
444            }
445        }
446    }
447
448    /// Selects an available gateway by querying the federation's registered
449    /// gateways, checking if one of them match the invoice's payee public
450    /// key, then queries the gateway for `RoutingInfo` to determine if it is
451    /// online.
452    pub async fn select_gateway(
453        &self,
454        invoice: Option<Bolt11Invoice>,
455    ) -> Result<(SafeUrl, RoutingInfo), SelectGatewayError> {
456        let gateways = self
457            .module_api
458            .gateways()
459            .await
460            .map_err(|e| SelectGatewayError::FailedToRequestGateways(e.to_string()))?;
461
462        if gateways.is_empty() {
463            return Err(SelectGatewayError::NoGatewaysAvailable);
464        }
465
466        if let Some(invoice) = invoice
467            && let Some(gateway) = self
468                .client_ctx
469                .module_db()
470                .begin_transaction_nc()
471                .await
472                .get_value(&GatewayKey(invoice.recover_payee_pub_key()))
473                .await
474                .filter(|gateway| gateways.contains(gateway))
475            && let Ok(Some(routing_info)) = self.routing_info(&gateway).await
476        {
477            return Ok((gateway, routing_info));
478        }
479
480        for gateway in gateways {
481            if let Ok(Some(routing_info)) = self.routing_info(&gateway).await {
482                return Ok((gateway, routing_info));
483            }
484        }
485
486        Err(SelectGatewayError::GatewaysUnresponsive)
487    }
488
489    /// Sends a request to each peer for their registered gateway list and
490    /// returns a `Vec<SafeUrl` of all registered gateways to the client.
491    pub async fn list_gateways(
492        &self,
493        peer: Option<PeerId>,
494    ) -> Result<Vec<SafeUrl>, ListGatewaysError> {
495        if let Some(peer) = peer {
496            self.module_api
497                .gateways_from_peer(peer)
498                .await
499                .map_err(|_| ListGatewaysError::FailedToListGateways)
500        } else {
501            self.module_api
502                .gateways()
503                .await
504                .map_err(|_| ListGatewaysError::FailedToListGateways)
505        }
506    }
507
508    /// Requests the `RoutingInfo`, including fee information, from the gateway
509    /// available at the `SafeUrl`.
510    pub async fn routing_info(
511        &self,
512        gateway: &SafeUrl,
513    ) -> Result<Option<RoutingInfo>, RoutingInfoError> {
514        self.gateway_conn
515            .routing_info(gateway.clone(), &self.federation_id)
516            .await
517            .map_err(|_| RoutingInfoError::FailedToRequestRoutingInfo)
518    }
519
520    /// Pay an invoice. For testing you can optionally specify a gateway to
521    /// route with, otherwise a gateway will be selected automatically. If the
522    /// invoice was created by a gateway connected to our federation, the same
523    /// gateway will be selected to allow for a direct ecash swap. Otherwise we
524    /// select a random online gateway.
525    ///
526    /// The fee for this payment may depend on the selected gateway but
527    /// will be limited to one and a half percent plus one hundred satoshis.
528    /// This fee accounts for the fee charged by the gateway as well as
529    /// the additional fee required to reliably route this payment over
530    /// lightning if necessary. Since the gateway has been vetted by at least
531    /// one guardian we trust it to set a reasonable fee and only enforce a
532    /// rather high limit.
533    ///
534    /// The absolute fee for a payment can be calculated from the operation meta
535    /// to be shown to the user in the transaction history.
536    #[allow(clippy::too_many_lines)]
537    pub async fn send(
538        &self,
539        invoice: Bolt11Invoice,
540        gateway: Option<SafeUrl>,
541        custom_meta: Value,
542    ) -> Result<OperationId, SendPaymentError> {
543        let amount = invoice
544            .amount_milli_satoshis()
545            .ok_or(SendPaymentError::InvoiceMissingAmount)?;
546
547        if invoice.is_expired() {
548            return Err(SendPaymentError::InvoiceExpired);
549        }
550
551        if self.cfg.network != invoice.currency().into() {
552            return Err(SendPaymentError::WrongCurrency {
553                invoice_currency: invoice.currency(),
554                federation_currency: self.cfg.network.into(),
555            });
556        }
557
558        let operation_id = self.get_next_operation_id(&invoice).await?;
559
560        let (ephemeral_tweak, ephemeral_pk) = tweak::generate(self.keypair.public_key());
561
562        let refund_keypair = SecretKey::from_slice(&ephemeral_tweak)
563            .expect("32 bytes, within curve order")
564            .keypair(secp256k1::SECP256K1);
565
566        let (gateway_api, routing_info) = match gateway {
567            Some(gateway_api) => (
568                gateway_api.clone(),
569                self.routing_info(&gateway_api)
570                    .await
571                    .map_err(|e| SendPaymentError::FailedToConnectToGateway(e.to_string()))?
572                    .ok_or(SendPaymentError::FederationNotSupported)?,
573            ),
574            None => self
575                .select_gateway(Some(invoice.clone()))
576                .await
577                .map_err(SendPaymentError::SelectGateway)?,
578        };
579
580        let (send_fee, expiration_delta) = routing_info.send_parameters(&invoice);
581
582        if !send_fee.le(&PaymentFee::SEND_FEE_LIMIT) {
583            return Err(SendPaymentError::GatewayFeeExceedsLimit);
584        }
585
586        if EXPIRATION_DELTA_LIMIT < expiration_delta {
587            return Err(SendPaymentError::GatewayExpirationExceedsLimit);
588        }
589
590        let consensus_block_count = self
591            .module_api
592            .consensus_block_count()
593            .await
594            .map_err(|e| SendPaymentError::FailedToRequestBlockCount(e.to_string()))?;
595
596        let contract = OutgoingContract {
597            payment_image: PaymentImage::Hash(*invoice.payment_hash()),
598            amount: send_fee.add_to(amount),
599            expiration: consensus_block_count + expiration_delta + CONTRACT_CONFIRMATION_BUFFER,
600            claim_pk: routing_info.module_public_key,
601            refund_pk: refund_keypair.public_key(),
602            ephemeral_pk,
603        };
604
605        let contract_clone = contract.clone();
606        let gateway_api_clone = gateway_api.clone();
607        let invoice_clone = invoice.clone();
608
609        let client_output = ClientOutput::<LightningOutput> {
610            output: LightningOutput::V0(LightningOutputV0::Outgoing(contract.clone())),
611            amounts: Amounts::new_bitcoin(contract.amount),
612        };
613
614        let client_output_sm = ClientOutputSM::<LightningClientStateMachines> {
615            state_machines: Arc::new(move |range: OutPointRange| {
616                vec![LightningClientStateMachines::Send(SendStateMachine {
617                    common: SendSMCommon {
618                        operation_id,
619                        outpoint: range.into_iter().next().unwrap(),
620                        contract: contract_clone.clone(),
621                        gateway_api: Some(gateway_api_clone.clone()),
622                        invoice: Some(LightningInvoice::Bolt11(invoice_clone.clone())),
623                        refund_keypair,
624                    },
625                    state: SendSMState::Funding,
626                })]
627            }),
628        };
629
630        let client_output = self.client_ctx.make_client_outputs(ClientOutputBundle::new(
631            vec![client_output],
632            vec![client_output_sm],
633        ));
634
635        let transaction = TransactionBuilder::new().with_outputs(client_output);
636
637        self.client_ctx
638            .finalize_and_submit_transaction(
639                operation_id,
640                LightningCommonInit::KIND.as_str(),
641                move |change_outpoint_range| {
642                    LightningOperationMeta::Send(SendOperationMeta {
643                        change_outpoint_range,
644                        gateway: gateway_api.clone(),
645                        contract: contract.clone(),
646                        invoice: LightningInvoice::Bolt11(invoice.clone()),
647                        custom_meta: custom_meta.clone(),
648                    })
649                },
650                transaction,
651            )
652            .await
653            .map_err(|e| SendPaymentError::FailedToFundPayment(e.to_string()))?;
654
655        let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
656
657        self.client_ctx
658            .log_event(
659                &mut dbtx,
660                SendPaymentEvent {
661                    operation_id,
662                    amount: send_fee.add_to(amount),
663                    fee: send_fee.fee(amount),
664                },
665            )
666            .await;
667
668        dbtx.commit_tx().await;
669
670        Ok(operation_id)
671    }
672
673    async fn get_next_operation_id(
674        &self,
675        invoice: &Bolt11Invoice,
676    ) -> Result<OperationId, SendPaymentError> {
677        for payment_attempt in 0..u64::MAX {
678            let operation_id = OperationId::from_encodable(&(invoice.clone(), payment_attempt));
679
680            if !self.client_ctx.operation_exists(operation_id).await {
681                return Ok(operation_id);
682            }
683
684            if self.client_ctx.has_active_states(operation_id).await {
685                return Err(SendPaymentError::PaymentInProgress(operation_id));
686            }
687
688            let mut stream = self
689                .subscribe_send_operation_state_updates(operation_id)
690                .await
691                .expect("operation_id exists")
692                .into_stream();
693
694            // This will not block since we checked for active states and there were none,
695            // so by definition a final state has to have been assumed already.
696            while let Some(state) = stream.next().await {
697                if let SendOperationState::Success(_) = state {
698                    return Err(SendPaymentError::InvoiceAlreadyPaid(operation_id));
699                }
700            }
701        }
702
703        panic!("We could not find an unused operation id for sending a lightning payment");
704    }
705
706    /// Subscribe to all state updates of the send operation.
707    pub async fn subscribe_send_operation_state_updates(
708        &self,
709        operation_id: OperationId,
710    ) -> anyhow::Result<UpdateStreamOrOutcome<SendOperationState>> {
711        let operation = self.client_ctx.get_operation(operation_id).await?;
712        let mut stream = self.notifier.subscribe(operation_id).await;
713        let client_ctx = self.client_ctx.clone();
714        let module_api = self.module_api.clone();
715
716        Ok(self.client_ctx.outcome_or_updates(operation, operation_id, move || {
717            stream! {
718                loop {
719                    if let Some(LightningClientStateMachines::Send(state)) = stream.next().await {
720                        match state.state {
721                            SendSMState::Funding => yield SendOperationState::Funding,
722                            SendSMState::Funded => yield SendOperationState::Funded,
723                            SendSMState::Success(preimage) => {
724                                // the preimage has been verified by the state machine previously
725                                assert!(state.common.contract.verify_preimage(&preimage));
726
727                                yield SendOperationState::Success(preimage);
728                                return;
729                            },
730                            SendSMState::Refunding(out_points) => {
731                                yield SendOperationState::Refunding;
732
733                                if client_ctx.await_primary_module_outputs(operation_id, out_points.clone()).await.is_ok() {
734                                    yield SendOperationState::Refunded;
735                                    return;
736                                }
737
738                                // The gateway may have incorrectly claimed the outgoing contract thereby causing
739                                // our refund transaction to be rejected. Therefore, we check one last time if
740                                // the preimage is available before we enter the failure state.
741                                if let Some(preimage) = module_api.await_preimage(
742                                    state.common.outpoint,
743                                    0
744                                ).await
745                                    && state.common.contract.verify_preimage(&preimage) {
746                                        yield SendOperationState::Success(preimage);
747                                        return;
748                                    }
749
750                                yield SendOperationState::Failure;
751                                return;
752                            },
753                            SendSMState::Rejected(..) => {
754                                yield SendOperationState::Failure;
755                                return;
756                            },
757                        }
758                    }
759                }
760            }
761        }))
762    }
763
764    /// Await the final state of the send operation.
765    pub async fn await_final_send_operation_state(
766        &self,
767        operation_id: OperationId,
768    ) -> anyhow::Result<FinalSendOperationState> {
769        let mut stream = self
770            .subscribe_send_operation_state_updates(operation_id)
771            .await?
772            .into_stream();
773
774        let mut final_state = None;
775
776        while let Some(state) = stream.next().await {
777            match state {
778                SendOperationState::Success(_) => {
779                    final_state = Some(FinalSendOperationState::Success);
780                }
781                SendOperationState::Refunded => {
782                    final_state = Some(FinalSendOperationState::Refunded);
783                }
784                SendOperationState::Failure => final_state = Some(FinalSendOperationState::Failure),
785                _ => {}
786            }
787        }
788
789        Ok(final_state.expect("Stream contains one final state"))
790    }
791
792    /// Request an invoice. For testing you can optionally specify a gateway to
793    /// generate the invoice, otherwise a random online gateway will be selected
794    /// automatically.
795    ///
796    /// The total fee for this payment may depend on the chosen gateway but
797    /// will be limited to half of one percent plus fifty satoshis. Since the
798    /// selected gateway has been vetted by at least one guardian we trust it to
799    /// set a reasonable fee and only enforce a rather high limit.
800    ///
801    /// The absolute fee for a payment can be calculated from the operation meta
802    /// to be shown to the user in the transaction history.
803    pub async fn receive(
804        &self,
805        amount: Amount,
806        expiry_secs: u32,
807        description: Bolt11InvoiceDescription,
808        gateway: Option<SafeUrl>,
809        custom_meta: Value,
810    ) -> Result<(Bolt11Invoice, OperationId), ReceiveError> {
811        let (gateway, contract, invoice) = self
812            .create_contract_and_fetch_invoice(
813                self.keypair.public_key(),
814                amount,
815                expiry_secs,
816                description,
817                gateway,
818            )
819            .await?;
820
821        let operation_id = self
822            .receive_incoming_contract(
823                self.keypair.secret_key(),
824                contract.clone(),
825                LightningOperationMeta::Receive(ReceiveOperationMeta {
826                    gateway,
827                    contract,
828                    invoice: LightningInvoice::Bolt11(invoice.clone()),
829                    custom_meta,
830                }),
831            )
832            .await
833            .expect("The contract has been generated with our public key");
834
835        Ok((invoice, operation_id))
836    }
837
838    /// Create an incoming contract locked to a public key derived from the
839    /// recipient's static module public key and fetches the corresponding
840    /// invoice.
841    async fn create_contract_and_fetch_invoice(
842        &self,
843        recipient_static_pk: PublicKey,
844        amount: Amount,
845        expiry_secs: u32,
846        description: Bolt11InvoiceDescription,
847        gateway: Option<SafeUrl>,
848    ) -> Result<(SafeUrl, IncomingContract, Bolt11Invoice), ReceiveError> {
849        let (ephemeral_tweak, ephemeral_pk) = tweak::generate(recipient_static_pk);
850
851        let encryption_seed = ephemeral_tweak
852            .consensus_hash::<sha256::Hash>()
853            .to_byte_array();
854
855        let preimage = encryption_seed
856            .consensus_hash::<sha256::Hash>()
857            .to_byte_array();
858
859        let (gateway, routing_info) = match gateway {
860            Some(gateway) => (
861                gateway.clone(),
862                self.routing_info(&gateway)
863                    .await
864                    .map_err(|e| ReceiveError::FailedToConnectToGateway(e.to_string()))?
865                    .ok_or(ReceiveError::FederationNotSupported)?,
866            ),
867            None => self
868                .select_gateway(None)
869                .await
870                .map_err(ReceiveError::SelectGateway)?,
871        };
872
873        if !routing_info.receive_fee.le(&PaymentFee::RECEIVE_FEE_LIMIT) {
874            return Err(ReceiveError::GatewayFeeExceedsLimit);
875        }
876
877        let contract_amount = routing_info.receive_fee.subtract_from(amount.msats);
878
879        if contract_amount < MINIMUM_INCOMING_CONTRACT_AMOUNT {
880            return Err(ReceiveError::AmountTooSmall);
881        }
882
883        let expiration = duration_since_epoch()
884            .as_secs()
885            .saturating_add(u64::from(expiry_secs));
886
887        let claim_pk = recipient_static_pk
888            .mul_tweak(
889                secp256k1::SECP256K1,
890                &Scalar::from_be_bytes(ephemeral_tweak).expect("Within curve order"),
891            )
892            .expect("Tweak is valid");
893
894        let contract = IncomingContract::new(
895            self.cfg.tpe_agg_pk,
896            encryption_seed,
897            preimage,
898            PaymentImage::Hash(preimage.consensus_hash()),
899            contract_amount,
900            expiration,
901            claim_pk,
902            routing_info.module_public_key,
903            ephemeral_pk,
904        );
905
906        let invoice = self
907            .gateway_conn
908            .bolt11_invoice(
909                gateway.clone(),
910                self.federation_id,
911                contract.clone(),
912                amount,
913                description,
914                expiry_secs,
915            )
916            .await
917            .map_err(|e| ReceiveError::FailedToConnectToGateway(e.to_string()))?;
918
919        if invoice.payment_hash() != &preimage.consensus_hash() {
920            return Err(ReceiveError::InvalidInvoice);
921        }
922
923        if invoice.amount_milli_satoshis() != Some(amount.msats) {
924            return Err(ReceiveError::IncorrectInvoiceAmount);
925        }
926
927        Ok((gateway, contract, invoice))
928    }
929
930    // Receive an incoming contract locked to a public key derived from our
931    // static module public key.
932    async fn receive_incoming_contract(
933        &self,
934        sk: SecretKey,
935        contract: IncomingContract,
936        operation_meta: LightningOperationMeta,
937    ) -> Option<OperationId> {
938        let operation_id = OperationId::from_encodable(&contract.clone());
939
940        let (claim_keypair, agg_decryption_key) = self.recover_contract_keys(sk, &contract)?;
941
942        let receive_sm = LightningClientStateMachines::Receive(ReceiveStateMachine {
943            common: ReceiveSMCommon {
944                operation_id,
945                contract: contract.clone(),
946                claim_keypair,
947                agg_decryption_key,
948            },
949            state: ReceiveSMState::Pending,
950        });
951
952        // this may only fail if the operation id is already in use, in which case we
953        // ignore the error such that the method is idempotent
954        self.client_ctx
955            .manual_operation_start(
956                operation_id,
957                LightningCommonInit::KIND.as_str(),
958                operation_meta,
959                vec![self.client_ctx.make_dyn_state(receive_sm)],
960            )
961            .await
962            .ok();
963
964        Some(operation_id)
965    }
966
967    fn recover_contract_keys(
968        &self,
969        sk: SecretKey,
970        contract: &IncomingContract,
971    ) -> Option<(Keypair, AggregateDecryptionKey)> {
972        let tweak = ecdh::SharedSecret::new(&contract.commitment.ephemeral_pk, &sk);
973
974        let encryption_seed = tweak
975            .secret_bytes()
976            .consensus_hash::<sha256::Hash>()
977            .to_byte_array();
978
979        let claim_keypair = sk
980            .mul_tweak(&Scalar::from_be_bytes(tweak.secret_bytes()).expect("Within curve order"))
981            .expect("Tweak is valid")
982            .keypair(secp256k1::SECP256K1);
983
984        if claim_keypair.public_key() != contract.commitment.claim_pk {
985            return None; // The claim key is not derived from our pk
986        }
987
988        let agg_decryption_key = derive_agg_dk(&self.cfg.tpe_agg_pk, &encryption_seed);
989
990        if !contract.verify_agg_decryption_key(&self.cfg.tpe_agg_pk, &agg_decryption_key) {
991            return None; // The decryption key is not derived from our pk
992        }
993
994        contract.decrypt_preimage(&agg_decryption_key)?;
995
996        Some((claim_keypair, agg_decryption_key))
997    }
998
999    /// Subscribe to all state updates of the receive operation.
1000    pub async fn subscribe_receive_operation_state_updates(
1001        &self,
1002        operation_id: OperationId,
1003    ) -> anyhow::Result<UpdateStreamOrOutcome<ReceiveOperationState>> {
1004        let operation = self.client_ctx.get_operation(operation_id).await?;
1005        let mut stream = self.notifier.subscribe(operation_id).await;
1006        let client_ctx = self.client_ctx.clone();
1007
1008        Ok(self.client_ctx.outcome_or_updates(operation, operation_id, move || {
1009            stream! {
1010                loop {
1011                    if let Some(LightningClientStateMachines::Receive(state)) = stream.next().await {
1012                        match state.state {
1013                            ReceiveSMState::Pending => yield ReceiveOperationState::Pending,
1014                            ReceiveSMState::Claiming(out_points) => {
1015                                yield ReceiveOperationState::Claiming;
1016
1017                                if client_ctx.await_primary_module_outputs(operation_id, out_points).await.is_ok() {
1018                                    yield ReceiveOperationState::Claimed;
1019                                } else {
1020                                    yield ReceiveOperationState::Failure;
1021                                }
1022                                return;
1023                            },
1024                            ReceiveSMState::Expired => {
1025                                yield ReceiveOperationState::Expired;
1026                                return;
1027                            }
1028                        }
1029                    }
1030                }
1031            }
1032        }))
1033    }
1034
1035    /// Await the final state of the receive operation.
1036    pub async fn await_final_receive_operation_state(
1037        &self,
1038        operation_id: OperationId,
1039    ) -> anyhow::Result<FinalReceiveOperationState> {
1040        let mut stream = self
1041            .subscribe_receive_operation_state_updates(operation_id)
1042            .await?
1043            .into_stream();
1044
1045        let mut final_state = None;
1046
1047        while let Some(state) = stream.next().await {
1048            match state {
1049                ReceiveOperationState::Expired => {
1050                    final_state = Some(FinalReceiveOperationState::Expired);
1051                }
1052                ReceiveOperationState::Claimed => {
1053                    final_state = Some(FinalReceiveOperationState::Claimed);
1054                }
1055                ReceiveOperationState::Failure => {
1056                    final_state = Some(FinalReceiveOperationState::Failure);
1057                }
1058                _ => {}
1059            }
1060        }
1061
1062        Ok(final_state.expect("Stream contains one final state"))
1063    }
1064
1065    /// Generate an lnurl for the client. You can optionally specify a gateway
1066    /// to use for testing purposes.
1067    pub async fn generate_lnurl(
1068        &self,
1069        recurringd: SafeUrl,
1070        gateway: Option<SafeUrl>,
1071    ) -> Result<String, GenerateLnurlError> {
1072        let gateways = if let Some(gateway) = gateway {
1073            vec![gateway]
1074        } else {
1075            let gateways = self
1076                .module_api
1077                .gateways()
1078                .await
1079                .map_err(|e| GenerateLnurlError::FailedToRequestGateways(e.to_string()))?;
1080
1081            if gateways.is_empty() {
1082                return Err(GenerateLnurlError::NoGatewaysAvailable);
1083            }
1084
1085            gateways
1086        };
1087
1088        let payload = fedimint_core::base32::encode_prefixed(
1089            fedimint_core::base32::FEDIMINT_PREFIX,
1090            &lnurl::LnurlRequest {
1091                federation_id: self.federation_id,
1092                recipient_pk: self.lnurl_keypair.public_key(),
1093                aggregate_pk: self.cfg.tpe_agg_pk,
1094                gateways,
1095            },
1096        );
1097
1098        Ok(fedimint_lnurl::encode_lnurl(&format!(
1099            "{recurringd}pay/{payload}"
1100        )))
1101    }
1102
1103    fn spawn_receive_lnurl_task(
1104        &self,
1105        custom_meta_fn: Arc<dyn Fn() -> Value + Send + Sync>,
1106        task_group: &TaskGroup,
1107        client_span: &tracing::Span,
1108    ) {
1109        let module = self.clone();
1110        let api = self.module_api.clone();
1111
1112        task_group.spawn_cancellable_with_span(
1113            client_span.clone(),
1114            "receive_lnurl_task",
1115            async move {
1116                api.wait_for_initialized_connections().await;
1117                loop {
1118                    module.receive_lnurl(custom_meta_fn()).await;
1119                }
1120            },
1121        );
1122    }
1123
1124    async fn receive_lnurl(&self, custom_meta: Value) {
1125        let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
1126
1127        let stream_index = dbtx
1128            .get_value(&IncomingContractStreamIndexKey)
1129            .await
1130            .unwrap_or(0);
1131
1132        let (contracts, next_index) = self
1133            .module_api
1134            .await_incoming_contracts(stream_index, 128)
1135            .await;
1136
1137        for contract in &contracts {
1138            if let Some(operation_id) = self
1139                .receive_incoming_contract(
1140                    self.lnurl_keypair.secret_key(),
1141                    contract.clone(),
1142                    LightningOperationMeta::LnurlReceive(LnurlReceiveOperationMeta {
1143                        contract: contract.clone(),
1144                        custom_meta: custom_meta.clone(),
1145                    }),
1146                )
1147                .await
1148            {
1149                self.await_final_receive_operation_state(operation_id)
1150                    .await
1151                    .ok();
1152            }
1153        }
1154
1155        dbtx.insert_entry(&IncomingContractStreamIndexKey, &next_index)
1156            .await;
1157
1158        dbtx.commit_tx().await;
1159    }
1160}
1161
1162#[derive(Error, Debug, Clone, Eq, PartialEq)]
1163pub enum SelectGatewayError {
1164    #[error("Failed to request gateways")]
1165    FailedToRequestGateways(String),
1166    #[error("No gateways are available")]
1167    NoGatewaysAvailable,
1168    #[error("All gateways failed to respond")]
1169    GatewaysUnresponsive,
1170}
1171
1172#[derive(Error, Debug, Clone, Eq, PartialEq)]
1173pub enum SendPaymentError {
1174    #[error("Invoice is missing an amount")]
1175    InvoiceMissingAmount,
1176    #[error("Invoice has expired")]
1177    InvoiceExpired,
1178    #[error("A payment for this invoice is already in progress")]
1179    PaymentInProgress(OperationId),
1180    #[error("This invoice has already been paid")]
1181    InvoiceAlreadyPaid(OperationId),
1182    #[error(transparent)]
1183    SelectGateway(SelectGatewayError),
1184    #[error("Failed to connect to gateway")]
1185    FailedToConnectToGateway(String),
1186    #[error("Gateway does not support this federation")]
1187    FederationNotSupported,
1188    #[error("Gateway fee exceeds the allowed limit")]
1189    GatewayFeeExceedsLimit,
1190    #[error("Gateway expiration time exceeds the allowed limit")]
1191    GatewayExpirationExceedsLimit,
1192    #[error("Failed to request block count")]
1193    FailedToRequestBlockCount(String),
1194    #[error("Failed to fund the payment")]
1195    FailedToFundPayment(String),
1196    #[error("Invoice is for a different currency")]
1197    WrongCurrency {
1198        invoice_currency: Currency,
1199        federation_currency: Currency,
1200    },
1201}
1202
1203#[derive(Error, Debug, Clone, Eq, PartialEq)]
1204pub enum ReceiveError {
1205    #[error(transparent)]
1206    SelectGateway(SelectGatewayError),
1207    #[error("Failed to connect to gateway")]
1208    FailedToConnectToGateway(String),
1209    #[error("Gateway does not support this federation")]
1210    FederationNotSupported,
1211    #[error("Gateway fee exceeds the allowed limit")]
1212    GatewayFeeExceedsLimit,
1213    #[error("Amount is too small to cover fees")]
1214    AmountTooSmall,
1215    #[error("Gateway returned an invalid invoice")]
1216    InvalidInvoice,
1217    #[error("Gateway returned an invoice with incorrect amount")]
1218    IncorrectInvoiceAmount,
1219}
1220
1221#[derive(Error, Debug, Clone, Eq, PartialEq)]
1222pub enum GenerateLnurlError {
1223    #[error("No gateways are available")]
1224    NoGatewaysAvailable,
1225    #[error("Failed to request gateways")]
1226    FailedToRequestGateways(String),
1227}
1228
1229#[derive(Error, Debug, Clone, Eq, PartialEq)]
1230pub enum ListGatewaysError {
1231    #[error("Failed to request gateways")]
1232    FailedToListGateways,
1233}
1234
1235#[derive(Error, Debug, Clone, Eq, PartialEq)]
1236pub enum RoutingInfoError {
1237    #[error("Failed to request routing info")]
1238    FailedToRequestRoutingInfo,
1239}
1240
1241#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
1242pub enum LightningClientStateMachines {
1243    Send(SendStateMachine),
1244    Receive(ReceiveStateMachine),
1245}
1246
1247impl IntoDynInstance for LightningClientStateMachines {
1248    type DynType = DynState;
1249
1250    fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
1251        DynState::from_typed(instance_id, self)
1252    }
1253}
1254
1255impl State for LightningClientStateMachines {
1256    type ModuleContext = LightningClientContext;
1257
1258    fn transitions(
1259        &self,
1260        context: &Self::ModuleContext,
1261        global_context: &DynGlobalClientContext,
1262    ) -> Vec<StateTransition<Self>> {
1263        match self {
1264            LightningClientStateMachines::Send(state) => {
1265                sm_enum_variant_translation!(
1266                    state.transitions(context, global_context),
1267                    LightningClientStateMachines::Send
1268                )
1269            }
1270            LightningClientStateMachines::Receive(state) => {
1271                sm_enum_variant_translation!(
1272                    state.transitions(context, global_context),
1273                    LightningClientStateMachines::Receive
1274                )
1275            }
1276        }
1277    }
1278
1279    fn operation_id(&self) -> OperationId {
1280        match self {
1281            LightningClientStateMachines::Send(state) => state.operation_id(),
1282            LightningClientStateMachines::Receive(state) => state.operation_id(),
1283        }
1284    }
1285}