devimint/
gatewayd.rs

1use std::collections::HashMap;
2use std::ops::ControlFlow;
3use std::path::PathBuf;
4use std::str::FromStr;
5use std::time::SystemTime;
6
7use anyhow::{Context, Result, anyhow};
8use bitcoin::hashes::sha256;
9use chrono::{DateTime, Utc};
10use esplora_client::Txid;
11use fedimint_core::config::FederationId;
12use fedimint_core::secp256k1::PublicKey;
13use fedimint_core::util::{backoff_util, retry};
14use fedimint_core::{Amount, BitcoinAmountOrAll, BitcoinHash};
15use fedimint_gateway_common::{
16    ChannelInfo, CreateOfferResponse, GatewayBalances, GetInvoiceResponse,
17    ListTransactionsResponse, MnemonicResponse, PaymentDetails, PaymentStatus,
18    PaymentSummaryResponse, V1_API_ENDPOINT,
19};
20use fedimint_ln_server::common::lightning_invoice::Bolt11Invoice;
21use fedimint_lnv2_common::gateway_api::PaymentFee;
22use fedimint_logging::LOG_DEVIMINT;
23use fedimint_testing_core::node_type::LightningNodeType;
24use semver::Version;
25use tracing::{debug, info};
26
27use crate::cmd;
28use crate::envs::{
29    FM_GATEWAY_API_ADDR_ENV, FM_GATEWAY_DATA_DIR_ENV, FM_GATEWAY_LISTEN_ADDR_ENV, FM_PORT_LDK_ENV,
30};
31use crate::external::{Bitcoind, LightningNode};
32use crate::federation::Federation;
33use crate::util::{Command, ProcessHandle, ProcessManager, poll, supports_lnv2};
34use crate::vars::utf8;
35use crate::version_constants::{VERSION_0_7_0_ALPHA, VERSION_0_9_0_ALPHA};
36
37#[derive(Clone)]
38pub struct Gatewayd {
39    pub(crate) process: ProcessHandle,
40    pub ln: LightningNode,
41    pub addr: String,
42    pub(crate) lightning_node_addr: String,
43    pub gatewayd_version: Version,
44    pub gw_name: String,
45    pub log_path: PathBuf,
46    pub gw_port: u16,
47    pub ldk_port: u16,
48}
49
50impl Gatewayd {
51    pub async fn new(process_mgr: &ProcessManager, ln: LightningNode) -> Result<Self> {
52        let ln_type = ln.ln_type();
53        let (gw_name, port, lightning_node_port) = match &ln {
54            LightningNode::Lnd(_) => (
55                "gatewayd-lnd".to_string(),
56                process_mgr.globals.FM_PORT_GW_LND,
57                process_mgr.globals.FM_PORT_LND_LISTEN,
58            ),
59            LightningNode::Ldk {
60                name,
61                gw_port,
62                ldk_port,
63            } => (name.to_owned(), gw_port.to_owned(), ldk_port.to_owned()),
64        };
65        let test_dir = &process_mgr.globals.FM_TEST_DIR;
66        let addr = format!("http://127.0.0.1:{port}/{V1_API_ENDPOINT}");
67        let lightning_node_addr = format!("127.0.0.1:{lightning_node_port}");
68
69        let mut gateway_env: HashMap<String, String> = HashMap::from_iter([
70            (
71                FM_GATEWAY_DATA_DIR_ENV.to_owned(),
72                format!("{}/{gw_name}", utf8(test_dir)),
73            ),
74            (
75                FM_GATEWAY_LISTEN_ADDR_ENV.to_owned(),
76                format!("127.0.0.1:{port}"),
77            ),
78            (FM_GATEWAY_API_ADDR_ENV.to_owned(), addr.clone()),
79            (FM_PORT_LDK_ENV.to_owned(), lightning_node_port.to_string()),
80        ]);
81        if !supports_lnv2() {
82            info!(target: LOG_DEVIMINT, "LNv2 is not supported, running gatewayd in LNv1 mode");
83            gateway_env.insert(
84                "FM_GATEWAY_LIGHTNING_MODULE_MODE".to_owned(),
85                "LNv1".to_string(),
86            );
87        }
88
89        let gatewayd_version = crate::util::Gatewayd::version_or_default().await;
90        if ln_type == LightningNodeType::Ldk {
91            gateway_env.insert("FM_LDK_ALIAS".to_owned(), gw_name.clone());
92
93            // Prior to `v0.9.0`, only LDK could connect to bitcoind
94            if gatewayd_version < *VERSION_0_9_0_ALPHA {
95                let btc_rpc_port = process_mgr.globals.FM_PORT_BTC_RPC;
96                gateway_env.insert(
97                    "FM_LDK_BITCOIND_RPC_URL".to_owned(),
98                    format!("http://bitcoin:bitcoin@127.0.0.1:{btc_rpc_port}"),
99                );
100            }
101        }
102
103        let process = process_mgr
104            .spawn_daemon(
105                &gw_name,
106                Gatewayd::start_gatewayd(&ln_type, &gatewayd_version).envs(gateway_env),
107            )
108            .await?;
109
110        let log_path = process_mgr
111            .globals
112            .FM_LOGS_DIR
113            .join(format!("{gw_name}.log"));
114        let gatewayd = Self {
115            process,
116            ln,
117            addr,
118            lightning_node_addr,
119            gatewayd_version,
120            gw_name,
121            log_path,
122            gw_port: port,
123            ldk_port: lightning_node_port,
124        };
125        poll(
126            "waiting for gateway to be ready to respond to rpc",
127            || async { gatewayd.gateway_id().await.map_err(ControlFlow::Continue) },
128        )
129        .await?;
130        Ok(gatewayd)
131    }
132
133    fn is_forced_current(&self) -> bool {
134        self.ln.ln_type() == LightningNodeType::Ldk && self.gatewayd_version < *VERSION_0_7_0_ALPHA
135    }
136
137    fn start_gatewayd(ln_type: &LightningNodeType, gatewayd_version: &Version) -> Command {
138        // If an LDK gateway is trying to spawn prior to v0.6, just use most recent
139        // version
140        if *ln_type == LightningNodeType::Ldk && *gatewayd_version < *VERSION_0_7_0_ALPHA {
141            cmd!("gatewayd", ln_type)
142        } else {
143            cmd!(crate::util::Gatewayd, ln_type)
144        }
145    }
146
147    pub async fn terminate(self) -> Result<()> {
148        self.process.terminate().await
149    }
150
151    pub fn set_lightning_node(&mut self, ln_node: LightningNode) {
152        self.ln = ln_node;
153    }
154
155    pub async fn stop_lightning_node(&mut self) -> Result<()> {
156        info!(target: LOG_DEVIMINT, "Stopping lightning node");
157        match self.ln.clone() {
158            LightningNode::Lnd(lnd) => lnd.terminate().await,
159            LightningNode::Ldk {
160                name: _,
161                gw_port: _,
162                ldk_port: _,
163            } => {
164                // This is not implemented because the LDK node lives in
165                // the gateway process and cannot be stopped independently.
166                unimplemented!("LDK node termination not implemented")
167            }
168        }
169    }
170
171    /// Restarts the gateway using the provided `bin_path`, which is useful for
172    /// testing upgrades.
173    pub async fn restart_with_bin(
174        &mut self,
175        process_mgr: &ProcessManager,
176        gatewayd_path: &PathBuf,
177        gateway_cli_path: &PathBuf,
178    ) -> Result<()> {
179        let ln = self.ln.clone();
180
181        self.process.terminate().await?;
182        // TODO: Audit that the environment access only happens in single-threaded code.
183        unsafe { std::env::set_var("FM_GATEWAYD_BASE_EXECUTABLE", gatewayd_path) };
184        // TODO: Audit that the environment access only happens in single-threaded code.
185        unsafe { std::env::set_var("FM_GATEWAY_CLI_BASE_EXECUTABLE", gateway_cli_path) };
186
187        if supports_lnv2() {
188            info!(target: LOG_DEVIMINT, "LNv2 is now supported, running in All mode");
189            // TODO: Audit that the environment access only happens in single-threaded code.
190            unsafe { std::env::set_var("FM_GATEWAY_LIGHTNING_MODULE_MODE", "All") };
191        }
192
193        let new_ln = ln;
194        let new_gw = Self::new(process_mgr, new_ln.clone()).await?;
195        self.process = new_gw.process;
196        self.set_lightning_node(new_ln);
197        let gatewayd_version = crate::util::Gatewayd::version_or_default().await;
198        let gateway_cli_version = crate::util::GatewayCli::version_or_default().await;
199        info!(
200            target: LOG_DEVIMINT,
201            ?gatewayd_version,
202            ?gateway_cli_version,
203            "upgraded gatewayd and gateway-cli"
204        );
205        Ok(())
206    }
207
208    pub fn cmd(&self) -> Command {
209        if self.is_forced_current() {
210            cmd!(
211                "gateway-cli",
212                "--rpcpassword=theresnosecondbest",
213                "-a",
214                &self.addr
215            )
216        } else {
217            cmd!(
218                crate::util::get_gateway_cli_path(),
219                "--rpcpassword=theresnosecondbest",
220                "-a",
221                &self.addr
222            )
223        }
224    }
225
226    pub async fn get_info(&self) -> Result<serde_json::Value> {
227        retry(
228            "Getting gateway info via gateway-cli info",
229            backoff_util::aggressive_backoff(),
230            || async { cmd!(self, "info").out_json().await },
231        )
232        .await
233        .context("Getting gateway info via gateway-cli info")
234    }
235
236    pub async fn gateway_id(&self) -> Result<String> {
237        let info = self.get_info().await?;
238        let gateway_id = info["gateway_id"]
239            .as_str()
240            .context("gateway_id must be a string")?
241            .to_owned();
242        Ok(gateway_id)
243    }
244
245    pub async fn lightning_pubkey(&self) -> Result<PublicKey> {
246        let info = self.get_info().await?;
247        let lightning_pub_key = info["lightning_pub_key"]
248            .as_str()
249            .context("lightning_pub_key must be a string")?
250            .to_owned();
251        Ok(lightning_pub_key.parse()?)
252    }
253
254    pub async fn connect_fed(&self, fed: &Federation) -> Result<()> {
255        let invite_code = fed.invite_code()?;
256        poll("gateway connect-fed", || async {
257            cmd!(self, "connect-fed", invite_code.clone())
258                .run()
259                .await
260                .map_err(ControlFlow::Continue)?;
261            Ok(())
262        })
263        .await?;
264        Ok(())
265    }
266
267    pub async fn recover_fed(&self, fed: &Federation) -> Result<()> {
268        let federation_id = fed.calculate_federation_id();
269        let invite_code = fed.invite_code()?;
270        info!(target: LOG_DEVIMINT, federation_id = %federation_id, "Recovering...");
271        poll("gateway connect-fed --recover=true", || async {
272            cmd!(self, "connect-fed", invite_code.clone(), "--recover=true")
273                .run()
274                .await
275                .map_err(ControlFlow::Continue)?;
276            Ok(())
277        })
278        .await?;
279        Ok(())
280    }
281
282    pub async fn backup_to_fed(&self, fed: &Federation) -> Result<()> {
283        let federation_id = fed.calculate_federation_id();
284        cmd!(self, "ecash", "backup", "--federation-id", federation_id)
285            .run()
286            .await?;
287        Ok(())
288    }
289
290    pub async fn get_pegin_addr(&self, fed_id: &str) -> Result<String> {
291        Ok(cmd!(self, "ecash", "pegin", "--federation-id={fed_id}")
292            .out_json()
293            .await?
294            .as_str()
295            .context("address must be a string")?
296            .to_owned())
297    }
298
299    pub async fn get_ln_onchain_address(&self) -> Result<String> {
300        cmd!(self, "onchain", "address").out_string().await
301    }
302
303    pub async fn get_mnemonic(&self) -> Result<MnemonicResponse> {
304        let value = retry(
305            "Getting gateway mnemonic",
306            backoff_util::aggressive_backoff(),
307            || async { cmd!(self, "seed").out_json().await },
308        )
309        .await
310        .context("Getting gateway mnemonic")?;
311
312        Ok(serde_json::from_value(value)?)
313    }
314
315    pub async fn leave_federation(&self, federation_id: FederationId) -> Result<()> {
316        cmd!(self, "leave-fed", "--federation-id", federation_id)
317            .run()
318            .await?;
319        Ok(())
320    }
321
322    pub async fn create_invoice(&self, amount_msats: u64) -> Result<Bolt11Invoice> {
323        Ok(Bolt11Invoice::from_str(
324            &cmd!(self, "lightning", "create-invoice", amount_msats)
325                .out_string()
326                .await?,
327        )?)
328    }
329
330    pub async fn pay_invoice(&self, invoice: Bolt11Invoice) -> Result<()> {
331        cmd!(self, "lightning", "pay-invoice", invoice.to_string())
332            .run()
333            .await?;
334
335        Ok(())
336    }
337
338    pub async fn send_ecash(&self, federation_id: String, amount_msats: u64) -> Result<String> {
339        let value = cmd!(
340            self,
341            "ecash",
342            "send",
343            "--federation-id",
344            federation_id,
345            amount_msats
346        )
347        .out_json()
348        .await?;
349        let ecash: String = serde_json::from_value(
350            value
351                .get("notes")
352                .expect("notes key does not exist")
353                .clone(),
354        )?;
355        Ok(ecash)
356    }
357
358    pub async fn receive_ecash(&self, ecash: String) -> Result<()> {
359        cmd!(self, "ecash", "receive", "--notes", ecash)
360            .run()
361            .await?;
362        Ok(())
363    }
364
365    pub async fn get_balances(&self) -> Result<GatewayBalances> {
366        let value = cmd!(self, "get-balances").out_json().await?;
367        Ok(serde_json::from_value(value)?)
368    }
369
370    pub async fn ecash_balance(&self, federation_id: String) -> anyhow::Result<u64> {
371        let federation_id = FederationId::from_str(&federation_id)?;
372        let balances = self.get_balances().await?;
373        let ecash_balance = balances
374            .ecash_balances
375            .into_iter()
376            .find(|info| info.federation_id == federation_id)
377            .ok_or(anyhow::anyhow!("Gateway is not joined to federation"))?
378            .ecash_balance_msats
379            .msats;
380        Ok(ecash_balance)
381    }
382
383    pub async fn send_onchain(
384        &self,
385        bitcoind: &Bitcoind,
386        amount: BitcoinAmountOrAll,
387        fee_rate: u64,
388    ) -> Result<bitcoin::Txid> {
389        let withdraw_address = bitcoind.get_new_address().await?;
390        let value = cmd!(
391            self,
392            "onchain",
393            "send",
394            "--address",
395            withdraw_address,
396            "--amount",
397            amount,
398            "--fee-rate-sats-per-vbyte",
399            fee_rate
400        )
401        .out_json()
402        .await?;
403
404        let txid: bitcoin::Txid = serde_json::from_value(value)?;
405        Ok(txid)
406    }
407
408    pub async fn close_channel(&self, remote_pubkey: PublicKey, force: bool) -> Result<()> {
409        let gateway_cli_version = crate::util::GatewayCli::version_or_default().await;
410        let mut close_channel = if force && gateway_cli_version >= *VERSION_0_9_0_ALPHA {
411            cmd!(
412                self,
413                "lightning",
414                "close-channels-with-peer",
415                "--pubkey",
416                remote_pubkey,
417                "--force",
418            )
419        } else {
420            cmd!(
421                self,
422                "lightning",
423                "close-channels-with-peer",
424                "--pubkey",
425                remote_pubkey,
426            )
427        };
428
429        close_channel.run().await?;
430
431        Ok(())
432    }
433
434    pub async fn close_all_channels(&self, force: bool) -> Result<()> {
435        let channels = self.list_channels().await?;
436
437        for chan in channels {
438            let remote_pubkey = chan.remote_pubkey;
439            self.close_channel(remote_pubkey, force).await?;
440        }
441
442        Ok(())
443    }
444
445    /// Open a channel with the gateway's lightning node, returning the funding
446    /// transaction txid.
447    pub async fn open_channel(
448        &self,
449        gw: &Gatewayd,
450        channel_size_sats: u64,
451        push_amount_sats: Option<u64>,
452    ) -> Result<Txid> {
453        let pubkey = gw.lightning_pubkey().await?;
454
455        let mut command = cmd!(
456            self,
457            "lightning",
458            "open-channel",
459            "--pubkey",
460            pubkey,
461            "--host",
462            gw.lightning_node_addr,
463            "--channel-size-sats",
464            channel_size_sats,
465            "--push-amount-sats",
466            push_amount_sats.unwrap_or(0)
467        );
468
469        Ok(Txid::from_str(&command.out_string().await?)?)
470    }
471
472    pub async fn list_channels(&self) -> Result<Vec<ChannelInfo>> {
473        let gateway_cli_version = crate::util::GatewayCli::version_or_default().await;
474        let channels = if gateway_cli_version >= *VERSION_0_9_0_ALPHA || self.is_forced_current() {
475            cmd!(self, "lightning", "list-channels").out_json().await?
476        } else {
477            cmd!(self, "lightning", "list-active-channels")
478                .out_json()
479                .await?
480        };
481
482        let channels = channels
483            .as_array()
484            .context("channels must be an array")?
485            .iter()
486            .map(|channel| {
487                let remote_pubkey = channel["remote_pubkey"]
488                    .as_str()
489                    .context("remote_pubkey must be a string")?
490                    .to_owned();
491                let channel_size_sats = channel["channel_size_sats"]
492                    .as_u64()
493                    .context("channel_size_sats must be a u64")?;
494                let outbound_liquidity_sats = channel["outbound_liquidity_sats"]
495                    .as_u64()
496                    .context("outbound_liquidity_sats must be a u64")?;
497                let inbound_liquidity_sats = channel["inbound_liquidity_sats"]
498                    .as_u64()
499                    .context("inbound_liquidity_sats must be a u64")?;
500                let is_active = channel["is_active"].as_bool().unwrap_or(true);
501                Ok(ChannelInfo {
502                    remote_pubkey: remote_pubkey
503                        .parse()
504                        .expect("Lightning node returned invalid remote channel pubkey"),
505                    channel_size_sats,
506                    outbound_liquidity_sats,
507                    inbound_liquidity_sats,
508                    is_active,
509                })
510            })
511            .collect::<Result<Vec<ChannelInfo>>>()?;
512        Ok(channels)
513    }
514
515    pub async fn wait_for_block_height(&self, target_block_height: u64) -> Result<()> {
516        poll("waiting for block height", || async {
517            let info = self.get_info().await.map_err(ControlFlow::Continue)?;
518            let value = info.get("block_height");
519            if let Some(height) = value {
520                let block_height: Option<u32> = serde_json::from_value(height.clone())
521                    .context("Could not parse block height")
522                    .map_err(ControlFlow::Continue)?;
523                let Some(block_height) = block_height else {
524                    return Err(ControlFlow::Continue(anyhow!("Not synced any blocks yet")));
525                };
526                let synced = info["synced_to_chain"]
527                    .as_bool()
528                    .expect("Could not get synced_to_chain");
529                if block_height >= target_block_height as u32 && synced {
530                    return Ok(());
531                }
532            }
533            Err(ControlFlow::Continue(anyhow!("Not synced to block")))
534        })
535        .await?;
536        Ok(())
537    }
538
539    pub async fn get_lightning_fee(&self, fed_id: String) -> Result<PaymentFee> {
540        let info_value = self.get_info().await?;
541        let federations = info_value["federations"]
542            .as_array()
543            .expect("federations is an array");
544
545        let fed = federations
546            .iter()
547            .find(|fed| {
548                serde_json::from_value::<String>(fed["federation_id"].clone())
549                    .expect("could not deserialize federation_id")
550                    == fed_id
551            })
552            .ok_or_else(|| anyhow!("Federation not found"))?;
553
554        let lightning_fee = fed["config"]["lightning_fee"].clone();
555        let base: Amount = serde_json::from_value(lightning_fee["base"].clone())
556            .map_err(|e| anyhow!("Couldnt parse base: {}", e))?;
557        let parts_per_million: u64 =
558            serde_json::from_value(lightning_fee["parts_per_million"].clone())
559                .map_err(|e| anyhow!("Couldnt parse parts_per_million: {}", e))?;
560
561        Ok(PaymentFee {
562            base,
563            parts_per_million,
564        })
565    }
566
567    pub async fn set_federation_routing_fee(
568        &self,
569        fed_id: String,
570        base: u64,
571        ppm: u64,
572    ) -> Result<()> {
573        cmd!(
574            self,
575            "cfg",
576            "set-fees",
577            "--federation-id",
578            fed_id,
579            "--ln-base",
580            base,
581            "--ln-ppm",
582            ppm
583        )
584        .run()
585        .await?;
586
587        Ok(())
588    }
589
590    pub async fn set_federation_transaction_fee(
591        &self,
592        fed_id: String,
593        base: u64,
594        ppm: u64,
595    ) -> Result<()> {
596        cmd!(
597            self,
598            "cfg",
599            "set-fees",
600            "--federation-id",
601            fed_id,
602            "--tx-base",
603            base,
604            "--tx-ppm",
605            ppm
606        )
607        .run()
608        .await?;
609
610        Ok(())
611    }
612
613    pub async fn payment_summary(&self) -> Result<PaymentSummaryResponse> {
614        let out_json = cmd!(self, "payment-summary").out_json().await?;
615        Ok(serde_json::from_value(out_json).expect("Could not deserialize PaymentSummaryResponse"))
616    }
617
618    pub async fn wait_bolt11_invoice(&self, payment_hash: Vec<u8>) -> Result<()> {
619        let gatewayd_version = crate::util::Gatewayd::version_or_default().await;
620        if gatewayd_version < *VERSION_0_7_0_ALPHA {
621            if let LightningNode::Lnd(lnd) = &self.ln {
622                return lnd.wait_bolt11_invoice(payment_hash).await;
623            }
624
625            debug!("Skipping bolt11 invoice check because it is not supported until v0.7");
626            return Ok(());
627        }
628
629        let payment_hash =
630            sha256::Hash::from_slice(&payment_hash).expect("Could not parse payment hash");
631        let invoice_val = cmd!(
632            self,
633            "lightning",
634            "get-invoice",
635            "--payment-hash",
636            payment_hash
637        )
638        .out_json()
639        .await?;
640        let invoice: GetInvoiceResponse =
641            serde_json::from_value(invoice_val).expect("Could not parse GetInvoiceResponse");
642        anyhow::ensure!(invoice.status == PaymentStatus::Succeeded);
643
644        Ok(())
645    }
646
647    pub async fn list_transactions(
648        &self,
649        start: SystemTime,
650        end: SystemTime,
651    ) -> Result<Vec<PaymentDetails>> {
652        let start_datetime: DateTime<Utc> = start.into();
653        let end_datetime: DateTime<Utc> = end.into();
654        let response = cmd!(
655            self,
656            "lightning",
657            "list-transactions",
658            "--start-time",
659            start_datetime.to_rfc3339(),
660            "--end-time",
661            end_datetime.to_rfc3339()
662        )
663        .out_json()
664        .await?;
665        let transactions = serde_json::from_value::<ListTransactionsResponse>(response)?;
666        Ok(transactions.transactions)
667    }
668
669    pub async fn create_offer(&self, amount: Option<Amount>) -> Result<String> {
670        let offer_value = if let Some(amount) = amount {
671            cmd!(
672                self,
673                "lightning",
674                "create-offer",
675                "--amount-msat",
676                amount.msats
677            )
678            .out_json()
679            .await?
680        } else {
681            cmd!(self, "lightning", "create-offer").out_json().await?
682        };
683        let offer_response = serde_json::from_value::<CreateOfferResponse>(offer_value)
684            .expect("Could not parse offer response");
685        Ok(offer_response.offer)
686    }
687
688    pub async fn pay_offer(&self, offer: String, amount: Option<Amount>) -> Result<()> {
689        if let Some(amount) = amount {
690            cmd!(
691                self,
692                "lightning",
693                "pay-offer",
694                "--offer",
695                offer,
696                "--amount-msat",
697                amount.msats
698            )
699            .run()
700            .await?;
701        } else {
702            cmd!(self, "lightning", "pay-offer", "--offer", offer)
703                .run()
704                .await?;
705        }
706
707        Ok(())
708    }
709}