1use std::collections::HashMap;
2use std::ops::ControlFlow;
3use std::path::PathBuf;
4use std::str::FromStr;
5use std::time::SystemTime;
6
7use anyhow::{Context, Result, anyhow};
8use bitcoin::hashes::sha256;
9use chrono::{DateTime, Utc};
10use esplora_client::Txid;
11use fedimint_core::config::FederationId;
12use fedimint_core::secp256k1::PublicKey;
13use fedimint_core::util::{backoff_util, retry};
14use fedimint_core::{Amount, BitcoinAmountOrAll, BitcoinHash};
15use fedimint_gateway_common::{
16 ChannelInfo, CreateOfferResponse, GatewayBalances, GetInvoiceResponse,
17 ListTransactionsResponse, MnemonicResponse, PaymentDetails, PaymentStatus,
18 PaymentSummaryResponse, V1_API_ENDPOINT,
19};
20use fedimint_ln_server::common::lightning_invoice::Bolt11Invoice;
21use fedimint_lnv2_common::gateway_api::PaymentFee;
22use fedimint_logging::LOG_DEVIMINT;
23use fedimint_testing_core::node_type::LightningNodeType;
24use semver::Version;
25use tracing::{debug, info};
26
27use crate::cmd;
28use crate::envs::{
29 FM_GATEWAY_API_ADDR_ENV, FM_GATEWAY_DATA_DIR_ENV, FM_GATEWAY_LISTEN_ADDR_ENV, FM_PORT_LDK_ENV,
30};
31use crate::external::{Bitcoind, LightningNode};
32use crate::federation::Federation;
33use crate::util::{Command, ProcessHandle, ProcessManager, poll, supports_lnv2};
34use crate::vars::utf8;
35use crate::version_constants::{VERSION_0_7_0_ALPHA, VERSION_0_9_0_ALPHA};
36
37#[derive(Clone)]
38pub struct Gatewayd {
39 pub(crate) process: ProcessHandle,
40 pub ln: LightningNode,
41 pub addr: String,
42 pub(crate) lightning_node_addr: String,
43 pub gatewayd_version: Version,
44 pub gw_name: String,
45 pub log_path: PathBuf,
46 pub gw_port: u16,
47 pub ldk_port: u16,
48}
49
50impl Gatewayd {
51 pub async fn new(process_mgr: &ProcessManager, ln: LightningNode) -> Result<Self> {
52 let ln_type = ln.ln_type();
53 let (gw_name, port, lightning_node_port) = match &ln {
54 LightningNode::Lnd(_) => (
55 "gatewayd-lnd".to_string(),
56 process_mgr.globals.FM_PORT_GW_LND,
57 process_mgr.globals.FM_PORT_LND_LISTEN,
58 ),
59 LightningNode::Ldk {
60 name,
61 gw_port,
62 ldk_port,
63 } => (name.to_owned(), gw_port.to_owned(), ldk_port.to_owned()),
64 };
65 let test_dir = &process_mgr.globals.FM_TEST_DIR;
66 let addr = format!("http://127.0.0.1:{port}/{V1_API_ENDPOINT}");
67 let lightning_node_addr = format!("127.0.0.1:{lightning_node_port}");
68
69 let mut gateway_env: HashMap<String, String> = HashMap::from_iter([
70 (
71 FM_GATEWAY_DATA_DIR_ENV.to_owned(),
72 format!("{}/{gw_name}", utf8(test_dir)),
73 ),
74 (
75 FM_GATEWAY_LISTEN_ADDR_ENV.to_owned(),
76 format!("127.0.0.1:{port}"),
77 ),
78 (FM_GATEWAY_API_ADDR_ENV.to_owned(), addr.clone()),
79 (FM_PORT_LDK_ENV.to_owned(), lightning_node_port.to_string()),
80 ]);
81 if !supports_lnv2() {
82 info!(target: LOG_DEVIMINT, "LNv2 is not supported, running gatewayd in LNv1 mode");
83 gateway_env.insert(
84 "FM_GATEWAY_LIGHTNING_MODULE_MODE".to_owned(),
85 "LNv1".to_string(),
86 );
87 }
88
89 let gatewayd_version = crate::util::Gatewayd::version_or_default().await;
90 if ln_type == LightningNodeType::Ldk {
91 gateway_env.insert("FM_LDK_ALIAS".to_owned(), gw_name.clone());
92
93 if gatewayd_version < *VERSION_0_9_0_ALPHA {
95 let btc_rpc_port = process_mgr.globals.FM_PORT_BTC_RPC;
96 gateway_env.insert(
97 "FM_LDK_BITCOIND_RPC_URL".to_owned(),
98 format!("http://bitcoin:bitcoin@127.0.0.1:{btc_rpc_port}"),
99 );
100 }
101 }
102
103 let process = process_mgr
104 .spawn_daemon(
105 &gw_name,
106 Gatewayd::start_gatewayd(&ln_type, &gatewayd_version).envs(gateway_env),
107 )
108 .await?;
109
110 let log_path = process_mgr
111 .globals
112 .FM_LOGS_DIR
113 .join(format!("{gw_name}.log"));
114 let gatewayd = Self {
115 process,
116 ln,
117 addr,
118 lightning_node_addr,
119 gatewayd_version,
120 gw_name,
121 log_path,
122 gw_port: port,
123 ldk_port: lightning_node_port,
124 };
125 poll(
126 "waiting for gateway to be ready to respond to rpc",
127 || async { gatewayd.gateway_id().await.map_err(ControlFlow::Continue) },
128 )
129 .await?;
130 Ok(gatewayd)
131 }
132
133 fn is_forced_current(&self) -> bool {
134 self.ln.ln_type() == LightningNodeType::Ldk && self.gatewayd_version < *VERSION_0_7_0_ALPHA
135 }
136
137 fn start_gatewayd(ln_type: &LightningNodeType, gatewayd_version: &Version) -> Command {
138 if *ln_type == LightningNodeType::Ldk && *gatewayd_version < *VERSION_0_7_0_ALPHA {
141 cmd!("gatewayd", ln_type)
142 } else {
143 cmd!(crate::util::Gatewayd, ln_type)
144 }
145 }
146
147 pub async fn terminate(self) -> Result<()> {
148 self.process.terminate().await
149 }
150
151 pub fn set_lightning_node(&mut self, ln_node: LightningNode) {
152 self.ln = ln_node;
153 }
154
155 pub async fn stop_lightning_node(&mut self) -> Result<()> {
156 info!(target: LOG_DEVIMINT, "Stopping lightning node");
157 match self.ln.clone() {
158 LightningNode::Lnd(lnd) => lnd.terminate().await,
159 LightningNode::Ldk {
160 name: _,
161 gw_port: _,
162 ldk_port: _,
163 } => {
164 unimplemented!("LDK node termination not implemented")
167 }
168 }
169 }
170
171 pub async fn restart_with_bin(
174 &mut self,
175 process_mgr: &ProcessManager,
176 gatewayd_path: &PathBuf,
177 gateway_cli_path: &PathBuf,
178 ) -> Result<()> {
179 let ln = self.ln.clone();
180
181 self.process.terminate().await?;
182 unsafe { std::env::set_var("FM_GATEWAYD_BASE_EXECUTABLE", gatewayd_path) };
184 unsafe { std::env::set_var("FM_GATEWAY_CLI_BASE_EXECUTABLE", gateway_cli_path) };
186
187 if supports_lnv2() {
188 info!(target: LOG_DEVIMINT, "LNv2 is now supported, running in All mode");
189 unsafe { std::env::set_var("FM_GATEWAY_LIGHTNING_MODULE_MODE", "All") };
191 }
192
193 let new_ln = ln;
194 let new_gw = Self::new(process_mgr, new_ln.clone()).await?;
195 self.process = new_gw.process;
196 self.set_lightning_node(new_ln);
197 let gatewayd_version = crate::util::Gatewayd::version_or_default().await;
198 let gateway_cli_version = crate::util::GatewayCli::version_or_default().await;
199 info!(
200 target: LOG_DEVIMINT,
201 ?gatewayd_version,
202 ?gateway_cli_version,
203 "upgraded gatewayd and gateway-cli"
204 );
205 Ok(())
206 }
207
208 pub fn cmd(&self) -> Command {
209 if self.is_forced_current() {
210 cmd!(
211 "gateway-cli",
212 "--rpcpassword=theresnosecondbest",
213 "-a",
214 &self.addr
215 )
216 } else {
217 cmd!(
218 crate::util::get_gateway_cli_path(),
219 "--rpcpassword=theresnosecondbest",
220 "-a",
221 &self.addr
222 )
223 }
224 }
225
226 pub async fn get_info(&self) -> Result<serde_json::Value> {
227 retry(
228 "Getting gateway info via gateway-cli info",
229 backoff_util::aggressive_backoff(),
230 || async { cmd!(self, "info").out_json().await },
231 )
232 .await
233 .context("Getting gateway info via gateway-cli info")
234 }
235
236 pub async fn gateway_id(&self) -> Result<String> {
237 let info = self.get_info().await?;
238 let gateway_id = info["gateway_id"]
239 .as_str()
240 .context("gateway_id must be a string")?
241 .to_owned();
242 Ok(gateway_id)
243 }
244
245 pub async fn lightning_pubkey(&self) -> Result<PublicKey> {
246 let info = self.get_info().await?;
247 let lightning_pub_key = info["lightning_pub_key"]
248 .as_str()
249 .context("lightning_pub_key must be a string")?
250 .to_owned();
251 Ok(lightning_pub_key.parse()?)
252 }
253
254 pub async fn connect_fed(&self, fed: &Federation) -> Result<()> {
255 let invite_code = fed.invite_code()?;
256 poll("gateway connect-fed", || async {
257 cmd!(self, "connect-fed", invite_code.clone())
258 .run()
259 .await
260 .map_err(ControlFlow::Continue)?;
261 Ok(())
262 })
263 .await?;
264 Ok(())
265 }
266
267 pub async fn recover_fed(&self, fed: &Federation) -> Result<()> {
268 let federation_id = fed.calculate_federation_id();
269 let invite_code = fed.invite_code()?;
270 info!(target: LOG_DEVIMINT, federation_id = %federation_id, "Recovering...");
271 poll("gateway connect-fed --recover=true", || async {
272 cmd!(self, "connect-fed", invite_code.clone(), "--recover=true")
273 .run()
274 .await
275 .map_err(ControlFlow::Continue)?;
276 Ok(())
277 })
278 .await?;
279 Ok(())
280 }
281
282 pub async fn backup_to_fed(&self, fed: &Federation) -> Result<()> {
283 let federation_id = fed.calculate_federation_id();
284 cmd!(self, "ecash", "backup", "--federation-id", federation_id)
285 .run()
286 .await?;
287 Ok(())
288 }
289
290 pub async fn get_pegin_addr(&self, fed_id: &str) -> Result<String> {
291 Ok(cmd!(self, "ecash", "pegin", "--federation-id={fed_id}")
292 .out_json()
293 .await?
294 .as_str()
295 .context("address must be a string")?
296 .to_owned())
297 }
298
299 pub async fn get_ln_onchain_address(&self) -> Result<String> {
300 cmd!(self, "onchain", "address").out_string().await
301 }
302
303 pub async fn get_mnemonic(&self) -> Result<MnemonicResponse> {
304 let value = retry(
305 "Getting gateway mnemonic",
306 backoff_util::aggressive_backoff(),
307 || async { cmd!(self, "seed").out_json().await },
308 )
309 .await
310 .context("Getting gateway mnemonic")?;
311
312 Ok(serde_json::from_value(value)?)
313 }
314
315 pub async fn leave_federation(&self, federation_id: FederationId) -> Result<()> {
316 cmd!(self, "leave-fed", "--federation-id", federation_id)
317 .run()
318 .await?;
319 Ok(())
320 }
321
322 pub async fn create_invoice(&self, amount_msats: u64) -> Result<Bolt11Invoice> {
323 Ok(Bolt11Invoice::from_str(
324 &cmd!(self, "lightning", "create-invoice", amount_msats)
325 .out_string()
326 .await?,
327 )?)
328 }
329
330 pub async fn pay_invoice(&self, invoice: Bolt11Invoice) -> Result<()> {
331 cmd!(self, "lightning", "pay-invoice", invoice.to_string())
332 .run()
333 .await?;
334
335 Ok(())
336 }
337
338 pub async fn send_ecash(&self, federation_id: String, amount_msats: u64) -> Result<String> {
339 let value = cmd!(
340 self,
341 "ecash",
342 "send",
343 "--federation-id",
344 federation_id,
345 amount_msats
346 )
347 .out_json()
348 .await?;
349 let ecash: String = serde_json::from_value(
350 value
351 .get("notes")
352 .expect("notes key does not exist")
353 .clone(),
354 )?;
355 Ok(ecash)
356 }
357
358 pub async fn receive_ecash(&self, ecash: String) -> Result<()> {
359 cmd!(self, "ecash", "receive", "--notes", ecash)
360 .run()
361 .await?;
362 Ok(())
363 }
364
365 pub async fn get_balances(&self) -> Result<GatewayBalances> {
366 let value = cmd!(self, "get-balances").out_json().await?;
367 Ok(serde_json::from_value(value)?)
368 }
369
370 pub async fn ecash_balance(&self, federation_id: String) -> anyhow::Result<u64> {
371 let federation_id = FederationId::from_str(&federation_id)?;
372 let balances = self.get_balances().await?;
373 let ecash_balance = balances
374 .ecash_balances
375 .into_iter()
376 .find(|info| info.federation_id == federation_id)
377 .ok_or(anyhow::anyhow!("Gateway is not joined to federation"))?
378 .ecash_balance_msats
379 .msats;
380 Ok(ecash_balance)
381 }
382
383 pub async fn send_onchain(
384 &self,
385 bitcoind: &Bitcoind,
386 amount: BitcoinAmountOrAll,
387 fee_rate: u64,
388 ) -> Result<bitcoin::Txid> {
389 let withdraw_address = bitcoind.get_new_address().await?;
390 let value = cmd!(
391 self,
392 "onchain",
393 "send",
394 "--address",
395 withdraw_address,
396 "--amount",
397 amount,
398 "--fee-rate-sats-per-vbyte",
399 fee_rate
400 )
401 .out_json()
402 .await?;
403
404 let txid: bitcoin::Txid = serde_json::from_value(value)?;
405 Ok(txid)
406 }
407
408 pub async fn close_channel(&self, remote_pubkey: PublicKey, force: bool) -> Result<()> {
409 let gateway_cli_version = crate::util::GatewayCli::version_or_default().await;
410 let mut close_channel = if force && gateway_cli_version >= *VERSION_0_9_0_ALPHA {
411 cmd!(
412 self,
413 "lightning",
414 "close-channels-with-peer",
415 "--pubkey",
416 remote_pubkey,
417 "--force",
418 )
419 } else {
420 cmd!(
421 self,
422 "lightning",
423 "close-channels-with-peer",
424 "--pubkey",
425 remote_pubkey,
426 )
427 };
428
429 close_channel.run().await?;
430
431 Ok(())
432 }
433
434 pub async fn close_all_channels(&self, force: bool) -> Result<()> {
435 let channels = self.list_channels().await?;
436
437 for chan in channels {
438 let remote_pubkey = chan.remote_pubkey;
439 self.close_channel(remote_pubkey, force).await?;
440 }
441
442 Ok(())
443 }
444
445 pub async fn open_channel(
448 &self,
449 gw: &Gatewayd,
450 channel_size_sats: u64,
451 push_amount_sats: Option<u64>,
452 ) -> Result<Txid> {
453 let pubkey = gw.lightning_pubkey().await?;
454
455 let mut command = cmd!(
456 self,
457 "lightning",
458 "open-channel",
459 "--pubkey",
460 pubkey,
461 "--host",
462 gw.lightning_node_addr,
463 "--channel-size-sats",
464 channel_size_sats,
465 "--push-amount-sats",
466 push_amount_sats.unwrap_or(0)
467 );
468
469 Ok(Txid::from_str(&command.out_string().await?)?)
470 }
471
472 pub async fn list_channels(&self) -> Result<Vec<ChannelInfo>> {
473 let gateway_cli_version = crate::util::GatewayCli::version_or_default().await;
474 let channels = if gateway_cli_version >= *VERSION_0_9_0_ALPHA || self.is_forced_current() {
475 cmd!(self, "lightning", "list-channels").out_json().await?
476 } else {
477 cmd!(self, "lightning", "list-active-channels")
478 .out_json()
479 .await?
480 };
481
482 let channels = channels
483 .as_array()
484 .context("channels must be an array")?
485 .iter()
486 .map(|channel| {
487 let remote_pubkey = channel["remote_pubkey"]
488 .as_str()
489 .context("remote_pubkey must be a string")?
490 .to_owned();
491 let channel_size_sats = channel["channel_size_sats"]
492 .as_u64()
493 .context("channel_size_sats must be a u64")?;
494 let outbound_liquidity_sats = channel["outbound_liquidity_sats"]
495 .as_u64()
496 .context("outbound_liquidity_sats must be a u64")?;
497 let inbound_liquidity_sats = channel["inbound_liquidity_sats"]
498 .as_u64()
499 .context("inbound_liquidity_sats must be a u64")?;
500 let is_active = channel["is_active"].as_bool().unwrap_or(true);
501 Ok(ChannelInfo {
502 remote_pubkey: remote_pubkey
503 .parse()
504 .expect("Lightning node returned invalid remote channel pubkey"),
505 channel_size_sats,
506 outbound_liquidity_sats,
507 inbound_liquidity_sats,
508 is_active,
509 })
510 })
511 .collect::<Result<Vec<ChannelInfo>>>()?;
512 Ok(channels)
513 }
514
515 pub async fn wait_for_block_height(&self, target_block_height: u64) -> Result<()> {
516 poll("waiting for block height", || async {
517 let info = self.get_info().await.map_err(ControlFlow::Continue)?;
518 let value = info.get("block_height");
519 if let Some(height) = value {
520 let block_height: Option<u32> = serde_json::from_value(height.clone())
521 .context("Could not parse block height")
522 .map_err(ControlFlow::Continue)?;
523 let Some(block_height) = block_height else {
524 return Err(ControlFlow::Continue(anyhow!("Not synced any blocks yet")));
525 };
526 let synced = info["synced_to_chain"]
527 .as_bool()
528 .expect("Could not get synced_to_chain");
529 if block_height >= target_block_height as u32 && synced {
530 return Ok(());
531 }
532 }
533 Err(ControlFlow::Continue(anyhow!("Not synced to block")))
534 })
535 .await?;
536 Ok(())
537 }
538
539 pub async fn get_lightning_fee(&self, fed_id: String) -> Result<PaymentFee> {
540 let info_value = self.get_info().await?;
541 let federations = info_value["federations"]
542 .as_array()
543 .expect("federations is an array");
544
545 let fed = federations
546 .iter()
547 .find(|fed| {
548 serde_json::from_value::<String>(fed["federation_id"].clone())
549 .expect("could not deserialize federation_id")
550 == fed_id
551 })
552 .ok_or_else(|| anyhow!("Federation not found"))?;
553
554 let lightning_fee = fed["config"]["lightning_fee"].clone();
555 let base: Amount = serde_json::from_value(lightning_fee["base"].clone())
556 .map_err(|e| anyhow!("Couldnt parse base: {}", e))?;
557 let parts_per_million: u64 =
558 serde_json::from_value(lightning_fee["parts_per_million"].clone())
559 .map_err(|e| anyhow!("Couldnt parse parts_per_million: {}", e))?;
560
561 Ok(PaymentFee {
562 base,
563 parts_per_million,
564 })
565 }
566
567 pub async fn set_federation_routing_fee(
568 &self,
569 fed_id: String,
570 base: u64,
571 ppm: u64,
572 ) -> Result<()> {
573 cmd!(
574 self,
575 "cfg",
576 "set-fees",
577 "--federation-id",
578 fed_id,
579 "--ln-base",
580 base,
581 "--ln-ppm",
582 ppm
583 )
584 .run()
585 .await?;
586
587 Ok(())
588 }
589
590 pub async fn set_federation_transaction_fee(
591 &self,
592 fed_id: String,
593 base: u64,
594 ppm: u64,
595 ) -> Result<()> {
596 cmd!(
597 self,
598 "cfg",
599 "set-fees",
600 "--federation-id",
601 fed_id,
602 "--tx-base",
603 base,
604 "--tx-ppm",
605 ppm
606 )
607 .run()
608 .await?;
609
610 Ok(())
611 }
612
613 pub async fn payment_summary(&self) -> Result<PaymentSummaryResponse> {
614 let out_json = cmd!(self, "payment-summary").out_json().await?;
615 Ok(serde_json::from_value(out_json).expect("Could not deserialize PaymentSummaryResponse"))
616 }
617
618 pub async fn wait_bolt11_invoice(&self, payment_hash: Vec<u8>) -> Result<()> {
619 let gatewayd_version = crate::util::Gatewayd::version_or_default().await;
620 if gatewayd_version < *VERSION_0_7_0_ALPHA {
621 if let LightningNode::Lnd(lnd) = &self.ln {
622 return lnd.wait_bolt11_invoice(payment_hash).await;
623 }
624
625 debug!("Skipping bolt11 invoice check because it is not supported until v0.7");
626 return Ok(());
627 }
628
629 let payment_hash =
630 sha256::Hash::from_slice(&payment_hash).expect("Could not parse payment hash");
631 let invoice_val = cmd!(
632 self,
633 "lightning",
634 "get-invoice",
635 "--payment-hash",
636 payment_hash
637 )
638 .out_json()
639 .await?;
640 let invoice: GetInvoiceResponse =
641 serde_json::from_value(invoice_val).expect("Could not parse GetInvoiceResponse");
642 anyhow::ensure!(invoice.status == PaymentStatus::Succeeded);
643
644 Ok(())
645 }
646
647 pub async fn list_transactions(
648 &self,
649 start: SystemTime,
650 end: SystemTime,
651 ) -> Result<Vec<PaymentDetails>> {
652 let start_datetime: DateTime<Utc> = start.into();
653 let end_datetime: DateTime<Utc> = end.into();
654 let response = cmd!(
655 self,
656 "lightning",
657 "list-transactions",
658 "--start-time",
659 start_datetime.to_rfc3339(),
660 "--end-time",
661 end_datetime.to_rfc3339()
662 )
663 .out_json()
664 .await?;
665 let transactions = serde_json::from_value::<ListTransactionsResponse>(response)?;
666 Ok(transactions.transactions)
667 }
668
669 pub async fn create_offer(&self, amount: Option<Amount>) -> Result<String> {
670 let offer_value = if let Some(amount) = amount {
671 cmd!(
672 self,
673 "lightning",
674 "create-offer",
675 "--amount-msat",
676 amount.msats
677 )
678 .out_json()
679 .await?
680 } else {
681 cmd!(self, "lightning", "create-offer").out_json().await?
682 };
683 let offer_response = serde_json::from_value::<CreateOfferResponse>(offer_value)
684 .expect("Could not parse offer response");
685 Ok(offer_response.offer)
686 }
687
688 pub async fn pay_offer(&self, offer: String, amount: Option<Amount>) -> Result<()> {
689 if let Some(amount) = amount {
690 cmd!(
691 self,
692 "lightning",
693 "pay-offer",
694 "--offer",
695 offer,
696 "--amount-msat",
697 amount.msats
698 )
699 .run()
700 .await?;
701 } else {
702 cmd!(self, "lightning", "pay-offer", "--offer", offer)
703 .run()
704 .await?;
705 }
706
707 Ok(())
708 }
709}