1use std::collections::HashMap;
2use std::ops::ControlFlow;
3use std::path::PathBuf;
4use std::str::FromStr;
5use std::time::SystemTime;
6
7use anyhow::{Context, Result, anyhow};
8use bitcoin::hashes::sha256;
9use chrono::{DateTime, Utc};
10use esplora_client::Txid;
11use fedimint_core::config::FederationId;
12use fedimint_core::secp256k1::PublicKey;
13use fedimint_core::util::{backoff_util, retry};
14use fedimint_core::{Amount, BitcoinAmountOrAll, BitcoinHash};
15use fedimint_gateway_common::{
16 ChannelInfo, CreateOfferResponse, GatewayBalances, GetInvoiceResponse,
17 ListTransactionsResponse, MnemonicResponse, PaymentDetails, PaymentStatus,
18 PaymentSummaryResponse, V1_API_ENDPOINT,
19};
20use fedimint_ln_server::common::lightning_invoice::Bolt11Invoice;
21use fedimint_lnv2_common::gateway_api::PaymentFee;
22use fedimint_logging::LOG_DEVIMINT;
23use fedimint_testing_core::node_type::LightningNodeType;
24use semver::Version;
25use tracing::{debug, info};
26
27use crate::cmd;
28use crate::envs::{
29 FM_GATEWAY_API_ADDR_ENV, FM_GATEWAY_DATA_DIR_ENV, FM_GATEWAY_LISTEN_ADDR_ENV, FM_PORT_LDK_ENV,
30};
31use crate::external::{Bitcoind, LightningNode};
32use crate::federation::Federation;
33use crate::util::{Command, ProcessHandle, ProcessManager, poll, supports_lnv2};
34use crate::vars::utf8;
35use crate::version_constants::{VERSION_0_7_0_ALPHA, VERSION_0_9_0_ALPHA};
36
37#[derive(Clone)]
38pub struct Gatewayd {
39 pub(crate) process: ProcessHandle,
40 pub ln: LightningNode,
41 pub addr: String,
42 pub(crate) lightning_node_addr: String,
43 pub gatewayd_version: Version,
44 pub gw_name: String,
45 pub log_path: PathBuf,
46 pub gw_port: u16,
47 pub ldk_port: u16,
48 pub gateway_id: String,
49}
50
51impl Gatewayd {
52 pub async fn new(process_mgr: &ProcessManager, ln: LightningNode) -> Result<Self> {
53 let ln_type = ln.ln_type();
54 let (gw_name, port, lightning_node_port) = match &ln {
55 LightningNode::Lnd(_) => (
56 "gatewayd-lnd".to_string(),
57 process_mgr.globals.FM_PORT_GW_LND,
58 process_mgr.globals.FM_PORT_LND_LISTEN,
59 ),
60 LightningNode::Ldk {
61 name,
62 gw_port,
63 ldk_port,
64 } => (name.to_owned(), gw_port.to_owned(), ldk_port.to_owned()),
65 };
66 let test_dir = &process_mgr.globals.FM_TEST_DIR;
67 let addr = format!("http://127.0.0.1:{port}/{V1_API_ENDPOINT}");
68 let lightning_node_addr = format!("127.0.0.1:{lightning_node_port}");
69
70 let mut gateway_env: HashMap<String, String> = HashMap::from_iter([
71 (
72 FM_GATEWAY_DATA_DIR_ENV.to_owned(),
73 format!("{}/{gw_name}", utf8(test_dir)),
74 ),
75 (
76 FM_GATEWAY_LISTEN_ADDR_ENV.to_owned(),
77 format!("127.0.0.1:{port}"),
78 ),
79 (FM_GATEWAY_API_ADDR_ENV.to_owned(), addr.clone()),
80 (FM_PORT_LDK_ENV.to_owned(), lightning_node_port.to_string()),
81 ]);
82
83 let gatewayd_version = crate::util::Gatewayd::version_or_default().await;
84 if gatewayd_version < *VERSION_0_9_0_ALPHA {
85 let mode = if supports_lnv2() {
86 "All"
87 } else {
88 info!(target: LOG_DEVIMINT, "LNv2 is not supported, running gatewayd in LNv1 mode");
89 "LNv1"
90 };
91 gateway_env.insert(
92 "FM_GATEWAY_LIGHTNING_MODULE_MODE".to_owned(),
93 mode.to_string(),
94 );
95 }
96
97 if ln_type == LightningNodeType::Ldk {
98 gateway_env.insert("FM_LDK_ALIAS".to_owned(), gw_name.clone());
99
100 if gatewayd_version < *VERSION_0_9_0_ALPHA {
102 let btc_rpc_port = process_mgr.globals.FM_PORT_BTC_RPC;
103 gateway_env.insert(
104 "FM_LDK_BITCOIND_RPC_URL".to_owned(),
105 format!("http://bitcoin:bitcoin@127.0.0.1:{btc_rpc_port}"),
106 );
107 }
108 }
109
110 let process = process_mgr
111 .spawn_daemon(
112 &gw_name,
113 Gatewayd::start_gatewayd(&ln_type, &gatewayd_version).envs(gateway_env),
114 )
115 .await?;
116
117 let gateway_id = poll(
118 "waiting for gateway to be ready to respond to rpc",
119 || async {
120 let command = if ln.ln_type() == LightningNodeType::Ldk
122 && gatewayd_version < *VERSION_0_7_0_ALPHA
123 {
124 vec!["gateway-cli".to_string()]
125 } else {
126 crate::util::get_gateway_cli_path()
127 };
128 let info = cmd!(
129 command,
130 "--rpcpassword",
131 "theresnosecondbest",
132 "-a",
133 addr,
134 "info"
135 )
136 .out_json()
137 .await
138 .map_err(ControlFlow::Continue)?;
139 let gateway_id = info["gateway_id"]
140 .as_str()
141 .context("gateway_id must be a string")
142 .map_err(ControlFlow::Break)?
143 .to_owned();
144 Ok(gateway_id)
145 },
146 )
147 .await?;
148
149 let log_path = process_mgr
150 .globals
151 .FM_LOGS_DIR
152 .join(format!("{gw_name}.log"));
153 let gatewayd = Self {
154 process,
155 ln,
156 addr,
157 lightning_node_addr,
158 gatewayd_version,
159 gw_name,
160 log_path,
161 gw_port: port,
162 ldk_port: lightning_node_port,
163 gateway_id,
164 };
165
166 Ok(gatewayd)
167 }
168
169 fn is_forced_current(&self) -> bool {
170 self.ln.ln_type() == LightningNodeType::Ldk && self.gatewayd_version < *VERSION_0_7_0_ALPHA
171 }
172
173 fn start_gatewayd(ln_type: &LightningNodeType, gatewayd_version: &Version) -> Command {
174 if *ln_type == LightningNodeType::Ldk && *gatewayd_version < *VERSION_0_7_0_ALPHA {
177 cmd!("gatewayd", ln_type)
178 } else {
179 cmd!(crate::util::Gatewayd, ln_type)
180 }
181 }
182
183 pub async fn terminate(self) -> Result<()> {
184 self.process.terminate().await
185 }
186
187 pub fn set_lightning_node(&mut self, ln_node: LightningNode) {
188 self.ln = ln_node;
189 }
190
191 pub async fn stop_lightning_node(&mut self) -> Result<()> {
192 info!(target: LOG_DEVIMINT, "Stopping lightning node");
193 match self.ln.clone() {
194 LightningNode::Lnd(lnd) => lnd.terminate().await,
195 LightningNode::Ldk {
196 name: _,
197 gw_port: _,
198 ldk_port: _,
199 } => {
200 unimplemented!("LDK node termination not implemented")
203 }
204 }
205 }
206
207 pub async fn restart_with_bin(
210 &mut self,
211 process_mgr: &ProcessManager,
212 gatewayd_path: &PathBuf,
213 gateway_cli_path: &PathBuf,
214 ) -> Result<()> {
215 let ln = self.ln.clone();
216
217 self.process.terminate().await?;
218 unsafe { std::env::set_var("FM_GATEWAYD_BASE_EXECUTABLE", gatewayd_path) };
220 unsafe { std::env::set_var("FM_GATEWAY_CLI_BASE_EXECUTABLE", gateway_cli_path) };
222
223 let gatewayd_version = crate::util::Gatewayd::version_or_default().await;
224 if gatewayd_version < *VERSION_0_9_0_ALPHA && supports_lnv2() {
225 info!(target: LOG_DEVIMINT, "LNv2 is now supported, running in All mode");
226 unsafe { std::env::set_var("FM_GATEWAY_LIGHTNING_MODULE_MODE", "All") };
228 }
229
230 let new_ln = ln;
231 let new_gw = Self::new(process_mgr, new_ln.clone()).await?;
232 self.process = new_gw.process;
233 self.set_lightning_node(new_ln);
234 let gateway_cli_version = crate::util::GatewayCli::version_or_default().await;
235 info!(
236 target: LOG_DEVIMINT,
237 ?gatewayd_version,
238 ?gateway_cli_version,
239 "upgraded gatewayd and gateway-cli"
240 );
241 Ok(())
242 }
243
244 pub fn cmd(&self) -> Command {
245 if self.is_forced_current() {
246 cmd!(
247 "gateway-cli",
248 "--rpcpassword=theresnosecondbest",
249 "-a",
250 &self.addr
251 )
252 } else {
253 cmd!(
254 crate::util::get_gateway_cli_path(),
255 "--rpcpassword=theresnosecondbest",
256 "-a",
257 &self.addr
258 )
259 }
260 }
261
262 pub async fn get_info(&self) -> Result<serde_json::Value> {
263 retry(
264 "Getting gateway info via gateway-cli info",
265 backoff_util::aggressive_backoff(),
266 || async { cmd!(self, "info").out_json().await },
267 )
268 .await
269 .context("Getting gateway info via gateway-cli info")
270 }
271
272 pub async fn lightning_pubkey(&self) -> Result<PublicKey> {
273 let info = self.get_info().await?;
274 let lightning_pub_key = info["lightning_pub_key"]
275 .as_str()
276 .context("lightning_pub_key must be a string")?
277 .to_owned();
278 Ok(lightning_pub_key.parse()?)
279 }
280
281 pub async fn connect_fed(&self, fed: &Federation) -> Result<()> {
282 let invite_code = fed.invite_code()?;
283 poll("gateway connect-fed", || async {
284 cmd!(self, "connect-fed", invite_code.clone())
285 .run()
286 .await
287 .map_err(ControlFlow::Continue)?;
288 Ok(())
289 })
290 .await?;
291 Ok(())
292 }
293
294 pub async fn recover_fed(&self, fed: &Federation) -> Result<()> {
295 let federation_id = fed.calculate_federation_id();
296 let invite_code = fed.invite_code()?;
297 info!(target: LOG_DEVIMINT, federation_id = %federation_id, "Recovering...");
298 poll("gateway connect-fed --recover=true", || async {
299 cmd!(self, "connect-fed", invite_code.clone(), "--recover=true")
300 .run()
301 .await
302 .map_err(ControlFlow::Continue)?;
303 Ok(())
304 })
305 .await?;
306 Ok(())
307 }
308
309 pub async fn backup_to_fed(&self, fed: &Federation) -> Result<()> {
310 let federation_id = fed.calculate_federation_id();
311 cmd!(self, "ecash", "backup", "--federation-id", federation_id)
312 .run()
313 .await?;
314 Ok(())
315 }
316
317 pub async fn get_pegin_addr(&self, fed_id: &str) -> Result<String> {
318 Ok(cmd!(self, "ecash", "pegin", "--federation-id={fed_id}")
319 .out_json()
320 .await?
321 .as_str()
322 .context("address must be a string")?
323 .to_owned())
324 }
325
326 pub async fn get_ln_onchain_address(&self) -> Result<String> {
327 cmd!(self, "onchain", "address").out_string().await
328 }
329
330 pub async fn get_mnemonic(&self) -> Result<MnemonicResponse> {
331 let value = retry(
332 "Getting gateway mnemonic",
333 backoff_util::aggressive_backoff(),
334 || async { cmd!(self, "seed").out_json().await },
335 )
336 .await
337 .context("Getting gateway mnemonic")?;
338
339 Ok(serde_json::from_value(value)?)
340 }
341
342 pub async fn leave_federation(&self, federation_id: FederationId) -> Result<()> {
343 cmd!(self, "leave-fed", "--federation-id", federation_id)
344 .run()
345 .await?;
346 Ok(())
347 }
348
349 pub async fn create_invoice(&self, amount_msats: u64) -> Result<Bolt11Invoice> {
350 Ok(Bolt11Invoice::from_str(
351 &cmd!(self, "lightning", "create-invoice", amount_msats)
352 .out_string()
353 .await?,
354 )?)
355 }
356
357 pub async fn pay_invoice(&self, invoice: Bolt11Invoice) -> Result<()> {
358 cmd!(self, "lightning", "pay-invoice", invoice.to_string())
359 .run()
360 .await?;
361
362 Ok(())
363 }
364
365 pub async fn send_ecash(&self, federation_id: String, amount_msats: u64) -> Result<String> {
366 let value = cmd!(
367 self,
368 "ecash",
369 "send",
370 "--federation-id",
371 federation_id,
372 amount_msats
373 )
374 .out_json()
375 .await?;
376 let ecash: String = serde_json::from_value(
377 value
378 .get("notes")
379 .expect("notes key does not exist")
380 .clone(),
381 )?;
382 Ok(ecash)
383 }
384
385 pub async fn receive_ecash(&self, ecash: String) -> Result<()> {
386 cmd!(self, "ecash", "receive", "--notes", ecash)
387 .run()
388 .await?;
389 Ok(())
390 }
391
392 pub async fn get_balances(&self) -> Result<GatewayBalances> {
393 let value = cmd!(self, "get-balances").out_json().await?;
394 Ok(serde_json::from_value(value)?)
395 }
396
397 pub async fn ecash_balance(&self, federation_id: String) -> anyhow::Result<u64> {
398 let federation_id = FederationId::from_str(&federation_id)?;
399 let balances = self.get_balances().await?;
400 let ecash_balance = balances
401 .ecash_balances
402 .into_iter()
403 .find(|info| info.federation_id == federation_id)
404 .ok_or(anyhow::anyhow!("Gateway is not joined to federation"))?
405 .ecash_balance_msats
406 .msats;
407 Ok(ecash_balance)
408 }
409
410 pub async fn send_onchain(
411 &self,
412 bitcoind: &Bitcoind,
413 amount: BitcoinAmountOrAll,
414 fee_rate: u64,
415 ) -> Result<bitcoin::Txid> {
416 let withdraw_address = bitcoind.get_new_address().await?;
417 let value = cmd!(
418 self,
419 "onchain",
420 "send",
421 "--address",
422 withdraw_address,
423 "--amount",
424 amount,
425 "--fee-rate-sats-per-vbyte",
426 fee_rate
427 )
428 .out_json()
429 .await?;
430
431 let txid: bitcoin::Txid = serde_json::from_value(value)?;
432 Ok(txid)
433 }
434
435 pub async fn close_channel(&self, remote_pubkey: PublicKey, force: bool) -> Result<()> {
436 let gateway_cli_version = crate::util::GatewayCli::version_or_default().await;
437 let mut close_channel = if force && gateway_cli_version >= *VERSION_0_9_0_ALPHA {
438 cmd!(
439 self,
440 "lightning",
441 "close-channels-with-peer",
442 "--pubkey",
443 remote_pubkey,
444 "--force",
445 )
446 } else {
447 cmd!(
448 self,
449 "lightning",
450 "close-channels-with-peer",
451 "--pubkey",
452 remote_pubkey,
453 )
454 };
455
456 close_channel.run().await?;
457
458 Ok(())
459 }
460
461 pub async fn close_all_channels(&self, force: bool) -> Result<()> {
462 let channels = self.list_channels().await?;
463
464 for chan in channels {
465 let remote_pubkey = chan.remote_pubkey;
466 self.close_channel(remote_pubkey, force).await?;
467 }
468
469 Ok(())
470 }
471
472 pub async fn open_channel(
475 &self,
476 gw: &Gatewayd,
477 channel_size_sats: u64,
478 push_amount_sats: Option<u64>,
479 ) -> Result<Txid> {
480 let pubkey = gw.lightning_pubkey().await?;
481
482 let mut command = cmd!(
483 self,
484 "lightning",
485 "open-channel",
486 "--pubkey",
487 pubkey,
488 "--host",
489 gw.lightning_node_addr,
490 "--channel-size-sats",
491 channel_size_sats,
492 "--push-amount-sats",
493 push_amount_sats.unwrap_or(0)
494 );
495
496 Ok(Txid::from_str(&command.out_string().await?)?)
497 }
498
499 pub async fn list_channels(&self) -> Result<Vec<ChannelInfo>> {
500 let gateway_cli_version = crate::util::GatewayCli::version_or_default().await;
501 let channels = if gateway_cli_version >= *VERSION_0_9_0_ALPHA || self.is_forced_current() {
502 cmd!(self, "lightning", "list-channels").out_json().await?
503 } else {
504 cmd!(self, "lightning", "list-active-channels")
505 .out_json()
506 .await?
507 };
508
509 let channels = channels
510 .as_array()
511 .context("channels must be an array")?
512 .iter()
513 .map(|channel| {
514 let remote_pubkey = channel["remote_pubkey"]
515 .as_str()
516 .context("remote_pubkey must be a string")?
517 .to_owned();
518 let channel_size_sats = channel["channel_size_sats"]
519 .as_u64()
520 .context("channel_size_sats must be a u64")?;
521 let outbound_liquidity_sats = channel["outbound_liquidity_sats"]
522 .as_u64()
523 .context("outbound_liquidity_sats must be a u64")?;
524 let inbound_liquidity_sats = channel["inbound_liquidity_sats"]
525 .as_u64()
526 .context("inbound_liquidity_sats must be a u64")?;
527 let is_active = channel["is_active"].as_bool().unwrap_or(true);
528 Ok(ChannelInfo {
529 remote_pubkey: remote_pubkey
530 .parse()
531 .expect("Lightning node returned invalid remote channel pubkey"),
532 channel_size_sats,
533 outbound_liquidity_sats,
534 inbound_liquidity_sats,
535 is_active,
536 })
537 })
538 .collect::<Result<Vec<ChannelInfo>>>()?;
539 Ok(channels)
540 }
541
542 pub async fn wait_for_block_height(&self, target_block_height: u64) -> Result<()> {
543 poll("waiting for block height", || async {
544 let info = self.get_info().await.map_err(ControlFlow::Continue)?;
545 let value = info.get("block_height");
546 if let Some(height) = value {
547 let block_height: Option<u32> = serde_json::from_value(height.clone())
548 .context("Could not parse block height")
549 .map_err(ControlFlow::Continue)?;
550 let Some(block_height) = block_height else {
551 return Err(ControlFlow::Continue(anyhow!("Not synced any blocks yet")));
552 };
553 let synced = info["synced_to_chain"]
554 .as_bool()
555 .expect("Could not get synced_to_chain");
556 if block_height >= target_block_height as u32 && synced {
557 return Ok(());
558 }
559 }
560 Err(ControlFlow::Continue(anyhow!("Not synced to block")))
561 })
562 .await?;
563 Ok(())
564 }
565
566 pub async fn get_lightning_fee(&self, fed_id: String) -> Result<PaymentFee> {
567 let info_value = self.get_info().await?;
568 let federations = info_value["federations"]
569 .as_array()
570 .expect("federations is an array");
571
572 let fed = federations
573 .iter()
574 .find(|fed| {
575 serde_json::from_value::<String>(fed["federation_id"].clone())
576 .expect("could not deserialize federation_id")
577 == fed_id
578 })
579 .ok_or_else(|| anyhow!("Federation not found"))?;
580
581 let lightning_fee = fed["config"]["lightning_fee"].clone();
582 let base: Amount = serde_json::from_value(lightning_fee["base"].clone())
583 .map_err(|e| anyhow!("Couldnt parse base: {}", e))?;
584 let parts_per_million: u64 =
585 serde_json::from_value(lightning_fee["parts_per_million"].clone())
586 .map_err(|e| anyhow!("Couldnt parse parts_per_million: {}", e))?;
587
588 Ok(PaymentFee {
589 base,
590 parts_per_million,
591 })
592 }
593
594 pub async fn set_federation_routing_fee(
595 &self,
596 fed_id: String,
597 base: u64,
598 ppm: u64,
599 ) -> Result<()> {
600 cmd!(
601 self,
602 "cfg",
603 "set-fees",
604 "--federation-id",
605 fed_id,
606 "--ln-base",
607 base,
608 "--ln-ppm",
609 ppm
610 )
611 .run()
612 .await?;
613
614 Ok(())
615 }
616
617 pub async fn set_federation_transaction_fee(
618 &self,
619 fed_id: String,
620 base: u64,
621 ppm: u64,
622 ) -> Result<()> {
623 cmd!(
624 self,
625 "cfg",
626 "set-fees",
627 "--federation-id",
628 fed_id,
629 "--tx-base",
630 base,
631 "--tx-ppm",
632 ppm
633 )
634 .run()
635 .await?;
636
637 Ok(())
638 }
639
640 pub async fn payment_summary(&self) -> Result<PaymentSummaryResponse> {
641 let out_json = cmd!(self, "payment-summary").out_json().await?;
642 Ok(serde_json::from_value(out_json).expect("Could not deserialize PaymentSummaryResponse"))
643 }
644
645 pub async fn wait_bolt11_invoice(&self, payment_hash: Vec<u8>) -> Result<()> {
646 let gatewayd_version = crate::util::Gatewayd::version_or_default().await;
647 if gatewayd_version < *VERSION_0_7_0_ALPHA {
648 if let LightningNode::Lnd(lnd) = &self.ln {
649 return lnd.wait_bolt11_invoice(payment_hash).await;
650 }
651
652 debug!("Skipping bolt11 invoice check because it is not supported until v0.7");
653 return Ok(());
654 }
655
656 let payment_hash =
657 sha256::Hash::from_slice(&payment_hash).expect("Could not parse payment hash");
658 let invoice_val = cmd!(
659 self,
660 "lightning",
661 "get-invoice",
662 "--payment-hash",
663 payment_hash
664 )
665 .out_json()
666 .await?;
667 let invoice: GetInvoiceResponse =
668 serde_json::from_value(invoice_val).expect("Could not parse GetInvoiceResponse");
669 anyhow::ensure!(invoice.status == PaymentStatus::Succeeded);
670
671 Ok(())
672 }
673
674 pub async fn list_transactions(
675 &self,
676 start: SystemTime,
677 end: SystemTime,
678 ) -> Result<Vec<PaymentDetails>> {
679 let start_datetime: DateTime<Utc> = start.into();
680 let end_datetime: DateTime<Utc> = end.into();
681 let response = cmd!(
682 self,
683 "lightning",
684 "list-transactions",
685 "--start-time",
686 start_datetime.to_rfc3339(),
687 "--end-time",
688 end_datetime.to_rfc3339()
689 )
690 .out_json()
691 .await?;
692 let transactions = serde_json::from_value::<ListTransactionsResponse>(response)?;
693 Ok(transactions.transactions)
694 }
695
696 pub async fn create_offer(&self, amount: Option<Amount>) -> Result<String> {
697 let offer_value = if let Some(amount) = amount {
698 cmd!(
699 self,
700 "lightning",
701 "create-offer",
702 "--amount-msat",
703 amount.msats
704 )
705 .out_json()
706 .await?
707 } else {
708 cmd!(self, "lightning", "create-offer").out_json().await?
709 };
710 let offer_response = serde_json::from_value::<CreateOfferResponse>(offer_value)
711 .expect("Could not parse offer response");
712 Ok(offer_response.offer)
713 }
714
715 pub async fn pay_offer(&self, offer: String, amount: Option<Amount>) -> Result<()> {
716 if let Some(amount) = amount {
717 cmd!(
718 self,
719 "lightning",
720 "pay-offer",
721 "--offer",
722 offer,
723 "--amount-msat",
724 amount.msats
725 )
726 .run()
727 .await?;
728 } else {
729 cmd!(self, "lightning", "pay-offer", "--offer", offer)
730 .run()
731 .await?;
732 }
733
734 Ok(())
735 }
736}