use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use bitcoin::hashes::Hash;
use bitcoin::{Network, OutPoint};
use fedimint_bip39::Mnemonic;
use fedimint_bitcoind::{create_bitcoind, DynBitcoindRpc};
use fedimint_core::envs::{is_env_var_set, BitcoinRpcConfig};
use fedimint_core::task::{block_in_place, TaskGroup, TaskHandle};
use fedimint_core::util::SafeUrl;
use fedimint_core::{Amount, BitcoinAmountOrAll};
use fedimint_ln_common::contracts::Preimage;
use ldk_node::lightning::ln::msgs::SocketAddress;
use ldk_node::lightning::ln::PaymentHash;
use ldk_node::lightning::routing::gossip::NodeAlias;
use ldk_node::payment::{PaymentKind, PaymentStatus, SendingParameters};
use lightning::ln::channelmanager::PaymentId;
use lightning::ln::PaymentPreimage;
use lightning::util::scid_utils::scid_from_parts;
use lightning_invoice::Bolt11Invoice;
use tokio::sync::mpsc::Sender;
use tokio_stream::wrappers::ReceiverStream;
use tracing::{error, info};
use super::{
ChannelInfo, ILnRpcClient, LightningRpcError, ListActiveChannelsResponse, RouteHtlcStream,
};
use crate::{
CloseChannelsWithPeerRequest, CloseChannelsWithPeerResponse, CreateInvoiceRequest,
CreateInvoiceResponse, GetBalancesResponse, GetLnOnchainAddressResponse, GetNodeInfoResponse,
GetRouteHintsResponse, InterceptPaymentRequest, InterceptPaymentResponse, InvoiceDescription,
OpenChannelRequest, OpenChannelResponse, PayInvoiceResponse, PaymentAction, SendOnchainRequest,
SendOnchainResponse,
};
pub enum GatewayLdkChainSourceConfig {
Bitcoind { server_url: SafeUrl },
Esplora { server_url: SafeUrl },
}
impl GatewayLdkChainSourceConfig {
fn bitcoin_rpc_config(&self) -> BitcoinRpcConfig {
match self {
Self::Bitcoind { server_url } => BitcoinRpcConfig {
kind: "bitcoind".to_string(),
url: server_url.clone(),
},
Self::Esplora { server_url } => BitcoinRpcConfig {
kind: "esplora".to_string(),
url: server_url.clone(),
},
}
}
}
pub struct GatewayLdkClient {
node: Arc<ldk_node::Node>,
bitcoind_rpc: DynBitcoindRpc,
task_group: TaskGroup,
htlc_stream_receiver_or: Option<tokio::sync::mpsc::Receiver<InterceptPaymentRequest>>,
outbound_lightning_payment_lock_pool: lockable::LockPool<PaymentId>,
}
impl std::fmt::Debug for GatewayLdkClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GatewayLdkClient").finish_non_exhaustive()
}
}
impl GatewayLdkClient {
pub fn new(
data_dir: &Path,
chain_source_config: GatewayLdkChainSourceConfig,
network: Network,
lightning_port: u16,
mnemonic: Mnemonic,
runtime: Arc<tokio::runtime::Runtime>,
) -> anyhow::Result<Self> {
let node_alias = if network == Network::Bitcoin {
None
} else {
let alias = format!("{network} LDK Gateway");
let mut bytes = [0u8; 32];
bytes[..alias.as_bytes().len()].copy_from_slice(alias.as_bytes());
Some(NodeAlias(bytes))
};
let mut node_builder = ldk_node::Builder::from_config(ldk_node::config::Config {
network,
listening_addresses: Some(vec![SocketAddress::TcpIpV4 {
addr: [0, 0, 0, 0],
port: lightning_port,
}]),
node_alias,
..Default::default()
});
node_builder.set_entropy_bip39_mnemonic(mnemonic, None);
let bitcoind_rpc = create_bitcoind(&chain_source_config.bitcoin_rpc_config())?;
match chain_source_config {
GatewayLdkChainSourceConfig::Bitcoind { server_url } => {
node_builder.set_chain_source_bitcoind_rpc(
server_url
.host_str()
.expect("Could not retrieve host from bitcoind RPC url")
.to_string(),
server_url
.port()
.expect("Could not retrieve port from bitcoind RPC url"),
server_url.username().to_string(),
server_url.password().unwrap_or_default().to_string(),
);
}
GatewayLdkChainSourceConfig::Esplora { server_url } => {
node_builder.set_chain_source_esplora(server_url.to_string(), None);
}
};
let Some(data_dir_str) = data_dir.to_str() else {
return Err(anyhow::anyhow!("Invalid data dir path"));
};
node_builder.set_storage_dir_path(data_dir_str.to_string());
let node = Arc::new(node_builder.build()?);
node.start_with_runtime(runtime).map_err(|e| {
error!(?e, "Failed to start LDK Node");
LightningRpcError::FailedToConnect
})?;
let (htlc_stream_sender, htlc_stream_receiver) = tokio::sync::mpsc::channel(1024);
let task_group = TaskGroup::new();
let node_clone = node.clone();
task_group.spawn("ldk lightning node event handler", |handle| async move {
loop {
Self::handle_next_event(&node_clone, &htlc_stream_sender, &handle).await;
}
});
Ok(GatewayLdkClient {
node,
bitcoind_rpc,
task_group,
htlc_stream_receiver_or: Some(htlc_stream_receiver),
outbound_lightning_payment_lock_pool: lockable::LockPool::new(),
})
}
async fn handle_next_event(
node: &ldk_node::Node,
htlc_stream_sender: &Sender<InterceptPaymentRequest>,
handle: &TaskHandle,
) {
let event = tokio::select! {
event = node.next_event_async() => {
event
}
() = handle.make_shutdown_rx() => {
return;
}
};
if let ldk_node::Event::PaymentClaimable {
payment_id: _,
payment_hash,
claimable_amount_msat,
claim_deadline,
} = event
{
if let Err(e) = htlc_stream_sender
.send(InterceptPaymentRequest {
payment_hash: Hash::from_slice(&payment_hash.0).expect("Failed to create Hash"),
amount_msat: claimable_amount_msat,
expiry: claim_deadline.unwrap_or_default(),
short_channel_id: None,
incoming_chan_id: 0,
htlc_id: 0,
})
.await
{
error!(?e, "Failed send InterceptHtlcRequest to stream");
}
}
node.event_handled();
}
async fn outpoint_to_scid(&self, funding_txo: OutPoint) -> anyhow::Result<u64> {
let block_hash = self
.bitcoind_rpc
.get_txout_proof(funding_txo.txid)
.await?
.block_header
.block_hash();
let block_height = self
.bitcoind_rpc
.get_tx_block_height(&funding_txo.txid)
.await?
.ok_or(anyhow::anyhow!("Failed to get block height"))?;
let block = self.bitcoind_rpc.get_block(&block_hash).await?;
let tx_index = block
.txdata
.iter()
.enumerate()
.find(|(_, tx)| tx.compute_txid() == funding_txo.txid)
.ok_or(anyhow::anyhow!("Failed to find transaction"))?
.0 as u32;
let output_index = funding_txo.vout;
scid_from_parts(block_height, u64::from(tx_index), u64::from(output_index))
.map_err(|e| anyhow::anyhow!("Failed to convert to short channel ID: {e:?}"))
}
}
impl Drop for GatewayLdkClient {
fn drop(&mut self) {
self.task_group.shutdown();
info!("Stopping LDK Node...");
if let Err(e) = self.node.stop() {
error!(?e, "Failed to stop LDK Node");
} else {
info!("LDK Node stopped.");
}
}
}
#[async_trait]
impl ILnRpcClient for GatewayLdkClient {
async fn info(&self) -> Result<GetNodeInfoResponse, LightningRpcError> {
if is_env_var_set("FM_IN_DEVIMINT") {
block_in_place(|| {
let _ = self.node.sync_wallets();
});
}
let node_status = self.node.status();
let chain_tip_block_height =
u32::try_from(self.bitcoind_rpc.get_block_count().await.map_err(|e| {
LightningRpcError::FailedToGetNodeInfo {
failure_reason: format!("Failed to get block count from chain source: {e}"),
}
})?)
.expect("Failed to convert block count to u32")
- 1;
let ldk_block_height = node_status.current_best_block.height;
let synced_to_chain = chain_tip_block_height == ldk_block_height;
assert!(
chain_tip_block_height >= ldk_block_height,
"LDK Block Height is in the future"
);
Ok(GetNodeInfoResponse {
pub_key: self.node.node_id(),
alias: match self.node.node_alias() {
Some(alias) => alias.to_string(),
None => format!("LDK Fedimint Gateway Node {}", self.node.node_id()),
},
network: self.node.config().network.to_string(),
block_height: ldk_block_height,
synced_to_chain,
})
}
async fn routehints(
&self,
_num_route_hints: usize,
) -> Result<GetRouteHintsResponse, LightningRpcError> {
Ok(GetRouteHintsResponse {
route_hints: vec![],
})
}
async fn pay(
&self,
invoice: Bolt11Invoice,
max_delay: u64,
max_fee: Amount,
) -> Result<PayInvoiceResponse, LightningRpcError> {
let payment_id = PaymentId(*invoice.payment_hash().as_byte_array());
let _payment_lock_guard = self
.outbound_lightning_payment_lock_pool
.async_lock(payment_id)
.await;
if self.node.payment(&payment_id).is_none() {
assert_eq!(
self.node
.bolt11_payment()
.send(
&invoice,
Some(SendingParameters {
max_total_routing_fee_msat: Some(Some(max_fee.msats)),
max_total_cltv_expiry_delta: Some(max_delay as u32),
max_path_count: None,
max_channel_saturation_power_of_half: None,
}),
)
.map_err(|e| LightningRpcError::FailedPayment {
failure_reason: format!("LDK payment failed to initialize: {e:?}"),
})?,
payment_id
);
}
loop {
if let Some(payment_details) = self.node.payment(&payment_id) {
match payment_details.status {
PaymentStatus::Pending => {}
PaymentStatus::Succeeded => {
if let PaymentKind::Bolt11 {
preimage: Some(preimage),
..
} = payment_details.kind
{
return Ok(PayInvoiceResponse {
preimage: Preimage(preimage.0),
});
}
}
PaymentStatus::Failed => {
return Err(LightningRpcError::FailedPayment {
failure_reason: "LDK payment failed".to_string(),
});
}
}
}
fedimint_core::runtime::sleep(Duration::from_millis(100)).await;
}
}
async fn route_htlcs<'a>(
mut self: Box<Self>,
_task_group: &TaskGroup,
) -> Result<(RouteHtlcStream<'a>, Arc<dyn ILnRpcClient>), LightningRpcError> {
let route_htlc_stream = match self.htlc_stream_receiver_or.take() {
Some(stream) => Ok(Box::pin(ReceiverStream::new(stream))),
None => Err(LightningRpcError::FailedToRouteHtlcs {
failure_reason:
"Stream does not exist. Likely was already taken by calling `route_htlcs()`."
.to_string(),
}),
}?;
Ok((route_htlc_stream, Arc::new(*self)))
}
async fn complete_htlc(&self, htlc: InterceptPaymentResponse) -> Result<(), LightningRpcError> {
let InterceptPaymentResponse {
action,
payment_hash,
incoming_chan_id: _,
htlc_id: _,
} = htlc;
let ph = PaymentHash(*payment_hash.clone().as_byte_array());
let claimable_amount_msat = 999_999_999_999_999;
let ph_hex_str = hex::encode(payment_hash);
if let PaymentAction::Settle(preimage) = action {
self.node
.bolt11_payment()
.claim_for_hash(ph, claimable_amount_msat, PaymentPreimage(preimage.0))
.map_err(|_| LightningRpcError::FailedToCompleteHtlc {
failure_reason: format!("Failed to claim LDK payment with hash {ph_hex_str}"),
})?;
} else {
error!("Unwinding payment with hash {ph_hex_str} because the action was not `Settle`");
self.node.bolt11_payment().fail_for_hash(ph).map_err(|_| {
LightningRpcError::FailedToCompleteHtlc {
failure_reason: format!("Failed to unwind LDK payment with hash {ph_hex_str}"),
}
})?;
}
return Ok(());
}
async fn create_invoice(
&self,
create_invoice_request: CreateInvoiceRequest,
) -> Result<CreateInvoiceResponse, LightningRpcError> {
let payment_hash_or = if let Some(payment_hash) = create_invoice_request.payment_hash {
let ph = PaymentHash(*payment_hash.as_byte_array());
Some(ph)
} else {
None
};
let description_str = match create_invoice_request.description {
Some(InvoiceDescription::Direct(desc)) => desc,
_ => String::new(),
};
let invoice = match payment_hash_or {
Some(payment_hash) => self.node.bolt11_payment().receive_for_hash(
create_invoice_request.amount_msat,
description_str.as_str(),
create_invoice_request.expiry_secs,
payment_hash,
),
None => self.node.bolt11_payment().receive(
create_invoice_request.amount_msat,
description_str.as_str(),
create_invoice_request.expiry_secs,
),
}
.map_err(|e| LightningRpcError::FailedToGetInvoice {
failure_reason: e.to_string(),
})?;
Ok(CreateInvoiceResponse {
invoice: invoice.to_string(),
})
}
async fn get_ln_onchain_address(
&self,
) -> Result<GetLnOnchainAddressResponse, LightningRpcError> {
self.node
.onchain_payment()
.new_address()
.map(|address| GetLnOnchainAddressResponse {
address: address.to_string(),
})
.map_err(|e| LightningRpcError::FailedToGetLnOnchainAddress {
failure_reason: e.to_string(),
})
}
async fn send_onchain(
&self,
SendOnchainRequest {
address,
amount,
fee_rate_sats_per_vbyte: _,
}: SendOnchainRequest,
) -> Result<SendOnchainResponse, LightningRpcError> {
let onchain = self.node.onchain_payment();
let txid = match amount {
BitcoinAmountOrAll::All => onchain.send_all_to_address(&address.assume_checked()),
BitcoinAmountOrAll::Amount(amount_sats) => {
onchain.send_to_address(&address.assume_checked(), amount_sats.to_sat())
}
}
.map_err(|e| LightningRpcError::FailedToWithdrawOnchain {
failure_reason: e.to_string(),
})?;
Ok(SendOnchainResponse {
txid: txid.to_string(),
})
}
async fn open_channel(
&self,
OpenChannelRequest {
pubkey,
host,
channel_size_sats,
push_amount_sats,
}: OpenChannelRequest,
) -> Result<OpenChannelResponse, LightningRpcError> {
let push_amount_msats_or = if push_amount_sats == 0 {
None
} else {
Some(push_amount_sats * 1000)
};
let user_channel_id = self
.node
.open_announced_channel(
pubkey,
SocketAddress::from_str(&host).map_err(|e| {
LightningRpcError::FailedToConnectToPeer {
failure_reason: e.to_string(),
}
})?,
channel_size_sats,
push_amount_msats_or,
None,
)
.map_err(|e| LightningRpcError::FailedToOpenChannel {
failure_reason: e.to_string(),
})?;
for _ in 0..10 {
let funding_txid_or = self
.node
.list_channels()
.iter()
.find(|channel| channel.user_channel_id == user_channel_id)
.and_then(|channel| channel.funding_txo)
.map(|funding_txo| funding_txo.txid);
if let Some(funding_txid) = funding_txid_or {
return Ok(OpenChannelResponse {
funding_txid: funding_txid.to_string(),
});
}
fedimint_core::runtime::sleep(Duration::from_millis(100)).await;
}
Err(LightningRpcError::FailedToOpenChannel {
failure_reason: "Channel could not be opened".to_string(),
})
}
async fn close_channels_with_peer(
&self,
CloseChannelsWithPeerRequest { pubkey }: CloseChannelsWithPeerRequest,
) -> Result<CloseChannelsWithPeerResponse, LightningRpcError> {
let mut num_channels_closed = 0;
for channel_with_peer in self
.node
.list_channels()
.iter()
.filter(|channel| channel.counterparty_node_id == pubkey)
{
if self
.node
.close_channel(&channel_with_peer.user_channel_id, pubkey)
.is_ok()
{
num_channels_closed += 1;
}
}
Ok(CloseChannelsWithPeerResponse {
num_channels_closed,
})
}
async fn list_active_channels(&self) -> Result<ListActiveChannelsResponse, LightningRpcError> {
let mut channels = Vec::new();
for channel_details in self
.node
.list_channels()
.iter()
.filter(|channel| channel.is_usable)
{
channels.push(ChannelInfo {
remote_pubkey: channel_details.counterparty_node_id,
channel_size_sats: channel_details.channel_value_sats,
outbound_liquidity_sats: channel_details.outbound_capacity_msat / 1000,
inbound_liquidity_sats: channel_details.inbound_capacity_msat / 1000,
short_channel_id: match channel_details.funding_txo {
Some(funding_txo) => self.outpoint_to_scid(funding_txo).await.unwrap_or(0),
None => 0,
},
});
}
Ok(ListActiveChannelsResponse { channels })
}
async fn get_balances(&self) -> Result<GetBalancesResponse, LightningRpcError> {
let balances = self.node.list_balances();
let channel_lists = self
.node
.list_channels()
.into_iter()
.filter(|chan| chan.is_usable)
.collect::<Vec<_>>();
let total_inbound_liquidity_balance_msat: u64 = channel_lists
.iter()
.map(|channel| channel.inbound_capacity_msat)
.sum();
Ok(GetBalancesResponse {
onchain_balance_sats: balances.total_onchain_balance_sats,
lightning_balance_msats: balances.total_lightning_balance_sats * 1000,
inbound_lightning_liquidity_msats: total_inbound_liquidity_balance_msat,
})
}
}