fedimint_lnv2_client/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::missing_errors_doc)]
3#![allow(clippy::missing_panics_doc)]
4#![allow(clippy::module_name_repetitions)]
5#![allow(clippy::must_use_candidate)]
6
7mod api;
8#[cfg(feature = "cli")]
9mod cli;
10mod db;
11pub mod events;
12mod receive_sm;
13mod send_sm;
14
15use std::collections::{BTreeMap, BTreeSet};
16use std::sync::Arc;
17
18use async_stream::stream;
19use bitcoin::hashes::{Hash, sha256};
20use bitcoin::secp256k1;
21use db::{DbKeyPrefix, GatewayKey, IncomingContractStreamIndexKey};
22use fedimint_api_client::api::DynModuleApi;
23use fedimint_client_module::module::init::{ClientModuleInit, ClientModuleInitArgs};
24use fedimint_client_module::module::recovery::NoModuleBackup;
25use fedimint_client_module::module::{ClientContext, ClientModule, OutPointRange};
26use fedimint_client_module::oplog::UpdateStreamOrOutcome;
27use fedimint_client_module::sm::{Context, DynState, ModuleNotifier, State, StateTransition};
28use fedimint_client_module::transaction::{
29    ClientOutput, ClientOutputBundle, ClientOutputSM, TransactionBuilder,
30};
31use fedimint_client_module::{DynGlobalClientContext, sm_enum_variant_translation};
32use fedimint_core::config::FederationId;
33use fedimint_core::core::{IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId};
34use fedimint_core::db::{DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
35use fedimint_core::encoding::{Decodable, Encodable};
36use fedimint_core::module::{
37    Amounts, ApiAuth, ApiVersion, CommonModuleInit, ModuleCommon, ModuleInit, MultiApiVersion,
38};
39use fedimint_core::secp256k1::SECP256K1;
40use fedimint_core::task::TaskGroup;
41use fedimint_core::time::duration_since_epoch;
42use fedimint_core::util::SafeUrl;
43use fedimint_core::{Amount, PeerId, apply, async_trait_maybe_send};
44use fedimint_derive_secret::{ChildId, DerivableSecret};
45use fedimint_lnv2_common::config::LightningClientConfig;
46use fedimint_lnv2_common::contracts::{IncomingContract, OutgoingContract, PaymentImage};
47use fedimint_lnv2_common::gateway_api::{
48    GatewayConnection, PaymentFee, RealGatewayConnection, RoutingInfo,
49};
50use fedimint_lnv2_common::{
51    Bolt11InvoiceDescription, GatewayApi, KIND, LightningCommonInit, LightningInvoice,
52    LightningModuleTypes, LightningOutput, LightningOutputV0, lnurl, tweak,
53};
54use futures::StreamExt;
55use lightning_invoice::{Bolt11Invoice, Currency};
56use secp256k1::{Keypair, PublicKey, Scalar, SecretKey, ecdh};
57use serde::{Deserialize, Serialize};
58use serde_json::Value;
59use strum::IntoEnumIterator as _;
60use thiserror::Error;
61use tpe::{AggregateDecryptionKey, derive_agg_dk};
62use tracing::warn;
63
64use crate::api::LightningFederationApi;
65use crate::events::SendPaymentEvent;
66use crate::receive_sm::{ReceiveSMCommon, ReceiveSMState, ReceiveStateMachine};
67use crate::send_sm::{SendSMCommon, SendSMState, SendStateMachine};
68
69/// Number of blocks until outgoing lightning contracts times out and user
70/// client can refund it unilaterally
71const EXPIRATION_DELTA_LIMIT: u64 = 1440;
72
73/// A two hour buffer in case either the client or gateway go offline
74const CONTRACT_CONFIRMATION_BUFFER: u64 = 12;
75
76#[allow(clippy::large_enum_variant)]
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub enum LightningOperationMeta {
79    Send(SendOperationMeta),
80    Receive(ReceiveOperationMeta),
81    LnurlReceive(LnurlReceiveOperationMeta),
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct SendOperationMeta {
86    pub change_outpoint_range: OutPointRange,
87    pub gateway: SafeUrl,
88    pub contract: OutgoingContract,
89    pub invoice: LightningInvoice,
90    pub custom_meta: Value,
91}
92
93impl SendOperationMeta {
94    /// Calculate the absolute fee paid to the gateway on success.
95    pub fn gateway_fee(&self) -> Amount {
96        match &self.invoice {
97            LightningInvoice::Bolt11(invoice) => self.contract.amount.saturating_sub(
98                Amount::from_msats(invoice.amount_milli_satoshis().expect("Invoice has amount")),
99            ),
100        }
101    }
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct ReceiveOperationMeta {
106    pub gateway: SafeUrl,
107    pub contract: IncomingContract,
108    pub invoice: LightningInvoice,
109    pub custom_meta: Value,
110}
111
112impl ReceiveOperationMeta {
113    /// Calculate the absolute fee paid to the gateway on success.
114    pub fn gateway_fee(&self) -> Amount {
115        match &self.invoice {
116            LightningInvoice::Bolt11(invoice) => {
117                Amount::from_msats(invoice.amount_milli_satoshis().expect("Invoice has amount"))
118                    .saturating_sub(self.contract.commitment.amount)
119            }
120        }
121    }
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct LnurlReceiveOperationMeta {
126    pub contract: IncomingContract,
127    pub custom_meta: Value,
128}
129
130#[cfg_attr(doc, aquamarine::aquamarine)]
131/// The state of an operation sending a payment over lightning.
132///
133/// ```mermaid
134/// graph LR
135/// classDef virtual fill:#fff,stroke-dasharray: 5 5
136///
137///     Funding -- funding transaction is rejected --> Rejected
138///     Funding -- funding transaction is accepted --> Funded
139///     Funded -- payment is confirmed  --> Success
140///     Funded -- payment attempt expires --> Refunding
141///     Funded -- gateway cancels payment attempt --> Refunding
142///     Refunding -- payment is confirmed --> Success
143///     Refunding -- ecash is minted --> Refunded
144///     Refunding -- minting ecash fails --> Failure
145/// ```
146/// The transition from Refunding to Success is only possible if the gateway
147/// misbehaves.
148#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
149pub enum SendOperationState {
150    /// We are funding the contract to incentivize the gateway.
151    Funding,
152    /// We are waiting for the gateway to complete the payment.
153    Funded,
154    /// The payment was successful.
155    Success([u8; 32]),
156    /// The payment has failed and we are refunding the contract.
157    Refunding,
158    /// The payment has been refunded.
159    Refunded,
160    /// Either a programming error has occurred or the federation is malicious.
161    Failure,
162}
163
164/// The final state of an operation sending a payment over lightning.
165#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
166pub enum FinalSendOperationState {
167    /// The payment was successful.
168    Success,
169    /// The payment has been refunded.
170    Refunded,
171    /// Either a programming error has occurred or the federation is malicious.
172    Failure,
173}
174
175pub type SendResult = Result<OperationId, SendPaymentError>;
176
177#[cfg_attr(doc, aquamarine::aquamarine)]
178/// The state of an operation receiving a payment over lightning.
179///
180/// ```mermaid
181/// graph LR
182/// classDef virtual fill:#fff,stroke-dasharray: 5 5
183///
184///     Pending -- payment is confirmed --> Claiming
185///     Pending -- invoice expires --> Expired
186///     Claiming -- ecash is minted --> Claimed
187///     Claiming -- minting ecash fails --> Failure
188/// ```
189#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
190pub enum ReceiveOperationState {
191    /// We are waiting for the payment.
192    Pending,
193    /// The payment request has expired.
194    Expired,
195    /// The payment has been confirmed and we are issuing the ecash.
196    Claiming,
197    /// The payment has been successful.
198    Claimed,
199    /// Either a programming error has occurred or the federation is malicious.
200    Failure,
201}
202
203/// The final state of an operation receiving a payment over lightning.
204#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
205pub enum FinalReceiveOperationState {
206    /// The payment request has expired.
207    Expired,
208    /// The payment has been successful.
209    Claimed,
210    /// Either a programming error has occurred or the federation is malicious.
211    Failure,
212}
213
214pub type ReceiveResult = Result<(Bolt11Invoice, OperationId), ReceiveError>;
215
216#[derive(Clone)]
217pub struct LightningClientInit {
218    pub gateway_conn: Option<Arc<dyn GatewayConnection + Send + Sync>>,
219    pub custom_meta_fn: Arc<dyn Fn() -> Value + Send + Sync>,
220}
221
222impl std::fmt::Debug for LightningClientInit {
223    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
224        f.debug_struct("LightningClientInit")
225            .field("gateway_conn", &self.gateway_conn)
226            .field("custom_meta_fn", &"<function>")
227            .finish()
228    }
229}
230
231impl Default for LightningClientInit {
232    fn default() -> Self {
233        LightningClientInit {
234            gateway_conn: None,
235            custom_meta_fn: Arc::new(|| Value::Null),
236        }
237    }
238}
239
240impl ModuleInit for LightningClientInit {
241    type Common = LightningCommonInit;
242
243    async fn dump_database(
244        &self,
245        _dbtx: &mut DatabaseTransaction<'_>,
246        _prefix_names: Vec<String>,
247    ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
248        Box::new(BTreeMap::new().into_iter())
249    }
250}
251
252#[apply(async_trait_maybe_send!)]
253impl ClientModuleInit for LightningClientInit {
254    type Module = LightningClientModule;
255
256    fn supported_api_versions(&self) -> MultiApiVersion {
257        MultiApiVersion::try_from_iter([ApiVersion { major: 0, minor: 0 }])
258            .expect("no version conflicts")
259    }
260
261    async fn init(&self, args: &ClientModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
262        let gateway_conn = if let Some(gateway_conn) = self.gateway_conn.clone() {
263            gateway_conn
264        } else {
265            let api = GatewayApi::new(None, args.connector_registry.clone());
266            Arc::new(RealGatewayConnection { api })
267        };
268        Ok(LightningClientModule::new(
269            *args.federation_id(),
270            args.cfg().clone(),
271            args.notifier().clone(),
272            args.context(),
273            args.module_api().clone(),
274            args.module_root_secret(),
275            gateway_conn,
276            self.custom_meta_fn.clone(),
277            args.admin_auth().cloned(),
278            args.task_group(),
279        ))
280    }
281
282    fn used_db_prefixes(&self) -> Option<BTreeSet<u8>> {
283        Some(
284            DbKeyPrefix::iter()
285                .map(|p| p as u8)
286                .chain(
287                    DbKeyPrefix::ExternalReservedStart as u8
288                        ..=DbKeyPrefix::CoreInternalReservedEnd as u8,
289                )
290                .collect(),
291        )
292    }
293}
294
295#[derive(Debug, Clone)]
296pub struct LightningClientContext {
297    federation_id: FederationId,
298    gateway_conn: Arc<dyn GatewayConnection + Send + Sync>,
299    pub(crate) client_ctx: ClientContext<LightningClientModule>,
300}
301
302impl Context for LightningClientContext {
303    const KIND: Option<ModuleKind> = Some(KIND);
304}
305
306#[derive(Debug, Clone)]
307pub struct LightningClientModule {
308    federation_id: FederationId,
309    cfg: LightningClientConfig,
310    notifier: ModuleNotifier<LightningClientStateMachines>,
311    client_ctx: ClientContext<Self>,
312    module_api: DynModuleApi,
313    keypair: Keypair,
314    lnurl_keypair: Keypair,
315    gateway_conn: Arc<dyn GatewayConnection + Send + Sync>,
316    #[allow(unused)] // The field is only used by the cli feature
317    admin_auth: Option<ApiAuth>,
318}
319
320#[apply(async_trait_maybe_send!)]
321impl ClientModule for LightningClientModule {
322    type Init = LightningClientInit;
323    type Common = LightningModuleTypes;
324    type Backup = NoModuleBackup;
325    type ModuleStateMachineContext = LightningClientContext;
326    type States = LightningClientStateMachines;
327
328    fn context(&self) -> Self::ModuleStateMachineContext {
329        LightningClientContext {
330            federation_id: self.federation_id,
331            gateway_conn: self.gateway_conn.clone(),
332            client_ctx: self.client_ctx.clone(),
333        }
334    }
335
336    fn input_fee(
337        &self,
338        amounts: &Amounts,
339        _input: &<Self::Common as ModuleCommon>::Input,
340    ) -> Option<Amounts> {
341        Some(Amounts::new_bitcoin(
342            self.cfg.fee_consensus.fee(amounts.expect_only_bitcoin()),
343        ))
344    }
345
346    fn output_fee(
347        &self,
348        amounts: &Amounts,
349        _output: &<Self::Common as ModuleCommon>::Output,
350    ) -> Option<Amounts> {
351        Some(Amounts::new_bitcoin(
352            self.cfg.fee_consensus.fee(amounts.expect_only_bitcoin()),
353        ))
354    }
355
356    #[cfg(feature = "cli")]
357    async fn handle_cli_command(
358        &self,
359        args: &[std::ffi::OsString],
360    ) -> anyhow::Result<serde_json::Value> {
361        cli::handle_cli_command(self, args).await
362    }
363}
364
365impl LightningClientModule {
366    #[allow(clippy::too_many_arguments)]
367    fn new(
368        federation_id: FederationId,
369        cfg: LightningClientConfig,
370        notifier: ModuleNotifier<LightningClientStateMachines>,
371        client_ctx: ClientContext<Self>,
372        module_api: DynModuleApi,
373        module_root_secret: &DerivableSecret,
374        gateway_conn: Arc<dyn GatewayConnection + Send + Sync>,
375        custom_meta_fn: Arc<dyn Fn() -> Value + Send + Sync>,
376        admin_auth: Option<ApiAuth>,
377        task_group: &TaskGroup,
378    ) -> Self {
379        let module = Self {
380            federation_id,
381            cfg,
382            notifier,
383            client_ctx,
384            module_api,
385            keypair: module_root_secret
386                .child_key(ChildId(0))
387                .to_secp_key(SECP256K1),
388            lnurl_keypair: module_root_secret
389                .child_key(ChildId(1))
390                .to_secp_key(SECP256K1),
391            gateway_conn,
392            admin_auth,
393        };
394
395        module.spawn_receive_lnurl_task(custom_meta_fn, task_group);
396
397        module.spawn_gateway_map_update_task(task_group);
398
399        module
400    }
401
402    fn spawn_gateway_map_update_task(&self, task_group: &TaskGroup) {
403        let module = self.clone();
404
405        task_group.spawn_cancellable("gateway_map_update_task", async move {
406            module.update_gateway_map().await;
407        });
408    }
409
410    async fn update_gateway_map(&self) {
411        // Update the mapping from lightning node public keys to gateway api
412        // endpoints maintained in the module database. When paying an invoice this
413        // enables the client to select the gateway that has created the invoice,
414        // if possible, such that the payment does not go over lightning, reducing
415        // fees and latency.
416
417        if let Ok(gateways) = self.module_api.gateways().await {
418            let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
419
420            for gateway in gateways {
421                if let Ok(Some(routing_info)) = self
422                    .gateway_conn
423                    .routing_info(gateway.clone(), &self.federation_id)
424                    .await
425                {
426                    dbtx.insert_entry(&GatewayKey(routing_info.lightning_public_key), &gateway)
427                        .await;
428                }
429            }
430
431            if let Err(e) = dbtx.commit_tx_result().await {
432                warn!("Failed to commit the updated gateway mapping to the database: {e}");
433            }
434        }
435    }
436
437    /// Selects an available gateway by querying the federation's registered
438    /// gateways, checking if one of them match the invoice's payee public
439    /// key, then queries the gateway for `RoutingInfo` to determine if it is
440    /// online.
441    pub async fn select_gateway(
442        &self,
443        invoice: Option<Bolt11Invoice>,
444    ) -> Result<(SafeUrl, RoutingInfo), SelectGatewayError> {
445        let gateways = self
446            .module_api
447            .gateways()
448            .await
449            .map_err(|e| SelectGatewayError::FailedToRequestGateways(e.to_string()))?;
450
451        if gateways.is_empty() {
452            return Err(SelectGatewayError::NoGatewaysAvailable);
453        }
454
455        if let Some(invoice) = invoice
456            && let Some(gateway) = self
457                .client_ctx
458                .module_db()
459                .begin_transaction_nc()
460                .await
461                .get_value(&GatewayKey(invoice.recover_payee_pub_key()))
462                .await
463                .filter(|gateway| gateways.contains(gateway))
464            && let Ok(Some(routing_info)) = self.routing_info(&gateway).await
465        {
466            return Ok((gateway, routing_info));
467        }
468
469        for gateway in gateways {
470            if let Ok(Some(routing_info)) = self.routing_info(&gateway).await {
471                return Ok((gateway, routing_info));
472            }
473        }
474
475        Err(SelectGatewayError::GatewaysUnresponsive)
476    }
477
478    /// Sends a request to each peer for their registered gateway list and
479    /// returns a `Vec<SafeUrl` of all registered gateways to the client.
480    pub async fn list_gateways(
481        &self,
482        peer: Option<PeerId>,
483    ) -> Result<Vec<SafeUrl>, ListGatewaysError> {
484        if let Some(peer) = peer {
485            self.module_api
486                .gateways_from_peer(peer)
487                .await
488                .map_err(|_| ListGatewaysError::FailedToListGateways)
489        } else {
490            self.module_api
491                .gateways()
492                .await
493                .map_err(|_| ListGatewaysError::FailedToListGateways)
494        }
495    }
496
497    /// Requests the `RoutingInfo`, including fee information, from the gateway
498    /// available at the `SafeUrl`.
499    pub async fn routing_info(
500        &self,
501        gateway: &SafeUrl,
502    ) -> Result<Option<RoutingInfo>, RoutingInfoError> {
503        self.gateway_conn
504            .routing_info(gateway.clone(), &self.federation_id)
505            .await
506            .map_err(|_| RoutingInfoError::FailedToRequestRoutingInfo)
507    }
508
509    /// Pay an invoice. For testing you can optionally specify a gateway to
510    /// route with, otherwise a gateway will be selected automatically. If the
511    /// invoice was created by a gateway connected to our federation, the same
512    /// gateway will be selected to allow for a direct ecash swap. Otherwise we
513    /// select a random online gateway.
514    ///
515    /// The fee for this payment may depend on the selected gateway but
516    /// will be limited to one and a half percent plus one hundred satoshis.
517    /// This fee accounts for the fee charged by the gateway as well as
518    /// the additional fee required to reliably route this payment over
519    /// lightning if necessary. Since the gateway has been vetted by at least
520    /// one guardian we trust it to set a reasonable fee and only enforce a
521    /// rather high limit.
522    ///
523    /// The absolute fee for a payment can be calculated from the operation meta
524    /// to be shown to the user in the transaction history.
525    #[allow(clippy::too_many_lines)]
526    pub async fn send(
527        &self,
528        invoice: Bolt11Invoice,
529        gateway: Option<SafeUrl>,
530        custom_meta: Value,
531    ) -> Result<OperationId, SendPaymentError> {
532        let amount = invoice
533            .amount_milli_satoshis()
534            .ok_or(SendPaymentError::InvoiceMissingAmount)?;
535
536        if invoice.is_expired() {
537            return Err(SendPaymentError::InvoiceExpired);
538        }
539
540        if self.cfg.network != invoice.currency().into() {
541            return Err(SendPaymentError::WrongCurrency {
542                invoice_currency: invoice.currency(),
543                federation_currency: self.cfg.network.into(),
544            });
545        }
546
547        let operation_id = self.get_next_operation_id(&invoice).await?;
548
549        let (ephemeral_tweak, ephemeral_pk) = tweak::generate(self.keypair.public_key());
550
551        let refund_keypair = SecretKey::from_slice(&ephemeral_tweak)
552            .expect("32 bytes, within curve order")
553            .keypair(secp256k1::SECP256K1);
554
555        let (gateway_api, routing_info) = match gateway {
556            Some(gateway_api) => (
557                gateway_api.clone(),
558                self.routing_info(&gateway_api)
559                    .await
560                    .map_err(|e| SendPaymentError::FailedToConnectToGateway(e.to_string()))?
561                    .ok_or(SendPaymentError::FederationNotSupported)?,
562            ),
563            None => self
564                .select_gateway(Some(invoice.clone()))
565                .await
566                .map_err(SendPaymentError::SelectGateway)?,
567        };
568
569        let (send_fee, expiration_delta) = routing_info.send_parameters(&invoice);
570
571        if !send_fee.le(&PaymentFee::SEND_FEE_LIMIT) {
572            return Err(SendPaymentError::GatewayFeeExceedsLimit);
573        }
574
575        if EXPIRATION_DELTA_LIMIT < expiration_delta {
576            return Err(SendPaymentError::GatewayExpirationExceedsLimit);
577        }
578
579        let consensus_block_count = self
580            .module_api
581            .consensus_block_count()
582            .await
583            .map_err(|e| SendPaymentError::FailedToRequestBlockCount(e.to_string()))?;
584
585        let contract = OutgoingContract {
586            payment_image: PaymentImage::Hash(*invoice.payment_hash()),
587            amount: send_fee.add_to(amount),
588            expiration: consensus_block_count + expiration_delta + CONTRACT_CONFIRMATION_BUFFER,
589            claim_pk: routing_info.module_public_key,
590            refund_pk: refund_keypair.public_key(),
591            ephemeral_pk,
592        };
593
594        let contract_clone = contract.clone();
595        let gateway_api_clone = gateway_api.clone();
596        let invoice_clone = invoice.clone();
597
598        let client_output = ClientOutput::<LightningOutput> {
599            output: LightningOutput::V0(LightningOutputV0::Outgoing(contract.clone())),
600            amounts: Amounts::new_bitcoin(contract.amount),
601        };
602
603        let client_output_sm = ClientOutputSM::<LightningClientStateMachines> {
604            state_machines: Arc::new(move |range: OutPointRange| {
605                vec![LightningClientStateMachines::Send(SendStateMachine {
606                    common: SendSMCommon {
607                        operation_id,
608                        outpoint: range.into_iter().next().unwrap(),
609                        contract: contract_clone.clone(),
610                        gateway_api: Some(gateway_api_clone.clone()),
611                        invoice: Some(LightningInvoice::Bolt11(invoice_clone.clone())),
612                        refund_keypair,
613                    },
614                    state: SendSMState::Funding,
615                })]
616            }),
617        };
618
619        let client_output = self.client_ctx.make_client_outputs(ClientOutputBundle::new(
620            vec![client_output],
621            vec![client_output_sm],
622        ));
623
624        let transaction = TransactionBuilder::new().with_outputs(client_output);
625
626        self.client_ctx
627            .finalize_and_submit_transaction(
628                operation_id,
629                LightningCommonInit::KIND.as_str(),
630                move |change_outpoint_range| {
631                    LightningOperationMeta::Send(SendOperationMeta {
632                        change_outpoint_range,
633                        gateway: gateway_api.clone(),
634                        contract: contract.clone(),
635                        invoice: LightningInvoice::Bolt11(invoice.clone()),
636                        custom_meta: custom_meta.clone(),
637                    })
638                },
639                transaction,
640            )
641            .await
642            .map_err(|e| SendPaymentError::FailedToFundPayment(e.to_string()))?;
643
644        let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
645
646        self.client_ctx
647            .log_event(
648                &mut dbtx,
649                SendPaymentEvent {
650                    operation_id,
651                    amount: send_fee.add_to(amount),
652                    fee: Some(send_fee.fee(amount)),
653                },
654            )
655            .await;
656
657        dbtx.commit_tx().await;
658
659        Ok(operation_id)
660    }
661
662    async fn get_next_operation_id(
663        &self,
664        invoice: &Bolt11Invoice,
665    ) -> Result<OperationId, SendPaymentError> {
666        for payment_attempt in 0..u64::MAX {
667            let operation_id = OperationId::from_encodable(&(invoice.clone(), payment_attempt));
668
669            if !self.client_ctx.operation_exists(operation_id).await {
670                return Ok(operation_id);
671            }
672
673            if self.client_ctx.has_active_states(operation_id).await {
674                return Err(SendPaymentError::PaymentInProgress(operation_id));
675            }
676
677            let mut stream = self
678                .subscribe_send_operation_state_updates(operation_id)
679                .await
680                .expect("operation_id exists")
681                .into_stream();
682
683            // This will not block since we checked for active states and there were none,
684            // so by definition a final state has to have been assumed already.
685            while let Some(state) = stream.next().await {
686                if let SendOperationState::Success(_) = state {
687                    return Err(SendPaymentError::InvoiceAlreadyPaid(operation_id));
688                }
689            }
690        }
691
692        panic!("We could not find an unused operation id for sending a lightning payment");
693    }
694
695    /// Subscribe to all state updates of the send operation.
696    pub async fn subscribe_send_operation_state_updates(
697        &self,
698        operation_id: OperationId,
699    ) -> anyhow::Result<UpdateStreamOrOutcome<SendOperationState>> {
700        let operation = self.client_ctx.get_operation(operation_id).await?;
701        let mut stream = self.notifier.subscribe(operation_id).await;
702        let client_ctx = self.client_ctx.clone();
703        let module_api = self.module_api.clone();
704
705        Ok(self.client_ctx.outcome_or_updates(operation, operation_id, move || {
706            stream! {
707                loop {
708                    if let Some(LightningClientStateMachines::Send(state)) = stream.next().await {
709                        match state.state {
710                            SendSMState::Funding => yield SendOperationState::Funding,
711                            SendSMState::Funded => yield SendOperationState::Funded,
712                            SendSMState::Success(preimage) => {
713                                // the preimage has been verified by the state machine previously
714                                assert!(state.common.contract.verify_preimage(&preimage));
715
716                                yield SendOperationState::Success(preimage);
717                                return;
718                            },
719                            SendSMState::Refunding(out_points) => {
720                                yield SendOperationState::Refunding;
721
722                                if client_ctx.await_primary_module_outputs(operation_id, out_points.clone()).await.is_ok() {
723                                    yield SendOperationState::Refunded;
724                                    return;
725                                }
726
727                                // The gateway may have incorrectly claimed the outgoing contract thereby causing
728                                // our refund transaction to be rejected. Therefore, we check one last time if
729                                // the preimage is available before we enter the failure state.
730                                if let Some(preimage) = module_api.await_preimage(
731                                    state.common.outpoint,
732                                    0
733                                ).await
734                                    && state.common.contract.verify_preimage(&preimage) {
735                                        yield SendOperationState::Success(preimage);
736                                        return;
737                                    }
738
739                                yield SendOperationState::Failure;
740                                return;
741                            },
742                            SendSMState::Rejected(..) => {
743                                yield SendOperationState::Failure;
744                                return;
745                            },
746                        }
747                    }
748                }
749            }
750        }))
751    }
752
753    /// Await the final state of the send operation.
754    pub async fn await_final_send_operation_state(
755        &self,
756        operation_id: OperationId,
757    ) -> anyhow::Result<FinalSendOperationState> {
758        let state = self
759            .subscribe_send_operation_state_updates(operation_id)
760            .await?
761            .into_stream()
762            .filter_map(|state| {
763                futures::future::ready(match state {
764                    SendOperationState::Success(_) => Some(FinalSendOperationState::Success),
765                    SendOperationState::Refunded => Some(FinalSendOperationState::Refunded),
766                    SendOperationState::Failure => Some(FinalSendOperationState::Failure),
767                    _ => None,
768                })
769            })
770            .next()
771            .await
772            .expect("Stream contains one final state");
773
774        Ok(state)
775    }
776
777    /// Request an invoice. For testing you can optionally specify a gateway to
778    /// generate the invoice, otherwise a random online gateway will be selected
779    /// automatically.
780    ///
781    /// The total fee for this payment may depend on the chosen gateway but
782    /// will be limited to half of one percent plus fifty satoshis. Since the
783    /// selected gateway has been vetted by at least one guardian we trust it to
784    /// set a reasonable fee and only enforce a rather high limit.
785    ///
786    /// The absolute fee for a payment can be calculated from the operation meta
787    /// to be shown to the user in the transaction history.
788    pub async fn receive(
789        &self,
790        amount: Amount,
791        expiry_secs: u32,
792        description: Bolt11InvoiceDescription,
793        gateway: Option<SafeUrl>,
794        custom_meta: Value,
795    ) -> Result<(Bolt11Invoice, OperationId), ReceiveError> {
796        let (gateway, contract, invoice) = self
797            .create_contract_and_fetch_invoice(
798                self.keypair.public_key(),
799                amount,
800                expiry_secs,
801                description,
802                gateway,
803            )
804            .await?;
805
806        let operation_id = self
807            .receive_incoming_contract(
808                self.keypair.secret_key(),
809                contract.clone(),
810                LightningOperationMeta::Receive(ReceiveOperationMeta {
811                    gateway,
812                    contract,
813                    invoice: LightningInvoice::Bolt11(invoice.clone()),
814                    custom_meta,
815                }),
816            )
817            .await
818            .expect("The contract has been generated with our public key");
819
820        Ok((invoice, operation_id))
821    }
822
823    /// Create an incoming contract locked to a public key derived from the
824    /// recipient's static module public key and fetches the corresponding
825    /// invoice.
826    async fn create_contract_and_fetch_invoice(
827        &self,
828        recipient_static_pk: PublicKey,
829        amount: Amount,
830        expiry_secs: u32,
831        description: Bolt11InvoiceDescription,
832        gateway: Option<SafeUrl>,
833    ) -> Result<(SafeUrl, IncomingContract, Bolt11Invoice), ReceiveError> {
834        let (ephemeral_tweak, ephemeral_pk) = tweak::generate(recipient_static_pk);
835
836        let encryption_seed = ephemeral_tweak
837            .consensus_hash::<sha256::Hash>()
838            .to_byte_array();
839
840        let preimage = encryption_seed
841            .consensus_hash::<sha256::Hash>()
842            .to_byte_array();
843
844        let (gateway, routing_info) = match gateway {
845            Some(gateway) => (
846                gateway.clone(),
847                self.routing_info(&gateway)
848                    .await
849                    .map_err(|e| ReceiveError::FailedToConnectToGateway(e.to_string()))?
850                    .ok_or(ReceiveError::FederationNotSupported)?,
851            ),
852            None => self
853                .select_gateway(None)
854                .await
855                .map_err(ReceiveError::SelectGateway)?,
856        };
857
858        if !routing_info.receive_fee.le(&PaymentFee::RECEIVE_FEE_LIMIT) {
859            return Err(ReceiveError::GatewayFeeExceedsLimit);
860        }
861
862        let contract_amount = routing_info.receive_fee.subtract_from(amount.msats);
863
864        // The dust limit ensures that the incoming contract can be claimed without
865        // additional funds as the contracts amount is sufficient to cover the fees
866        if contract_amount < Amount::from_sats(5) {
867            return Err(ReceiveError::AmountTooSmall);
868        }
869
870        let expiration = duration_since_epoch()
871            .as_secs()
872            .saturating_add(u64::from(expiry_secs));
873
874        let claim_pk = recipient_static_pk
875            .mul_tweak(
876                secp256k1::SECP256K1,
877                &Scalar::from_be_bytes(ephemeral_tweak).expect("Within curve order"),
878            )
879            .expect("Tweak is valid");
880
881        let contract = IncomingContract::new(
882            self.cfg.tpe_agg_pk,
883            encryption_seed,
884            preimage,
885            PaymentImage::Hash(preimage.consensus_hash()),
886            contract_amount,
887            expiration,
888            claim_pk,
889            routing_info.module_public_key,
890            ephemeral_pk,
891        );
892
893        let invoice = self
894            .gateway_conn
895            .bolt11_invoice(
896                gateway.clone(),
897                self.federation_id,
898                contract.clone(),
899                amount,
900                description,
901                expiry_secs,
902            )
903            .await
904            .map_err(|e| ReceiveError::FailedToConnectToGateway(e.to_string()))?;
905
906        if invoice.payment_hash() != &preimage.consensus_hash() {
907            return Err(ReceiveError::InvalidInvoice);
908        }
909
910        if invoice.amount_milli_satoshis() != Some(amount.msats) {
911            return Err(ReceiveError::IncorrectInvoiceAmount);
912        }
913
914        Ok((gateway, contract, invoice))
915    }
916
917    // Receive an incoming contract locked to a public key derived from our
918    // static module public key.
919    async fn receive_incoming_contract(
920        &self,
921        sk: SecretKey,
922        contract: IncomingContract,
923        operation_meta: LightningOperationMeta,
924    ) -> Option<OperationId> {
925        let operation_id = OperationId::from_encodable(&contract.clone());
926
927        let (claim_keypair, agg_decryption_key) = self.recover_contract_keys(sk, &contract)?;
928
929        let receive_sm = LightningClientStateMachines::Receive(ReceiveStateMachine {
930            common: ReceiveSMCommon {
931                operation_id,
932                contract: contract.clone(),
933                claim_keypair,
934                agg_decryption_key,
935            },
936            state: ReceiveSMState::Pending,
937        });
938
939        // this may only fail if the operation id is already in use, in which case we
940        // ignore the error such that the method is idempotent
941        self.client_ctx
942            .manual_operation_start(
943                operation_id,
944                LightningCommonInit::KIND.as_str(),
945                operation_meta,
946                vec![self.client_ctx.make_dyn_state(receive_sm)],
947            )
948            .await
949            .ok();
950
951        Some(operation_id)
952    }
953
954    fn recover_contract_keys(
955        &self,
956        sk: SecretKey,
957        contract: &IncomingContract,
958    ) -> Option<(Keypair, AggregateDecryptionKey)> {
959        let tweak = ecdh::SharedSecret::new(&contract.commitment.ephemeral_pk, &sk);
960
961        let encryption_seed = tweak
962            .secret_bytes()
963            .consensus_hash::<sha256::Hash>()
964            .to_byte_array();
965
966        let claim_keypair = sk
967            .mul_tweak(&Scalar::from_be_bytes(tweak.secret_bytes()).expect("Within curve order"))
968            .expect("Tweak is valid")
969            .keypair(secp256k1::SECP256K1);
970
971        if claim_keypair.public_key() != contract.commitment.claim_pk {
972            return None; // The claim key is not derived from our pk
973        }
974
975        let agg_decryption_key = derive_agg_dk(&self.cfg.tpe_agg_pk, &encryption_seed);
976
977        if !contract.verify_agg_decryption_key(&self.cfg.tpe_agg_pk, &agg_decryption_key) {
978            return None; // The decryption key is not derived from our pk
979        }
980
981        contract.decrypt_preimage(&agg_decryption_key)?;
982
983        Some((claim_keypair, agg_decryption_key))
984    }
985
986    /// Subscribe to all state updates of the receive operation.
987    pub async fn subscribe_receive_operation_state_updates(
988        &self,
989        operation_id: OperationId,
990    ) -> anyhow::Result<UpdateStreamOrOutcome<ReceiveOperationState>> {
991        let operation = self.client_ctx.get_operation(operation_id).await?;
992        let mut stream = self.notifier.subscribe(operation_id).await;
993        let client_ctx = self.client_ctx.clone();
994
995        Ok(self.client_ctx.outcome_or_updates(operation, operation_id, move || {
996            stream! {
997                loop {
998                    if let Some(LightningClientStateMachines::Receive(state)) = stream.next().await {
999                        match state.state {
1000                            ReceiveSMState::Pending => yield ReceiveOperationState::Pending,
1001                            ReceiveSMState::Claiming(out_points) => {
1002                                yield ReceiveOperationState::Claiming;
1003
1004                                if client_ctx.await_primary_module_outputs(operation_id, out_points).await.is_ok() {
1005                                    yield ReceiveOperationState::Claimed;
1006                                } else {
1007                                    yield ReceiveOperationState::Failure;
1008                                }
1009                                return;
1010                            },
1011                            ReceiveSMState::Expired => {
1012                                yield ReceiveOperationState::Expired;
1013                                return;
1014                            }
1015                        }
1016                    }
1017                }
1018            }
1019        }))
1020    }
1021
1022    /// Await the final state of the receive operation.
1023    pub async fn await_final_receive_operation_state(
1024        &self,
1025        operation_id: OperationId,
1026    ) -> anyhow::Result<FinalReceiveOperationState> {
1027        let state = self
1028            .subscribe_receive_operation_state_updates(operation_id)
1029            .await?
1030            .into_stream()
1031            .filter_map(|state| {
1032                futures::future::ready(match state {
1033                    ReceiveOperationState::Expired => Some(FinalReceiveOperationState::Expired),
1034                    ReceiveOperationState::Claimed => Some(FinalReceiveOperationState::Claimed),
1035                    ReceiveOperationState::Failure => Some(FinalReceiveOperationState::Failure),
1036                    _ => None,
1037                })
1038            })
1039            .next()
1040            .await
1041            .expect("Stream contains one final state");
1042
1043        Ok(state)
1044    }
1045
1046    /// Generate an lnurl for the client. You can optionally specify a gateway
1047    /// to use for testing purposes.
1048    pub async fn generate_lnurl(
1049        &self,
1050        recurringd: SafeUrl,
1051        gateway: Option<SafeUrl>,
1052    ) -> Result<String, GenerateLnurlError> {
1053        let gateways = if let Some(gateway) = gateway {
1054            vec![gateway]
1055        } else {
1056            let gateways = self
1057                .module_api
1058                .gateways()
1059                .await
1060                .map_err(|e| GenerateLnurlError::FailedToRequestGateways(e.to_string()))?;
1061
1062            if gateways.is_empty() {
1063                return Err(GenerateLnurlError::NoGatewaysAvailable);
1064            }
1065
1066            gateways
1067        };
1068
1069        let lnurl = lnurl::generate_lnurl(
1070            recurringd,
1071            self.federation_id,
1072            self.lnurl_keypair.public_key(),
1073            self.cfg.tpe_agg_pk,
1074            gateways,
1075        )
1076        .await
1077        .map_err(|e| GenerateLnurlError::FailedToRequestLnurl(e.to_string()))?;
1078
1079        Ok(lnurl)
1080    }
1081
1082    fn spawn_receive_lnurl_task(
1083        &self,
1084        custom_meta_fn: Arc<dyn Fn() -> Value + Send + Sync>,
1085        task_group: &TaskGroup,
1086    ) {
1087        let module = self.clone();
1088
1089        task_group.spawn_cancellable("receive_lnurl_task", async move {
1090            loop {
1091                module.receive_lnurl(custom_meta_fn()).await;
1092            }
1093        });
1094    }
1095
1096    async fn receive_lnurl(&self, custom_meta: Value) {
1097        let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
1098
1099        let stream_index = dbtx
1100            .get_value(&IncomingContractStreamIndexKey)
1101            .await
1102            .unwrap_or(0);
1103
1104        let (contracts, next_index) = self
1105            .module_api
1106            .await_incoming_contracts(stream_index, 128)
1107            .await;
1108
1109        for contract in &contracts {
1110            if let Some(operation_id) = self
1111                .receive_incoming_contract(
1112                    self.lnurl_keypair.secret_key(),
1113                    contract.clone(),
1114                    LightningOperationMeta::LnurlReceive(LnurlReceiveOperationMeta {
1115                        contract: contract.clone(),
1116                        custom_meta: custom_meta.clone(),
1117                    }),
1118                )
1119                .await
1120            {
1121                self.await_final_receive_operation_state(operation_id)
1122                    .await
1123                    .ok();
1124            }
1125        }
1126
1127        dbtx.insert_entry(&IncomingContractStreamIndexKey, &next_index)
1128            .await;
1129
1130        dbtx.commit_tx().await;
1131    }
1132}
1133
1134#[derive(Error, Debug, Clone, Eq, PartialEq)]
1135pub enum SelectGatewayError {
1136    #[error("Failed to request gateways")]
1137    FailedToRequestGateways(String),
1138    #[error("No gateways are available")]
1139    NoGatewaysAvailable,
1140    #[error("All gateways failed to respond")]
1141    GatewaysUnresponsive,
1142}
1143
1144#[derive(Error, Debug, Clone, Eq, PartialEq)]
1145pub enum SendPaymentError {
1146    #[error("Invoice is missing an amount")]
1147    InvoiceMissingAmount,
1148    #[error("Invoice has expired")]
1149    InvoiceExpired,
1150    #[error("A payment for this invoice is already in progress")]
1151    PaymentInProgress(OperationId),
1152    #[error("This invoice has already been paid")]
1153    InvoiceAlreadyPaid(OperationId),
1154    #[error(transparent)]
1155    SelectGateway(SelectGatewayError),
1156    #[error("Failed to connect to gateway")]
1157    FailedToConnectToGateway(String),
1158    #[error("Gateway does not support this federation")]
1159    FederationNotSupported,
1160    #[error("Gateway fee exceeds the allowed limit")]
1161    GatewayFeeExceedsLimit,
1162    #[error("Gateway expiration time exceeds the allowed limit")]
1163    GatewayExpirationExceedsLimit,
1164    #[error("Failed to request block count")]
1165    FailedToRequestBlockCount(String),
1166    #[error("Failed to fund the payment")]
1167    FailedToFundPayment(String),
1168    #[error("Invoice is for a different currency")]
1169    WrongCurrency {
1170        invoice_currency: Currency,
1171        federation_currency: Currency,
1172    },
1173}
1174
1175#[derive(Error, Debug, Clone, Eq, PartialEq)]
1176pub enum ReceiveError {
1177    #[error(transparent)]
1178    SelectGateway(SelectGatewayError),
1179    #[error("Failed to connect to gateway")]
1180    FailedToConnectToGateway(String),
1181    #[error("Gateway does not support this federation")]
1182    FederationNotSupported,
1183    #[error("Gateway fee exceeds the allowed limit")]
1184    GatewayFeeExceedsLimit,
1185    #[error("Amount is too small to cover fees")]
1186    AmountTooSmall,
1187    #[error("Gateway returned an invalid invoice")]
1188    InvalidInvoice,
1189    #[error("Gateway returned an invoice with incorrect amount")]
1190    IncorrectInvoiceAmount,
1191}
1192
1193#[derive(Error, Debug, Clone, Eq, PartialEq)]
1194pub enum GenerateLnurlError {
1195    #[error("No gateways are available")]
1196    NoGatewaysAvailable,
1197    #[error("Failed to request gateways")]
1198    FailedToRequestGateways(String),
1199    #[error("Failed to request LNURL")]
1200    FailedToRequestLnurl(String),
1201}
1202
1203#[derive(Error, Debug, Clone, Eq, PartialEq)]
1204pub enum ListGatewaysError {
1205    #[error("Failed to request gateways")]
1206    FailedToListGateways,
1207}
1208
1209#[derive(Error, Debug, Clone, Eq, PartialEq)]
1210pub enum RoutingInfoError {
1211    #[error("Failed to request routing info")]
1212    FailedToRequestRoutingInfo,
1213}
1214
1215#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
1216pub enum LightningClientStateMachines {
1217    Send(SendStateMachine),
1218    Receive(ReceiveStateMachine),
1219}
1220
1221impl IntoDynInstance for LightningClientStateMachines {
1222    type DynType = DynState;
1223
1224    fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
1225        DynState::from_typed(instance_id, self)
1226    }
1227}
1228
1229impl State for LightningClientStateMachines {
1230    type ModuleContext = LightningClientContext;
1231
1232    fn transitions(
1233        &self,
1234        context: &Self::ModuleContext,
1235        global_context: &DynGlobalClientContext,
1236    ) -> Vec<StateTransition<Self>> {
1237        match self {
1238            LightningClientStateMachines::Send(state) => {
1239                sm_enum_variant_translation!(
1240                    state.transitions(context, global_context),
1241                    LightningClientStateMachines::Send
1242                )
1243            }
1244            LightningClientStateMachines::Receive(state) => {
1245                sm_enum_variant_translation!(
1246                    state.transitions(context, global_context),
1247                    LightningClientStateMachines::Receive
1248                )
1249            }
1250        }
1251    }
1252
1253    fn operation_id(&self) -> OperationId {
1254        match self {
1255            LightningClientStateMachines::Send(state) => state.operation_id(),
1256            LightningClientStateMachines::Receive(state) => state.operation_id(),
1257        }
1258    }
1259}