fedimint_lnv2_client/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::missing_errors_doc)]
3#![allow(clippy::missing_panics_doc)]
4#![allow(clippy::module_name_repetitions)]
5#![allow(clippy::must_use_candidate)]
6
7mod api;
8#[cfg(feature = "cli")]
9mod cli;
10mod db;
11pub mod events;
12mod receive_sm;
13mod send_sm;
14
15use std::collections::{BTreeMap, BTreeSet};
16use std::sync::Arc;
17
18use async_stream::stream;
19use bitcoin::hashes::{Hash, sha256};
20use bitcoin::secp256k1;
21use db::{DbKeyPrefix, GatewayKey, IncomingContractStreamIndexKey};
22use fedimint_api_client::api::DynModuleApi;
23use fedimint_client_module::module::init::{ClientModuleInit, ClientModuleInitArgs};
24use fedimint_client_module::module::recovery::NoModuleBackup;
25use fedimint_client_module::module::{ClientContext, ClientModule, OutPointRange};
26use fedimint_client_module::oplog::UpdateStreamOrOutcome;
27use fedimint_client_module::sm::{Context, DynState, ModuleNotifier, State, StateTransition};
28use fedimint_client_module::transaction::{
29    ClientOutput, ClientOutputBundle, ClientOutputSM, TransactionBuilder,
30};
31use fedimint_client_module::{DynGlobalClientContext, sm_enum_variant_translation};
32use fedimint_core::config::FederationId;
33use fedimint_core::core::{IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId};
34use fedimint_core::db::{DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
35use fedimint_core::encoding::{Decodable, Encodable};
36use fedimint_core::module::{
37    Amounts, ApiAuth, ApiVersion, CommonModuleInit, ModuleCommon, ModuleInit, MultiApiVersion,
38};
39use fedimint_core::secp256k1::SECP256K1;
40use fedimint_core::task::TaskGroup;
41use fedimint_core::time::duration_since_epoch;
42use fedimint_core::util::SafeUrl;
43use fedimint_core::{Amount, PeerId, apply, async_trait_maybe_send};
44use fedimint_derive_secret::{ChildId, DerivableSecret};
45use fedimint_lnv2_common::config::LightningClientConfig;
46use fedimint_lnv2_common::contracts::{IncomingContract, OutgoingContract, PaymentImage};
47use fedimint_lnv2_common::gateway_api::{
48    GatewayConnection, PaymentFee, RealGatewayConnection, RoutingInfo,
49};
50use fedimint_lnv2_common::{
51    Bolt11InvoiceDescription, GatewayApi, KIND, LightningCommonInit, LightningInvoice,
52    LightningModuleTypes, LightningOutput, LightningOutputV0, MINIMUM_INCOMING_CONTRACT_AMOUNT,
53    lnurl, tweak,
54};
55use futures::StreamExt;
56use lightning_invoice::{Bolt11Invoice, Currency};
57use secp256k1::{Keypair, PublicKey, Scalar, SecretKey, ecdh};
58use serde::{Deserialize, Serialize};
59use serde_json::Value;
60use strum::IntoEnumIterator as _;
61use thiserror::Error;
62use tpe::{AggregateDecryptionKey, derive_agg_dk};
63use tracing::warn;
64
65use crate::api::LightningFederationApi;
66use crate::events::SendPaymentEvent;
67use crate::receive_sm::{ReceiveSMCommon, ReceiveSMState, ReceiveStateMachine};
68use crate::send_sm::{SendSMCommon, SendSMState, SendStateMachine};
69
70/// Number of blocks until outgoing lightning contracts times out and user
71/// client can refund it unilaterally
72const EXPIRATION_DELTA_LIMIT: u64 = 1440;
73
74/// A two hour buffer in case either the client or gateway go offline
75const CONTRACT_CONFIRMATION_BUFFER: u64 = 12;
76
77#[allow(clippy::large_enum_variant)]
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub enum LightningOperationMeta {
80    Send(SendOperationMeta),
81    Receive(ReceiveOperationMeta),
82    LnurlReceive(LnurlReceiveOperationMeta),
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct SendOperationMeta {
87    pub change_outpoint_range: OutPointRange,
88    pub gateway: SafeUrl,
89    pub contract: OutgoingContract,
90    pub invoice: LightningInvoice,
91    pub custom_meta: Value,
92}
93
94impl SendOperationMeta {
95    /// Calculate the absolute fee paid to the gateway on success.
96    pub fn gateway_fee(&self) -> Amount {
97        match &self.invoice {
98            LightningInvoice::Bolt11(invoice) => self.contract.amount.saturating_sub(
99                Amount::from_msats(invoice.amount_milli_satoshis().expect("Invoice has amount")),
100            ),
101        }
102    }
103}
104
105#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct ReceiveOperationMeta {
107    pub gateway: SafeUrl,
108    pub contract: IncomingContract,
109    pub invoice: LightningInvoice,
110    pub custom_meta: Value,
111}
112
113impl ReceiveOperationMeta {
114    /// Calculate the absolute fee paid to the gateway on success.
115    pub fn gateway_fee(&self) -> Amount {
116        match &self.invoice {
117            LightningInvoice::Bolt11(invoice) => {
118                Amount::from_msats(invoice.amount_milli_satoshis().expect("Invoice has amount"))
119                    .saturating_sub(self.contract.commitment.amount)
120            }
121        }
122    }
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct LnurlReceiveOperationMeta {
127    pub contract: IncomingContract,
128    pub custom_meta: Value,
129}
130
131#[cfg_attr(doc, aquamarine::aquamarine)]
132/// The state of an operation sending a payment over lightning.
133///
134/// ```mermaid
135/// graph LR
136/// classDef virtual fill:#fff,stroke-dasharray: 5 5
137///
138///     Funding -- funding transaction is rejected --> Rejected
139///     Funding -- funding transaction is accepted --> Funded
140///     Funded -- payment is confirmed  --> Success
141///     Funded -- payment attempt expires --> Refunding
142///     Funded -- gateway cancels payment attempt --> Refunding
143///     Refunding -- payment is confirmed --> Success
144///     Refunding -- ecash is minted --> Refunded
145///     Refunding -- minting ecash fails --> Failure
146/// ```
147/// The transition from Refunding to Success is only possible if the gateway
148/// misbehaves.
149#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
150pub enum SendOperationState {
151    /// We are funding the contract to incentivize the gateway.
152    Funding,
153    /// We are waiting for the gateway to complete the payment.
154    Funded,
155    /// The payment was successful.
156    Success([u8; 32]),
157    /// The payment has failed and we are refunding the contract.
158    Refunding,
159    /// The payment has been refunded.
160    Refunded,
161    /// Either a programming error has occurred or the federation is malicious.
162    Failure,
163}
164
165/// The final state of an operation sending a payment over lightning.
166#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
167pub enum FinalSendOperationState {
168    /// The payment was successful.
169    Success,
170    /// The payment has been refunded.
171    Refunded,
172    /// Either a programming error has occurred or the federation is malicious.
173    Failure,
174}
175
176pub type SendResult = Result<OperationId, SendPaymentError>;
177
178#[cfg_attr(doc, aquamarine::aquamarine)]
179/// The state of an operation receiving a payment over lightning.
180///
181/// ```mermaid
182/// graph LR
183/// classDef virtual fill:#fff,stroke-dasharray: 5 5
184///
185///     Pending -- payment is confirmed --> Claiming
186///     Pending -- invoice expires --> Expired
187///     Claiming -- ecash is minted --> Claimed
188///     Claiming -- minting ecash fails --> Failure
189/// ```
190#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
191pub enum ReceiveOperationState {
192    /// We are waiting for the payment.
193    Pending,
194    /// The payment request has expired.
195    Expired,
196    /// The payment has been confirmed and we are issuing the ecash.
197    Claiming,
198    /// The payment has been successful.
199    Claimed,
200    /// Either a programming error has occurred or the federation is malicious.
201    Failure,
202}
203
204/// The final state of an operation receiving a payment over lightning.
205#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
206pub enum FinalReceiveOperationState {
207    /// The payment request has expired.
208    Expired,
209    /// The payment has been successful.
210    Claimed,
211    /// Either a programming error has occurred or the federation is malicious.
212    Failure,
213}
214
215pub type ReceiveResult = Result<(Bolt11Invoice, OperationId), ReceiveError>;
216
217#[derive(Clone)]
218pub struct LightningClientInit {
219    pub gateway_conn: Option<Arc<dyn GatewayConnection + Send + Sync>>,
220    pub custom_meta_fn: Arc<dyn Fn() -> Value + Send + Sync>,
221}
222
223impl std::fmt::Debug for LightningClientInit {
224    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
225        f.debug_struct("LightningClientInit")
226            .field("gateway_conn", &self.gateway_conn)
227            .field("custom_meta_fn", &"<function>")
228            .finish()
229    }
230}
231
232impl Default for LightningClientInit {
233    fn default() -> Self {
234        LightningClientInit {
235            gateway_conn: None,
236            custom_meta_fn: Arc::new(|| Value::Null),
237        }
238    }
239}
240
241impl ModuleInit for LightningClientInit {
242    type Common = LightningCommonInit;
243
244    async fn dump_database(
245        &self,
246        _dbtx: &mut DatabaseTransaction<'_>,
247        _prefix_names: Vec<String>,
248    ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
249        Box::new(BTreeMap::new().into_iter())
250    }
251}
252
253#[apply(async_trait_maybe_send!)]
254impl ClientModuleInit for LightningClientInit {
255    type Module = LightningClientModule;
256
257    fn supported_api_versions(&self) -> MultiApiVersion {
258        MultiApiVersion::try_from_iter([ApiVersion { major: 0, minor: 0 }])
259            .expect("no version conflicts")
260    }
261
262    async fn init(&self, args: &ClientModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
263        let gateway_conn = if let Some(gateway_conn) = self.gateway_conn.clone() {
264            gateway_conn
265        } else {
266            let api = GatewayApi::new(None, args.connector_registry.clone());
267            Arc::new(RealGatewayConnection { api })
268        };
269        Ok(LightningClientModule::new(
270            *args.federation_id(),
271            args.cfg().clone(),
272            args.notifier().clone(),
273            args.context(),
274            args.module_api().clone(),
275            args.module_root_secret(),
276            gateway_conn,
277            self.custom_meta_fn.clone(),
278            args.admin_auth().cloned(),
279            args.task_group(),
280        ))
281    }
282
283    fn used_db_prefixes(&self) -> Option<BTreeSet<u8>> {
284        Some(
285            DbKeyPrefix::iter()
286                .map(|p| p as u8)
287                .chain(
288                    DbKeyPrefix::ExternalReservedStart as u8
289                        ..=DbKeyPrefix::CoreInternalReservedEnd as u8,
290                )
291                .collect(),
292        )
293    }
294}
295
296#[derive(Debug, Clone)]
297pub struct LightningClientContext {
298    federation_id: FederationId,
299    gateway_conn: Arc<dyn GatewayConnection + Send + Sync>,
300    pub(crate) client_ctx: ClientContext<LightningClientModule>,
301}
302
303impl Context for LightningClientContext {
304    const KIND: Option<ModuleKind> = Some(KIND);
305}
306
307#[derive(Debug, Clone)]
308pub struct LightningClientModule {
309    federation_id: FederationId,
310    cfg: LightningClientConfig,
311    notifier: ModuleNotifier<LightningClientStateMachines>,
312    client_ctx: ClientContext<Self>,
313    module_api: DynModuleApi,
314    keypair: Keypair,
315    lnurl_keypair: Keypair,
316    gateway_conn: Arc<dyn GatewayConnection + Send + Sync>,
317    #[allow(unused)] // The field is only used by the cli feature
318    admin_auth: Option<ApiAuth>,
319}
320
321#[apply(async_trait_maybe_send!)]
322impl ClientModule for LightningClientModule {
323    type Init = LightningClientInit;
324    type Common = LightningModuleTypes;
325    type Backup = NoModuleBackup;
326    type ModuleStateMachineContext = LightningClientContext;
327    type States = LightningClientStateMachines;
328
329    fn context(&self) -> Self::ModuleStateMachineContext {
330        LightningClientContext {
331            federation_id: self.federation_id,
332            gateway_conn: self.gateway_conn.clone(),
333            client_ctx: self.client_ctx.clone(),
334        }
335    }
336
337    fn input_fee(
338        &self,
339        amounts: &Amounts,
340        _input: &<Self::Common as ModuleCommon>::Input,
341    ) -> Option<Amounts> {
342        Some(Amounts::new_bitcoin(
343            self.cfg.fee_consensus.fee(amounts.expect_only_bitcoin()),
344        ))
345    }
346
347    fn output_fee(
348        &self,
349        amounts: &Amounts,
350        _output: &<Self::Common as ModuleCommon>::Output,
351    ) -> Option<Amounts> {
352        Some(Amounts::new_bitcoin(
353            self.cfg.fee_consensus.fee(amounts.expect_only_bitcoin()),
354        ))
355    }
356
357    #[cfg(feature = "cli")]
358    async fn handle_cli_command(
359        &self,
360        args: &[std::ffi::OsString],
361    ) -> anyhow::Result<serde_json::Value> {
362        cli::handle_cli_command(self, args).await
363    }
364}
365
366impl LightningClientModule {
367    #[allow(clippy::too_many_arguments)]
368    fn new(
369        federation_id: FederationId,
370        cfg: LightningClientConfig,
371        notifier: ModuleNotifier<LightningClientStateMachines>,
372        client_ctx: ClientContext<Self>,
373        module_api: DynModuleApi,
374        module_root_secret: &DerivableSecret,
375        gateway_conn: Arc<dyn GatewayConnection + Send + Sync>,
376        custom_meta_fn: Arc<dyn Fn() -> Value + Send + Sync>,
377        admin_auth: Option<ApiAuth>,
378        task_group: &TaskGroup,
379    ) -> Self {
380        let module = Self {
381            federation_id,
382            cfg,
383            notifier,
384            client_ctx,
385            module_api,
386            keypair: module_root_secret
387                .child_key(ChildId(0))
388                .to_secp_key(SECP256K1),
389            lnurl_keypair: module_root_secret
390                .child_key(ChildId(1))
391                .to_secp_key(SECP256K1),
392            gateway_conn,
393            admin_auth,
394        };
395
396        module.spawn_receive_lnurl_task(custom_meta_fn, task_group);
397
398        module.spawn_gateway_map_update_task(task_group);
399
400        module
401    }
402
403    fn spawn_gateway_map_update_task(&self, task_group: &TaskGroup) {
404        let module = self.clone();
405        let api = self.module_api.clone();
406
407        task_group.spawn_cancellable("gateway_map_update_task", async move {
408            api.wait_for_initialized_connections().await;
409            module.update_gateway_map().await;
410        });
411    }
412
413    async fn update_gateway_map(&self) {
414        // Update the mapping from lightning node public keys to gateway api
415        // endpoints maintained in the module database. When paying an invoice this
416        // enables the client to select the gateway that has created the invoice,
417        // if possible, such that the payment does not go over lightning, reducing
418        // fees and latency.
419
420        if let Ok(gateways) = self.module_api.gateways().await {
421            let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
422
423            for gateway in gateways {
424                if let Ok(Some(routing_info)) = self
425                    .gateway_conn
426                    .routing_info(gateway.clone(), &self.federation_id)
427                    .await
428                {
429                    dbtx.insert_entry(&GatewayKey(routing_info.lightning_public_key), &gateway)
430                        .await;
431                }
432            }
433
434            if let Err(e) = dbtx.commit_tx_result().await {
435                warn!("Failed to commit the updated gateway mapping to the database: {e}");
436            }
437        }
438    }
439
440    /// Selects an available gateway by querying the federation's registered
441    /// gateways, checking if one of them match the invoice's payee public
442    /// key, then queries the gateway for `RoutingInfo` to determine if it is
443    /// online.
444    pub async fn select_gateway(
445        &self,
446        invoice: Option<Bolt11Invoice>,
447    ) -> Result<(SafeUrl, RoutingInfo), SelectGatewayError> {
448        let gateways = self
449            .module_api
450            .gateways()
451            .await
452            .map_err(|e| SelectGatewayError::FailedToRequestGateways(e.to_string()))?;
453
454        if gateways.is_empty() {
455            return Err(SelectGatewayError::NoGatewaysAvailable);
456        }
457
458        if let Some(invoice) = invoice
459            && let Some(gateway) = self
460                .client_ctx
461                .module_db()
462                .begin_transaction_nc()
463                .await
464                .get_value(&GatewayKey(invoice.recover_payee_pub_key()))
465                .await
466                .filter(|gateway| gateways.contains(gateway))
467            && let Ok(Some(routing_info)) = self.routing_info(&gateway).await
468        {
469            return Ok((gateway, routing_info));
470        }
471
472        for gateway in gateways {
473            if let Ok(Some(routing_info)) = self.routing_info(&gateway).await {
474                return Ok((gateway, routing_info));
475            }
476        }
477
478        Err(SelectGatewayError::GatewaysUnresponsive)
479    }
480
481    /// Sends a request to each peer for their registered gateway list and
482    /// returns a `Vec<SafeUrl` of all registered gateways to the client.
483    pub async fn list_gateways(
484        &self,
485        peer: Option<PeerId>,
486    ) -> Result<Vec<SafeUrl>, ListGatewaysError> {
487        if let Some(peer) = peer {
488            self.module_api
489                .gateways_from_peer(peer)
490                .await
491                .map_err(|_| ListGatewaysError::FailedToListGateways)
492        } else {
493            self.module_api
494                .gateways()
495                .await
496                .map_err(|_| ListGatewaysError::FailedToListGateways)
497        }
498    }
499
500    /// Requests the `RoutingInfo`, including fee information, from the gateway
501    /// available at the `SafeUrl`.
502    pub async fn routing_info(
503        &self,
504        gateway: &SafeUrl,
505    ) -> Result<Option<RoutingInfo>, RoutingInfoError> {
506        self.gateway_conn
507            .routing_info(gateway.clone(), &self.federation_id)
508            .await
509            .map_err(|_| RoutingInfoError::FailedToRequestRoutingInfo)
510    }
511
512    /// Pay an invoice. For testing you can optionally specify a gateway to
513    /// route with, otherwise a gateway will be selected automatically. If the
514    /// invoice was created by a gateway connected to our federation, the same
515    /// gateway will be selected to allow for a direct ecash swap. Otherwise we
516    /// select a random online gateway.
517    ///
518    /// The fee for this payment may depend on the selected gateway but
519    /// will be limited to one and a half percent plus one hundred satoshis.
520    /// This fee accounts for the fee charged by the gateway as well as
521    /// the additional fee required to reliably route this payment over
522    /// lightning if necessary. Since the gateway has been vetted by at least
523    /// one guardian we trust it to set a reasonable fee and only enforce a
524    /// rather high limit.
525    ///
526    /// The absolute fee for a payment can be calculated from the operation meta
527    /// to be shown to the user in the transaction history.
528    #[allow(clippy::too_many_lines)]
529    pub async fn send(
530        &self,
531        invoice: Bolt11Invoice,
532        gateway: Option<SafeUrl>,
533        custom_meta: Value,
534    ) -> Result<OperationId, SendPaymentError> {
535        let amount = invoice
536            .amount_milli_satoshis()
537            .ok_or(SendPaymentError::InvoiceMissingAmount)?;
538
539        if invoice.is_expired() {
540            return Err(SendPaymentError::InvoiceExpired);
541        }
542
543        if self.cfg.network != invoice.currency().into() {
544            return Err(SendPaymentError::WrongCurrency {
545                invoice_currency: invoice.currency(),
546                federation_currency: self.cfg.network.into(),
547            });
548        }
549
550        let operation_id = self.get_next_operation_id(&invoice).await?;
551
552        let (ephemeral_tweak, ephemeral_pk) = tweak::generate(self.keypair.public_key());
553
554        let refund_keypair = SecretKey::from_slice(&ephemeral_tweak)
555            .expect("32 bytes, within curve order")
556            .keypair(secp256k1::SECP256K1);
557
558        let (gateway_api, routing_info) = match gateway {
559            Some(gateway_api) => (
560                gateway_api.clone(),
561                self.routing_info(&gateway_api)
562                    .await
563                    .map_err(|e| SendPaymentError::FailedToConnectToGateway(e.to_string()))?
564                    .ok_or(SendPaymentError::FederationNotSupported)?,
565            ),
566            None => self
567                .select_gateway(Some(invoice.clone()))
568                .await
569                .map_err(SendPaymentError::SelectGateway)?,
570        };
571
572        let (send_fee, expiration_delta) = routing_info.send_parameters(&invoice);
573
574        if !send_fee.le(&PaymentFee::SEND_FEE_LIMIT) {
575            return Err(SendPaymentError::GatewayFeeExceedsLimit);
576        }
577
578        if EXPIRATION_DELTA_LIMIT < expiration_delta {
579            return Err(SendPaymentError::GatewayExpirationExceedsLimit);
580        }
581
582        let consensus_block_count = self
583            .module_api
584            .consensus_block_count()
585            .await
586            .map_err(|e| SendPaymentError::FailedToRequestBlockCount(e.to_string()))?;
587
588        let contract = OutgoingContract {
589            payment_image: PaymentImage::Hash(*invoice.payment_hash()),
590            amount: send_fee.add_to(amount),
591            expiration: consensus_block_count + expiration_delta + CONTRACT_CONFIRMATION_BUFFER,
592            claim_pk: routing_info.module_public_key,
593            refund_pk: refund_keypair.public_key(),
594            ephemeral_pk,
595        };
596
597        let contract_clone = contract.clone();
598        let gateway_api_clone = gateway_api.clone();
599        let invoice_clone = invoice.clone();
600
601        let client_output = ClientOutput::<LightningOutput> {
602            output: LightningOutput::V0(LightningOutputV0::Outgoing(contract.clone())),
603            amounts: Amounts::new_bitcoin(contract.amount),
604        };
605
606        let client_output_sm = ClientOutputSM::<LightningClientStateMachines> {
607            state_machines: Arc::new(move |range: OutPointRange| {
608                vec![LightningClientStateMachines::Send(SendStateMachine {
609                    common: SendSMCommon {
610                        operation_id,
611                        outpoint: range.into_iter().next().unwrap(),
612                        contract: contract_clone.clone(),
613                        gateway_api: Some(gateway_api_clone.clone()),
614                        invoice: Some(LightningInvoice::Bolt11(invoice_clone.clone())),
615                        refund_keypair,
616                    },
617                    state: SendSMState::Funding,
618                })]
619            }),
620        };
621
622        let client_output = self.client_ctx.make_client_outputs(ClientOutputBundle::new(
623            vec![client_output],
624            vec![client_output_sm],
625        ));
626
627        let transaction = TransactionBuilder::new().with_outputs(client_output);
628
629        self.client_ctx
630            .finalize_and_submit_transaction(
631                operation_id,
632                LightningCommonInit::KIND.as_str(),
633                move |change_outpoint_range| {
634                    LightningOperationMeta::Send(SendOperationMeta {
635                        change_outpoint_range,
636                        gateway: gateway_api.clone(),
637                        contract: contract.clone(),
638                        invoice: LightningInvoice::Bolt11(invoice.clone()),
639                        custom_meta: custom_meta.clone(),
640                    })
641                },
642                transaction,
643            )
644            .await
645            .map_err(|e| SendPaymentError::FailedToFundPayment(e.to_string()))?;
646
647        let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
648
649        self.client_ctx
650            .log_event(
651                &mut dbtx,
652                SendPaymentEvent {
653                    operation_id,
654                    amount: send_fee.add_to(amount),
655                    fee: Some(send_fee.fee(amount)),
656                },
657            )
658            .await;
659
660        dbtx.commit_tx().await;
661
662        Ok(operation_id)
663    }
664
665    async fn get_next_operation_id(
666        &self,
667        invoice: &Bolt11Invoice,
668    ) -> Result<OperationId, SendPaymentError> {
669        for payment_attempt in 0..u64::MAX {
670            let operation_id = OperationId::from_encodable(&(invoice.clone(), payment_attempt));
671
672            if !self.client_ctx.operation_exists(operation_id).await {
673                return Ok(operation_id);
674            }
675
676            if self.client_ctx.has_active_states(operation_id).await {
677                return Err(SendPaymentError::PaymentInProgress(operation_id));
678            }
679
680            let mut stream = self
681                .subscribe_send_operation_state_updates(operation_id)
682                .await
683                .expect("operation_id exists")
684                .into_stream();
685
686            // This will not block since we checked for active states and there were none,
687            // so by definition a final state has to have been assumed already.
688            while let Some(state) = stream.next().await {
689                if let SendOperationState::Success(_) = state {
690                    return Err(SendPaymentError::InvoiceAlreadyPaid(operation_id));
691                }
692            }
693        }
694
695        panic!("We could not find an unused operation id for sending a lightning payment");
696    }
697
698    /// Subscribe to all state updates of the send operation.
699    pub async fn subscribe_send_operation_state_updates(
700        &self,
701        operation_id: OperationId,
702    ) -> anyhow::Result<UpdateStreamOrOutcome<SendOperationState>> {
703        let operation = self.client_ctx.get_operation(operation_id).await?;
704        let mut stream = self.notifier.subscribe(operation_id).await;
705        let client_ctx = self.client_ctx.clone();
706        let module_api = self.module_api.clone();
707
708        Ok(self.client_ctx.outcome_or_updates(operation, operation_id, move || {
709            stream! {
710                loop {
711                    if let Some(LightningClientStateMachines::Send(state)) = stream.next().await {
712                        match state.state {
713                            SendSMState::Funding => yield SendOperationState::Funding,
714                            SendSMState::Funded => yield SendOperationState::Funded,
715                            SendSMState::Success(preimage) => {
716                                // the preimage has been verified by the state machine previously
717                                assert!(state.common.contract.verify_preimage(&preimage));
718
719                                yield SendOperationState::Success(preimage);
720                                return;
721                            },
722                            SendSMState::Refunding(out_points) => {
723                                yield SendOperationState::Refunding;
724
725                                if client_ctx.await_primary_module_outputs(operation_id, out_points.clone()).await.is_ok() {
726                                    yield SendOperationState::Refunded;
727                                    return;
728                                }
729
730                                // The gateway may have incorrectly claimed the outgoing contract thereby causing
731                                // our refund transaction to be rejected. Therefore, we check one last time if
732                                // the preimage is available before we enter the failure state.
733                                if let Some(preimage) = module_api.await_preimage(
734                                    state.common.outpoint,
735                                    0
736                                ).await
737                                    && state.common.contract.verify_preimage(&preimage) {
738                                        yield SendOperationState::Success(preimage);
739                                        return;
740                                    }
741
742                                yield SendOperationState::Failure;
743                                return;
744                            },
745                            SendSMState::Rejected(..) => {
746                                yield SendOperationState::Failure;
747                                return;
748                            },
749                        }
750                    }
751                }
752            }
753        }))
754    }
755
756    /// Await the final state of the send operation.
757    pub async fn await_final_send_operation_state(
758        &self,
759        operation_id: OperationId,
760    ) -> anyhow::Result<FinalSendOperationState> {
761        let state = self
762            .subscribe_send_operation_state_updates(operation_id)
763            .await?
764            .into_stream()
765            .filter_map(|state| {
766                futures::future::ready(match state {
767                    SendOperationState::Success(_) => Some(FinalSendOperationState::Success),
768                    SendOperationState::Refunded => Some(FinalSendOperationState::Refunded),
769                    SendOperationState::Failure => Some(FinalSendOperationState::Failure),
770                    _ => None,
771                })
772            })
773            .next()
774            .await
775            .expect("Stream contains one final state");
776
777        Ok(state)
778    }
779
780    /// Request an invoice. For testing you can optionally specify a gateway to
781    /// generate the invoice, otherwise a random online gateway will be selected
782    /// automatically.
783    ///
784    /// The total fee for this payment may depend on the chosen gateway but
785    /// will be limited to half of one percent plus fifty satoshis. Since the
786    /// selected gateway has been vetted by at least one guardian we trust it to
787    /// set a reasonable fee and only enforce a rather high limit.
788    ///
789    /// The absolute fee for a payment can be calculated from the operation meta
790    /// to be shown to the user in the transaction history.
791    pub async fn receive(
792        &self,
793        amount: Amount,
794        expiry_secs: u32,
795        description: Bolt11InvoiceDescription,
796        gateway: Option<SafeUrl>,
797        custom_meta: Value,
798    ) -> Result<(Bolt11Invoice, OperationId), ReceiveError> {
799        let (gateway, contract, invoice) = self
800            .create_contract_and_fetch_invoice(
801                self.keypair.public_key(),
802                amount,
803                expiry_secs,
804                description,
805                gateway,
806            )
807            .await?;
808
809        let operation_id = self
810            .receive_incoming_contract(
811                self.keypair.secret_key(),
812                contract.clone(),
813                LightningOperationMeta::Receive(ReceiveOperationMeta {
814                    gateway,
815                    contract,
816                    invoice: LightningInvoice::Bolt11(invoice.clone()),
817                    custom_meta,
818                }),
819            )
820            .await
821            .expect("The contract has been generated with our public key");
822
823        Ok((invoice, operation_id))
824    }
825
826    /// Create an incoming contract locked to a public key derived from the
827    /// recipient's static module public key and fetches the corresponding
828    /// invoice.
829    async fn create_contract_and_fetch_invoice(
830        &self,
831        recipient_static_pk: PublicKey,
832        amount: Amount,
833        expiry_secs: u32,
834        description: Bolt11InvoiceDescription,
835        gateway: Option<SafeUrl>,
836    ) -> Result<(SafeUrl, IncomingContract, Bolt11Invoice), ReceiveError> {
837        let (ephemeral_tweak, ephemeral_pk) = tweak::generate(recipient_static_pk);
838
839        let encryption_seed = ephemeral_tweak
840            .consensus_hash::<sha256::Hash>()
841            .to_byte_array();
842
843        let preimage = encryption_seed
844            .consensus_hash::<sha256::Hash>()
845            .to_byte_array();
846
847        let (gateway, routing_info) = match gateway {
848            Some(gateway) => (
849                gateway.clone(),
850                self.routing_info(&gateway)
851                    .await
852                    .map_err(|e| ReceiveError::FailedToConnectToGateway(e.to_string()))?
853                    .ok_or(ReceiveError::FederationNotSupported)?,
854            ),
855            None => self
856                .select_gateway(None)
857                .await
858                .map_err(ReceiveError::SelectGateway)?,
859        };
860
861        if !routing_info.receive_fee.le(&PaymentFee::RECEIVE_FEE_LIMIT) {
862            return Err(ReceiveError::GatewayFeeExceedsLimit);
863        }
864
865        let contract_amount = routing_info.receive_fee.subtract_from(amount.msats);
866
867        if contract_amount < MINIMUM_INCOMING_CONTRACT_AMOUNT {
868            return Err(ReceiveError::AmountTooSmall);
869        }
870
871        let expiration = duration_since_epoch()
872            .as_secs()
873            .saturating_add(u64::from(expiry_secs));
874
875        let claim_pk = recipient_static_pk
876            .mul_tweak(
877                secp256k1::SECP256K1,
878                &Scalar::from_be_bytes(ephemeral_tweak).expect("Within curve order"),
879            )
880            .expect("Tweak is valid");
881
882        let contract = IncomingContract::new(
883            self.cfg.tpe_agg_pk,
884            encryption_seed,
885            preimage,
886            PaymentImage::Hash(preimage.consensus_hash()),
887            contract_amount,
888            expiration,
889            claim_pk,
890            routing_info.module_public_key,
891            ephemeral_pk,
892        );
893
894        let invoice = self
895            .gateway_conn
896            .bolt11_invoice(
897                gateway.clone(),
898                self.federation_id,
899                contract.clone(),
900                amount,
901                description,
902                expiry_secs,
903            )
904            .await
905            .map_err(|e| ReceiveError::FailedToConnectToGateway(e.to_string()))?;
906
907        if invoice.payment_hash() != &preimage.consensus_hash() {
908            return Err(ReceiveError::InvalidInvoice);
909        }
910
911        if invoice.amount_milli_satoshis() != Some(amount.msats) {
912            return Err(ReceiveError::IncorrectInvoiceAmount);
913        }
914
915        Ok((gateway, contract, invoice))
916    }
917
918    // Receive an incoming contract locked to a public key derived from our
919    // static module public key.
920    async fn receive_incoming_contract(
921        &self,
922        sk: SecretKey,
923        contract: IncomingContract,
924        operation_meta: LightningOperationMeta,
925    ) -> Option<OperationId> {
926        let operation_id = OperationId::from_encodable(&contract.clone());
927
928        let (claim_keypair, agg_decryption_key) = self.recover_contract_keys(sk, &contract)?;
929
930        let receive_sm = LightningClientStateMachines::Receive(ReceiveStateMachine {
931            common: ReceiveSMCommon {
932                operation_id,
933                contract: contract.clone(),
934                claim_keypair,
935                agg_decryption_key,
936            },
937            state: ReceiveSMState::Pending,
938        });
939
940        // this may only fail if the operation id is already in use, in which case we
941        // ignore the error such that the method is idempotent
942        self.client_ctx
943            .manual_operation_start(
944                operation_id,
945                LightningCommonInit::KIND.as_str(),
946                operation_meta,
947                vec![self.client_ctx.make_dyn_state(receive_sm)],
948            )
949            .await
950            .ok();
951
952        Some(operation_id)
953    }
954
955    fn recover_contract_keys(
956        &self,
957        sk: SecretKey,
958        contract: &IncomingContract,
959    ) -> Option<(Keypair, AggregateDecryptionKey)> {
960        let tweak = ecdh::SharedSecret::new(&contract.commitment.ephemeral_pk, &sk);
961
962        let encryption_seed = tweak
963            .secret_bytes()
964            .consensus_hash::<sha256::Hash>()
965            .to_byte_array();
966
967        let claim_keypair = sk
968            .mul_tweak(&Scalar::from_be_bytes(tweak.secret_bytes()).expect("Within curve order"))
969            .expect("Tweak is valid")
970            .keypair(secp256k1::SECP256K1);
971
972        if claim_keypair.public_key() != contract.commitment.claim_pk {
973            return None; // The claim key is not derived from our pk
974        }
975
976        let agg_decryption_key = derive_agg_dk(&self.cfg.tpe_agg_pk, &encryption_seed);
977
978        if !contract.verify_agg_decryption_key(&self.cfg.tpe_agg_pk, &agg_decryption_key) {
979            return None; // The decryption key is not derived from our pk
980        }
981
982        contract.decrypt_preimage(&agg_decryption_key)?;
983
984        Some((claim_keypair, agg_decryption_key))
985    }
986
987    /// Subscribe to all state updates of the receive operation.
988    pub async fn subscribe_receive_operation_state_updates(
989        &self,
990        operation_id: OperationId,
991    ) -> anyhow::Result<UpdateStreamOrOutcome<ReceiveOperationState>> {
992        let operation = self.client_ctx.get_operation(operation_id).await?;
993        let mut stream = self.notifier.subscribe(operation_id).await;
994        let client_ctx = self.client_ctx.clone();
995
996        Ok(self.client_ctx.outcome_or_updates(operation, operation_id, move || {
997            stream! {
998                loop {
999                    if let Some(LightningClientStateMachines::Receive(state)) = stream.next().await {
1000                        match state.state {
1001                            ReceiveSMState::Pending => yield ReceiveOperationState::Pending,
1002                            ReceiveSMState::Claiming(out_points) => {
1003                                yield ReceiveOperationState::Claiming;
1004
1005                                if client_ctx.await_primary_module_outputs(operation_id, out_points).await.is_ok() {
1006                                    yield ReceiveOperationState::Claimed;
1007                                } else {
1008                                    yield ReceiveOperationState::Failure;
1009                                }
1010                                return;
1011                            },
1012                            ReceiveSMState::Expired => {
1013                                yield ReceiveOperationState::Expired;
1014                                return;
1015                            }
1016                        }
1017                    }
1018                }
1019            }
1020        }))
1021    }
1022
1023    /// Await the final state of the receive operation.
1024    pub async fn await_final_receive_operation_state(
1025        &self,
1026        operation_id: OperationId,
1027    ) -> anyhow::Result<FinalReceiveOperationState> {
1028        let state = self
1029            .subscribe_receive_operation_state_updates(operation_id)
1030            .await?
1031            .into_stream()
1032            .filter_map(|state| {
1033                futures::future::ready(match state {
1034                    ReceiveOperationState::Expired => Some(FinalReceiveOperationState::Expired),
1035                    ReceiveOperationState::Claimed => Some(FinalReceiveOperationState::Claimed),
1036                    ReceiveOperationState::Failure => Some(FinalReceiveOperationState::Failure),
1037                    _ => None,
1038                })
1039            })
1040            .next()
1041            .await
1042            .expect("Stream contains one final state");
1043
1044        Ok(state)
1045    }
1046
1047    /// Generate an lnurl for the client. You can optionally specify a gateway
1048    /// to use for testing purposes.
1049    pub async fn generate_lnurl(
1050        &self,
1051        recurringd: SafeUrl,
1052        gateway: Option<SafeUrl>,
1053    ) -> Result<String, GenerateLnurlError> {
1054        let gateways = if let Some(gateway) = gateway {
1055            vec![gateway]
1056        } else {
1057            let gateways = self
1058                .module_api
1059                .gateways()
1060                .await
1061                .map_err(|e| GenerateLnurlError::FailedToRequestGateways(e.to_string()))?;
1062
1063            if gateways.is_empty() {
1064                return Err(GenerateLnurlError::NoGatewaysAvailable);
1065            }
1066
1067            gateways
1068        };
1069
1070        let lnurl = lnurl::generate_lnurl(
1071            &recurringd,
1072            self.federation_id,
1073            self.lnurl_keypair.public_key(),
1074            self.cfg.tpe_agg_pk,
1075            gateways,
1076        );
1077
1078        Ok(lnurl)
1079    }
1080
1081    fn spawn_receive_lnurl_task(
1082        &self,
1083        custom_meta_fn: Arc<dyn Fn() -> Value + Send + Sync>,
1084        task_group: &TaskGroup,
1085    ) {
1086        let module = self.clone();
1087        let api = self.module_api.clone();
1088
1089        task_group.spawn_cancellable("receive_lnurl_task", async move {
1090            api.wait_for_initialized_connections().await;
1091            loop {
1092                module.receive_lnurl(custom_meta_fn()).await;
1093            }
1094        });
1095    }
1096
1097    async fn receive_lnurl(&self, custom_meta: Value) {
1098        let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
1099
1100        let stream_index = dbtx
1101            .get_value(&IncomingContractStreamIndexKey)
1102            .await
1103            .unwrap_or(0);
1104
1105        let (contracts, next_index) = self
1106            .module_api
1107            .await_incoming_contracts(stream_index, 128)
1108            .await;
1109
1110        for contract in &contracts {
1111            if let Some(operation_id) = self
1112                .receive_incoming_contract(
1113                    self.lnurl_keypair.secret_key(),
1114                    contract.clone(),
1115                    LightningOperationMeta::LnurlReceive(LnurlReceiveOperationMeta {
1116                        contract: contract.clone(),
1117                        custom_meta: custom_meta.clone(),
1118                    }),
1119                )
1120                .await
1121            {
1122                self.await_final_receive_operation_state(operation_id)
1123                    .await
1124                    .ok();
1125            }
1126        }
1127
1128        dbtx.insert_entry(&IncomingContractStreamIndexKey, &next_index)
1129            .await;
1130
1131        dbtx.commit_tx().await;
1132    }
1133}
1134
1135#[derive(Error, Debug, Clone, Eq, PartialEq)]
1136pub enum SelectGatewayError {
1137    #[error("Failed to request gateways")]
1138    FailedToRequestGateways(String),
1139    #[error("No gateways are available")]
1140    NoGatewaysAvailable,
1141    #[error("All gateways failed to respond")]
1142    GatewaysUnresponsive,
1143}
1144
1145#[derive(Error, Debug, Clone, Eq, PartialEq)]
1146pub enum SendPaymentError {
1147    #[error("Invoice is missing an amount")]
1148    InvoiceMissingAmount,
1149    #[error("Invoice has expired")]
1150    InvoiceExpired,
1151    #[error("A payment for this invoice is already in progress")]
1152    PaymentInProgress(OperationId),
1153    #[error("This invoice has already been paid")]
1154    InvoiceAlreadyPaid(OperationId),
1155    #[error(transparent)]
1156    SelectGateway(SelectGatewayError),
1157    #[error("Failed to connect to gateway")]
1158    FailedToConnectToGateway(String),
1159    #[error("Gateway does not support this federation")]
1160    FederationNotSupported,
1161    #[error("Gateway fee exceeds the allowed limit")]
1162    GatewayFeeExceedsLimit,
1163    #[error("Gateway expiration time exceeds the allowed limit")]
1164    GatewayExpirationExceedsLimit,
1165    #[error("Failed to request block count")]
1166    FailedToRequestBlockCount(String),
1167    #[error("Failed to fund the payment")]
1168    FailedToFundPayment(String),
1169    #[error("Invoice is for a different currency")]
1170    WrongCurrency {
1171        invoice_currency: Currency,
1172        federation_currency: Currency,
1173    },
1174}
1175
1176#[derive(Error, Debug, Clone, Eq, PartialEq)]
1177pub enum ReceiveError {
1178    #[error(transparent)]
1179    SelectGateway(SelectGatewayError),
1180    #[error("Failed to connect to gateway")]
1181    FailedToConnectToGateway(String),
1182    #[error("Gateway does not support this federation")]
1183    FederationNotSupported,
1184    #[error("Gateway fee exceeds the allowed limit")]
1185    GatewayFeeExceedsLimit,
1186    #[error("Amount is too small to cover fees")]
1187    AmountTooSmall,
1188    #[error("Gateway returned an invalid invoice")]
1189    InvalidInvoice,
1190    #[error("Gateway returned an invoice with incorrect amount")]
1191    IncorrectInvoiceAmount,
1192}
1193
1194#[derive(Error, Debug, Clone, Eq, PartialEq)]
1195pub enum GenerateLnurlError {
1196    #[error("No gateways are available")]
1197    NoGatewaysAvailable,
1198    #[error("Failed to request gateways")]
1199    FailedToRequestGateways(String),
1200}
1201
1202#[derive(Error, Debug, Clone, Eq, PartialEq)]
1203pub enum ListGatewaysError {
1204    #[error("Failed to request gateways")]
1205    FailedToListGateways,
1206}
1207
1208#[derive(Error, Debug, Clone, Eq, PartialEq)]
1209pub enum RoutingInfoError {
1210    #[error("Failed to request routing info")]
1211    FailedToRequestRoutingInfo,
1212}
1213
1214#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
1215pub enum LightningClientStateMachines {
1216    Send(SendStateMachine),
1217    Receive(ReceiveStateMachine),
1218}
1219
1220impl IntoDynInstance for LightningClientStateMachines {
1221    type DynType = DynState;
1222
1223    fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
1224        DynState::from_typed(instance_id, self)
1225    }
1226}
1227
1228impl State for LightningClientStateMachines {
1229    type ModuleContext = LightningClientContext;
1230
1231    fn transitions(
1232        &self,
1233        context: &Self::ModuleContext,
1234        global_context: &DynGlobalClientContext,
1235    ) -> Vec<StateTransition<Self>> {
1236        match self {
1237            LightningClientStateMachines::Send(state) => {
1238                sm_enum_variant_translation!(
1239                    state.transitions(context, global_context),
1240                    LightningClientStateMachines::Send
1241                )
1242            }
1243            LightningClientStateMachines::Receive(state) => {
1244                sm_enum_variant_translation!(
1245                    state.transitions(context, global_context),
1246                    LightningClientStateMachines::Receive
1247                )
1248            }
1249        }
1250    }
1251
1252    fn operation_id(&self) -> OperationId {
1253        match self {
1254            LightningClientStateMachines::Send(state) => state.operation_id(),
1255            LightningClientStateMachines::Receive(state) => state.operation_id(),
1256        }
1257    }
1258}