1use std::collections::HashMap;
2use std::ops::ControlFlow;
3use std::path::PathBuf;
4use std::str::FromStr;
5use std::time::SystemTime;
6
7use anyhow::{Context, Result, anyhow};
8use bitcoin::hashes::sha256;
9use chrono::{DateTime, Utc};
10use esplora_client::Txid;
11use fedimint_core::config::FederationId;
12use fedimint_core::secp256k1::PublicKey;
13use fedimint_core::util::{backoff_util, retry};
14use fedimint_core::{Amount, BitcoinAmountOrAll, BitcoinHash, default_esplora_server};
15use fedimint_gateway_common::{
16 ChannelInfo, CreateOfferResponse, GatewayBalances, GetInvoiceResponse,
17 ListTransactionsResponse, MnemonicResponse, PaymentDetails, PaymentStatus,
18 PaymentSummaryResponse, V1_API_ENDPOINT,
19};
20use fedimint_ln_server::common::lightning_invoice::Bolt11Invoice;
21use fedimint_lnv2_common::gateway_api::PaymentFee;
22use fedimint_logging::LOG_DEVIMINT;
23use fedimint_testing_core::node_type::LightningNodeType;
24use semver::Version;
25use tracing::{debug, info};
26
27use crate::cmd;
28use crate::envs::{
29 FM_GATEWAY_API_ADDR_ENV, FM_GATEWAY_DATA_DIR_ENV, FM_GATEWAY_LISTEN_ADDR_ENV, FM_PORT_LDK_ENV,
30};
31use crate::external::{Bitcoind, LightningNode};
32use crate::federation::Federation;
33use crate::util::{Command, ProcessHandle, ProcessManager, poll, supports_lnv2};
34use crate::vars::utf8;
35use crate::version_constants::{VERSION_0_7_0_ALPHA, VERSION_0_9_0_ALPHA};
36
37#[derive(Clone)]
38pub enum LdkChainSource {
39 Bitcoind,
40 Esplora,
41}
42
43#[derive(Clone)]
44pub struct Gatewayd {
45 pub(crate) process: ProcessHandle,
46 pub ln: LightningNode,
47 pub addr: String,
48 pub(crate) lightning_node_addr: String,
49 pub gatewayd_version: Version,
50 pub gw_name: String,
51 pub log_path: PathBuf,
52 pub gw_port: u16,
53 pub ldk_port: u16,
54}
55
56impl Gatewayd {
57 pub async fn new(process_mgr: &ProcessManager, ln: LightningNode) -> Result<Self> {
58 let ln_type = ln.ln_type();
59 let (gw_name, port, lightning_node_port) = match &ln {
60 LightningNode::Lnd(_) => (
61 "gatewayd-lnd".to_string(),
62 process_mgr.globals.FM_PORT_GW_LND,
63 process_mgr.globals.FM_PORT_LND_LISTEN,
64 ),
65 LightningNode::Ldk {
66 name,
67 gw_port,
68 ldk_port,
69 chain_source: _,
70 } => (name.to_owned(), gw_port.to_owned(), ldk_port.to_owned()),
71 };
72 let test_dir = &process_mgr.globals.FM_TEST_DIR;
73 let addr = format!("http://127.0.0.1:{port}/{V1_API_ENDPOINT}");
74 let lightning_node_addr = format!("127.0.0.1:{lightning_node_port}");
75
76 let mut gateway_env: HashMap<String, String> = HashMap::from_iter([
77 (
78 FM_GATEWAY_DATA_DIR_ENV.to_owned(),
79 format!("{}/{gw_name}", utf8(test_dir)),
80 ),
81 (
82 FM_GATEWAY_LISTEN_ADDR_ENV.to_owned(),
83 format!("127.0.0.1:{port}"),
84 ),
85 (FM_GATEWAY_API_ADDR_ENV.to_owned(), addr.clone()),
86 (FM_PORT_LDK_ENV.to_owned(), lightning_node_port.to_string()),
87 ]);
88 if !supports_lnv2() {
89 info!(target: LOG_DEVIMINT, "LNv2 is not supported, running gatewayd in LNv1 mode");
90 gateway_env.insert(
91 "FM_GATEWAY_LIGHTNING_MODULE_MODE".to_owned(),
92 "LNv1".to_string(),
93 );
94 }
95 if ln_type == LightningNodeType::Ldk {
96 gateway_env.insert("FM_LDK_ALIAS".to_owned(), gw_name.clone());
97 Self::set_ldk_chain_source(&ln, &mut gateway_env, process_mgr);
98 }
99 let gatewayd_version = crate::util::Gatewayd::version_or_default().await;
100 let process = process_mgr
101 .spawn_daemon(
102 &gw_name,
103 Gatewayd::start_gatewayd(&ln_type, &gatewayd_version).envs(gateway_env),
104 )
105 .await?;
106
107 let log_path = process_mgr
108 .globals
109 .FM_LOGS_DIR
110 .join(format!("{gw_name}.log"));
111 let gatewayd = Self {
112 process,
113 ln,
114 addr,
115 lightning_node_addr,
116 gatewayd_version,
117 gw_name,
118 log_path,
119 gw_port: port,
120 ldk_port: lightning_node_port,
121 };
122 poll(
123 "waiting for gateway to be ready to respond to rpc",
124 || async { gatewayd.gateway_id().await.map_err(ControlFlow::Continue) },
125 )
126 .await?;
127 Ok(gatewayd)
128 }
129
130 fn set_ldk_chain_source(
131 ln: &LightningNode,
132 gateway_env: &mut HashMap<String, String>,
133 process_mgr: &ProcessManager,
134 ) {
135 let network = if let Ok(network) = std::env::var("FM_GATEWAY_NETWORK") {
136 bitcoin::Network::from_str(&network).expect("Invalid network specified")
137 } else {
138 bitcoin::Network::Regtest
139 };
140 if let LightningNode::Ldk {
141 name: _,
142 gw_port: _,
143 ldk_port: _,
144 chain_source,
145 } = ln
146 {
147 match chain_source {
148 LdkChainSource::Bitcoind => {
149 let btc_rpc_port = process_mgr.globals.FM_PORT_BTC_RPC;
150 gateway_env.insert(
151 "FM_LDK_BITCOIND_RPC_URL".to_owned(),
152 format!("http://bitcoin:bitcoin@127.0.0.1:{btc_rpc_port}"),
153 );
154 }
155 LdkChainSource::Esplora => {
156 let esplora_port = process_mgr.globals.FM_PORT_ESPLORA.to_string();
157 gateway_env.insert(
158 "FM_LDK_ESPLORA_SERVER_URL".to_owned(),
159 default_esplora_server(network, Some(esplora_port))
160 .url
161 .to_string(),
162 );
163 }
164 }
165 }
166 }
167
168 fn is_forced_current(&self) -> bool {
169 self.ln.ln_type() == LightningNodeType::Ldk && self.gatewayd_version < *VERSION_0_7_0_ALPHA
170 }
171
172 fn start_gatewayd(ln_type: &LightningNodeType, gatewayd_version: &Version) -> Command {
173 if *ln_type == LightningNodeType::Ldk && *gatewayd_version < *VERSION_0_7_0_ALPHA {
176 cmd!("gatewayd", ln_type)
177 } else {
178 cmd!(crate::util::Gatewayd, ln_type)
179 }
180 }
181
182 pub async fn terminate(self) -> Result<()> {
183 self.process.terminate().await
184 }
185
186 pub fn set_lightning_node(&mut self, ln_node: LightningNode) {
187 self.ln = ln_node;
188 }
189
190 pub async fn stop_lightning_node(&mut self) -> Result<()> {
191 info!(target: LOG_DEVIMINT, "Stopping lightning node");
192 match self.ln.clone() {
193 LightningNode::Lnd(lnd) => lnd.terminate().await,
194 LightningNode::Ldk {
195 name: _,
196 gw_port: _,
197 ldk_port: _,
198 chain_source: _,
199 } => {
200 unimplemented!("LDK node termination not implemented")
203 }
204 }
205 }
206
207 pub async fn restart_with_bin(
210 &mut self,
211 process_mgr: &ProcessManager,
212 gatewayd_path: &PathBuf,
213 gateway_cli_path: &PathBuf,
214 ) -> Result<()> {
215 let ln = self.ln.clone();
216
217 self.process.terminate().await?;
218 unsafe { std::env::set_var("FM_GATEWAYD_BASE_EXECUTABLE", gatewayd_path) };
220 unsafe { std::env::set_var("FM_GATEWAY_CLI_BASE_EXECUTABLE", gateway_cli_path) };
222
223 if supports_lnv2() {
224 info!(target: LOG_DEVIMINT, "LNv2 is now supported, running in All mode");
225 unsafe { std::env::set_var("FM_GATEWAY_LIGHTNING_MODULE_MODE", "All") };
227 }
228
229 let new_ln = ln;
230 let new_gw = Self::new(process_mgr, new_ln.clone()).await?;
231 self.process = new_gw.process;
232 self.set_lightning_node(new_ln);
233 let gatewayd_version = crate::util::Gatewayd::version_or_default().await;
234 let gateway_cli_version = crate::util::GatewayCli::version_or_default().await;
235 info!(
236 target: LOG_DEVIMINT,
237 ?gatewayd_version,
238 ?gateway_cli_version,
239 "upgraded gatewayd and gateway-cli"
240 );
241 Ok(())
242 }
243
244 pub fn cmd(&self) -> Command {
245 if self.is_forced_current() {
246 cmd!(
247 "gateway-cli",
248 "--rpcpassword=theresnosecondbest",
249 "-a",
250 &self.addr
251 )
252 } else {
253 cmd!(
254 crate::util::get_gateway_cli_path(),
255 "--rpcpassword=theresnosecondbest",
256 "-a",
257 &self.addr
258 )
259 }
260 }
261
262 pub async fn get_info(&self) -> Result<serde_json::Value> {
263 retry(
264 "Getting gateway info via gateway-cli info",
265 backoff_util::aggressive_backoff(),
266 || async { cmd!(self, "info").out_json().await },
267 )
268 .await
269 .context("Getting gateway info via gateway-cli info")
270 }
271
272 pub async fn gateway_id(&self) -> Result<String> {
273 let info = self.get_info().await?;
274 let gateway_id = info["gateway_id"]
275 .as_str()
276 .context("gateway_id must be a string")?
277 .to_owned();
278 Ok(gateway_id)
279 }
280
281 pub async fn lightning_pubkey(&self) -> Result<PublicKey> {
282 let info = self.get_info().await?;
283 let lightning_pub_key = info["lightning_pub_key"]
284 .as_str()
285 .context("lightning_pub_key must be a string")?
286 .to_owned();
287 Ok(lightning_pub_key.parse()?)
288 }
289
290 pub async fn connect_fed(&self, fed: &Federation) -> Result<()> {
291 let invite_code = fed.invite_code()?;
292 poll("gateway connect-fed", || async {
293 cmd!(self, "connect-fed", invite_code.clone())
294 .run()
295 .await
296 .map_err(ControlFlow::Continue)?;
297 Ok(())
298 })
299 .await?;
300 Ok(())
301 }
302
303 pub async fn recover_fed(&self, fed: &Federation) -> Result<()> {
304 let federation_id = fed.calculate_federation_id();
305 let invite_code = fed.invite_code()?;
306 info!(target: LOG_DEVIMINT, federation_id = %federation_id, "Recovering...");
307 poll("gateway connect-fed --recover=true", || async {
308 cmd!(self, "connect-fed", invite_code.clone(), "--recover=true")
309 .run()
310 .await
311 .map_err(ControlFlow::Continue)?;
312 Ok(())
313 })
314 .await?;
315 Ok(())
316 }
317
318 pub async fn backup_to_fed(&self, fed: &Federation) -> Result<()> {
319 let federation_id = fed.calculate_federation_id();
320 cmd!(self, "ecash", "backup", "--federation-id", federation_id)
321 .run()
322 .await?;
323 Ok(())
324 }
325
326 pub async fn get_pegin_addr(&self, fed_id: &str) -> Result<String> {
327 Ok(cmd!(self, "ecash", "pegin", "--federation-id={fed_id}")
328 .out_json()
329 .await?
330 .as_str()
331 .context("address must be a string")?
332 .to_owned())
333 }
334
335 pub async fn get_ln_onchain_address(&self) -> Result<String> {
336 cmd!(self, "onchain", "address").out_string().await
337 }
338
339 pub async fn get_mnemonic(&self) -> Result<MnemonicResponse> {
340 let value = retry(
341 "Getting gateway mnemonic",
342 backoff_util::aggressive_backoff(),
343 || async { cmd!(self, "seed").out_json().await },
344 )
345 .await
346 .context("Getting gateway mnemonic")?;
347
348 Ok(serde_json::from_value(value)?)
349 }
350
351 pub async fn leave_federation(&self, federation_id: FederationId) -> Result<()> {
352 cmd!(self, "leave-fed", "--federation-id", federation_id)
353 .run()
354 .await?;
355 Ok(())
356 }
357
358 pub async fn create_invoice(&self, amount_msats: u64) -> Result<Bolt11Invoice> {
359 Ok(Bolt11Invoice::from_str(
360 &cmd!(self, "lightning", "create-invoice", amount_msats)
361 .out_string()
362 .await?,
363 )?)
364 }
365
366 pub async fn pay_invoice(&self, invoice: Bolt11Invoice) -> Result<()> {
367 cmd!(self, "lightning", "pay-invoice", invoice.to_string())
368 .run()
369 .await?;
370
371 Ok(())
372 }
373
374 pub async fn send_ecash(&self, federation_id: String, amount_msats: u64) -> Result<String> {
375 let value = cmd!(
376 self,
377 "ecash",
378 "send",
379 "--federation-id",
380 federation_id,
381 amount_msats
382 )
383 .out_json()
384 .await?;
385 let ecash: String = serde_json::from_value(
386 value
387 .get("notes")
388 .expect("notes key does not exist")
389 .clone(),
390 )?;
391 Ok(ecash)
392 }
393
394 pub async fn receive_ecash(&self, ecash: String) -> Result<()> {
395 cmd!(self, "ecash", "receive", "--notes", ecash)
396 .run()
397 .await?;
398 Ok(())
399 }
400
401 pub async fn get_balances(&self) -> Result<GatewayBalances> {
402 let value = cmd!(self, "get-balances").out_json().await?;
403 Ok(serde_json::from_value(value)?)
404 }
405
406 pub async fn ecash_balance(&self, federation_id: String) -> anyhow::Result<u64> {
407 let federation_id = FederationId::from_str(&federation_id)?;
408 let balances = self.get_balances().await?;
409 let ecash_balance = balances
410 .ecash_balances
411 .into_iter()
412 .find(|info| info.federation_id == federation_id)
413 .ok_or(anyhow::anyhow!("Gateway is not joined to federation"))?
414 .ecash_balance_msats
415 .msats;
416 Ok(ecash_balance)
417 }
418
419 pub async fn send_onchain(
420 &self,
421 bitcoind: &Bitcoind,
422 amount: BitcoinAmountOrAll,
423 fee_rate: u64,
424 ) -> Result<bitcoin::Txid> {
425 let withdraw_address = bitcoind.get_new_address().await?;
426 let value = cmd!(
427 self,
428 "onchain",
429 "send",
430 "--address",
431 withdraw_address,
432 "--amount",
433 amount,
434 "--fee-rate-sats-per-vbyte",
435 fee_rate
436 )
437 .out_json()
438 .await?;
439
440 let txid: bitcoin::Txid = serde_json::from_value(value)?;
441 Ok(txid)
442 }
443
444 pub async fn close_channel(&self, remote_pubkey: PublicKey, force: bool) -> Result<()> {
445 let gateway_cli_version = crate::util::GatewayCli::version_or_default().await;
446 let mut close_channel = if force && gateway_cli_version >= *VERSION_0_9_0_ALPHA {
447 cmd!(
448 self,
449 "lightning",
450 "close-channels-with-peer",
451 "--pubkey",
452 remote_pubkey,
453 "--force",
454 )
455 } else {
456 cmd!(
457 self,
458 "lightning",
459 "close-channels-with-peer",
460 "--pubkey",
461 remote_pubkey,
462 )
463 };
464
465 close_channel.run().await?;
466
467 Ok(())
468 }
469
470 pub async fn close_all_channels(&self, force: bool) -> Result<()> {
471 let channels = self.list_channels().await?;
472
473 for chan in channels {
474 let remote_pubkey = chan.remote_pubkey;
475 self.close_channel(remote_pubkey, force).await?;
476 }
477
478 Ok(())
479 }
480
481 pub async fn open_channel(
484 &self,
485 gw: &Gatewayd,
486 channel_size_sats: u64,
487 push_amount_sats: Option<u64>,
488 ) -> Result<Txid> {
489 let pubkey = gw.lightning_pubkey().await?;
490
491 let mut command = cmd!(
492 self,
493 "lightning",
494 "open-channel",
495 "--pubkey",
496 pubkey,
497 "--host",
498 gw.lightning_node_addr,
499 "--channel-size-sats",
500 channel_size_sats,
501 "--push-amount-sats",
502 push_amount_sats.unwrap_or(0)
503 );
504
505 Ok(Txid::from_str(&command.out_string().await?)?)
506 }
507
508 pub async fn list_channels(&self) -> Result<Vec<ChannelInfo>> {
509 let gateway_cli_version = crate::util::GatewayCli::version_or_default().await;
510 let channels = if gateway_cli_version >= *VERSION_0_9_0_ALPHA || self.is_forced_current() {
511 cmd!(self, "lightning", "list-channels").out_json().await?
512 } else {
513 cmd!(self, "lightning", "list-active-channels")
514 .out_json()
515 .await?
516 };
517
518 let channels = channels
519 .as_array()
520 .context("channels must be an array")?
521 .iter()
522 .map(|channel| {
523 let remote_pubkey = channel["remote_pubkey"]
524 .as_str()
525 .context("remote_pubkey must be a string")?
526 .to_owned();
527 let channel_size_sats = channel["channel_size_sats"]
528 .as_u64()
529 .context("channel_size_sats must be a u64")?;
530 let outbound_liquidity_sats = channel["outbound_liquidity_sats"]
531 .as_u64()
532 .context("outbound_liquidity_sats must be a u64")?;
533 let inbound_liquidity_sats = channel["inbound_liquidity_sats"]
534 .as_u64()
535 .context("inbound_liquidity_sats must be a u64")?;
536 let is_active = channel["is_active"].as_bool().unwrap_or(true);
537 Ok(ChannelInfo {
538 remote_pubkey: remote_pubkey
539 .parse()
540 .expect("Lightning node returned invalid remote channel pubkey"),
541 channel_size_sats,
542 outbound_liquidity_sats,
543 inbound_liquidity_sats,
544 is_active,
545 })
546 })
547 .collect::<Result<Vec<ChannelInfo>>>()?;
548 Ok(channels)
549 }
550
551 pub async fn wait_for_block_height(&self, target_block_height: u64) -> Result<()> {
552 poll("waiting for block height", || async {
553 let info = self.get_info().await.map_err(ControlFlow::Continue)?;
554 let value = info.get("block_height");
555 if let Some(height) = value {
556 let block_height: Option<u32> = serde_json::from_value(height.clone())
557 .context("Could not parse block height")
558 .map_err(ControlFlow::Continue)?;
559 let Some(block_height) = block_height else {
560 return Err(ControlFlow::Continue(anyhow!("Not synced any blocks yet")));
561 };
562 let synced = info["synced_to_chain"]
563 .as_bool()
564 .expect("Could not get synced_to_chain");
565 if block_height >= target_block_height as u32 && synced {
566 return Ok(());
567 }
568 }
569 Err(ControlFlow::Continue(anyhow!("Not synced to block")))
570 })
571 .await?;
572 Ok(())
573 }
574
575 pub async fn get_lightning_fee(&self, fed_id: String) -> Result<PaymentFee> {
576 let info_value = self.get_info().await?;
577 let federations = info_value["federations"]
578 .as_array()
579 .expect("federations is an array");
580
581 let fed = federations
582 .iter()
583 .find(|fed| {
584 serde_json::from_value::<String>(fed["federation_id"].clone())
585 .expect("could not deserialize federation_id")
586 == fed_id
587 })
588 .ok_or_else(|| anyhow!("Federation not found"))?;
589
590 let lightning_fee = fed["config"]["lightning_fee"].clone();
591 let base: Amount = serde_json::from_value(lightning_fee["base"].clone())
592 .map_err(|e| anyhow!("Couldnt parse base: {}", e))?;
593 let parts_per_million: u64 =
594 serde_json::from_value(lightning_fee["parts_per_million"].clone())
595 .map_err(|e| anyhow!("Couldnt parse parts_per_million: {}", e))?;
596
597 Ok(PaymentFee {
598 base,
599 parts_per_million,
600 })
601 }
602
603 pub async fn set_federation_routing_fee(
604 &self,
605 fed_id: String,
606 base: u64,
607 ppm: u64,
608 ) -> Result<()> {
609 cmd!(
610 self,
611 "cfg",
612 "set-fees",
613 "--federation-id",
614 fed_id,
615 "--ln-base",
616 base,
617 "--ln-ppm",
618 ppm
619 )
620 .run()
621 .await?;
622
623 Ok(())
624 }
625
626 pub async fn set_federation_transaction_fee(
627 &self,
628 fed_id: String,
629 base: u64,
630 ppm: u64,
631 ) -> Result<()> {
632 cmd!(
633 self,
634 "cfg",
635 "set-fees",
636 "--federation-id",
637 fed_id,
638 "--tx-base",
639 base,
640 "--tx-ppm",
641 ppm
642 )
643 .run()
644 .await?;
645
646 Ok(())
647 }
648
649 pub async fn payment_summary(&self) -> Result<PaymentSummaryResponse> {
650 let out_json = cmd!(self, "payment-summary").out_json().await?;
651 Ok(serde_json::from_value(out_json).expect("Could not deserialize PaymentSummaryResponse"))
652 }
653
654 pub async fn wait_bolt11_invoice(&self, payment_hash: Vec<u8>) -> Result<()> {
655 let gatewayd_version = crate::util::Gatewayd::version_or_default().await;
656 if gatewayd_version < *VERSION_0_7_0_ALPHA {
657 if let LightningNode::Lnd(lnd) = &self.ln {
658 return lnd.wait_bolt11_invoice(payment_hash).await;
659 }
660
661 debug!("Skipping bolt11 invoice check because it is not supported until v0.7");
662 return Ok(());
663 }
664
665 let payment_hash =
666 sha256::Hash::from_slice(&payment_hash).expect("Could not parse payment hash");
667 let invoice_val = cmd!(
668 self,
669 "lightning",
670 "get-invoice",
671 "--payment-hash",
672 payment_hash
673 )
674 .out_json()
675 .await?;
676 let invoice: GetInvoiceResponse =
677 serde_json::from_value(invoice_val).expect("Could not parse GetInvoiceResponse");
678 anyhow::ensure!(invoice.status == PaymentStatus::Succeeded);
679
680 Ok(())
681 }
682
683 pub async fn list_transactions(
684 &self,
685 start: SystemTime,
686 end: SystemTime,
687 ) -> Result<Vec<PaymentDetails>> {
688 let start_datetime: DateTime<Utc> = start.into();
689 let end_datetime: DateTime<Utc> = end.into();
690 let response = cmd!(
691 self,
692 "lightning",
693 "list-transactions",
694 "--start-time",
695 start_datetime.to_rfc3339(),
696 "--end-time",
697 end_datetime.to_rfc3339()
698 )
699 .out_json()
700 .await?;
701 let transactions = serde_json::from_value::<ListTransactionsResponse>(response)?;
702 Ok(transactions.transactions)
703 }
704
705 pub async fn create_offer(&self, amount: Option<Amount>) -> Result<String> {
706 let offer_value = if let Some(amount) = amount {
707 cmd!(
708 self,
709 "lightning",
710 "create-offer",
711 "--amount-msat",
712 amount.msats
713 )
714 .out_json()
715 .await?
716 } else {
717 cmd!(self, "lightning", "create-offer").out_json().await?
718 };
719 let offer_response = serde_json::from_value::<CreateOfferResponse>(offer_value)
720 .expect("Could not parse offer response");
721 Ok(offer_response.offer)
722 }
723
724 pub async fn pay_offer(&self, offer: String, amount: Option<Amount>) -> Result<()> {
725 if let Some(amount) = amount {
726 cmd!(
727 self,
728 "lightning",
729 "pay-offer",
730 "--offer",
731 offer,
732 "--amount-msat",
733 amount.msats
734 )
735 .run()
736 .await?;
737 } else {
738 cmd!(self, "lightning", "pay-offer", "--offer", offer)
739 .run()
740 .await?;
741 }
742
743 Ok(())
744 }
745}