devimint/
gatewayd.rs

1use std::collections::HashMap;
2use std::ops::ControlFlow;
3use std::path::PathBuf;
4use std::str::FromStr;
5use std::time::SystemTime;
6
7use anyhow::{Context, Result, anyhow};
8use bitcoin::hashes::sha256;
9use chrono::{DateTime, Utc};
10use esplora_client::Txid;
11use fedimint_core::config::FederationId;
12use fedimint_core::secp256k1::PublicKey;
13use fedimint_core::util::{backoff_util, retry};
14use fedimint_core::{Amount, BitcoinAmountOrAll, BitcoinHash};
15use fedimint_gateway_common::envs::FM_GATEWAY_IROH_SECRET_KEY_OVERRIDE_ENV;
16use fedimint_gateway_common::{
17    ChannelInfo, CreateOfferResponse, GatewayBalances, GetInvoiceResponse,
18    ListTransactionsResponse, MnemonicResponse, PaymentDetails, PaymentStatus,
19    PaymentSummaryResponse, V1_API_ENDPOINT,
20};
21use fedimint_ln_server::common::lightning_invoice::Bolt11Invoice;
22use fedimint_lnv2_common::gateway_api::PaymentFee;
23use fedimint_logging::LOG_DEVIMINT;
24use fedimint_testing_core::node_type::LightningNodeType;
25use semver::Version;
26use tracing::info;
27
28use crate::cmd;
29use crate::envs::{
30    FM_GATEWAY_API_ADDR_ENV, FM_GATEWAY_DATA_DIR_ENV, FM_GATEWAY_IROH_LISTEN_ADDR_ENV,
31    FM_GATEWAY_LISTEN_ADDR_ENV, FM_PORT_LDK_ENV,
32};
33use crate::external::{Bitcoind, LightningNode};
34use crate::federation::Federation;
35use crate::util::{Command, ProcessHandle, ProcessManager, poll, supports_lnv2};
36use crate::vars::utf8;
37use crate::version_constants::{VERSION_0_9_0_ALPHA, VERSION_0_10_0_ALPHA};
38
39#[derive(Clone)]
40pub struct Gatewayd {
41    pub(crate) process: ProcessHandle,
42    pub ln: LightningNode,
43    pub addr: String,
44    pub(crate) lightning_node_addr: String,
45    pub gatewayd_version: Version,
46    pub gw_name: String,
47    pub log_path: PathBuf,
48    pub gw_port: u16,
49    pub ldk_port: u16,
50    pub gateway_id: String,
51    pub iroh_port: u16,
52    pub node_id: iroh_base::NodeId,
53    pub gateway_index: usize,
54}
55
56impl Gatewayd {
57    pub async fn new(
58        process_mgr: &ProcessManager,
59        ln: LightningNode,
60        gateway_index: usize,
61    ) -> Result<Self> {
62        let ln_type = ln.ln_type();
63        let (gw_name, port, lightning_node_port) = match &ln {
64            LightningNode::Lnd(_) => (
65                "gatewayd-lnd".to_string(),
66                process_mgr.globals.FM_PORT_GW_LND,
67                process_mgr.globals.FM_PORT_LND_LISTEN,
68            ),
69            LightningNode::Ldk {
70                name,
71                gw_port,
72                ldk_port,
73            } => (name.to_owned(), gw_port.to_owned(), ldk_port.to_owned()),
74        };
75        let test_dir = &process_mgr.globals.FM_TEST_DIR;
76        let addr = format!("http://127.0.0.1:{port}/{V1_API_ENDPOINT}");
77        let lightning_node_addr = format!("127.0.0.1:{lightning_node_port}");
78        let iroh_endpoint = process_mgr
79            .globals
80            .gatewayd_overrides
81            .gateway_iroh_endpoints
82            .get(gateway_index)
83            .expect("No gateway for index");
84
85        let mut gateway_env: HashMap<String, String> = HashMap::from_iter([
86            (
87                FM_GATEWAY_DATA_DIR_ENV.to_owned(),
88                format!("{}/{gw_name}", utf8(test_dir)),
89            ),
90            (
91                FM_GATEWAY_LISTEN_ADDR_ENV.to_owned(),
92                format!("127.0.0.1:{port}"),
93            ),
94            (FM_GATEWAY_API_ADDR_ENV.to_owned(), addr.clone()),
95            (FM_PORT_LDK_ENV.to_owned(), lightning_node_port.to_string()),
96            (
97                FM_GATEWAY_IROH_LISTEN_ADDR_ENV.to_owned(),
98                format!("127.0.0.1:{}", iroh_endpoint.port()),
99            ),
100            (
101                FM_GATEWAY_IROH_SECRET_KEY_OVERRIDE_ENV.to_owned(),
102                iroh_endpoint.secret_key(),
103            ),
104        ]);
105
106        let gatewayd_version = crate::util::Gatewayd::version_or_default().await;
107        if gatewayd_version < *VERSION_0_9_0_ALPHA {
108            let mode = if supports_lnv2() {
109                "All"
110            } else {
111                info!(target: LOG_DEVIMINT, "LNv2 is not supported, running gatewayd in LNv1 mode");
112                "LNv1"
113            };
114            gateway_env.insert(
115                "FM_GATEWAY_LIGHTNING_MODULE_MODE".to_owned(),
116                mode.to_string(),
117            );
118        }
119
120        if ln_type == LightningNodeType::Ldk {
121            gateway_env.insert("FM_LDK_ALIAS".to_owned(), gw_name.clone());
122
123            // Prior to `v0.9.0`, only LDK could connect to bitcoind
124            if gatewayd_version < *VERSION_0_9_0_ALPHA {
125                let btc_rpc_port = process_mgr.globals.FM_PORT_BTC_RPC;
126                gateway_env.insert(
127                    "FM_LDK_BITCOIND_RPC_URL".to_owned(),
128                    format!("http://bitcoin:bitcoin@127.0.0.1:{btc_rpc_port}"),
129                );
130            }
131        }
132
133        let process = process_mgr
134            .spawn_daemon(
135                &gw_name,
136                cmd!(crate::util::Gatewayd, ln_type).envs(gateway_env),
137            )
138            .await?;
139
140        let gateway_id = poll(
141            "waiting for gateway to be ready to respond to rpc",
142            || async {
143                // Once the gateway id is available via RPC, the gateway is ready
144                let info = cmd!(
145                    crate::util::get_gateway_cli_path(),
146                    "--rpcpassword",
147                    "theresnosecondbest",
148                    "-a",
149                    addr,
150                    "info"
151                )
152                .out_json()
153                .await
154                .map_err(ControlFlow::Continue)?;
155                let gateway_id = info["gateway_id"]
156                    .as_str()
157                    .context("gateway_id must be a string")
158                    .map_err(ControlFlow::Break)?
159                    .to_owned();
160                Ok(gateway_id)
161            },
162        )
163        .await?;
164
165        let log_path = process_mgr
166            .globals
167            .FM_LOGS_DIR
168            .join(format!("{gw_name}.log"));
169        let gatewayd = Self {
170            process,
171            ln,
172            addr,
173            lightning_node_addr,
174            gatewayd_version,
175            gw_name,
176            log_path,
177            gw_port: port,
178            ldk_port: lightning_node_port,
179            gateway_id,
180            iroh_port: iroh_endpoint.port(),
181            node_id: iroh_endpoint.node_id(),
182            gateway_index,
183        };
184
185        Ok(gatewayd)
186    }
187
188    pub async fn terminate(self) -> Result<()> {
189        self.process.terminate().await
190    }
191
192    pub fn set_lightning_node(&mut self, ln_node: LightningNode) {
193        self.ln = ln_node;
194    }
195
196    pub async fn stop_lightning_node(&mut self) -> Result<()> {
197        info!(target: LOG_DEVIMINT, "Stopping lightning node");
198        match self.ln.clone() {
199            LightningNode::Lnd(lnd) => lnd.terminate().await,
200            LightningNode::Ldk {
201                name: _,
202                gw_port: _,
203                ldk_port: _,
204            } => {
205                // This is not implemented because the LDK node lives in
206                // the gateway process and cannot be stopped independently.
207                unimplemented!("LDK node termination not implemented")
208            }
209        }
210    }
211
212    /// Restarts the gateway using the provided `bin_path`, which is useful for
213    /// testing upgrades.
214    pub async fn restart_with_bin(
215        &mut self,
216        process_mgr: &ProcessManager,
217        gatewayd_path: &PathBuf,
218        gateway_cli_path: &PathBuf,
219    ) -> Result<()> {
220        let ln = self.ln.clone();
221
222        self.process.terminate().await?;
223        // TODO: Audit that the environment access only happens in single-threaded code.
224        unsafe { std::env::set_var("FM_GATEWAYD_BASE_EXECUTABLE", gatewayd_path) };
225        // TODO: Audit that the environment access only happens in single-threaded code.
226        unsafe { std::env::set_var("FM_GATEWAY_CLI_BASE_EXECUTABLE", gateway_cli_path) };
227
228        let gatewayd_version = crate::util::Gatewayd::version_or_default().await;
229        if gatewayd_version < *VERSION_0_9_0_ALPHA && supports_lnv2() {
230            info!(target: LOG_DEVIMINT, "LNv2 is now supported, running in All mode");
231            // TODO: Audit that the environment access only happens in single-threaded code.
232            unsafe { std::env::set_var("FM_GATEWAY_LIGHTNING_MODULE_MODE", "All") };
233        }
234
235        let new_ln = ln;
236        let new_gw = Self::new(process_mgr, new_ln.clone(), self.gateway_index).await?;
237        self.process = new_gw.process;
238        self.set_lightning_node(new_ln);
239        let gateway_cli_version = crate::util::GatewayCli::version_or_default().await;
240        info!(
241            target: LOG_DEVIMINT,
242            ?gatewayd_version,
243            ?gateway_cli_version,
244            "upgraded gatewayd and gateway-cli"
245        );
246        Ok(())
247    }
248
249    pub fn cmd(&self) -> Command {
250        cmd!(
251            crate::util::get_gateway_cli_path(),
252            "--rpcpassword=theresnosecondbest",
253            "-a",
254            &self.addr
255        )
256    }
257
258    pub async fn gateway_id(&self) -> Result<String> {
259        let info = self.get_info().await?;
260        let gateway_id = info["gateway_id"]
261            .as_str()
262            .context("gateway_id must be a string")?
263            .to_owned();
264        Ok(gateway_id)
265    }
266
267    pub async fn get_info(&self) -> Result<serde_json::Value> {
268        retry(
269            "Getting gateway info via gateway-cli info",
270            backoff_util::aggressive_backoff(),
271            || async { cmd!(self, "info").out_json().await },
272        )
273        .await
274        .context("Getting gateway info via gateway-cli info")
275    }
276
277    pub async fn get_info_iroh(&self) -> Result<serde_json::Value> {
278        cmd!(
279            crate::util::get_gateway_cli_path(),
280            "--rpcpassword=theresnosecondbest",
281            "--address",
282            format!("iroh://{}", self.node_id),
283            "info",
284        )
285        .out_json()
286        .await
287    }
288
289    pub async fn lightning_pubkey(&self) -> Result<PublicKey> {
290        let info = self.get_info().await?;
291        let lightning_pub_key = if self.gatewayd_version < *VERSION_0_10_0_ALPHA {
292            info["lightning_pub_key"]
293                .as_str()
294                .context("lightning_pub_key must be a string")?
295                .to_owned()
296        } else {
297            info["lightning_info"]["connected"]["public_key"]
298                .as_str()
299                .context("lightning_pub_key must be a string")?
300                .to_owned()
301        };
302
303        Ok(lightning_pub_key.parse()?)
304    }
305
306    pub async fn connect_fed(&self, fed: &Federation) -> Result<()> {
307        let invite_code = fed.invite_code()?;
308        poll("gateway connect-fed", || async {
309            cmd!(self, "connect-fed", invite_code.clone())
310                .run()
311                .await
312                .map_err(ControlFlow::Continue)?;
313            Ok(())
314        })
315        .await?;
316        Ok(())
317    }
318
319    pub async fn recover_fed(&self, fed: &Federation) -> Result<()> {
320        let federation_id = fed.calculate_federation_id();
321        let invite_code = fed.invite_code()?;
322        info!(target: LOG_DEVIMINT, federation_id = %federation_id, "Recovering...");
323        poll("gateway connect-fed --recover=true", || async {
324            cmd!(self, "connect-fed", invite_code.clone(), "--recover=true")
325                .run()
326                .await
327                .map_err(ControlFlow::Continue)?;
328            Ok(())
329        })
330        .await?;
331        Ok(())
332    }
333
334    pub async fn backup_to_fed(&self, fed: &Federation) -> Result<()> {
335        let federation_id = fed.calculate_federation_id();
336        cmd!(self, "ecash", "backup", "--federation-id", federation_id)
337            .run()
338            .await?;
339        Ok(())
340    }
341
342    pub async fn get_pegin_addr(&self, fed_id: &str) -> Result<String> {
343        Ok(cmd!(self, "ecash", "pegin", "--federation-id={fed_id}")
344            .out_json()
345            .await?
346            .as_str()
347            .context("address must be a string")?
348            .to_owned())
349    }
350
351    pub async fn get_ln_onchain_address(&self) -> Result<String> {
352        cmd!(self, "onchain", "address").out_string().await
353    }
354
355    pub async fn get_mnemonic(&self) -> Result<MnemonicResponse> {
356        let value = retry(
357            "Getting gateway mnemonic",
358            backoff_util::aggressive_backoff(),
359            || async { cmd!(self, "seed").out_json().await },
360        )
361        .await
362        .context("Getting gateway mnemonic")?;
363
364        Ok(serde_json::from_value(value)?)
365    }
366
367    pub async fn leave_federation(&self, federation_id: FederationId) -> Result<()> {
368        cmd!(self, "leave-fed", "--federation-id", federation_id)
369            .run()
370            .await?;
371        Ok(())
372    }
373
374    pub async fn create_invoice(&self, amount_msats: u64) -> Result<Bolt11Invoice> {
375        Ok(Bolt11Invoice::from_str(
376            &cmd!(self, "lightning", "create-invoice", amount_msats)
377                .out_string()
378                .await?,
379        )?)
380    }
381
382    pub async fn pay_invoice(&self, invoice: Bolt11Invoice) -> Result<()> {
383        cmd!(self, "lightning", "pay-invoice", invoice.to_string())
384            .run()
385            .await?;
386
387        Ok(())
388    }
389
390    pub async fn send_ecash(&self, federation_id: String, amount_msats: u64) -> Result<String> {
391        let value = cmd!(
392            self,
393            "ecash",
394            "send",
395            "--federation-id",
396            federation_id,
397            amount_msats
398        )
399        .out_json()
400        .await?;
401        let ecash: String = serde_json::from_value(
402            value
403                .get("notes")
404                .expect("notes key does not exist")
405                .clone(),
406        )?;
407        Ok(ecash)
408    }
409
410    pub async fn receive_ecash(&self, ecash: String) -> Result<()> {
411        cmd!(self, "ecash", "receive", "--notes", ecash)
412            .run()
413            .await?;
414        Ok(())
415    }
416
417    pub async fn get_balances(&self) -> Result<GatewayBalances> {
418        let value = cmd!(self, "get-balances").out_json().await?;
419        Ok(serde_json::from_value(value)?)
420    }
421
422    pub async fn ecash_balance(&self, federation_id: String) -> anyhow::Result<u64> {
423        let federation_id = FederationId::from_str(&federation_id)?;
424        let balances = self.get_balances().await?;
425        let ecash_balance = balances
426            .ecash_balances
427            .into_iter()
428            .find(|info| info.federation_id == federation_id)
429            .ok_or(anyhow::anyhow!("Gateway is not joined to federation"))?
430            .ecash_balance_msats
431            .msats;
432        Ok(ecash_balance)
433    }
434
435    pub async fn send_onchain(
436        &self,
437        bitcoind: &Bitcoind,
438        amount: BitcoinAmountOrAll,
439        fee_rate: u64,
440    ) -> Result<bitcoin::Txid> {
441        let withdraw_address = bitcoind.get_new_address().await?;
442        let value = cmd!(
443            self,
444            "onchain",
445            "send",
446            "--address",
447            withdraw_address,
448            "--amount",
449            amount,
450            "--fee-rate-sats-per-vbyte",
451            fee_rate
452        )
453        .out_json()
454        .await?;
455
456        let txid: bitcoin::Txid = serde_json::from_value(value)?;
457        Ok(txid)
458    }
459
460    pub async fn close_channel(&self, remote_pubkey: PublicKey, force: bool) -> Result<()> {
461        let gateway_cli_version = crate::util::GatewayCli::version_or_default().await;
462        let mut close_channel = if force && gateway_cli_version >= *VERSION_0_9_0_ALPHA {
463            cmd!(
464                self,
465                "lightning",
466                "close-channels-with-peer",
467                "--pubkey",
468                remote_pubkey,
469                "--force",
470            )
471        } else if gateway_cli_version < *VERSION_0_10_0_ALPHA {
472            cmd!(
473                self,
474                "lightning",
475                "close-channels-with-peer",
476                "--pubkey",
477                remote_pubkey,
478            )
479        } else {
480            cmd!(
481                self,
482                "lightning",
483                "close-channels-with-peer",
484                "--pubkey",
485                remote_pubkey,
486                "--sats-per-vbyte",
487                "10",
488            )
489        };
490
491        close_channel.run().await?;
492
493        Ok(())
494    }
495
496    pub async fn close_all_channels(&self, force: bool) -> Result<()> {
497        let channels = self.list_channels().await?;
498
499        for chan in channels {
500            let remote_pubkey = chan.remote_pubkey;
501            self.close_channel(remote_pubkey, force).await?;
502        }
503
504        Ok(())
505    }
506
507    /// Open a channel with the gateway's lightning node, returning the funding
508    /// transaction txid.
509    pub async fn open_channel(
510        &self,
511        gw: &Gatewayd,
512        channel_size_sats: u64,
513        push_amount_sats: Option<u64>,
514    ) -> Result<Txid> {
515        let pubkey = gw.lightning_pubkey().await?;
516
517        let mut command = cmd!(
518            self,
519            "lightning",
520            "open-channel",
521            "--pubkey",
522            pubkey,
523            "--host",
524            gw.lightning_node_addr,
525            "--channel-size-sats",
526            channel_size_sats,
527            "--push-amount-sats",
528            push_amount_sats.unwrap_or(0)
529        );
530
531        Ok(Txid::from_str(&command.out_string().await?)?)
532    }
533
534    pub async fn list_channels(&self) -> Result<Vec<ChannelInfo>> {
535        let gateway_cli_version = crate::util::GatewayCli::version_or_default().await;
536        let channels = if gateway_cli_version >= *VERSION_0_9_0_ALPHA {
537            cmd!(self, "lightning", "list-channels").out_json().await?
538        } else {
539            cmd!(self, "lightning", "list-active-channels")
540                .out_json()
541                .await?
542        };
543
544        let channels = channels
545            .as_array()
546            .context("channels must be an array")?
547            .iter()
548            .map(|channel| {
549                let remote_pubkey = channel["remote_pubkey"]
550                    .as_str()
551                    .context("remote_pubkey must be a string")?
552                    .to_owned();
553                let channel_size_sats = channel["channel_size_sats"]
554                    .as_u64()
555                    .context("channel_size_sats must be a u64")?;
556                let outbound_liquidity_sats = channel["outbound_liquidity_sats"]
557                    .as_u64()
558                    .context("outbound_liquidity_sats must be a u64")?;
559                let inbound_liquidity_sats = channel["inbound_liquidity_sats"]
560                    .as_u64()
561                    .context("inbound_liquidity_sats must be a u64")?;
562                let is_active = channel["is_active"].as_bool().unwrap_or(true);
563                let funding_outpoint = channel.get("funding_outpoint").map(|v| {
564                    serde_json::from_value::<bitcoin::OutPoint>(v.clone())
565                        .expect("Could not deserialize outpoint")
566                });
567                Ok(ChannelInfo {
568                    remote_pubkey: remote_pubkey
569                        .parse()
570                        .expect("Lightning node returned invalid remote channel pubkey"),
571                    channel_size_sats,
572                    outbound_liquidity_sats,
573                    inbound_liquidity_sats,
574                    is_active,
575                    funding_outpoint,
576                })
577            })
578            .collect::<Result<Vec<ChannelInfo>>>()?;
579        Ok(channels)
580    }
581
582    pub async fn wait_for_block_height(&self, target_block_height: u64) -> Result<()> {
583        poll("waiting for block height", || async {
584            let info = self.get_info().await.map_err(ControlFlow::Continue)?;
585
586            let height_value = if self.gatewayd_version < *VERSION_0_10_0_ALPHA {
587                info["block_height"].clone()
588            } else {
589                info["lightning_info"]["connected"]["block_height"].clone()
590            };
591
592            let block_height: Option<u32> = serde_json::from_value(height_value)
593                .context("Could not parse block height")
594                .map_err(ControlFlow::Continue)?;
595            let Some(block_height) = block_height else {
596                return Err(ControlFlow::Continue(anyhow!("Not synced any blocks yet")));
597            };
598
599            let synced_value = if self.gatewayd_version < *VERSION_0_10_0_ALPHA {
600                info["synced_to_chain"].clone()
601            } else {
602                info["lightning_info"]["connected"]["synced_to_chain"].clone()
603            };
604            let synced = synced_value
605                .as_bool()
606                .expect("Could not get synced_to_chain");
607            if block_height >= target_block_height as u32 && synced {
608                return Ok(());
609            }
610
611            Err(ControlFlow::Continue(anyhow!("Not synced to block")))
612        })
613        .await?;
614        Ok(())
615    }
616
617    pub async fn get_lightning_fee(&self, fed_id: String) -> Result<PaymentFee> {
618        let info_value = self.get_info().await?;
619        let federations = info_value["federations"]
620            .as_array()
621            .expect("federations is an array");
622
623        let fed = federations
624            .iter()
625            .find(|fed| {
626                serde_json::from_value::<String>(fed["federation_id"].clone())
627                    .expect("could not deserialize federation_id")
628                    == fed_id
629            })
630            .ok_or_else(|| anyhow!("Federation not found"))?;
631
632        let lightning_fee = fed["config"]["lightning_fee"].clone();
633        let base: Amount = serde_json::from_value(lightning_fee["base"].clone())
634            .map_err(|e| anyhow!("Couldnt parse base: {}", e))?;
635        let parts_per_million: u64 =
636            serde_json::from_value(lightning_fee["parts_per_million"].clone())
637                .map_err(|e| anyhow!("Couldnt parse parts_per_million: {}", e))?;
638
639        Ok(PaymentFee {
640            base,
641            parts_per_million,
642        })
643    }
644
645    pub async fn set_federation_routing_fee(
646        &self,
647        fed_id: String,
648        base: u64,
649        ppm: u64,
650    ) -> Result<()> {
651        cmd!(
652            self,
653            "cfg",
654            "set-fees",
655            "--federation-id",
656            fed_id,
657            "--ln-base",
658            base,
659            "--ln-ppm",
660            ppm
661        )
662        .run()
663        .await?;
664
665        Ok(())
666    }
667
668    pub async fn set_federation_transaction_fee(
669        &self,
670        fed_id: String,
671        base: u64,
672        ppm: u64,
673    ) -> Result<()> {
674        cmd!(
675            self,
676            "cfg",
677            "set-fees",
678            "--federation-id",
679            fed_id,
680            "--tx-base",
681            base,
682            "--tx-ppm",
683            ppm
684        )
685        .run()
686        .await?;
687
688        Ok(())
689    }
690
691    pub async fn payment_summary(&self) -> Result<PaymentSummaryResponse> {
692        let out_json = cmd!(self, "payment-summary").out_json().await?;
693        Ok(serde_json::from_value(out_json).expect("Could not deserialize PaymentSummaryResponse"))
694    }
695
696    pub async fn wait_bolt11_invoice(&self, payment_hash: Vec<u8>) -> Result<()> {
697        let payment_hash =
698            sha256::Hash::from_slice(&payment_hash).expect("Could not parse payment hash");
699        let invoice_val = cmd!(
700            self,
701            "lightning",
702            "get-invoice",
703            "--payment-hash",
704            payment_hash
705        )
706        .out_json()
707        .await?;
708        let invoice: GetInvoiceResponse =
709            serde_json::from_value(invoice_val).expect("Could not parse GetInvoiceResponse");
710        anyhow::ensure!(invoice.status == PaymentStatus::Succeeded);
711
712        Ok(())
713    }
714
715    pub async fn list_transactions(
716        &self,
717        start: SystemTime,
718        end: SystemTime,
719    ) -> Result<Vec<PaymentDetails>> {
720        let start_datetime: DateTime<Utc> = start.into();
721        let end_datetime: DateTime<Utc> = end.into();
722        let response = cmd!(
723            self,
724            "lightning",
725            "list-transactions",
726            "--start-time",
727            start_datetime.to_rfc3339(),
728            "--end-time",
729            end_datetime.to_rfc3339()
730        )
731        .out_json()
732        .await?;
733        let transactions = serde_json::from_value::<ListTransactionsResponse>(response)?;
734        Ok(transactions.transactions)
735    }
736
737    pub async fn create_offer(&self, amount: Option<Amount>) -> Result<String> {
738        let offer_value = if let Some(amount) = amount {
739            cmd!(
740                self,
741                "lightning",
742                "create-offer",
743                "--amount-msat",
744                amount.msats
745            )
746            .out_json()
747            .await?
748        } else {
749            cmd!(self, "lightning", "create-offer").out_json().await?
750        };
751        let offer_response = serde_json::from_value::<CreateOfferResponse>(offer_value)
752            .expect("Could not parse offer response");
753        Ok(offer_response.offer)
754    }
755
756    pub async fn pay_offer(&self, offer: String, amount: Option<Amount>) -> Result<()> {
757        if let Some(amount) = amount {
758            cmd!(
759                self,
760                "lightning",
761                "pay-offer",
762                "--offer",
763                offer,
764                "--amount-msat",
765                amount.msats
766            )
767            .run()
768            .await?;
769        } else {
770            cmd!(self, "lightning", "pay-offer", "--offer", offer)
771                .run()
772                .await?;
773        }
774
775        Ok(())
776    }
777}