devimint/
gatewayd.rs

1use std::collections::HashMap;
2use std::ops::ControlFlow;
3use std::path::PathBuf;
4use std::str::FromStr;
5use std::time::SystemTime;
6
7use anyhow::{Context, Result, anyhow};
8use bitcoin::hashes::sha256;
9use chrono::{DateTime, Utc};
10use esplora_client::Txid;
11use fedimint_core::config::FederationId;
12use fedimint_core::secp256k1::PublicKey;
13use fedimint_core::util::{backoff_util, retry};
14use fedimint_core::{Amount, BitcoinAmountOrAll, BitcoinHash};
15use fedimint_gateway_common::{
16    ChannelInfo, CreateOfferResponse, GatewayBalances, GetInvoiceResponse,
17    ListTransactionsResponse, MnemonicResponse, PaymentDetails, PaymentStatus,
18    PaymentSummaryResponse, V1_API_ENDPOINT,
19};
20use fedimint_ln_server::common::lightning_invoice::Bolt11Invoice;
21use fedimint_lnv2_common::gateway_api::PaymentFee;
22use fedimint_logging::LOG_DEVIMINT;
23use fedimint_testing_core::node_type::LightningNodeType;
24use semver::Version;
25use tracing::{debug, info};
26
27use crate::cmd;
28use crate::envs::{
29    FM_GATEWAY_API_ADDR_ENV, FM_GATEWAY_DATA_DIR_ENV, FM_GATEWAY_LISTEN_ADDR_ENV, FM_PORT_LDK_ENV,
30};
31use crate::external::{Bitcoind, LightningNode};
32use crate::federation::Federation;
33use crate::util::{Command, ProcessHandle, ProcessManager, poll, supports_lnv2};
34use crate::vars::utf8;
35use crate::version_constants::{VERSION_0_7_0_ALPHA, VERSION_0_9_0_ALPHA};
36
37#[derive(Clone)]
38pub struct Gatewayd {
39    pub(crate) process: ProcessHandle,
40    pub ln: LightningNode,
41    pub addr: String,
42    pub(crate) lightning_node_addr: String,
43    pub gatewayd_version: Version,
44    pub gw_name: String,
45    pub log_path: PathBuf,
46    pub gw_port: u16,
47    pub ldk_port: u16,
48    pub gateway_id: String,
49}
50
51impl Gatewayd {
52    pub async fn new(process_mgr: &ProcessManager, ln: LightningNode) -> Result<Self> {
53        let ln_type = ln.ln_type();
54        let (gw_name, port, lightning_node_port) = match &ln {
55            LightningNode::Lnd(_) => (
56                "gatewayd-lnd".to_string(),
57                process_mgr.globals.FM_PORT_GW_LND,
58                process_mgr.globals.FM_PORT_LND_LISTEN,
59            ),
60            LightningNode::Ldk {
61                name,
62                gw_port,
63                ldk_port,
64            } => (name.to_owned(), gw_port.to_owned(), ldk_port.to_owned()),
65        };
66        let test_dir = &process_mgr.globals.FM_TEST_DIR;
67        let addr = format!("http://127.0.0.1:{port}/{V1_API_ENDPOINT}");
68        let lightning_node_addr = format!("127.0.0.1:{lightning_node_port}");
69
70        let mut gateway_env: HashMap<String, String> = HashMap::from_iter([
71            (
72                FM_GATEWAY_DATA_DIR_ENV.to_owned(),
73                format!("{}/{gw_name}", utf8(test_dir)),
74            ),
75            (
76                FM_GATEWAY_LISTEN_ADDR_ENV.to_owned(),
77                format!("127.0.0.1:{port}"),
78            ),
79            (FM_GATEWAY_API_ADDR_ENV.to_owned(), addr.clone()),
80            (FM_PORT_LDK_ENV.to_owned(), lightning_node_port.to_string()),
81        ]);
82
83        let gatewayd_version = crate::util::Gatewayd::version_or_default().await;
84        if gatewayd_version < *VERSION_0_9_0_ALPHA {
85            let mode = if supports_lnv2() {
86                "All"
87            } else {
88                info!(target: LOG_DEVIMINT, "LNv2 is not supported, running gatewayd in LNv1 mode");
89                "LNv1"
90            };
91            gateway_env.insert(
92                "FM_GATEWAY_LIGHTNING_MODULE_MODE".to_owned(),
93                mode.to_string(),
94            );
95        }
96
97        if ln_type == LightningNodeType::Ldk {
98            gateway_env.insert("FM_LDK_ALIAS".to_owned(), gw_name.clone());
99
100            // Prior to `v0.9.0`, only LDK could connect to bitcoind
101            if gatewayd_version < *VERSION_0_9_0_ALPHA {
102                let btc_rpc_port = process_mgr.globals.FM_PORT_BTC_RPC;
103                gateway_env.insert(
104                    "FM_LDK_BITCOIND_RPC_URL".to_owned(),
105                    format!("http://bitcoin:bitcoin@127.0.0.1:{btc_rpc_port}"),
106                );
107            }
108        }
109
110        let process = process_mgr
111            .spawn_daemon(
112                &gw_name,
113                Gatewayd::start_gatewayd(&ln_type, &gatewayd_version).envs(gateway_env),
114            )
115            .await?;
116
117        let gateway_id = poll(
118            "waiting for gateway to be ready to respond to rpc",
119            || async {
120                // Once the gateway id is available via RPC, the gateway is ready
121                let command = if ln.ln_type() == LightningNodeType::Ldk
122                    && gatewayd_version < *VERSION_0_7_0_ALPHA
123                {
124                    vec!["gateway-cli".to_string()]
125                } else {
126                    crate::util::get_gateway_cli_path()
127                };
128                let info = cmd!(
129                    command,
130                    "--rpcpassword",
131                    "theresnosecondbest",
132                    "-a",
133                    addr,
134                    "info"
135                )
136                .out_json()
137                .await
138                .map_err(ControlFlow::Continue)?;
139                let gateway_id = info["gateway_id"]
140                    .as_str()
141                    .context("gateway_id must be a string")
142                    .map_err(ControlFlow::Break)?
143                    .to_owned();
144                Ok(gateway_id)
145            },
146        )
147        .await?;
148
149        let log_path = process_mgr
150            .globals
151            .FM_LOGS_DIR
152            .join(format!("{gw_name}.log"));
153        let gatewayd = Self {
154            process,
155            ln,
156            addr,
157            lightning_node_addr,
158            gatewayd_version,
159            gw_name,
160            log_path,
161            gw_port: port,
162            ldk_port: lightning_node_port,
163            gateway_id,
164        };
165
166        Ok(gatewayd)
167    }
168
169    fn is_forced_current(&self) -> bool {
170        self.ln.ln_type() == LightningNodeType::Ldk && self.gatewayd_version < *VERSION_0_7_0_ALPHA
171    }
172
173    fn start_gatewayd(ln_type: &LightningNodeType, gatewayd_version: &Version) -> Command {
174        // If an LDK gateway is trying to spawn prior to v0.6, just use most recent
175        // version
176        if *ln_type == LightningNodeType::Ldk && *gatewayd_version < *VERSION_0_7_0_ALPHA {
177            cmd!("gatewayd", ln_type)
178        } else {
179            cmd!(crate::util::Gatewayd, ln_type)
180        }
181    }
182
183    pub async fn terminate(self) -> Result<()> {
184        self.process.terminate().await
185    }
186
187    pub fn set_lightning_node(&mut self, ln_node: LightningNode) {
188        self.ln = ln_node;
189    }
190
191    pub async fn stop_lightning_node(&mut self) -> Result<()> {
192        info!(target: LOG_DEVIMINT, "Stopping lightning node");
193        match self.ln.clone() {
194            LightningNode::Lnd(lnd) => lnd.terminate().await,
195            LightningNode::Ldk {
196                name: _,
197                gw_port: _,
198                ldk_port: _,
199            } => {
200                // This is not implemented because the LDK node lives in
201                // the gateway process and cannot be stopped independently.
202                unimplemented!("LDK node termination not implemented")
203            }
204        }
205    }
206
207    /// Restarts the gateway using the provided `bin_path`, which is useful for
208    /// testing upgrades.
209    pub async fn restart_with_bin(
210        &mut self,
211        process_mgr: &ProcessManager,
212        gatewayd_path: &PathBuf,
213        gateway_cli_path: &PathBuf,
214    ) -> Result<()> {
215        let ln = self.ln.clone();
216
217        self.process.terminate().await?;
218        // TODO: Audit that the environment access only happens in single-threaded code.
219        unsafe { std::env::set_var("FM_GATEWAYD_BASE_EXECUTABLE", gatewayd_path) };
220        // TODO: Audit that the environment access only happens in single-threaded code.
221        unsafe { std::env::set_var("FM_GATEWAY_CLI_BASE_EXECUTABLE", gateway_cli_path) };
222
223        let gatewayd_version = crate::util::Gatewayd::version_or_default().await;
224        if gatewayd_version < *VERSION_0_9_0_ALPHA && supports_lnv2() {
225            info!(target: LOG_DEVIMINT, "LNv2 is now supported, running in All mode");
226            // TODO: Audit that the environment access only happens in single-threaded code.
227            unsafe { std::env::set_var("FM_GATEWAY_LIGHTNING_MODULE_MODE", "All") };
228        }
229
230        let new_ln = ln;
231        let new_gw = Self::new(process_mgr, new_ln.clone()).await?;
232        self.process = new_gw.process;
233        self.set_lightning_node(new_ln);
234        let gateway_cli_version = crate::util::GatewayCli::version_or_default().await;
235        info!(
236            target: LOG_DEVIMINT,
237            ?gatewayd_version,
238            ?gateway_cli_version,
239            "upgraded gatewayd and gateway-cli"
240        );
241        Ok(())
242    }
243
244    pub fn cmd(&self) -> Command {
245        if self.is_forced_current() {
246            cmd!(
247                "gateway-cli",
248                "--rpcpassword=theresnosecondbest",
249                "-a",
250                &self.addr
251            )
252        } else {
253            cmd!(
254                crate::util::get_gateway_cli_path(),
255                "--rpcpassword=theresnosecondbest",
256                "-a",
257                &self.addr
258            )
259        }
260    }
261
262    pub async fn get_info(&self) -> Result<serde_json::Value> {
263        retry(
264            "Getting gateway info via gateway-cli info",
265            backoff_util::aggressive_backoff(),
266            || async { cmd!(self, "info").out_json().await },
267        )
268        .await
269        .context("Getting gateway info via gateway-cli info")
270    }
271
272    pub async fn lightning_pubkey(&self) -> Result<PublicKey> {
273        let info = self.get_info().await?;
274        let lightning_pub_key = info["lightning_pub_key"]
275            .as_str()
276            .context("lightning_pub_key must be a string")?
277            .to_owned();
278        Ok(lightning_pub_key.parse()?)
279    }
280
281    pub async fn connect_fed(&self, fed: &Federation) -> Result<()> {
282        let invite_code = fed.invite_code()?;
283        poll("gateway connect-fed", || async {
284            cmd!(self, "connect-fed", invite_code.clone())
285                .run()
286                .await
287                .map_err(ControlFlow::Continue)?;
288            Ok(())
289        })
290        .await?;
291        Ok(())
292    }
293
294    pub async fn recover_fed(&self, fed: &Federation) -> Result<()> {
295        let federation_id = fed.calculate_federation_id();
296        let invite_code = fed.invite_code()?;
297        info!(target: LOG_DEVIMINT, federation_id = %federation_id, "Recovering...");
298        poll("gateway connect-fed --recover=true", || async {
299            cmd!(self, "connect-fed", invite_code.clone(), "--recover=true")
300                .run()
301                .await
302                .map_err(ControlFlow::Continue)?;
303            Ok(())
304        })
305        .await?;
306        Ok(())
307    }
308
309    pub async fn backup_to_fed(&self, fed: &Federation) -> Result<()> {
310        let federation_id = fed.calculate_federation_id();
311        cmd!(self, "ecash", "backup", "--federation-id", federation_id)
312            .run()
313            .await?;
314        Ok(())
315    }
316
317    pub async fn get_pegin_addr(&self, fed_id: &str) -> Result<String> {
318        Ok(cmd!(self, "ecash", "pegin", "--federation-id={fed_id}")
319            .out_json()
320            .await?
321            .as_str()
322            .context("address must be a string")?
323            .to_owned())
324    }
325
326    pub async fn get_ln_onchain_address(&self) -> Result<String> {
327        cmd!(self, "onchain", "address").out_string().await
328    }
329
330    pub async fn get_mnemonic(&self) -> Result<MnemonicResponse> {
331        let value = retry(
332            "Getting gateway mnemonic",
333            backoff_util::aggressive_backoff(),
334            || async { cmd!(self, "seed").out_json().await },
335        )
336        .await
337        .context("Getting gateway mnemonic")?;
338
339        Ok(serde_json::from_value(value)?)
340    }
341
342    pub async fn leave_federation(&self, federation_id: FederationId) -> Result<()> {
343        cmd!(self, "leave-fed", "--federation-id", federation_id)
344            .run()
345            .await?;
346        Ok(())
347    }
348
349    pub async fn create_invoice(&self, amount_msats: u64) -> Result<Bolt11Invoice> {
350        Ok(Bolt11Invoice::from_str(
351            &cmd!(self, "lightning", "create-invoice", amount_msats)
352                .out_string()
353                .await?,
354        )?)
355    }
356
357    pub async fn pay_invoice(&self, invoice: Bolt11Invoice) -> Result<()> {
358        cmd!(self, "lightning", "pay-invoice", invoice.to_string())
359            .run()
360            .await?;
361
362        Ok(())
363    }
364
365    pub async fn send_ecash(&self, federation_id: String, amount_msats: u64) -> Result<String> {
366        let value = cmd!(
367            self,
368            "ecash",
369            "send",
370            "--federation-id",
371            federation_id,
372            amount_msats
373        )
374        .out_json()
375        .await?;
376        let ecash: String = serde_json::from_value(
377            value
378                .get("notes")
379                .expect("notes key does not exist")
380                .clone(),
381        )?;
382        Ok(ecash)
383    }
384
385    pub async fn receive_ecash(&self, ecash: String) -> Result<()> {
386        cmd!(self, "ecash", "receive", "--notes", ecash)
387            .run()
388            .await?;
389        Ok(())
390    }
391
392    pub async fn get_balances(&self) -> Result<GatewayBalances> {
393        let value = cmd!(self, "get-balances").out_json().await?;
394        Ok(serde_json::from_value(value)?)
395    }
396
397    pub async fn ecash_balance(&self, federation_id: String) -> anyhow::Result<u64> {
398        let federation_id = FederationId::from_str(&federation_id)?;
399        let balances = self.get_balances().await?;
400        let ecash_balance = balances
401            .ecash_balances
402            .into_iter()
403            .find(|info| info.federation_id == federation_id)
404            .ok_or(anyhow::anyhow!("Gateway is not joined to federation"))?
405            .ecash_balance_msats
406            .msats;
407        Ok(ecash_balance)
408    }
409
410    pub async fn send_onchain(
411        &self,
412        bitcoind: &Bitcoind,
413        amount: BitcoinAmountOrAll,
414        fee_rate: u64,
415    ) -> Result<bitcoin::Txid> {
416        let withdraw_address = bitcoind.get_new_address().await?;
417        let value = cmd!(
418            self,
419            "onchain",
420            "send",
421            "--address",
422            withdraw_address,
423            "--amount",
424            amount,
425            "--fee-rate-sats-per-vbyte",
426            fee_rate
427        )
428        .out_json()
429        .await?;
430
431        let txid: bitcoin::Txid = serde_json::from_value(value)?;
432        Ok(txid)
433    }
434
435    pub async fn close_channel(&self, remote_pubkey: PublicKey, force: bool) -> Result<()> {
436        let gateway_cli_version = crate::util::GatewayCli::version_or_default().await;
437        let mut close_channel = if force && gateway_cli_version >= *VERSION_0_9_0_ALPHA {
438            cmd!(
439                self,
440                "lightning",
441                "close-channels-with-peer",
442                "--pubkey",
443                remote_pubkey,
444                "--force",
445            )
446        } else {
447            cmd!(
448                self,
449                "lightning",
450                "close-channels-with-peer",
451                "--pubkey",
452                remote_pubkey,
453            )
454        };
455
456        close_channel.run().await?;
457
458        Ok(())
459    }
460
461    pub async fn close_all_channels(&self, force: bool) -> Result<()> {
462        let channels = self.list_channels().await?;
463
464        for chan in channels {
465            let remote_pubkey = chan.remote_pubkey;
466            self.close_channel(remote_pubkey, force).await?;
467        }
468
469        Ok(())
470    }
471
472    /// Open a channel with the gateway's lightning node, returning the funding
473    /// transaction txid.
474    pub async fn open_channel(
475        &self,
476        gw: &Gatewayd,
477        channel_size_sats: u64,
478        push_amount_sats: Option<u64>,
479    ) -> Result<Txid> {
480        let pubkey = gw.lightning_pubkey().await?;
481
482        let mut command = cmd!(
483            self,
484            "lightning",
485            "open-channel",
486            "--pubkey",
487            pubkey,
488            "--host",
489            gw.lightning_node_addr,
490            "--channel-size-sats",
491            channel_size_sats,
492            "--push-amount-sats",
493            push_amount_sats.unwrap_or(0)
494        );
495
496        Ok(Txid::from_str(&command.out_string().await?)?)
497    }
498
499    pub async fn list_channels(&self) -> Result<Vec<ChannelInfo>> {
500        let gateway_cli_version = crate::util::GatewayCli::version_or_default().await;
501        let channels = if gateway_cli_version >= *VERSION_0_9_0_ALPHA || self.is_forced_current() {
502            cmd!(self, "lightning", "list-channels").out_json().await?
503        } else {
504            cmd!(self, "lightning", "list-active-channels")
505                .out_json()
506                .await?
507        };
508
509        let channels = channels
510            .as_array()
511            .context("channels must be an array")?
512            .iter()
513            .map(|channel| {
514                let remote_pubkey = channel["remote_pubkey"]
515                    .as_str()
516                    .context("remote_pubkey must be a string")?
517                    .to_owned();
518                let channel_size_sats = channel["channel_size_sats"]
519                    .as_u64()
520                    .context("channel_size_sats must be a u64")?;
521                let outbound_liquidity_sats = channel["outbound_liquidity_sats"]
522                    .as_u64()
523                    .context("outbound_liquidity_sats must be a u64")?;
524                let inbound_liquidity_sats = channel["inbound_liquidity_sats"]
525                    .as_u64()
526                    .context("inbound_liquidity_sats must be a u64")?;
527                let is_active = channel["is_active"].as_bool().unwrap_or(true);
528                Ok(ChannelInfo {
529                    remote_pubkey: remote_pubkey
530                        .parse()
531                        .expect("Lightning node returned invalid remote channel pubkey"),
532                    channel_size_sats,
533                    outbound_liquidity_sats,
534                    inbound_liquidity_sats,
535                    is_active,
536                })
537            })
538            .collect::<Result<Vec<ChannelInfo>>>()?;
539        Ok(channels)
540    }
541
542    pub async fn wait_for_block_height(&self, target_block_height: u64) -> Result<()> {
543        poll("waiting for block height", || async {
544            let info = self.get_info().await.map_err(ControlFlow::Continue)?;
545            let value = info.get("block_height");
546            if let Some(height) = value {
547                let block_height: Option<u32> = serde_json::from_value(height.clone())
548                    .context("Could not parse block height")
549                    .map_err(ControlFlow::Continue)?;
550                let Some(block_height) = block_height else {
551                    return Err(ControlFlow::Continue(anyhow!("Not synced any blocks yet")));
552                };
553                let synced = info["synced_to_chain"]
554                    .as_bool()
555                    .expect("Could not get synced_to_chain");
556                if block_height >= target_block_height as u32 && synced {
557                    return Ok(());
558                }
559            }
560            Err(ControlFlow::Continue(anyhow!("Not synced to block")))
561        })
562        .await?;
563        Ok(())
564    }
565
566    pub async fn get_lightning_fee(&self, fed_id: String) -> Result<PaymentFee> {
567        let info_value = self.get_info().await?;
568        let federations = info_value["federations"]
569            .as_array()
570            .expect("federations is an array");
571
572        let fed = federations
573            .iter()
574            .find(|fed| {
575                serde_json::from_value::<String>(fed["federation_id"].clone())
576                    .expect("could not deserialize federation_id")
577                    == fed_id
578            })
579            .ok_or_else(|| anyhow!("Federation not found"))?;
580
581        let lightning_fee = fed["config"]["lightning_fee"].clone();
582        let base: Amount = serde_json::from_value(lightning_fee["base"].clone())
583            .map_err(|e| anyhow!("Couldnt parse base: {}", e))?;
584        let parts_per_million: u64 =
585            serde_json::from_value(lightning_fee["parts_per_million"].clone())
586                .map_err(|e| anyhow!("Couldnt parse parts_per_million: {}", e))?;
587
588        Ok(PaymentFee {
589            base,
590            parts_per_million,
591        })
592    }
593
594    pub async fn set_federation_routing_fee(
595        &self,
596        fed_id: String,
597        base: u64,
598        ppm: u64,
599    ) -> Result<()> {
600        cmd!(
601            self,
602            "cfg",
603            "set-fees",
604            "--federation-id",
605            fed_id,
606            "--ln-base",
607            base,
608            "--ln-ppm",
609            ppm
610        )
611        .run()
612        .await?;
613
614        Ok(())
615    }
616
617    pub async fn set_federation_transaction_fee(
618        &self,
619        fed_id: String,
620        base: u64,
621        ppm: u64,
622    ) -> Result<()> {
623        cmd!(
624            self,
625            "cfg",
626            "set-fees",
627            "--federation-id",
628            fed_id,
629            "--tx-base",
630            base,
631            "--tx-ppm",
632            ppm
633        )
634        .run()
635        .await?;
636
637        Ok(())
638    }
639
640    pub async fn payment_summary(&self) -> Result<PaymentSummaryResponse> {
641        let out_json = cmd!(self, "payment-summary").out_json().await?;
642        Ok(serde_json::from_value(out_json).expect("Could not deserialize PaymentSummaryResponse"))
643    }
644
645    pub async fn wait_bolt11_invoice(&self, payment_hash: Vec<u8>) -> Result<()> {
646        let gatewayd_version = crate::util::Gatewayd::version_or_default().await;
647        if gatewayd_version < *VERSION_0_7_0_ALPHA {
648            if let LightningNode::Lnd(lnd) = &self.ln {
649                return lnd.wait_bolt11_invoice(payment_hash).await;
650            }
651
652            debug!("Skipping bolt11 invoice check because it is not supported until v0.7");
653            return Ok(());
654        }
655
656        let payment_hash =
657            sha256::Hash::from_slice(&payment_hash).expect("Could not parse payment hash");
658        let invoice_val = cmd!(
659            self,
660            "lightning",
661            "get-invoice",
662            "--payment-hash",
663            payment_hash
664        )
665        .out_json()
666        .await?;
667        let invoice: GetInvoiceResponse =
668            serde_json::from_value(invoice_val).expect("Could not parse GetInvoiceResponse");
669        anyhow::ensure!(invoice.status == PaymentStatus::Succeeded);
670
671        Ok(())
672    }
673
674    pub async fn list_transactions(
675        &self,
676        start: SystemTime,
677        end: SystemTime,
678    ) -> Result<Vec<PaymentDetails>> {
679        let start_datetime: DateTime<Utc> = start.into();
680        let end_datetime: DateTime<Utc> = end.into();
681        let response = cmd!(
682            self,
683            "lightning",
684            "list-transactions",
685            "--start-time",
686            start_datetime.to_rfc3339(),
687            "--end-time",
688            end_datetime.to_rfc3339()
689        )
690        .out_json()
691        .await?;
692        let transactions = serde_json::from_value::<ListTransactionsResponse>(response)?;
693        Ok(transactions.transactions)
694    }
695
696    pub async fn create_offer(&self, amount: Option<Amount>) -> Result<String> {
697        let offer_value = if let Some(amount) = amount {
698            cmd!(
699                self,
700                "lightning",
701                "create-offer",
702                "--amount-msat",
703                amount.msats
704            )
705            .out_json()
706            .await?
707        } else {
708            cmd!(self, "lightning", "create-offer").out_json().await?
709        };
710        let offer_response = serde_json::from_value::<CreateOfferResponse>(offer_value)
711            .expect("Could not parse offer response");
712        Ok(offer_response.offer)
713    }
714
715    pub async fn pay_offer(&self, offer: String, amount: Option<Amount>) -> Result<()> {
716        if let Some(amount) = amount {
717            cmd!(
718                self,
719                "lightning",
720                "pay-offer",
721                "--offer",
722                offer,
723                "--amount-msat",
724                amount.msats
725            )
726            .run()
727            .await?;
728        } else {
729            cmd!(self, "lightning", "pay-offer", "--offer", offer)
730                .run()
731                .await?;
732        }
733
734        Ok(())
735    }
736}