devimint/
external.rs

1use std::ops::ControlFlow;
2use std::path::Path;
3use std::str::FromStr;
4use std::sync::Arc;
5use std::time::Duration;
6
7use anyhow::{Context, Result, anyhow, bail};
8use bitcoin::hashes::Hash;
9use bitcoincore_rpc::RpcApi;
10use bitcoincore_rpc::bitcoin::{Address, BlockHash};
11use bitcoincore_rpc::bitcoincore_rpc_json::{GetBalancesResult, GetBlockchainInfoResult};
12use bitcoincore_rpc::jsonrpc::error::RpcError;
13use cln_rpc::ClnRpc;
14use cln_rpc::primitives::{Amount as ClnRpcAmount, AmountOrAny};
15use fedimint_core::encoding::Encodable;
16use fedimint_core::task::jit::{JitTry, JitTryAnyhow};
17use fedimint_core::task::{block_in_place, block_on, sleep, timeout};
18use fedimint_core::util::{FmtCompact as _, write_overwrite_async};
19use fedimint_logging::LOG_DEVIMINT;
20use fedimint_testing::ln::LightningNodeType;
21use futures::StreamExt;
22use hex::ToHex;
23use itertools::Itertools;
24use tokio::fs;
25use tokio::sync::{MappedMutexGuard, Mutex, MutexGuard};
26use tokio::task::spawn_blocking;
27use tokio::time::Instant;
28use tonic_lnd::Client as LndClient;
29use tonic_lnd::lnrpc::{ChanInfoRequest, GetInfoRequest, ListChannelsRequest};
30use tracing::{debug, error, info, trace, warn};
31
32use crate::util::{ProcessHandle, ProcessManager, poll, poll_with_timeout};
33use crate::vars::utf8;
34use crate::version_constants::VERSION_0_5_0_ALPHA;
35use crate::{Gatewayd, cmd, poll_eq};
36
37#[derive(Clone)]
38pub struct Bitcoind {
39    pub client: Arc<bitcoincore_rpc::Client>,
40    pub(crate) wallet_client: Arc<JitTryAnyhow<Arc<bitcoincore_rpc::Client>>>,
41    pub(crate) _process: ProcessHandle,
42}
43
44impl Bitcoind {
45    pub async fn new(processmgr: &ProcessManager, skip_setup: bool) -> Result<Self> {
46        let btc_dir = utf8(&processmgr.globals.FM_BTC_DIR);
47
48        let conf = format!(
49            include_str!("cfg/bitcoin.conf"),
50            rpc_port = processmgr.globals.FM_PORT_BTC_RPC,
51            p2p_port = processmgr.globals.FM_PORT_BTC_P2P,
52            zmq_pub_raw_block = processmgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_BLOCK,
53            zmq_pub_raw_tx = processmgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_TX,
54            tx_index = "0",
55        );
56        write_overwrite_async(processmgr.globals.FM_BTC_DIR.join("bitcoin.conf"), conf).await?;
57        let process = processmgr
58            .spawn_daemon(
59                "bitcoind",
60                cmd!(crate::util::Bitcoind, "-datadir={btc_dir}"),
61            )
62            .await?;
63
64        let url = processmgr.globals.FM_BITCOIN_RPC_URL.parse()?;
65        debug!("Parsed FM_BITCOIN_RPC_URL: {:?}", &url);
66        let (host, auth) = fedimint_bitcoind::bitcoincore::from_url_to_url_auth(&url)?;
67        debug!("bitcoind host: {:?}, auth: {:?}", &host, auth);
68        let client =
69            Self::new_bitcoin_rpc(&host, auth.clone()).context("Failed to connect to bitcoind")?;
70        let wallet_client = JitTry::new_try(move || async move {
71            let client =
72                Self::new_bitcoin_rpc(&host, auth).context("Failed to connect to bitcoind")?;
73            Self::init(&client, skip_setup).await?;
74            Ok(Arc::new(client))
75        });
76
77        let bitcoind = Self {
78            _process: process,
79            client: Arc::new(client),
80            wallet_client: Arc::new(wallet_client),
81        };
82
83        bitcoind.poll_ready().await?;
84
85        Ok(bitcoind)
86    }
87
88    fn new_bitcoin_rpc(
89        url: &str,
90        auth: bitcoincore_rpc::Auth,
91    ) -> anyhow::Result<bitcoincore_rpc::Client> {
92        // The default (15s) is too low for some test environments
93        const RPC_TIMEOUT: Duration = Duration::from_secs(45);
94        let mut builder = bitcoincore_rpc::jsonrpc::simple_http::Builder::new()
95            .url(url)?
96            .timeout(RPC_TIMEOUT);
97        let (user, pass) = auth.get_user_pass()?;
98        if let Some(user) = user {
99            builder = builder.auth(user, pass);
100        }
101        let client = bitcoincore_rpc::jsonrpc::Client::with_transport(builder.build());
102        Ok(bitcoincore_rpc::Client::from_jsonrpc(client))
103    }
104
105    pub(crate) async fn init(client: &bitcoincore_rpc::Client, skip_setup: bool) -> Result<()> {
106        debug!("Setting up bitcoind");
107        // create RPC wallet
108        for attempt in 0.. {
109            match block_in_place(|| client.create_wallet("", None, None, None, None)) {
110                Ok(_) => {
111                    break;
112                }
113                Err(err) => {
114                    if err.to_string().contains("Database already exists") {
115                        break;
116                    }
117                    if attempt % 20 == 19 {
118                        debug!(target: LOG_DEVIMINT, %attempt, err = %err.fmt_compact(), "Waiting for initial bitcoind wallet initialization");
119                    }
120                    sleep(Duration::from_millis(100)).await;
121                }
122            }
123        }
124
125        if !skip_setup {
126            // mine blocks
127            let blocks = 101;
128            let address = block_in_place(|| client.get_new_address(None, None))?
129                .require_network(bitcoin::Network::Regtest)
130                .expect("Devimint always runs in regtest");
131            debug!(target: LOG_DEVIMINT, blocks_num=blocks, %address, "Mining blocks to address");
132            block_in_place(|| {
133                client
134                    .generate_to_address(blocks, &address)
135                    .context("Failed to generate blocks")
136            })?;
137            trace!(target: LOG_DEVIMINT, blocks_num=blocks, %address, "Mining blocks to address complete");
138        }
139
140        // wait bitcoind is ready
141        poll("bitcoind", || async {
142            let info = block_in_place(|| client.get_blockchain_info())
143                .context("bitcoind getblockchaininfo")
144                .map_err(ControlFlow::Continue)?;
145            if info.blocks > 100 {
146                Ok(())
147            } else {
148                Err(ControlFlow::Continue(anyhow!(
149                    "not enough blocks: {}",
150                    info.blocks
151                )))
152            }
153        })
154        .await?;
155        debug!("Bitcoind ready");
156        Ok(())
157    }
158
159    /// Poll until bitcoind rpc responds for basic commands
160    async fn poll_ready(&self) -> anyhow::Result<()> {
161        poll("bitcoind rpc ready", || async {
162            self.get_block_count()
163                .await
164                .map_err(ControlFlow::Continue::<anyhow::Error, _>)?;
165            Ok(())
166        })
167        .await
168    }
169
170    /// Client that can has wallet initialized, can generate internal addresses
171    /// and send funds
172    pub async fn wallet_client(&self) -> anyhow::Result<&Self> {
173        self.wallet_client.get_try().await?;
174        Ok(self)
175    }
176
177    /// Returns the total number of blocks in the chain.
178    ///
179    /// Fedimint's IBitcoindRpc considers block count the total number of
180    /// blocks, where bitcoind's rpc returns the height. Since the genesis
181    /// block has height 0, we need to add 1 to get the total block count.
182    pub async fn get_block_count(&self) -> Result<u64> {
183        let client = self.client.clone();
184        Ok(spawn_blocking(move || client.get_block_count()).await?? + 1)
185    }
186
187    pub async fn mine_blocks_no_wait(&self, block_num: u64) -> Result<u64> {
188        let start_time = Instant::now();
189        debug!(target: LOG_DEVIMINT, ?block_num, "Mining bitcoin blocks");
190        let addr = self.get_new_address().await?;
191        let initial_block_count = self.get_block_count().await?;
192        self.generate_to_address(block_num, addr).await?;
193        debug!(target: LOG_DEVIMINT,
194            elapsed_ms = %start_time.elapsed().as_millis(),
195            ?block_num, "Mined blocks (no wait)");
196
197        Ok(initial_block_count)
198    }
199
200    pub async fn mine_blocks(&self, block_num: u64) -> Result<()> {
201        let start_time = Instant::now();
202        debug!(target: LOG_DEVIMINT, ?block_num, "Mining bitcoin blocks");
203        let addr = self.get_new_address().await?;
204        let initial_block_count = self.get_block_count().await?;
205        self.generate_to_address(block_num, addr).await?;
206        while self.get_block_count().await? < initial_block_count + block_num {
207            trace!(target: LOG_DEVIMINT, ?block_num, "Waiting for blocks to be mined");
208            sleep(Duration::from_millis(100)).await;
209        }
210
211        debug!(target: LOG_DEVIMINT,
212            elapsed_ms = %start_time.elapsed().as_millis(),
213            ?block_num, "Mined blocks");
214
215        Ok(())
216    }
217
218    pub async fn send_to(&self, addr: String, amount: u64) -> Result<bitcoin::Txid> {
219        debug!(target: LOG_DEVIMINT, amount, addr, "Sending funds from bitcoind");
220        let amount = bitcoin::Amount::from_sat(amount);
221        let tx = self
222            .wallet_client()
223            .await?
224            .send_to_address(
225                bitcoin::Address::from_str(&addr)?
226                    .require_network(bitcoin::Network::Regtest)
227                    .expect("Devimint always runs in regtest"),
228                amount,
229            )
230            .await?;
231        Ok(tx)
232    }
233
234    pub async fn get_txout_proof(&self, txid: bitcoin::Txid) -> Result<String> {
235        let client = self.wallet_client().await?.clone();
236        let proof = spawn_blocking(move || client.client.get_tx_out_proof(&[txid], None)).await??;
237        Ok(proof.encode_hex())
238    }
239
240    /// Poll a transaction by its txid until it is found in the mempool or in a
241    /// block.
242    pub async fn poll_get_transaction(&self, txid: bitcoin::Txid) -> anyhow::Result<String> {
243        poll("Waiting for transaction in mempool", || async {
244            match self
245                .get_transaction(txid)
246                .await
247                .context("getrawtransaction")
248            {
249                Ok(Some(tx)) => Ok(tx),
250                Ok(None) => Err(ControlFlow::Continue(anyhow::anyhow!(
251                    "Transaction not found yet"
252                ))),
253                Err(err) => Err(ControlFlow::Break(err)),
254            }
255        })
256        .await
257    }
258
259    /// Get a transaction by its txid. Checks the mempool and all blocks.
260    async fn get_transaction(&self, txid: bitcoin::Txid) -> Result<Option<String>> {
261        // Check the mempool.
262        match self.get_raw_transaction(txid, None).await {
263            // The RPC succeeded, and the transaction was not found in the mempool. Continue to
264            // check blocks.
265            Ok(None) => {}
266            // The RPC failed, or the transaction was found in the mempool. Return the result.
267            other => return other,
268        };
269
270        let block_height = self.get_block_count().await? - 1;
271
272        // Check each block for the tx, starting at the chain tip.
273        // Buffer the requests to avoid spamming bitcoind.
274        // We're doing this after checking the mempool since the tx should
275        // usually be in the mempool, and we don't want to needlessly hit
276        // the bitcoind with block requests.
277        let mut buffered_tx_stream = futures::stream::iter((0..block_height).rev())
278            .map(|height| async move {
279                let block_hash = self.get_block_hash(height).await?;
280                self.get_raw_transaction(txid, Some(block_hash)).await
281            })
282            .buffered(32);
283
284        while let Some(tx_or) = buffered_tx_stream.next().await {
285            match tx_or {
286                // The RPC succeeded, and the transaction was not found in the block. Continue to
287                // the next block.
288                Ok(None) => continue,
289                // The RPC failed, or the transaction was found in the block. Return the result.
290                other => return other,
291            };
292        }
293
294        // The transaction was not found in the mempool or any block.
295        Ok(None)
296    }
297
298    async fn get_raw_transaction(
299        &self,
300        txid: bitcoin::Txid,
301        block_hash: Option<BlockHash>,
302    ) -> Result<Option<String>> {
303        let client = self.client.clone();
304        let tx_or =
305            spawn_blocking(move || client.get_raw_transaction(&txid, block_hash.as_ref())).await?;
306
307        let tx = match tx_or {
308            Ok(tx) => tx,
309            // `getrawtransaction` returns a JSON-RPC error with code -5 if the command
310            // reaches bitcoind but is not found. See here:
311            // https://github.com/bitcoin/bitcoin/blob/25dacae9c7feb31308271e2fd5a127c1fc230c2f/src/rpc/rawtransaction.cpp#L360-L376
312            // https://github.com/bitcoin/bitcoin/blob/25dacae9c7feb31308271e2fd5a127c1fc230c2f/src/rpc/protocol.h#L42
313            Err(bitcoincore_rpc::Error::JsonRpc(bitcoincore_rpc::jsonrpc::Error::Rpc(
314                RpcError { code: -5, .. },
315            ))) => return Ok(None),
316            Err(err) => return Err(err.into()),
317        };
318        let bytes = tx.consensus_encode_to_vec();
319        Ok(Some(bytes.encode_hex()))
320    }
321
322    async fn get_block_hash(&self, height: u64) -> Result<BlockHash> {
323        let client = self.client.clone();
324        Ok(spawn_blocking(move || client.get_block_hash(height)).await??)
325    }
326
327    pub async fn get_new_address(&self) -> Result<Address> {
328        let client = self.wallet_client().await?.clone();
329        let addr = spawn_blocking(move || client.client.get_new_address(None, None))
330            .await??
331            .require_network(bitcoin::Network::Regtest)
332            .expect("Devimint always runs in regtest");
333        Ok(addr)
334    }
335
336    pub async fn generate_to_address(
337        &self,
338        block_num: u64,
339        address: Address,
340    ) -> Result<Vec<BlockHash>> {
341        let client = self.wallet_client().await?.clone();
342        Ok(
343            spawn_blocking(move || client.client.generate_to_address(block_num, &address))
344                .await??,
345        )
346    }
347
348    pub async fn get_blockchain_info(&self) -> anyhow::Result<GetBlockchainInfoResult> {
349        let client = self.client.clone();
350        Ok(spawn_blocking(move || client.get_blockchain_info()).await??)
351    }
352
353    pub async fn send_to_address(
354        &self,
355        addr: Address,
356        amount: bitcoin::Amount,
357    ) -> anyhow::Result<bitcoin::Txid> {
358        let client = self.wallet_client().await?.clone();
359        Ok(spawn_blocking(move || {
360            client
361                .client
362                .send_to_address(&addr, amount, None, None, None, None, None, None)
363        })
364        .await??)
365    }
366
367    pub(crate) async fn get_balances(&self) -> anyhow::Result<GetBalancesResult> {
368        let client = self.wallet_client().await?.clone();
369        Ok(spawn_blocking(move || client.client.get_balances()).await??)
370    }
371
372    pub(crate) fn get_jsonrpc_client(&self) -> &bitcoincore_rpc::jsonrpc::Client {
373        self.client.get_jsonrpc_client()
374    }
375}
376
377pub struct LightningdProcessHandle(ProcessHandle);
378
379impl LightningdProcessHandle {
380    async fn terminate(&self) -> Result<()> {
381        if self.0.is_running().await {
382            self.0.terminate().await
383        } else {
384            Ok(())
385        }
386    }
387}
388
389impl Drop for LightningdProcessHandle {
390    fn drop(&mut self) {
391        // Terminate cln in a controlled way, otherwise it may leave running processes.
392        block_in_place(|| {
393            if let Err(e) = block_on(self.terminate()) {
394                warn!(target: LOG_DEVIMINT, "failed to terminate lightningd: {e:?}");
395            }
396        });
397    }
398}
399
400#[derive(Clone)]
401pub struct Lightningd {
402    pub(crate) rpc: Arc<Mutex<ClnRpc>>,
403    pub(crate) process: Arc<LightningdProcessHandle>,
404    pub(crate) bitcoind: Bitcoind,
405}
406
407impl Lightningd {
408    pub async fn new(process_mgr: &ProcessManager, bitcoind: Bitcoind) -> Result<Self> {
409        let cln_dir = &process_mgr.globals.FM_CLN_DIR;
410        let conf = format!(
411            include_str!("cfg/lightningd.conf"),
412            port = process_mgr.globals.FM_PORT_CLN,
413            bitcoin_rpcport = process_mgr.globals.FM_PORT_BTC_RPC,
414            log_path = process_mgr.globals.FM_CLN_DIR.join("cln.log").display(),
415        );
416        write_overwrite_async(process_mgr.globals.FM_CLN_DIR.join("config"), conf).await?;
417        let process = Lightningd::start(process_mgr, cln_dir).await?;
418
419        let socket_cln = cln_dir.join("regtest/lightning-rpc");
420        poll("lightningd", || async {
421            ClnRpc::new(socket_cln.clone())
422                .await
423                .context("connect to lightningd")
424                .map_err(ControlFlow::Continue)
425        })
426        .await?;
427        let rpc = ClnRpc::new(socket_cln).await?;
428        Ok(Self {
429            bitcoind,
430            rpc: Arc::new(Mutex::new(rpc)),
431            process: Arc::new(LightningdProcessHandle(process)),
432        })
433    }
434
435    pub async fn start(process_mgr: &ProcessManager, cln_dir: &Path) -> Result<ProcessHandle> {
436        let btc_dir = utf8(&process_mgr.globals.FM_BTC_DIR);
437        let cmd = cmd!(
438            crate::util::Lightningd,
439            "--dev-fast-gossip",
440            "--dev-bitcoind-poll=1",
441            format!("--lightning-dir={}", utf8(cln_dir)),
442            format!("--bitcoin-datadir={btc_dir}"),
443        );
444
445        process_mgr.spawn_daemon("lightningd", cmd).await
446    }
447
448    pub async fn request<R>(&self, request: R) -> Result<R::Response>
449    where
450        R: cln_rpc::model::TypedRequest + serde::Serialize + std::fmt::Debug,
451        R::Response: serde::de::DeserializeOwned + std::fmt::Debug,
452    {
453        let mut rpc = self.rpc.lock().await;
454        Ok(rpc.call_typed(&request).await?)
455    }
456
457    // TODO: Remove this method once we drop backwards compatibility for versions
458    // earlier than v0.4.0-alpha.
459    async fn await_block_processing(&self) -> Result<()> {
460        poll("lightningd block processing", || async {
461            let btc_height = self
462                .bitcoind
463                .get_blockchain_info()
464                .await
465                .context("bitcoind getblockchaininfo")
466                .map_err(ControlFlow::Continue)?
467                .blocks;
468            let lnd_height = self
469                .request(cln_rpc::model::requests::GetinfoRequest {})
470                .await
471                .map_err(ControlFlow::Continue)?
472                .blockheight;
473            poll_eq!(u64::from(lnd_height), btc_height)
474        })
475        .await?;
476        Ok(())
477    }
478
479    pub async fn pub_key(&self) -> Result<String> {
480        Ok(self
481            .request(cln_rpc::model::requests::GetinfoRequest {})
482            .await?
483            .id
484            .to_string())
485    }
486
487    pub async fn terminate(self) -> Result<()> {
488        self.process.terminate().await
489    }
490
491    pub async fn invoice(
492        &self,
493        amount: u64,
494        description: String,
495        label: String,
496    ) -> anyhow::Result<String> {
497        let invoice = self
498            .request(cln_rpc::model::requests::InvoiceRequest {
499                amount_msat: AmountOrAny::Amount(ClnRpcAmount::from_msat(amount)),
500                description,
501                label,
502                expiry: Some(60),
503                fallbacks: None,
504                preimage: None,
505                cltv: None,
506                deschashonly: None,
507                exposeprivatechannels: None,
508            })
509            .await?
510            .bolt11;
511        Ok(invoice)
512    }
513
514    pub async fn pay_bolt11_invoice(&self, invoice: String) -> anyhow::Result<()> {
515        let invoice_status = self
516            .request(cln_rpc::model::requests::PayRequest {
517                bolt11: invoice,
518                amount_msat: None,
519                label: None,
520                riskfactor: None,
521                maxfeepercent: None,
522                retry_for: None,
523                maxdelay: None,
524                exemptfee: None,
525                localinvreqid: None,
526                exclude: None,
527                maxfee: None,
528                description: None,
529                partial_msat: None,
530            })
531            .await?
532            .status;
533
534        anyhow::ensure!(matches!(
535            invoice_status,
536            cln_rpc::model::responses::PayStatus::COMPLETE
537        ));
538
539        Ok(())
540    }
541
542    pub async fn wait_any_bolt11_invoice(&self) -> anyhow::Result<()> {
543        let invoice_status = self
544            .request(cln_rpc::model::requests::WaitanyinvoiceRequest {
545                lastpay_index: None,
546                timeout: None,
547            })
548            .await?
549            .status;
550        anyhow::ensure!(matches!(
551            invoice_status,
552            cln_rpc::model::responses::WaitanyinvoiceStatus::PAID
553        ));
554
555        Ok(())
556    }
557}
558
559#[derive(Clone)]
560pub struct Lnd {
561    pub(crate) client: Arc<Mutex<LndClient>>,
562    pub(crate) process: ProcessHandle,
563    pub(crate) _bitcoind: Bitcoind,
564}
565
566impl Lnd {
567    pub async fn new(process_mgr: &ProcessManager, bitcoind: Bitcoind) -> Result<Self> {
568        let (process, client) = Lnd::start(process_mgr).await?;
569        let this = Self {
570            _bitcoind: bitcoind,
571            client: Arc::new(Mutex::new(client)),
572            process,
573        };
574        // wait for lnd rpc to be active
575        poll("lnd_startup", || async {
576            this.pub_key().await.map_err(ControlFlow::Continue)
577        })
578        .await?;
579        Ok(this)
580    }
581
582    pub async fn start(process_mgr: &ProcessManager) -> Result<(ProcessHandle, LndClient)> {
583        let conf = format!(
584            include_str!("cfg/lnd.conf"),
585            listen_port = process_mgr.globals.FM_PORT_LND_LISTEN,
586            rpc_port = process_mgr.globals.FM_PORT_LND_RPC,
587            rest_port = process_mgr.globals.FM_PORT_LND_REST,
588            btc_rpc_port = process_mgr.globals.FM_PORT_BTC_RPC,
589            zmq_pub_raw_block = process_mgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_BLOCK,
590            zmq_pub_raw_tx = process_mgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_TX,
591        );
592        write_overwrite_async(process_mgr.globals.FM_LND_DIR.join("lnd.conf"), conf).await?;
593        let cmd = cmd!(
594            crate::util::Lnd,
595            format!("--lnddir={}", utf8(&process_mgr.globals.FM_LND_DIR))
596        );
597
598        let process = process_mgr.spawn_daemon("lnd", cmd).await?;
599        let lnd_rpc_addr = &process_mgr.globals.FM_LND_RPC_ADDR;
600        let lnd_macaroon = &process_mgr.globals.FM_LND_MACAROON;
601        let lnd_tls_cert = &process_mgr.globals.FM_LND_TLS_CERT;
602        poll("wait for lnd files", || async {
603            if fs::try_exists(lnd_tls_cert)
604                .await
605                .context("lnd tls cert")
606                .map_err(ControlFlow::Continue)?
607                && fs::try_exists(lnd_macaroon)
608                    .await
609                    .context("lnd macaroon")
610                    .map_err(ControlFlow::Continue)?
611            {
612                Ok(())
613            } else {
614                Err(ControlFlow::Continue(anyhow!(
615                    "lnd tls cert or lnd macaroon not found"
616                )))
617            }
618        })
619        .await?;
620
621        let client = poll("lnd_connect", || async {
622            tonic_lnd::connect(
623                lnd_rpc_addr.clone(),
624                lnd_tls_cert.clone(),
625                lnd_macaroon.clone(),
626            )
627            .await
628            .context("lnd connect")
629            .map_err(ControlFlow::Continue)
630        })
631        .await?;
632
633        Ok((process, client))
634    }
635
636    pub async fn lightning_client_lock(
637        &self,
638    ) -> Result<MappedMutexGuard<'_, tonic_lnd::LightningClient>> {
639        let guard = self.client.lock().await;
640        Ok(MutexGuard::map(guard, |client| client.lightning()))
641    }
642
643    pub async fn invoices_client_lock(
644        &self,
645    ) -> Result<MappedMutexGuard<'_, tonic_lnd::InvoicesClient>> {
646        let guard = self.client.lock().await;
647        Ok(MutexGuard::map(guard, |client| client.invoices()))
648    }
649
650    pub async fn pub_key(&self) -> Result<String> {
651        Ok(self
652            .lightning_client_lock()
653            .await?
654            .get_info(GetInfoRequest {})
655            .await?
656            .into_inner()
657            .identity_pubkey)
658    }
659
660    // TODO: Remove this method once we drop backwards compatibility for versions
661    // earlier than v0.4.0-alpha.
662    async fn await_block_processing(&self) -> Result<()> {
663        poll("lnd block processing", || async {
664            let synced = self
665                .lightning_client_lock()
666                .await
667                .map_err(ControlFlow::Break)?
668                .get_info(GetInfoRequest {})
669                .await
670                .context("lnd get_info")
671                .map_err(ControlFlow::Continue)?
672                .into_inner()
673                .synced_to_chain;
674            if synced {
675                Ok(())
676            } else {
677                Err(ControlFlow::Continue(anyhow!("lnd not synced_to_chain")))
678            }
679        })
680        .await?;
681        Ok(())
682    }
683
684    pub async fn terminate(self) -> Result<()> {
685        self.process.terminate().await
686    }
687
688    pub async fn invoice(&self, amount: u64) -> anyhow::Result<(String, Vec<u8>)> {
689        let add_invoice = self
690            .lightning_client_lock()
691            .await?
692            .add_invoice(tonic_lnd::lnrpc::Invoice {
693                value_msat: amount as i64,
694                ..Default::default()
695            })
696            .await?
697            .into_inner();
698        let invoice = add_invoice.payment_request;
699        let payment_hash = add_invoice.r_hash;
700        Ok((invoice, payment_hash))
701    }
702
703    pub async fn pay_bolt11_invoice(&self, invoice: String) -> anyhow::Result<()> {
704        let payment = self
705            .lightning_client_lock()
706            .await?
707            .send_payment_sync(tonic_lnd::lnrpc::SendRequest {
708                payment_request: invoice.clone(),
709                ..Default::default()
710            })
711            .await?
712            .into_inner();
713        let payment_status = self
714            .lightning_client_lock()
715            .await?
716            .list_payments(tonic_lnd::lnrpc::ListPaymentsRequest {
717                include_incomplete: true,
718                ..Default::default()
719            })
720            .await?
721            .into_inner()
722            .payments
723            .into_iter()
724            .find(|p| p.payment_hash == payment.payment_hash.encode_hex::<String>())
725            .context("payment not in list")?
726            .status();
727        anyhow::ensure!(payment_status == tonic_lnd::lnrpc::payment::PaymentStatus::Succeeded);
728
729        Ok(())
730    }
731
732    pub async fn wait_bolt11_invoice(&self, payment_hash: Vec<u8>) -> anyhow::Result<()> {
733        let invoice_status = self
734            .lightning_client_lock()
735            .await?
736            .lookup_invoice(tonic_lnd::lnrpc::PaymentHash {
737                r_hash: payment_hash,
738                ..Default::default()
739            })
740            .await?
741            .into_inner()
742            .state();
743        anyhow::ensure!(invoice_status == tonic_lnd::lnrpc::invoice::InvoiceState::Settled);
744
745        Ok(())
746    }
747
748    pub async fn create_hold_invoice(
749        &self,
750        amount: u64,
751    ) -> anyhow::Result<([u8; 32], String, cln_rpc::primitives::Sha256)> {
752        let preimage = rand::random::<[u8; 32]>();
753        let hash = {
754            let mut engine = bitcoin::hashes::sha256::Hash::engine();
755            bitcoin::hashes::HashEngine::input(&mut engine, &preimage);
756            bitcoin::hashes::sha256::Hash::from_engine(engine)
757        };
758        // TODO(support:v0.5): LNv1 cannot pay HOLD invoices with a CLTV expiry greater
759        // than 500 before v0.5
760        let fedimint_cli_version = crate::util::FedimintCli::version_or_default().await;
761        let cltv_expiry = if fedimint_cli_version >= *VERSION_0_5_0_ALPHA {
762            650
763        } else {
764            100
765        };
766        let hold_request = self
767            .invoices_client_lock()
768            .await?
769            .add_hold_invoice(tonic_lnd::invoicesrpc::AddHoldInvoiceRequest {
770                value_msat: amount as i64,
771                hash: hash.to_byte_array().to_vec(),
772                cltv_expiry,
773                ..Default::default()
774            })
775            .await?
776            .into_inner();
777        let payment_request = hold_request.payment_request;
778        Ok((preimage, payment_request, hash))
779    }
780
781    pub async fn settle_hold_invoice(
782        &self,
783        preimage: [u8; 32],
784        payment_hash: cln_rpc::primitives::Sha256,
785    ) -> anyhow::Result<()> {
786        let mut hold_invoice_subscription = self
787            .invoices_client_lock()
788            .await?
789            .subscribe_single_invoice(tonic_lnd::invoicesrpc::SubscribeSingleInvoiceRequest {
790                r_hash: payment_hash.to_byte_array().to_vec(),
791            })
792            .await?
793            .into_inner();
794        loop {
795            const WAIT_FOR_INVOICE_TIMEOUT: Duration = Duration::from_secs(60);
796            match timeout(
797                WAIT_FOR_INVOICE_TIMEOUT,
798                futures::StreamExt::next(&mut hold_invoice_subscription),
799            )
800            .await
801            {
802                Ok(Some(Ok(invoice))) => {
803                    if invoice.state() == tonic_lnd::lnrpc::invoice::InvoiceState::Accepted {
804                        break;
805                    }
806                    debug!("hold invoice payment state: {:?}", invoice.state());
807                }
808                Ok(Some(Err(e))) => {
809                    bail!("error in invoice subscription: {e:?}");
810                }
811                Ok(None) => {
812                    bail!("invoice subscription ended before invoice was accepted");
813                }
814                Err(_) => {
815                    bail!("timed out waiting for invoice to be accepted")
816                }
817            }
818        }
819
820        self.invoices_client_lock()
821            .await?
822            .settle_invoice(tonic_lnd::invoicesrpc::SettleInvoiceMsg {
823                preimage: preimage.to_vec(),
824            })
825            .await?;
826
827        Ok(())
828    }
829}
830
831// TODO: Remove this method once we drop backwards compatibility for versions
832// earlier than v0.4.0-alpha.
833pub async fn open_channel(
834    process_mgr: &ProcessManager,
835    bitcoind: &Bitcoind,
836    cln: &Lightningd,
837    lnd: &Lnd,
838) -> Result<()> {
839    debug!(target: LOG_DEVIMINT, "Await block ln nodes block processing");
840    tokio::try_join!(cln.await_block_processing(), lnd.await_block_processing())?;
841
842    let cln_addr = cln
843        .request(cln_rpc::model::requests::NewaddrRequest { addresstype: None })
844        .await?
845        .bech32
846        .context("bech32 should be present")?;
847
848    bitcoind.send_to(cln_addr, 100_000_000).await?;
849    bitcoind.mine_blocks(10).await?;
850
851    let lnd_pubkey = lnd.pub_key().await?;
852    let cln_pubkey = cln.pub_key().await?;
853
854    cln.request(cln_rpc::model::requests::ConnectRequest {
855        id: format!(
856            "{}@127.0.0.1:{}",
857            lnd_pubkey, process_mgr.globals.FM_PORT_LND_LISTEN
858        ),
859        host: None,
860        port: None,
861    })
862    .await
863    .context("connect request")?;
864
865    poll("fund channel", || async {
866        cln.request(cln_rpc::model::requests::FundchannelRequest {
867            id: lnd_pubkey
868                .parse()
869                .context("failed to parse lnd pubkey")
870                .map_err(ControlFlow::Break)?,
871            amount: cln_rpc::primitives::AmountOrAll::Amount(
872                cln_rpc::primitives::Amount::from_sat(10_000_000),
873            ),
874            push_msat: Some(cln_rpc::primitives::Amount::from_sat(5_000_000)),
875            feerate: None,
876            announce: None,
877            minconf: None,
878            close_to: None,
879            request_amt: None,
880            compact_lease: None,
881            utxos: None,
882            mindepth: None,
883            reserve: None,
884            channel_type: None,
885        })
886        .await
887        .map_err(ControlFlow::Continue)
888    })
889    .await?;
890
891    bitcoind.mine_blocks(10).await?;
892
893    poll("Wait LND for channel update", || async {
894        let mut lnd_client = lnd.client.lock().await;
895        let channels = lnd_client
896            .lightning()
897            .list_channels(ListChannelsRequest {
898                active_only: true,
899                ..Default::default()
900            })
901            .await
902            .context("lnd list channels")
903            .map_err(ControlFlow::Break)?
904            .into_inner();
905
906        if let Some(channel) = channels
907            .channels
908            .iter()
909            .find(|channel| channel.remote_pubkey == cln_pubkey)
910        {
911            let chan_info = lnd_client
912                .lightning()
913                .get_chan_info(ChanInfoRequest {
914                    chan_id: channel.chan_id,
915                })
916                .await;
917
918            match chan_info {
919                Ok(_) => {
920                    return Ok(());
921                }
922                Err(err) => {
923                    debug!(target: LOG_DEVIMINT, err = %err.fmt_compact(), "Getting chan info failed");
924                }
925            }
926        }
927
928        Err(ControlFlow::Continue(anyhow!("channel not found")))
929    })
930    .await?;
931
932    Ok(())
933}
934
935pub type NamedGateway<'a> = (&'a Gatewayd, &'a str);
936
937#[allow(clippy::similar_names)]
938pub async fn open_channels_between_gateways(
939    bitcoind: &Bitcoind,
940    gateways: &[NamedGateway<'_>],
941) -> Result<()> {
942    let block_height = bitcoind.get_block_count().await? - 1;
943    debug!(target: LOG_DEVIMINT, ?block_height, "Syncing gateway lightning nodes to block height...");
944    futures::future::try_join_all(
945        gateways
946            .iter()
947            .map(|(gw, _gw_name)| gw.wait_for_block_height(block_height)),
948    )
949    .await?;
950
951    debug!(target: LOG_DEVIMINT, "Funding all gateway lightning nodes...");
952    for (gw, _gw_name) in gateways {
953        let funding_addr = gw.get_ln_onchain_address().await?;
954        bitcoind.send_to(funding_addr, 100_000_000).await?;
955    }
956
957    bitcoind.mine_blocks(10).await?;
958
959    let block_height = bitcoind.get_block_count().await? - 1;
960    debug!(target: LOG_DEVIMINT, ?block_height, "Syncing gateway lightning nodes to block height...");
961    futures::future::try_join_all(
962        gateways
963            .iter()
964            .map(|(gw, _gw_name)| gw.wait_for_block_height(block_height)),
965    )
966    .await?;
967
968    // All unique pairs of gateways.
969    // For a list of gateways [A, B, C], this will produce [(A, B), (B, C), (C, A)].
970    // Since the first gateway within each pair initiates the channel open,
971    // order within each pair needs to be enforced so that each Lightning node opens
972    // 1 channel.
973    let gateway_pairs: Vec<(&NamedGateway, &NamedGateway)> = if gateways.len() == 2 {
974        gateways.iter().tuple_windows::<(_, _)>().collect()
975    } else {
976        gateways.iter().circular_tuple_windows::<(_, _)>().collect()
977    };
978
979    let open_channel_tasks = gateway_pairs.iter()
980        .map(|((gw_a, gw_a_name), (gw_b, gw_b_name))| {
981            let gw_a = (*gw_a).clone();
982            let gw_a_name = (*gw_a_name).to_string();
983            let gw_b = (*gw_b).clone();
984            let gw_b_name = (*gw_b_name).to_string();
985
986            let sats_per_side = 5_000_000;
987            debug!(target: LOG_DEVIMINT, from=%gw_a_name, to=%gw_b_name, "Opening channel with {sats_per_side} sats on each side...");
988            tokio::task::spawn(async move {
989                // Sometimes channel openings just after funding the lightning nodes don't work right away.
990                let res = poll_with_timeout(&format!("Open channel from {gw_a_name} to {gw_b_name}"), Duration::from_secs(30), || async {
991                    gw_a.open_channel(&gw_b, sats_per_side * 2, Some(sats_per_side)).await.map_err(ControlFlow::Continue)
992                })
993                .await;
994
995                if res.is_err() {
996                    error!(target: LOG_DEVIMINT, from=%gw_a_name, to=%gw_b_name, ?res, "Failed to open channel");
997                    gw_a.dump_logs()?;
998                    gw_b.dump_logs()?;
999                }
1000
1001                res
1002            })
1003        })
1004        .collect::<Vec<_>>();
1005    let open_channel_task_results: Vec<Result<Result<_, _>, _>> =
1006        futures::future::join_all(open_channel_tasks).await;
1007
1008    let mut channel_funding_txids = Vec::new();
1009    for open_channel_task_result in open_channel_task_results {
1010        match open_channel_task_result {
1011            Ok(Ok(txid)) => {
1012                channel_funding_txids.push(txid);
1013            }
1014            Ok(Err(e)) => {
1015                return Err(anyhow::anyhow!(e));
1016            }
1017            Err(e) => {
1018                return Err(anyhow::anyhow!(e));
1019            }
1020        }
1021    }
1022
1023    // Wait for all channel funding transaction to be known by bitcoind.
1024    let mut is_missing_any_txids = false;
1025    for txid_or in &channel_funding_txids {
1026        if let Some(txid) = txid_or {
1027            bitcoind.poll_get_transaction(*txid).await?;
1028        } else {
1029            is_missing_any_txids = true;
1030        }
1031    }
1032
1033    // `open_channel` may not have sent out the channel funding transaction
1034    // immediately. Since it didn't return a funding txid, we need to wait for
1035    // it to get to the mempool.
1036    if is_missing_any_txids {
1037        fedimint_core::runtime::sleep(Duration::from_secs(2)).await;
1038    }
1039
1040    bitcoind.mine_blocks(10).await?;
1041
1042    let block_height = bitcoind.get_block_count().await? - 1;
1043    debug!(target: LOG_DEVIMINT, ?block_height, "Syncing gateway lightning nodes to block height...");
1044    futures::future::try_join_all(
1045        gateways
1046            .iter()
1047            .map(|(gw, _gw_name)| gw.wait_for_block_height(block_height)),
1048    )
1049    .await?;
1050
1051    for ((gw_a, _gw_a_name), (gw_b, _gw_b_name)) in &gateway_pairs {
1052        let gw_a_node_pubkey = gw_a.lightning_pubkey().await?;
1053        let gw_b_node_pubkey = gw_b.lightning_pubkey().await?;
1054
1055        wait_for_ready_channel_on_gateway_with_counterparty(gw_b, gw_a_node_pubkey).await?;
1056        wait_for_ready_channel_on_gateway_with_counterparty(gw_a, gw_b_node_pubkey).await?;
1057    }
1058
1059    Ok(())
1060}
1061
1062async fn wait_for_ready_channel_on_gateway_with_counterparty(
1063    gw: &Gatewayd,
1064    counterparty_lightning_node_pubkey: bitcoin::secp256k1::PublicKey,
1065) -> anyhow::Result<()> {
1066    poll(
1067        &format!("Wait for {} channel update", gw.gw_name),
1068        || async {
1069            let channels = gw
1070                .list_active_channels()
1071                .await
1072                .context("list channels")
1073                .map_err(ControlFlow::Break)?;
1074
1075            if channels
1076                .iter()
1077                .any(|channel| channel.remote_pubkey == counterparty_lightning_node_pubkey)
1078            {
1079                return Ok(());
1080            }
1081
1082            debug!(target: LOG_DEVIMINT, ?channels, gw = gw.gw_name, "Counterparty channels not found open");
1083
1084            Err(ControlFlow::Continue(anyhow!("channel not found")))
1085        },
1086    )
1087    .await
1088}
1089
1090#[derive(Clone)]
1091pub enum LightningNode {
1092    Lnd(Lnd),
1093    Ldk { name: String },
1094}
1095
1096impl LightningNode {
1097    pub fn ln_type(&self) -> LightningNodeType {
1098        match self {
1099            LightningNode::Lnd(_) => LightningNodeType::Lnd,
1100            LightningNode::Ldk { name: _ } => LightningNodeType::Ldk,
1101        }
1102    }
1103}
1104
1105#[derive(Clone)]
1106pub struct Electrs {
1107    _process: ProcessHandle,
1108    _bitcoind: Bitcoind,
1109}
1110
1111impl Electrs {
1112    pub async fn new(process_mgr: &ProcessManager, bitcoind: Bitcoind) -> Result<Self> {
1113        debug!(target: LOG_DEVIMINT, "Starting electrs");
1114        let electrs_dir = process_mgr
1115            .globals
1116            .FM_ELECTRS_DIR
1117            .to_str()
1118            .context("non utf8 path")?;
1119
1120        let daemon_dir = &process_mgr.globals.FM_BTC_DIR.display();
1121
1122        let conf = format!(
1123            include_str!("cfg/electrs.toml"),
1124            rpc_port = process_mgr.globals.FM_PORT_BTC_RPC,
1125            p2p_port = process_mgr.globals.FM_PORT_BTC_P2P,
1126            electrs_port = process_mgr.globals.FM_PORT_ELECTRS,
1127            monitoring_port = process_mgr.globals.FM_PORT_ELECTRS_MONITORING,
1128        );
1129        debug!("electrs conf: {:?}", conf);
1130        write_overwrite_async(
1131            process_mgr.globals.FM_ELECTRS_DIR.join("electrs.toml"),
1132            conf,
1133        )
1134        .await?;
1135        let cmd = cmd!(
1136            crate::util::Electrs,
1137            "--conf-dir={electrs_dir}",
1138            "--db-dir={electrs_dir}",
1139            "--daemon-dir={daemon_dir}"
1140        );
1141        let process = process_mgr.spawn_daemon("electrs", cmd).await?;
1142        debug!(target: LOG_DEVIMINT, "Electrs ready");
1143
1144        Ok(Self {
1145            _bitcoind: bitcoind,
1146            _process: process,
1147        })
1148    }
1149}
1150
1151#[derive(Clone)]
1152pub struct Esplora {
1153    _process: ProcessHandle,
1154    _bitcoind: Bitcoind,
1155}
1156
1157impl Esplora {
1158    pub async fn new(process_mgr: &ProcessManager, bitcoind: Bitcoind) -> Result<Self> {
1159        debug!("Starting esplora");
1160        let daemon_dir = process_mgr
1161            .globals
1162            .FM_BTC_DIR
1163            .to_str()
1164            .context("non utf8 path")?;
1165        let esplora_dir = process_mgr
1166            .globals
1167            .FM_ESPLORA_DIR
1168            .to_str()
1169            .context("non utf8 path")?;
1170
1171        let btc_rpc_port = process_mgr.globals.FM_PORT_BTC_RPC;
1172        let esplora_port = process_mgr.globals.FM_PORT_ESPLORA;
1173        let esplora_monitoring_port = process_mgr.globals.FM_PORT_ESPLORA_MONITORING;
1174        // spawn esplora
1175        let cmd = cmd!(
1176            crate::util::Esplora,
1177            "--daemon-dir={daemon_dir}",
1178            "--db-dir={esplora_dir}",
1179            "--cookie=bitcoin:bitcoin",
1180            "--network=regtest",
1181            "--daemon-rpc-addr=127.0.0.1:{btc_rpc_port}",
1182            "--http-addr=127.0.0.1:{esplora_port}",
1183            "--monitoring-addr=127.0.0.1:{esplora_monitoring_port}",
1184            "--jsonrpc-import", // Workaround for incompatible on-disk format
1185        );
1186        let process = process_mgr.spawn_daemon("esplora", cmd).await?;
1187
1188        Self::wait_for_ready(process_mgr).await?;
1189        debug!(target: LOG_DEVIMINT, "Esplora ready");
1190
1191        Ok(Self {
1192            _bitcoind: bitcoind,
1193            _process: process,
1194        })
1195    }
1196
1197    /// Wait until the server is able to respond to requests.
1198    async fn wait_for_ready(process_mgr: &ProcessManager) -> Result<()> {
1199        let client = esplora_client::Builder::new(&format!(
1200            "http://localhost:{}",
1201            process_mgr.globals.FM_PORT_ESPLORA
1202        ))
1203        // Disable retrying in the client since we're already retrying in the poll below.
1204        .max_retries(0)
1205        .build_async()
1206        .expect("esplora client build failed");
1207
1208        poll("esplora server ready", || async {
1209            client
1210                .get_fee_estimates()
1211                .await
1212                .map_err(|e| ControlFlow::Continue(anyhow::anyhow!(e)))?;
1213
1214            Ok(())
1215        })
1216        .await?;
1217
1218        Ok(())
1219    }
1220}
1221
1222#[allow(unused)]
1223pub struct ExternalDaemons {
1224    pub bitcoind: Bitcoind,
1225    pub cln: Lightningd,
1226    pub lnd: Lnd,
1227    pub electrs: Electrs,
1228    pub esplora: Esplora,
1229}
1230
1231pub async fn external_daemons(process_mgr: &ProcessManager) -> Result<ExternalDaemons> {
1232    let start_time = fedimint_core::time::now();
1233    let bitcoind = Bitcoind::new(process_mgr, false).await?;
1234    let (cln, lnd, electrs, esplora) = tokio::try_join!(
1235        Lightningd::new(process_mgr, bitcoind.clone()),
1236        Lnd::new(process_mgr, bitcoind.clone()),
1237        Electrs::new(process_mgr, bitcoind.clone()),
1238        Esplora::new(process_mgr, bitcoind.clone()),
1239    )?;
1240    open_channel(process_mgr, &bitcoind, &cln, &lnd).await?;
1241    // make sure the bitcoind wallet is ready
1242    let _ = bitcoind.wallet_client().await?;
1243    info!(
1244        target: LOG_DEVIMINT,
1245        "starting base daemons took {:?}",
1246        start_time.elapsed()?
1247    );
1248    Ok(ExternalDaemons {
1249        bitcoind,
1250        cln,
1251        lnd,
1252        electrs,
1253        esplora,
1254    })
1255}