fedimint_gwv2_client/
lib.rs

1mod api;
2mod complete_sm;
3pub mod events;
4mod receive_sm;
5mod send_sm;
6
7use std::collections::BTreeMap;
8use std::fmt;
9use std::fmt::Debug;
10use std::sync::Arc;
11
12use anyhow::{anyhow, ensure};
13use async_trait::async_trait;
14use bitcoin::hashes::sha256;
15use bitcoin::secp256k1::Message;
16use events::{IncomingPaymentStarted, OutgoingPaymentStarted};
17use fedimint_api_client::api::DynModuleApi;
18use fedimint_client::ClientHandleArc;
19use fedimint_client_module::module::init::{ClientModuleInit, ClientModuleInitArgs};
20use fedimint_client_module::module::recovery::NoModuleBackup;
21use fedimint_client_module::module::{ClientContext, ClientModule, IClientModule, OutPointRange};
22use fedimint_client_module::sm::util::MapStateTransitions;
23use fedimint_client_module::sm::{Context, DynState, ModuleNotifier, State, StateTransition};
24use fedimint_client_module::transaction::{
25    ClientOutput, ClientOutputBundle, ClientOutputSM, TransactionBuilder,
26};
27use fedimint_client_module::{DynGlobalClientContext, sm_enum_variant_translation};
28use fedimint_core::config::FederationId;
29use fedimint_core::core::{Decoder, IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId};
30use fedimint_core::db::DatabaseTransaction;
31use fedimint_core::encoding::{Decodable, Encodable};
32use fedimint_core::module::{
33    ApiVersion, CommonModuleInit, ModuleCommon, ModuleInit, MultiApiVersion,
34};
35use fedimint_core::secp256k1::Keypair;
36use fedimint_core::time::now;
37use fedimint_core::{Amount, PeerId, apply, async_trait_maybe_send, secp256k1};
38use fedimint_lightning::{InterceptPaymentResponse, LightningRpcError};
39use fedimint_lnv2_common::config::LightningClientConfig;
40use fedimint_lnv2_common::contracts::{IncomingContract, PaymentImage};
41use fedimint_lnv2_common::gateway_api::SendPaymentPayload;
42use fedimint_lnv2_common::{
43    LightningCommonInit, LightningInvoice, LightningModuleTypes, LightningOutput, LightningOutputV0,
44};
45use futures::StreamExt;
46use lightning_invoice::Bolt11Invoice;
47use receive_sm::{ReceiveSMState, ReceiveStateMachine};
48use secp256k1::schnorr::Signature;
49use send_sm::{SendSMState, SendStateMachine};
50use serde::{Deserialize, Serialize};
51use tpe::{AggregatePublicKey, PublicKeyShare};
52use tracing::{info, warn};
53
54use crate::api::GatewayFederationApi;
55use crate::complete_sm::{CompleteSMCommon, CompleteSMState, CompleteStateMachine};
56use crate::receive_sm::ReceiveSMCommon;
57use crate::send_sm::SendSMCommon;
58
59/// LNv2 CLTV Delta in blocks
60pub const EXPIRATION_DELTA_MINIMUM_V2: u64 = 144;
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct GatewayOperationMetaV2;
64
65#[derive(Debug, Clone)]
66pub struct GatewayClientInitV2 {
67    pub gateway: Arc<dyn IGatewayClientV2>,
68}
69
70impl ModuleInit for GatewayClientInitV2 {
71    type Common = LightningCommonInit;
72
73    async fn dump_database(
74        &self,
75        _dbtx: &mut DatabaseTransaction<'_>,
76        _prefix_names: Vec<String>,
77    ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
78        Box::new(vec![].into_iter())
79    }
80}
81
82#[apply(async_trait_maybe_send!)]
83impl ClientModuleInit for GatewayClientInitV2 {
84    type Module = GatewayClientModuleV2;
85
86    fn supported_api_versions(&self) -> MultiApiVersion {
87        MultiApiVersion::try_from_iter([ApiVersion { major: 0, minor: 0 }])
88            .expect("no version conflicts")
89    }
90
91    async fn init(&self, args: &ClientModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
92        Ok(GatewayClientModuleV2 {
93            federation_id: *args.federation_id(),
94            cfg: args.cfg().clone(),
95            notifier: args.notifier().clone(),
96            client_ctx: args.context(),
97            module_api: args.module_api().clone(),
98            keypair: args
99                .module_root_secret()
100                .clone()
101                .to_secp_key(fedimint_core::secp256k1::SECP256K1),
102            gateway: self.gateway.clone(),
103        })
104    }
105}
106
107#[derive(Debug, Clone)]
108pub struct GatewayClientModuleV2 {
109    pub federation_id: FederationId,
110    pub cfg: LightningClientConfig,
111    pub notifier: ModuleNotifier<GatewayClientStateMachinesV2>,
112    pub client_ctx: ClientContext<Self>,
113    pub module_api: DynModuleApi,
114    pub keypair: Keypair,
115    pub gateway: Arc<dyn IGatewayClientV2>,
116}
117
118#[derive(Debug, Clone)]
119pub struct GatewayClientContextV2 {
120    pub module: GatewayClientModuleV2,
121    pub decoder: Decoder,
122    pub tpe_agg_pk: AggregatePublicKey,
123    pub tpe_pks: BTreeMap<PeerId, PublicKeyShare>,
124    pub gateway: Arc<dyn IGatewayClientV2>,
125}
126
127impl Context for GatewayClientContextV2 {
128    const KIND: Option<ModuleKind> = Some(fedimint_lnv2_common::KIND);
129}
130
131impl ClientModule for GatewayClientModuleV2 {
132    type Init = GatewayClientInitV2;
133    type Common = LightningModuleTypes;
134    type Backup = NoModuleBackup;
135    type ModuleStateMachineContext = GatewayClientContextV2;
136    type States = GatewayClientStateMachinesV2;
137
138    fn context(&self) -> Self::ModuleStateMachineContext {
139        GatewayClientContextV2 {
140            module: self.clone(),
141            decoder: self.decoder(),
142            tpe_agg_pk: self.cfg.tpe_agg_pk,
143            tpe_pks: self.cfg.tpe_pks.clone(),
144            gateway: self.gateway.clone(),
145        }
146    }
147    fn input_fee(
148        &self,
149        amount: Amount,
150        _input: &<Self::Common as ModuleCommon>::Input,
151    ) -> Option<Amount> {
152        Some(self.cfg.fee_consensus.fee(amount))
153    }
154
155    fn output_fee(
156        &self,
157        _amount: Amount,
158        output: &<Self::Common as ModuleCommon>::Output,
159    ) -> Option<Amount> {
160        let amount = match output.ensure_v0_ref().ok()? {
161            LightningOutputV0::Outgoing(contract) => contract.amount,
162            LightningOutputV0::Incoming(contract) => contract.commitment.amount,
163        };
164
165        Some(self.cfg.fee_consensus.fee(amount))
166    }
167}
168
169#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
170pub enum GatewayClientStateMachinesV2 {
171    Send(SendStateMachine),
172    Receive(ReceiveStateMachine),
173    Complete(CompleteStateMachine),
174}
175
176impl fmt::Display for GatewayClientStateMachinesV2 {
177    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
178        match self {
179            GatewayClientStateMachinesV2::Send(send) => {
180                write!(f, "{send}")
181            }
182            GatewayClientStateMachinesV2::Receive(receive) => {
183                write!(f, "{receive}")
184            }
185            GatewayClientStateMachinesV2::Complete(complete) => {
186                write!(f, "{complete}")
187            }
188        }
189    }
190}
191
192impl IntoDynInstance for GatewayClientStateMachinesV2 {
193    type DynType = DynState;
194
195    fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
196        DynState::from_typed(instance_id, self)
197    }
198}
199
200impl State for GatewayClientStateMachinesV2 {
201    type ModuleContext = GatewayClientContextV2;
202
203    fn transitions(
204        &self,
205        context: &Self::ModuleContext,
206        global_context: &DynGlobalClientContext,
207    ) -> Vec<StateTransition<Self>> {
208        match self {
209            GatewayClientStateMachinesV2::Send(state) => {
210                sm_enum_variant_translation!(
211                    state.transitions(context, global_context),
212                    GatewayClientStateMachinesV2::Send
213                )
214            }
215            GatewayClientStateMachinesV2::Receive(state) => {
216                sm_enum_variant_translation!(
217                    state.transitions(context, global_context),
218                    GatewayClientStateMachinesV2::Receive
219                )
220            }
221            GatewayClientStateMachinesV2::Complete(state) => {
222                sm_enum_variant_translation!(
223                    state.transitions(context, global_context),
224                    GatewayClientStateMachinesV2::Complete
225                )
226            }
227        }
228    }
229
230    fn operation_id(&self) -> OperationId {
231        match self {
232            GatewayClientStateMachinesV2::Send(state) => state.operation_id(),
233            GatewayClientStateMachinesV2::Receive(state) => state.operation_id(),
234            GatewayClientStateMachinesV2::Complete(state) => state.operation_id(),
235        }
236    }
237}
238
239#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize, Decodable, Encodable)]
240pub enum FinalReceiveState {
241    Rejected,
242    Success([u8; 32]),
243    Refunded,
244    Failure,
245}
246
247impl GatewayClientModuleV2 {
248    pub async fn send_payment(
249        &self,
250        payload: SendPaymentPayload,
251    ) -> anyhow::Result<Result<[u8; 32], Signature>> {
252        let operation_start = now();
253
254        // The operation id is equal to the contract id which also doubles as the
255        // message signed by the gateway via the forfeit signature to forfeit
256        // the gateways claim to a contract in case of cancellation. We only create a
257        // forfeit signature after we have started the send state machine to
258        // prevent replay attacks with a previously cancelled outgoing contract
259        let operation_id = OperationId::from_encodable(&payload.contract.clone());
260
261        if self.client_ctx.operation_exists(operation_id).await {
262            return Ok(self.subscribe_send(operation_id).await);
263        }
264
265        // Since the following four checks may only fail due to client side
266        // programming error we do not have to enable cancellation and can check
267        // them before we start the state machine.
268        ensure!(
269            payload.contract.claim_pk == self.keypair.public_key(),
270            "The outgoing contract is keyed to another gateway"
271        );
272
273        // This prevents DOS attacks where an attacker submits a different invoice.
274        ensure!(
275            secp256k1::SECP256K1
276                .verify_schnorr(
277                    &payload.auth,
278                    &Message::from_digest(
279                        *payload.invoice.consensus_hash::<sha256::Hash>().as_ref()
280                    ),
281                    &payload.contract.refund_pk.x_only_public_key().0,
282                )
283                .is_ok(),
284            "Invalid auth signature for the invoice data"
285        );
286
287        // We need to check that the contract has been confirmed by the federation
288        // before we start the state machine to prevent DOS attacks.
289        let (contract_id, expiration) = self
290            .module_api
291            .outgoing_contract_expiration(payload.outpoint)
292            .await
293            .map_err(|_| anyhow!("The gateway can not reach the federation"))?
294            .ok_or(anyhow!("The outgoing contract has not yet been confirmed"))?;
295
296        ensure!(
297            contract_id == payload.contract.contract_id(),
298            "Contract Id returned by the federation does not match contract in request"
299        );
300
301        let (payment_hash, amount) = match &payload.invoice {
302            LightningInvoice::Bolt11(invoice) => (
303                invoice.payment_hash(),
304                invoice
305                    .amount_milli_satoshis()
306                    .ok_or(anyhow!("Invoice is missing amount"))?,
307            ),
308        };
309
310        ensure!(
311            PaymentImage::Hash(*payment_hash) == payload.contract.payment_image,
312            "The invoices payment hash does not match the contracts payment hash"
313        );
314
315        let min_contract_amount = self
316            .gateway
317            .min_contract_amount(&payload.federation_id, amount)
318            .await?;
319
320        let send_sm = GatewayClientStateMachinesV2::Send(SendStateMachine {
321            common: SendSMCommon {
322                operation_id,
323                outpoint: payload.outpoint,
324                contract: payload.contract.clone(),
325                max_delay: expiration.saturating_sub(EXPIRATION_DELTA_MINIMUM_V2),
326                min_contract_amount,
327                invoice: payload.invoice,
328                claim_keypair: self.keypair,
329            },
330            state: SendSMState::Sending,
331        });
332
333        let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
334        self.client_ctx
335            .manual_operation_start_dbtx(
336                &mut dbtx.to_ref_nc(),
337                operation_id,
338                LightningCommonInit::KIND.as_str(),
339                GatewayOperationMetaV2,
340                vec![self.client_ctx.make_dyn_state(send_sm)],
341            )
342            .await
343            .ok();
344
345        self.client_ctx
346            .log_event(
347                &mut dbtx,
348                OutgoingPaymentStarted {
349                    operation_start,
350                    outgoing_contract: payload.contract.clone(),
351                    min_contract_amount,
352                    invoice_amount: Amount::from_msats(amount),
353                    max_delay: expiration.saturating_sub(EXPIRATION_DELTA_MINIMUM_V2),
354                },
355            )
356            .await;
357        dbtx.commit_tx().await;
358
359        Ok(self.subscribe_send(operation_id).await)
360    }
361
362    pub async fn subscribe_send(&self, operation_id: OperationId) -> Result<[u8; 32], Signature> {
363        let mut stream = self.notifier.subscribe(operation_id).await;
364
365        loop {
366            if let Some(GatewayClientStateMachinesV2::Send(state)) = stream.next().await {
367                match state.state {
368                    SendSMState::Sending => {}
369                    SendSMState::Claiming(claiming) => {
370                        // This increases latency by one ordering and may eventually be removed;
371                        // however, at the current stage of lnv2 we prioritize the verification of
372                        // correctness above minimum latency.
373                        assert!(
374                            self.client_ctx
375                                .await_primary_module_outputs(operation_id, claiming.outpoints)
376                                .await
377                                .is_ok(),
378                            "Gateway Module V2 failed to claim outgoing contract with preimage"
379                        );
380
381                        return Ok(claiming.preimage);
382                    }
383                    SendSMState::Cancelled(cancelled) => {
384                        warn!("Outgoing lightning payment is cancelled {:?}", cancelled);
385
386                        let signature = self
387                            .keypair
388                            .sign_schnorr(state.common.contract.forfeit_message());
389
390                        assert!(state.common.contract.verify_forfeit_signature(&signature));
391
392                        return Err(signature);
393                    }
394                }
395            }
396        }
397    }
398
399    pub async fn relay_incoming_htlc(
400        &self,
401        payment_hash: sha256::Hash,
402        incoming_chan_id: u64,
403        htlc_id: u64,
404        contract: IncomingContract,
405        amount_msat: u64,
406    ) -> anyhow::Result<()> {
407        let operation_start = now();
408
409        let operation_id = OperationId::from_encodable(&contract);
410
411        if self.client_ctx.operation_exists(operation_id).await {
412            return Ok(());
413        }
414
415        let refund_keypair = self.keypair;
416
417        let client_output = ClientOutput::<LightningOutput> {
418            output: LightningOutput::V0(LightningOutputV0::Incoming(contract.clone())),
419            amount: contract.commitment.amount,
420        };
421        let commitment = contract.commitment.clone();
422        let client_output_sm = ClientOutputSM::<GatewayClientStateMachinesV2> {
423            state_machines: Arc::new(move |range: OutPointRange| {
424                assert_eq!(range.count(), 1);
425
426                vec![
427                    GatewayClientStateMachinesV2::Receive(ReceiveStateMachine {
428                        common: ReceiveSMCommon {
429                            operation_id,
430                            contract: contract.clone(),
431                            outpoint: range.into_iter().next().unwrap(),
432                            refund_keypair,
433                        },
434                        state: ReceiveSMState::Funding,
435                    }),
436                    GatewayClientStateMachinesV2::Complete(CompleteStateMachine {
437                        common: CompleteSMCommon {
438                            operation_id,
439                            payment_hash,
440                            incoming_chan_id,
441                            htlc_id,
442                        },
443                        state: CompleteSMState::Pending,
444                    }),
445                ]
446            }),
447        };
448
449        let client_output = self.client_ctx.make_client_outputs(ClientOutputBundle::new(
450            vec![client_output],
451            vec![client_output_sm],
452        ));
453        let transaction = TransactionBuilder::new().with_outputs(client_output);
454
455        self.client_ctx
456            .finalize_and_submit_transaction(
457                operation_id,
458                LightningCommonInit::KIND.as_str(),
459                |_| GatewayOperationMetaV2,
460                transaction,
461            )
462            .await?;
463
464        let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
465        self.client_ctx
466            .log_event(
467                &mut dbtx,
468                IncomingPaymentStarted {
469                    operation_start,
470                    incoming_contract_commitment: commitment,
471                    invoice_amount: Amount::from_msats(amount_msat),
472                },
473            )
474            .await;
475        dbtx.commit_tx().await;
476
477        Ok(())
478    }
479
480    pub async fn relay_direct_swap(
481        &self,
482        contract: IncomingContract,
483        amount_msat: u64,
484    ) -> anyhow::Result<FinalReceiveState> {
485        let operation_start = now();
486
487        let operation_id = OperationId::from_encodable(&contract);
488
489        if self.client_ctx.operation_exists(operation_id).await {
490            return Ok(self.await_receive(operation_id).await);
491        }
492
493        let refund_keypair = self.keypair;
494
495        let client_output = ClientOutput::<LightningOutput> {
496            output: LightningOutput::V0(LightningOutputV0::Incoming(contract.clone())),
497            amount: contract.commitment.amount,
498        };
499        let commitment = contract.commitment.clone();
500        let client_output_sm = ClientOutputSM::<GatewayClientStateMachinesV2> {
501            state_machines: Arc::new(move |range| {
502                assert_eq!(range.count(), 1);
503
504                vec![GatewayClientStateMachinesV2::Receive(ReceiveStateMachine {
505                    common: ReceiveSMCommon {
506                        operation_id,
507                        contract: contract.clone(),
508                        outpoint: range.into_iter().next().unwrap(),
509                        refund_keypair,
510                    },
511                    state: ReceiveSMState::Funding,
512                })]
513            }),
514        };
515
516        let client_output = self.client_ctx.make_client_outputs(ClientOutputBundle::new(
517            vec![client_output],
518            vec![client_output_sm],
519        ));
520
521        let transaction = TransactionBuilder::new().with_outputs(client_output);
522
523        self.client_ctx
524            .finalize_and_submit_transaction(
525                operation_id,
526                LightningCommonInit::KIND.as_str(),
527                |_| GatewayOperationMetaV2,
528                transaction,
529            )
530            .await?;
531
532        let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
533        self.client_ctx
534            .log_event(
535                &mut dbtx,
536                IncomingPaymentStarted {
537                    operation_start,
538                    incoming_contract_commitment: commitment,
539                    invoice_amount: Amount::from_msats(amount_msat),
540                },
541            )
542            .await;
543        dbtx.commit_tx().await;
544
545        Ok(self.await_receive(operation_id).await)
546    }
547
548    async fn await_receive(&self, operation_id: OperationId) -> FinalReceiveState {
549        let mut stream = self.notifier.subscribe(operation_id).await;
550
551        loop {
552            if let Some(GatewayClientStateMachinesV2::Receive(state)) = stream.next().await {
553                match state.state {
554                    ReceiveSMState::Funding => {}
555                    ReceiveSMState::Rejected(..) => return FinalReceiveState::Rejected,
556                    ReceiveSMState::Success(preimage) => {
557                        return FinalReceiveState::Success(preimage);
558                    }
559                    ReceiveSMState::Refunding(out_points) => {
560                        if self
561                            .client_ctx
562                            .await_primary_module_outputs(operation_id, out_points)
563                            .await
564                            .is_err()
565                        {
566                            return FinalReceiveState::Failure;
567                        }
568
569                        return FinalReceiveState::Refunded;
570                    }
571                    ReceiveSMState::Failure => return FinalReceiveState::Failure,
572                }
573            }
574        }
575    }
576
577    /// For the given `OperationId`, this function will wait until the Complete
578    /// state machine has finished or failed.
579    pub async fn await_completion(&self, operation_id: OperationId) {
580        let mut stream = self.notifier.subscribe(operation_id).await;
581
582        loop {
583            match stream.next().await {
584                Some(GatewayClientStateMachinesV2::Complete(state)) => {
585                    if state.state == CompleteSMState::Completed {
586                        info!(%state, "LNv2 completion state machine finished");
587                        return;
588                    }
589
590                    info!(%state, "Waiting for LNv2 completion state machine");
591                }
592                Some(GatewayClientStateMachinesV2::Receive(state)) => {
593                    info!(%state, "Waiting for LNv2 completion state machine");
594                    continue;
595                }
596                Some(state) => {
597                    warn!(%state, "Operation is not an LNv2 completion state machine");
598                    return;
599                }
600                None => return,
601            }
602        }
603    }
604}
605
606/// An interface between module implementation and the general `Gateway`
607///
608/// To abstract away and decouple the core gateway from the modules, the
609/// interface between the is expressed as a trait. The core gateway handles
610/// LNv2 operations that require access to the database or lightning node.
611#[async_trait]
612pub trait IGatewayClientV2: Debug + Send + Sync {
613    /// Use the gateway's lightning node to complete a payment
614    async fn complete_htlc(&self, htlc_response: InterceptPaymentResponse);
615
616    /// Determines if the payment can be completed using a direct swap to
617    /// another federation.
618    ///
619    /// A direct swap is determined by checking the gateway's connected
620    /// lightning node against the invoice's payee lightning node. If they
621    /// are the same, then the gateway can use another client to complete
622    /// the payment be swapping ecash instead of a payment over the
623    /// Lightning network.
624    async fn is_direct_swap(
625        &self,
626        invoice: &Bolt11Invoice,
627    ) -> anyhow::Result<Option<(IncomingContract, ClientHandleArc)>>;
628
629    /// Initiates a payment over the Lightning network.
630    async fn pay(
631        &self,
632        invoice: Bolt11Invoice,
633        max_delay: u64,
634        max_fee: Amount,
635    ) -> Result<[u8; 32], LightningRpcError>;
636
637    /// Computes the minimum contract amount necessary for making an outgoing
638    /// payment.
639    ///
640    /// The minimum contract amount must contain transaction fees to cover the
641    /// gateway's transaction fee and optionally additional fee to cover the
642    /// gateway's Lightning fee if the payment goes over the Lightning
643    /// network.
644    async fn min_contract_amount(
645        &self,
646        federation_id: &FederationId,
647        amount: u64,
648    ) -> anyhow::Result<Amount>;
649}