Skip to main content

fedimint_lightning/
ldk.rs

1use std::collections::BTreeMap;
2use std::path::Path;
3use std::str::FromStr;
4use std::sync::Arc;
5use std::time::{Duration, UNIX_EPOCH};
6
7use async_trait::async_trait;
8use bitcoin::hashes::{Hash, sha256};
9use bitcoin::{FeeRate, Network, OutPoint};
10use fedimint_bip39::Mnemonic;
11use fedimint_core::task::{TaskGroup, TaskHandle, block_in_place};
12use fedimint_core::util::{FmtCompact, SafeUrl};
13use fedimint_core::{Amount, BitcoinAmountOrAll, crit};
14use fedimint_gateway_common::{
15    ChainSource, GetInvoiceRequest, GetInvoiceResponse, ListTransactionsResponse,
16    SetChannelFeesRequest,
17};
18use fedimint_ln_common::contracts::Preimage;
19use fedimint_logging::LOG_LIGHTNING;
20use ldk_node::config::ChannelConfig;
21use ldk_node::lightning::ln::msgs::SocketAddress;
22use ldk_node::lightning::routing::gossip::{NodeAlias, NodeId};
23use ldk_node::payment::{PaymentDirection, PaymentKind, PaymentStatus, SendingParameters};
24use lightning::ln::channelmanager::PaymentId;
25use lightning::offers::offer::{Offer, OfferId};
26use lightning::types::payment::{PaymentHash, PaymentPreimage};
27use lightning_invoice::{Bolt11Invoice, Bolt11InvoiceDescription, Description};
28use tokio::sync::mpsc::Sender;
29use tokio::sync::{RwLock, oneshot};
30use tokio_stream::wrappers::ReceiverStream;
31use tracing::{debug, error, info, warn};
32
33use super::{ChannelInfo, ILnRpcClient, LightningRpcError, ListChannelsResponse, RouteHtlcStream};
34use crate::{
35    CloseChannelsWithPeerRequest, CloseChannelsWithPeerResponse, CreateInvoiceRequest,
36    CreateInvoiceResponse, GetBalancesResponse, GetLnOnchainAddressResponse, GetNodeInfoResponse,
37    GetRouteHintsResponse, InterceptPaymentRequest, InterceptPaymentResponse, InvoiceDescription,
38    OpenChannelRequest, OpenChannelResponse, PayInvoiceResponse, PaymentAction, SendOnchainRequest,
39    SendOnchainResponse,
40};
41
42pub struct GatewayLdkClient {
43    /// The underlying lightning node.
44    node: Arc<ldk_node::Node>,
45
46    task_group: TaskGroup,
47
48    /// The HTLC stream, until it is taken by calling
49    /// `ILnRpcClient::route_htlcs`.
50    htlc_stream_receiver_or: Option<tokio::sync::mpsc::Receiver<InterceptPaymentRequest>>,
51
52    /// Lock pool used to ensure that our implementation of `ILnRpcClient::pay`
53    /// doesn't allow for multiple simultaneous calls with the same invoice to
54    /// execute in parallel. This helps ensure that the function is idempotent.
55    outbound_lightning_payment_lock_pool: lockable::LockPool<PaymentId>,
56
57    /// Lock pool used to ensure that our implementation of
58    /// `ILnRpcClient::pay_offer` doesn't allow for multiple simultaneous
59    /// calls with the same offer to execute in parallel. This helps ensure
60    /// that the function is idempotent.
61    outbound_offer_lock_pool: lockable::LockPool<LdkOfferId>,
62
63    /// A map keyed by the `UserChannelId` of a channel that is currently
64    /// opening. The `Sender` is used to communicate the `OutPoint` back to
65    /// the API handler from the event handler when the channel has been
66    /// opened and is now pending.
67    pending_channels:
68        Arc<RwLock<BTreeMap<UserChannelId, oneshot::Sender<anyhow::Result<OutPoint>>>>>,
69}
70
71impl std::fmt::Debug for GatewayLdkClient {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        f.debug_struct("GatewayLdkClient").finish_non_exhaustive()
74    }
75}
76
77impl GatewayLdkClient {
78    /// Creates a new `GatewayLdkClient` instance and starts the underlying
79    /// lightning node. All resources, including the lightning node, will be
80    /// cleaned up when the returned `GatewayLdkClient` instance is dropped.
81    /// There's no need to manually stop the node.
82    pub fn new(
83        data_dir: &Path,
84        chain_source: ChainSource,
85        network: Network,
86        lightning_port: u16,
87        alias: String,
88        mnemonic: Mnemonic,
89        runtime: Arc<tokio::runtime::Runtime>,
90    ) -> anyhow::Result<Self> {
91        let mut bytes = [0u8; 32];
92        let alias = if alias.is_empty() {
93            "LDK Gateway".to_string()
94        } else {
95            alias
96        };
97        let alias_bytes = alias.as_bytes();
98        let truncated = &alias_bytes[..alias_bytes.len().min(32)];
99        bytes[..truncated.len()].copy_from_slice(truncated);
100        let node_alias = Some(NodeAlias(bytes));
101
102        let mut node_builder = ldk_node::Builder::from_config(ldk_node::config::Config {
103            network,
104            listening_addresses: Some(vec![SocketAddress::TcpIpV4 {
105                addr: [0, 0, 0, 0],
106                port: lightning_port,
107            }]),
108            node_alias,
109            ..Default::default()
110        });
111
112        node_builder.set_entropy_bip39_mnemonic(mnemonic, None);
113
114        match chain_source.clone() {
115            ChainSource::Bitcoind {
116                username,
117                password,
118                server_url,
119            } => {
120                node_builder.set_chain_source_bitcoind_rpc(
121                    server_url
122                        .host_str()
123                        .expect("Could not retrieve host from bitcoind RPC url")
124                        .to_string(),
125                    server_url
126                        .port()
127                        .expect("Could not retrieve port from bitcoind RPC url"),
128                    username,
129                    password,
130                );
131            }
132            ChainSource::Esplora { server_url } => {
133                node_builder.set_chain_source_esplora(get_esplora_url(server_url)?, None);
134            }
135        };
136        let Some(data_dir_str) = data_dir.to_str() else {
137            return Err(anyhow::anyhow!("Invalid data dir path"));
138        };
139        node_builder.set_storage_dir_path(data_dir_str.to_string());
140
141        info!(chain_source = %chain_source, data_dir = %data_dir_str, alias = %alias, "Starting LDK Node...");
142        let node = Arc::new(node_builder.build()?);
143        node.start_with_runtime(runtime).map_err(|err| {
144            crit!(target: LOG_LIGHTNING, err = %err.fmt_compact(), "Failed to start LDK Node");
145            LightningRpcError::FailedToConnect
146        })?;
147
148        let (htlc_stream_sender, htlc_stream_receiver) = tokio::sync::mpsc::channel(1024);
149        let task_group = TaskGroup::new();
150
151        let node_clone = node.clone();
152        let pending_channels = Arc::new(RwLock::new(BTreeMap::new()));
153        let pending_channels_clone = pending_channels.clone();
154        task_group.spawn("ldk lightning node event handler", |handle| async move {
155            loop {
156                Self::handle_next_event(
157                    &node_clone,
158                    &htlc_stream_sender,
159                    &handle,
160                    pending_channels_clone.clone(),
161                )
162                .await;
163            }
164        });
165
166        info!("Successfully started LDK Gateway");
167        Ok(GatewayLdkClient {
168            node,
169            task_group,
170            htlc_stream_receiver_or: Some(htlc_stream_receiver),
171            outbound_lightning_payment_lock_pool: lockable::LockPool::new(),
172            outbound_offer_lock_pool: lockable::LockPool::new(),
173            pending_channels,
174        })
175    }
176
177    async fn handle_next_event(
178        node: &ldk_node::Node,
179        htlc_stream_sender: &Sender<InterceptPaymentRequest>,
180        handle: &TaskHandle,
181        pending_channels: Arc<
182            RwLock<BTreeMap<UserChannelId, oneshot::Sender<anyhow::Result<OutPoint>>>>,
183        >,
184    ) {
185        // We manually check for task termination in case we receive a payment while the
186        // task is shutting down. In that case, we want to finish the payment
187        // before shutting this task down.
188        let event = tokio::select! {
189            event = node.next_event_async() => {
190                event
191            }
192            () = handle.make_shutdown_rx() => {
193                return;
194            }
195        };
196
197        match event {
198            ldk_node::Event::PaymentClaimable {
199                payment_id: _,
200                payment_hash,
201                claimable_amount_msat,
202                claim_deadline,
203                custom_records: _,
204            } => {
205                if let Err(err) = htlc_stream_sender
206                    .send(InterceptPaymentRequest {
207                        payment_hash: Hash::from_slice(&payment_hash.0)
208                            .expect("Failed to create Hash"),
209                        amount_msat: claimable_amount_msat,
210                        expiry: claim_deadline.unwrap_or_default(),
211                        short_channel_id: None,
212                        incoming_chan_id: 0,
213                        htlc_id: 0,
214                    })
215                    .await
216                {
217                    warn!(target: LOG_LIGHTNING, err = %err.fmt_compact(), "Failed send InterceptHtlcRequest to stream");
218                }
219            }
220            ldk_node::Event::ChannelPending {
221                channel_id,
222                user_channel_id,
223                former_temporary_channel_id: _,
224                counterparty_node_id: _,
225                funding_txo,
226            } => {
227                info!(target: LOG_LIGHTNING, %channel_id, "LDK Channel is pending");
228                let mut channels = pending_channels.write().await;
229                if let Some(sender) = channels.remove(&UserChannelId(user_channel_id)) {
230                    let _ = sender.send(Ok(funding_txo));
231                } else {
232                    debug!(
233                        ?user_channel_id,
234                        "No channel pending channel open for user channel id"
235                    );
236                }
237            }
238            ldk_node::Event::ChannelClosed {
239                channel_id,
240                user_channel_id,
241                counterparty_node_id: _,
242                reason,
243            } => {
244                info!(target: LOG_LIGHTNING, %channel_id, "LDK Channel is closed");
245                let mut channels = pending_channels.write().await;
246                if let Some(sender) = channels.remove(&UserChannelId(user_channel_id)) {
247                    let reason = if let Some(reason) = reason {
248                        reason.to_string()
249                    } else {
250                        "Channel has been closed".to_string()
251                    };
252                    let _ = sender.send(Err(anyhow::anyhow!(reason)));
253                } else {
254                    debug!(
255                        ?user_channel_id,
256                        "No channel pending channel open for user channel id"
257                    );
258                }
259            }
260            _ => {}
261        }
262
263        // `PaymentClaimable` and `ChannelPending` events are the only event types that
264        // we are interested in. We can safely ignore all other events.
265        if let Err(err) = node.event_handled() {
266            warn!(err = %err.fmt_compact(), "LDK could not mark event handled");
267        }
268    }
269}
270
271impl Drop for GatewayLdkClient {
272    fn drop(&mut self) {
273        self.task_group.shutdown();
274
275        info!(target: LOG_LIGHTNING, "Stopping LDK Node...");
276        match self.node.stop() {
277            Err(err) => {
278                warn!(target: LOG_LIGHTNING, err = %err.fmt_compact(), "Failed to stop LDK Node");
279            }
280            _ => {
281                info!(target: LOG_LIGHTNING, "LDK Node stopped.");
282            }
283        }
284    }
285}
286
287#[async_trait]
288impl ILnRpcClient for GatewayLdkClient {
289    async fn info(&self) -> Result<GetNodeInfoResponse, LightningRpcError> {
290        let node_status = self.node.status();
291        let ldk_block_height = node_status.current_best_block.height;
292        let onchain_sync = node_status.latest_onchain_wallet_sync_timestamp;
293        let lightning_sync = node_status.latest_lightning_wallet_sync_timestamp;
294        let is_running = node_status.is_running;
295        debug!(target: LOG_LIGHTNING, ?onchain_sync, ?lightning_sync, ?is_running, "LDK Sync Status");
296
297        Ok(GetNodeInfoResponse {
298            pub_key: self.node.node_id(),
299            alias: match self.node.node_alias() {
300                Some(alias) => alias.to_string(),
301                None => format!("LDK Fedimint Gateway Node {}", self.node.node_id()),
302            },
303            network: self.node.config().network.to_string(),
304            block_height: ldk_block_height,
305            // `synced_to_chain` is used for determining if the Lightning node is ready, so we care
306            // about the `lightning_sync` status.
307            synced_to_chain: lightning_sync.is_some(),
308        })
309    }
310
311    async fn routehints(
312        &self,
313        _num_route_hints: usize,
314    ) -> Result<GetRouteHintsResponse, LightningRpcError> {
315        // `ILnRpcClient::routehints()` is currently only ever used for LNv1 payment
316        // receives and will be removed when we switch to LNv2. The LDK gateway will
317        // never support LNv1 payment receives, only LNv2 payment receives, which
318        // require that the gateway's lightning node generates invoices rather than the
319        // fedimint client, so it is able to insert the proper route hints on its own.
320        Ok(GetRouteHintsResponse {
321            route_hints: vec![],
322        })
323    }
324
325    async fn pay(
326        &self,
327        invoice: Bolt11Invoice,
328        max_delay: u64,
329        max_fee: Amount,
330    ) -> Result<PayInvoiceResponse, LightningRpcError> {
331        let payment_id = PaymentId(*invoice.payment_hash().as_byte_array());
332
333        // Lock by the payment hash to prevent multiple simultaneous calls with the same
334        // invoice from executing. This prevents `ldk-node::Bolt11Payment::send()` from
335        // being called multiple times with the same invoice. This is important because
336        // `ldk-node::Bolt11Payment::send()` is not idempotent, but this function must
337        // be idempotent.
338        let _payment_lock_guard = self
339            .outbound_lightning_payment_lock_pool
340            .async_lock(payment_id)
341            .await;
342
343        // If a payment is not known to the node we can initiate it, and if it is known
344        // we can skip calling `ldk-node::Bolt11Payment::send()` and wait for the
345        // payment to complete. The lock guard above guarantees that this block is only
346        // executed once at a time for a given payment hash, ensuring that there is no
347        // race condition between checking if a payment is known and initiating a new
348        // payment if it isn't.
349        if self.node.payment(&payment_id).is_none() {
350            assert_eq!(
351                self.node
352                    .bolt11_payment()
353                    .send(
354                        &invoice,
355                        Some(SendingParameters {
356                            max_total_routing_fee_msat: Some(Some(max_fee.msats)),
357                            max_total_cltv_expiry_delta: Some(max_delay as u32),
358                            max_path_count: None,
359                            max_channel_saturation_power_of_half: None,
360                        }),
361                    )
362                    // TODO: Investigate whether all error types returned by `Bolt11Payment::send()`
363                    // result in idempotency.
364                    .map_err(|e| LightningRpcError::FailedPayment {
365                        failure_reason: format!("LDK payment failed to initialize: {e:?}"),
366                    })?,
367                payment_id
368            );
369        }
370
371        // TODO: Find a way to avoid looping/polling to know when a payment is
372        // completed. `ldk-node` provides `PaymentSuccessful` and `PaymentFailed`
373        // events, but interacting with the node event queue here isn't
374        // straightforward.
375        loop {
376            if let Some(payment_details) = self.node.payment(&payment_id) {
377                match payment_details.status {
378                    PaymentStatus::Pending => {}
379                    PaymentStatus::Succeeded => {
380                        if let PaymentKind::Bolt11 {
381                            preimage: Some(preimage),
382                            ..
383                        } = payment_details.kind
384                        {
385                            return Ok(PayInvoiceResponse {
386                                preimage: Preimage(preimage.0),
387                            });
388                        }
389                    }
390                    PaymentStatus::Failed => {
391                        return Err(LightningRpcError::FailedPayment {
392                            failure_reason: "LDK payment failed".to_string(),
393                        });
394                    }
395                }
396            }
397            fedimint_core::runtime::sleep(Duration::from_millis(100)).await;
398        }
399    }
400
401    async fn route_htlcs<'a>(
402        mut self: Box<Self>,
403        _task_group: &TaskGroup,
404    ) -> Result<(RouteHtlcStream<'a>, Arc<dyn ILnRpcClient>), LightningRpcError> {
405        let route_htlc_stream = match self.htlc_stream_receiver_or.take() {
406            Some(stream) => Ok(Box::pin(ReceiverStream::new(stream))),
407            None => Err(LightningRpcError::FailedToRouteHtlcs {
408                failure_reason:
409                    "Stream does not exist. Likely was already taken by calling `route_htlcs()`."
410                        .to_string(),
411            }),
412        }?;
413
414        Ok((route_htlc_stream, Arc::new(*self)))
415    }
416
417    async fn complete_htlc(&self, htlc: InterceptPaymentResponse) -> Result<(), LightningRpcError> {
418        let InterceptPaymentResponse {
419            action,
420            payment_hash,
421            incoming_chan_id: _,
422            htlc_id: _,
423        } = htlc;
424
425        let ph = PaymentHash(*payment_hash.clone().as_byte_array());
426
427        // TODO: Get the actual amount from the LDK node. Probably makes the
428        // most sense to pipe it through the `InterceptHtlcResponse` struct.
429        // This value is only used by `ldk-node` to ensure that the amount
430        // claimed isn't less than the amount expected, but we've already
431        // verified that the amount is correct when we intercepted the payment.
432        let claimable_amount_msat = 999_999_999_999_999;
433
434        let ph_hex_str = hex::encode(payment_hash);
435
436        if let PaymentAction::Settle(preimage) = action {
437            self.node
438                .bolt11_payment()
439                .claim_for_hash(ph, claimable_amount_msat, PaymentPreimage(preimage.0))
440                .map_err(|_| LightningRpcError::FailedToCompleteHtlc {
441                    failure_reason: format!("Failed to claim LDK payment with hash {ph_hex_str}"),
442                })?;
443        } else {
444            warn!(target: LOG_LIGHTNING, payment_hash = %ph_hex_str, "Unwinding payment because the action was not `Settle`");
445            self.node.bolt11_payment().fail_for_hash(ph).map_err(|_| {
446                LightningRpcError::FailedToCompleteHtlc {
447                    failure_reason: format!("Failed to unwind LDK payment with hash {ph_hex_str}"),
448                }
449            })?;
450        }
451
452        return Ok(());
453    }
454
455    async fn create_invoice(
456        &self,
457        create_invoice_request: CreateInvoiceRequest,
458    ) -> Result<CreateInvoiceResponse, LightningRpcError> {
459        let payment_hash_or = if let Some(payment_hash) = create_invoice_request.payment_hash {
460            let ph = PaymentHash(*payment_hash.as_byte_array());
461            Some(ph)
462        } else {
463            None
464        };
465
466        let description = match create_invoice_request.description {
467            Some(InvoiceDescription::Direct(desc)) => {
468                Bolt11InvoiceDescription::Direct(Description::new(desc).map_err(|_| {
469                    LightningRpcError::FailedToGetInvoice {
470                        failure_reason: "Invalid description".to_string(),
471                    }
472                })?)
473            }
474            Some(InvoiceDescription::Hash(hash)) => {
475                Bolt11InvoiceDescription::Hash(lightning_invoice::Sha256(hash))
476            }
477            None => Bolt11InvoiceDescription::Direct(Description::empty()),
478        };
479
480        let invoice = match payment_hash_or {
481            Some(payment_hash) => self.node.bolt11_payment().receive_for_hash(
482                create_invoice_request.amount_msat,
483                &description,
484                create_invoice_request.expiry_secs,
485                payment_hash,
486            ),
487            None => self.node.bolt11_payment().receive(
488                create_invoice_request.amount_msat,
489                &description,
490                create_invoice_request.expiry_secs,
491            ),
492        }
493        .map_err(|e| LightningRpcError::FailedToGetInvoice {
494            failure_reason: e.to_string(),
495        })?;
496
497        Ok(CreateInvoiceResponse {
498            invoice: invoice.to_string(),
499        })
500    }
501
502    async fn get_ln_onchain_address(
503        &self,
504    ) -> Result<GetLnOnchainAddressResponse, LightningRpcError> {
505        self.node
506            .onchain_payment()
507            .new_address()
508            .map(|address| GetLnOnchainAddressResponse {
509                address: address.to_string(),
510            })
511            .map_err(|e| LightningRpcError::FailedToGetLnOnchainAddress {
512                failure_reason: e.to_string(),
513            })
514    }
515
516    async fn send_onchain(
517        &self,
518        SendOnchainRequest {
519            address,
520            amount,
521            fee_rate_sats_per_vbyte,
522        }: SendOnchainRequest,
523    ) -> Result<SendOnchainResponse, LightningRpcError> {
524        let onchain = self.node.onchain_payment();
525
526        let retain_reserves = false;
527        let txid = match amount {
528            BitcoinAmountOrAll::All => onchain.send_all_to_address(
529                &address.assume_checked(),
530                retain_reserves,
531                FeeRate::from_sat_per_vb(fee_rate_sats_per_vbyte),
532            ),
533            BitcoinAmountOrAll::Amount(amount_sats) => onchain.send_to_address(
534                &address.assume_checked(),
535                amount_sats.to_sat(),
536                FeeRate::from_sat_per_vb(fee_rate_sats_per_vbyte),
537            ),
538        }
539        .map_err(|e| LightningRpcError::FailedToWithdrawOnchain {
540            failure_reason: e.to_string(),
541        })?;
542
543        Ok(SendOnchainResponse {
544            txid: txid.to_string(),
545        })
546    }
547
548    async fn open_channel(
549        &self,
550        OpenChannelRequest {
551            pubkey,
552            host,
553            channel_size_sats,
554            push_amount_sats,
555            fee_rate_sats_per_vbyte,
556            base_fee_msat,
557            parts_per_million,
558        }: OpenChannelRequest,
559    ) -> Result<OpenChannelResponse, LightningRpcError> {
560        let push_amount_msats_or = if push_amount_sats == 0 {
561            None
562        } else {
563            Some(push_amount_sats * 1000)
564        };
565
566        if fee_rate_sats_per_vbyte.is_some() {
567            // LDK manages its own fee estimation for funding transactions; the
568            // user-supplied rate cannot be applied here.
569            warn!(
570                target: LOG_LIGHTNING,
571                "Ignoring fee_rate_sats_per_vbyte on LDK channel open; LDK uses its built-in fee estimator"
572            );
573        }
574
575        let channel_config = match (base_fee_msat, parts_per_million) {
576            (None, None) => None,
577            (base, ppm) => {
578                let mut config = ChannelConfig::default();
579                if let Some(base) = base {
580                    config.forwarding_fee_base_msat = u32::try_from(base).map_err(|_| {
581                        LightningRpcError::FailedToOpenChannel {
582                            failure_reason: format!(
583                                "base_fee_msat {base} does not fit in u32 (LDK limit)"
584                            ),
585                        }
586                    })?;
587                }
588                if let Some(ppm) = ppm {
589                    config.forwarding_fee_proportional_millionths =
590                        u32::try_from(ppm).map_err(|_| LightningRpcError::FailedToOpenChannel {
591                            failure_reason: format!(
592                                "parts_per_million {ppm} does not fit in u32 (LDK limit)"
593                            ),
594                        })?;
595                }
596                Some(config)
597            }
598        };
599
600        let (tx, rx) = oneshot::channel::<anyhow::Result<OutPoint>>();
601
602        {
603            let mut channels = self.pending_channels.write().await;
604            let user_channel_id = self
605                .node
606                .open_announced_channel(
607                    pubkey,
608                    SocketAddress::from_str(&host).map_err(|e| {
609                        LightningRpcError::FailedToConnectToPeer {
610                            failure_reason: e.to_string(),
611                        }
612                    })?,
613                    channel_size_sats,
614                    push_amount_msats_or,
615                    channel_config,
616                )
617                .map_err(|e| LightningRpcError::FailedToOpenChannel {
618                    failure_reason: e.to_string(),
619                })?;
620
621            channels.insert(UserChannelId(user_channel_id), tx);
622        }
623
624        match rx
625            .await
626            .map_err(|err| LightningRpcError::FailedToOpenChannel {
627                failure_reason: err.to_string(),
628            })? {
629            Ok(outpoint) => {
630                let funding_txid = outpoint.txid;
631
632                Ok(OpenChannelResponse {
633                    funding_txid: funding_txid.to_string(),
634                })
635            }
636            Err(err) => Err(LightningRpcError::FailedToOpenChannel {
637                failure_reason: err.to_string(),
638            }),
639        }
640    }
641
642    async fn close_channels_with_peer(
643        &self,
644        CloseChannelsWithPeerRequest {
645            pubkey,
646            force,
647            sats_per_vbyte: _,
648        }: CloseChannelsWithPeerRequest,
649    ) -> Result<CloseChannelsWithPeerResponse, LightningRpcError> {
650        let mut num_channels_closed = 0;
651
652        info!(%pubkey, "Closing all channels with peer");
653        for channel_with_peer in self
654            .node
655            .list_channels()
656            .iter()
657            .filter(|channel| channel.counterparty_node_id == pubkey)
658        {
659            if force {
660                match self.node.force_close_channel(
661                    &channel_with_peer.user_channel_id,
662                    pubkey,
663                    Some("User initiated force close".to_string()),
664                ) {
665                    Ok(()) => num_channels_closed += 1,
666                    Err(err) => {
667                        error!(%pubkey, err = %err.fmt_compact(), "Could not force close channel");
668                    }
669                }
670            } else {
671                match self
672                    .node
673                    .close_channel(&channel_with_peer.user_channel_id, pubkey)
674                {
675                    Ok(()) => {
676                        num_channels_closed += 1;
677                    }
678                    Err(err) => {
679                        error!(%pubkey, err = %err.fmt_compact(), "Could not close channel");
680                    }
681                }
682            }
683        }
684
685        Ok(CloseChannelsWithPeerResponse {
686            num_channels_closed,
687        })
688    }
689
690    async fn list_channels(&self) -> Result<ListChannelsResponse, LightningRpcError> {
691        let mut channels = Vec::new();
692        let network_graph = self.node.network_graph();
693
694        // Build a map of peer pubkey -> address from connected/known peers
695        let peer_addresses: std::collections::HashMap<_, _> = self
696            .node
697            .list_peers()
698            .into_iter()
699            .map(|peer| (peer.node_id, peer.address.to_string()))
700            .collect();
701
702        for channel_details in self.node.list_channels().iter() {
703            let node_id = NodeId::from_pubkey(&channel_details.counterparty_node_id);
704            let node_info = network_graph.node(&node_id);
705
706            // Look up peer alias from network graph
707            let remote_node_alias = node_info.as_ref().and_then(|info| {
708                info.announcement_info.as_ref().and_then(|announcement| {
709                    let alias = announcement.alias().to_string();
710                    if alias.is_empty() { None } else { Some(alias) }
711                })
712            });
713
714            let remote_address = peer_addresses
715                .get(&channel_details.counterparty_node_id)
716                .cloned();
717
718            channels.push(ChannelInfo {
719                remote_pubkey: channel_details.counterparty_node_id,
720                channel_size_sats: channel_details.channel_value_sats,
721                outbound_liquidity_sats: channel_details.outbound_capacity_msat / 1000,
722                inbound_liquidity_sats: channel_details.inbound_capacity_msat / 1000,
723                is_active: channel_details.is_usable,
724                funding_outpoint: channel_details.funding_txo,
725                remote_node_alias,
726                remote_address,
727                base_fee_msat: Some(u64::from(channel_details.config.forwarding_fee_base_msat)),
728                parts_per_million: Some(u64::from(
729                    channel_details
730                        .config
731                        .forwarding_fee_proportional_millionths,
732                )),
733            });
734        }
735
736        Ok(ListChannelsResponse { channels })
737    }
738
739    async fn set_channel_fees(
740        &self,
741        payload: SetChannelFeesRequest,
742    ) -> Result<(), LightningRpcError> {
743        // ldk-node's `update_channel_config` is keyed by `UserChannelId` +
744        // counterparty pubkey, so resolve the funding outpoint to those by
745        // scanning the live channel list.
746        let channel = self
747            .node
748            .list_channels()
749            .into_iter()
750            .find(|c| c.funding_txo == Some(payload.funding_outpoint))
751            .ok_or_else(|| LightningRpcError::FailedToSetChannelFees {
752                failure_reason: format!(
753                    "No channel found with funding outpoint {}",
754                    payload.funding_outpoint,
755                ),
756            })?;
757
758        let forwarding_fee_base_msat = u32::try_from(payload.base_fee_msat).map_err(|_| {
759            LightningRpcError::FailedToSetChannelFees {
760                failure_reason: format!(
761                    "base_fee_msat {} does not fit in u32 (LDK limit)",
762                    payload.base_fee_msat,
763                ),
764            }
765        })?;
766        let forwarding_fee_proportional_millionths = u32::try_from(payload.parts_per_million)
767            .map_err(|_| LightningRpcError::FailedToSetChannelFees {
768                failure_reason: format!(
769                    "parts_per_million {} does not fit in u32 (LDK limit)",
770                    payload.parts_per_million,
771                ),
772            })?;
773
774        // Copy the channel's current config so we only change the two fee
775        // fields and preserve cltv_expiry_delta, dust limits, etc.
776        let new_config = ChannelConfig {
777            forwarding_fee_base_msat,
778            forwarding_fee_proportional_millionths,
779            ..channel.config
780        };
781
782        self.node
783            .update_channel_config(
784                &channel.user_channel_id,
785                channel.counterparty_node_id,
786                new_config,
787            )
788            .map_err(|e| LightningRpcError::FailedToSetChannelFees {
789                failure_reason: e.to_string(),
790            })?;
791
792        Ok(())
793    }
794
795    async fn get_balances(&self) -> Result<GetBalancesResponse, LightningRpcError> {
796        let balances = self.node.list_balances();
797        let channel_lists = self
798            .node
799            .list_channels()
800            .into_iter()
801            .filter(|chan| chan.is_usable)
802            .collect::<Vec<_>>();
803        // map and get the total inbound_capacity_msat in the channels
804        let total_inbound_liquidity_balance_msat: u64 = channel_lists
805            .iter()
806            .map(|channel| channel.inbound_capacity_msat)
807            .sum();
808
809        Ok(GetBalancesResponse {
810            onchain_balance_sats: balances.total_onchain_balance_sats,
811            lightning_balance_msats: balances.total_lightning_balance_sats * 1000,
812            inbound_lightning_liquidity_msats: total_inbound_liquidity_balance_msat,
813        })
814    }
815
816    async fn get_invoice(
817        &self,
818        get_invoice_request: GetInvoiceRequest,
819    ) -> Result<Option<GetInvoiceResponse>, LightningRpcError> {
820        let invoices = self
821            .node
822            .list_payments_with_filter(|details| {
823                details.direction == PaymentDirection::Inbound
824                    && details.id == PaymentId(get_invoice_request.payment_hash.to_byte_array())
825                    && !matches!(details.kind, PaymentKind::Onchain { .. })
826            })
827            .iter()
828            .map(|details| {
829                let (preimage, payment_hash, _) = get_preimage_and_payment_hash(&details.kind);
830                let status = match details.status {
831                    PaymentStatus::Failed => fedimint_gateway_common::PaymentStatus::Failed,
832                    PaymentStatus::Succeeded => fedimint_gateway_common::PaymentStatus::Succeeded,
833                    PaymentStatus::Pending => fedimint_gateway_common::PaymentStatus::Pending,
834                };
835                GetInvoiceResponse {
836                    preimage: preimage.map(|p| p.to_string()),
837                    payment_hash,
838                    amount: Amount::from_msats(
839                        details
840                            .amount_msat
841                            .expect("amountless invoices are not supported"),
842                    ),
843                    created_at: UNIX_EPOCH + Duration::from_secs(details.latest_update_timestamp),
844                    status,
845                }
846            })
847            .collect::<Vec<_>>();
848
849        Ok(invoices.first().cloned())
850    }
851
852    async fn list_transactions(
853        &self,
854        start_secs: u64,
855        end_secs: u64,
856    ) -> Result<ListTransactionsResponse, LightningRpcError> {
857        let transactions = self
858            .node
859            .list_payments_with_filter(|details| {
860                !matches!(details.kind, PaymentKind::Onchain { .. })
861                    && details.latest_update_timestamp >= start_secs
862                    && details.latest_update_timestamp < end_secs
863            })
864            .iter()
865            .map(|details| {
866                let (preimage, payment_hash, payment_kind) =
867                    get_preimage_and_payment_hash(&details.kind);
868                let direction = match details.direction {
869                    PaymentDirection::Outbound => {
870                        fedimint_gateway_common::PaymentDirection::Outbound
871                    }
872                    PaymentDirection::Inbound => fedimint_gateway_common::PaymentDirection::Inbound,
873                };
874                let status = match details.status {
875                    PaymentStatus::Failed => fedimint_gateway_common::PaymentStatus::Failed,
876                    PaymentStatus::Succeeded => fedimint_gateway_common::PaymentStatus::Succeeded,
877                    PaymentStatus::Pending => fedimint_gateway_common::PaymentStatus::Pending,
878                };
879                fedimint_gateway_common::PaymentDetails {
880                    payment_hash,
881                    preimage: preimage.map(|p| p.to_string()),
882                    payment_kind,
883                    amount: Amount::from_msats(
884                        details
885                            .amount_msat
886                            .expect("amountless invoices are not supported"),
887                    ),
888                    direction,
889                    status,
890                    timestamp_secs: details.latest_update_timestamp,
891                }
892            })
893            .collect::<Vec<_>>();
894        Ok(ListTransactionsResponse { transactions })
895    }
896
897    fn create_offer(
898        &self,
899        amount: Option<Amount>,
900        description: Option<String>,
901        expiry_secs: Option<u32>,
902        quantity: Option<u64>,
903    ) -> Result<String, LightningRpcError> {
904        let description = description.unwrap_or_default();
905        let offer = if let Some(amount) = amount {
906            self.node
907                .bolt12_payment()
908                .receive(amount.msats, &description, expiry_secs, quantity)
909                .map_err(|err| LightningRpcError::Bolt12Error {
910                    failure_reason: err.to_string(),
911                })?
912        } else {
913            self.node
914                .bolt12_payment()
915                .receive_variable_amount(&description, expiry_secs)
916                .map_err(|err| LightningRpcError::Bolt12Error {
917                    failure_reason: err.to_string(),
918                })?
919        };
920
921        Ok(offer.to_string())
922    }
923
924    async fn pay_offer(
925        &self,
926        offer: String,
927        quantity: Option<u64>,
928        amount: Option<Amount>,
929        payer_note: Option<String>,
930    ) -> Result<Preimage, LightningRpcError> {
931        let offer = Offer::from_str(&offer).map_err(|_| LightningRpcError::Bolt12Error {
932            failure_reason: "Failed to parse Bolt12 Offer".to_string(),
933        })?;
934
935        let _offer_lock_guard = self
936            .outbound_offer_lock_pool
937            .blocking_lock(LdkOfferId(offer.id()));
938
939        let payment_id = if let Some(amount) = amount {
940            self.node
941                .bolt12_payment()
942                .send_using_amount(&offer, amount.msats, quantity, payer_note)
943                .map_err(|err| LightningRpcError::Bolt12Error {
944                    failure_reason: err.to_string(),
945                })?
946        } else {
947            self.node
948                .bolt12_payment()
949                .send(&offer, quantity, payer_note)
950                .map_err(|err| LightningRpcError::Bolt12Error {
951                    failure_reason: err.to_string(),
952                })?
953        };
954
955        loop {
956            if let Some(payment_details) = self.node.payment(&payment_id) {
957                match payment_details.status {
958                    PaymentStatus::Pending => {}
959                    PaymentStatus::Succeeded => match payment_details.kind {
960                        PaymentKind::Bolt12Offer {
961                            preimage: Some(preimage),
962                            ..
963                        } => {
964                            info!(target: LOG_LIGHTNING, offer = %offer, payment_id = %payment_id, preimage = %preimage, "Successfully paid offer");
965                            return Ok(Preimage(preimage.0));
966                        }
967                        _ => {
968                            return Err(LightningRpcError::FailedPayment {
969                                failure_reason: "Unexpected payment kind".to_string(),
970                            });
971                        }
972                    },
973                    PaymentStatus::Failed => {
974                        return Err(LightningRpcError::FailedPayment {
975                            failure_reason: "Bolt12 payment failed".to_string(),
976                        });
977                    }
978                }
979            }
980            fedimint_core::runtime::sleep(Duration::from_millis(100)).await;
981        }
982    }
983
984    fn sync_wallet(&self) -> Result<(), LightningRpcError> {
985        block_in_place(|| {
986            let _ = self.node.sync_wallets();
987        });
988        Ok(())
989    }
990}
991
992/// Maps LDK's `PaymentKind` to an optional preimage and an optional payment
993/// hash depending on the type of payment.
994fn get_preimage_and_payment_hash(
995    kind: &PaymentKind,
996) -> (
997    Option<Preimage>,
998    Option<sha256::Hash>,
999    fedimint_gateway_common::PaymentKind,
1000) {
1001    match kind {
1002        PaymentKind::Bolt11 {
1003            hash,
1004            preimage,
1005            secret: _,
1006        } => (
1007            preimage.map(|p| Preimage(p.0)),
1008            Some(sha256::Hash::from_slice(&hash.0).expect("Failed to convert payment hash")),
1009            fedimint_gateway_common::PaymentKind::Bolt11,
1010        ),
1011        PaymentKind::Bolt11Jit {
1012            hash,
1013            preimage,
1014            secret: _,
1015            lsp_fee_limits: _,
1016            ..
1017        } => (
1018            preimage.map(|p| Preimage(p.0)),
1019            Some(sha256::Hash::from_slice(&hash.0).expect("Failed to convert payment hash")),
1020            fedimint_gateway_common::PaymentKind::Bolt11,
1021        ),
1022        PaymentKind::Bolt12Offer {
1023            hash,
1024            preimage,
1025            secret: _,
1026            offer_id: _,
1027            payer_note: _,
1028            quantity: _,
1029        } => (
1030            preimage.map(|p| Preimage(p.0)),
1031            hash.map(|h| sha256::Hash::from_slice(&h.0).expect("Failed to convert payment hash")),
1032            fedimint_gateway_common::PaymentKind::Bolt12Offer,
1033        ),
1034        PaymentKind::Bolt12Refund {
1035            hash,
1036            preimage,
1037            secret: _,
1038            payer_note: _,
1039            quantity: _,
1040        } => (
1041            preimage.map(|p| Preimage(p.0)),
1042            hash.map(|h| sha256::Hash::from_slice(&h.0).expect("Failed to convert payment hash")),
1043            fedimint_gateway_common::PaymentKind::Bolt12Refund,
1044        ),
1045        PaymentKind::Spontaneous { hash, preimage } => (
1046            preimage.map(|p| Preimage(p.0)),
1047            Some(sha256::Hash::from_slice(&hash.0).expect("Failed to convert payment hash")),
1048            fedimint_gateway_common::PaymentKind::Bolt11,
1049        ),
1050        PaymentKind::Onchain { .. } => (None, None, fedimint_gateway_common::PaymentKind::Onchain),
1051    }
1052}
1053
1054/// When a port is specified in the Esplora URL, the esplora client inside LDK
1055/// node cannot connect to the lightning node when there is a trailing slash.
1056/// The `SafeUrl::Display` function will always serialize the `SafeUrl` with a
1057/// trailing slash, which causes the connection to fail.
1058///
1059/// To handle this, we explicitly construct the esplora URL when a port is
1060/// specified.
1061fn get_esplora_url(server_url: SafeUrl) -> anyhow::Result<String> {
1062    // Esplora client cannot handle trailing slashes
1063    let host = server_url
1064        .host_str()
1065        .ok_or(anyhow::anyhow!("Missing esplora host"))?;
1066    let server_url = if let Some(port) = server_url.port() {
1067        format!("{}://{}:{}", server_url.scheme(), host, port)
1068    } else {
1069        server_url.to_string()
1070    };
1071    Ok(server_url)
1072}
1073
1074#[derive(Debug, Clone, Copy, Eq, PartialEq)]
1075struct LdkOfferId(OfferId);
1076
1077impl std::hash::Hash for LdkOfferId {
1078    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1079        state.write(&self.0.0);
1080    }
1081}
1082
1083#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1084pub struct UserChannelId(pub ldk_node::UserChannelId);
1085
1086impl PartialOrd for UserChannelId {
1087    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1088        Some(self.cmp(other))
1089    }
1090}
1091
1092impl Ord for UserChannelId {
1093    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1094        self.0.0.cmp(&other.0.0)
1095    }
1096}
1097
1098#[cfg(test)]
1099mod tests;