devimint/
external.rs

1use std::ops::ControlFlow;
2use std::str::FromStr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::{Context, Result, anyhow, bail};
7use bitcoin::hashes::{Hash, sha256};
8use bitcoincore_rpc::RpcApi;
9use bitcoincore_rpc::bitcoin::{Address, BlockHash};
10use bitcoincore_rpc::bitcoincore_rpc_json::{GetBalancesResult, GetBlockchainInfoResult};
11use bitcoincore_rpc::jsonrpc::error::RpcError;
12use fedimint_core::encoding::Encodable;
13use fedimint_core::task::jit::{JitTry, JitTryAnyhow};
14use fedimint_core::task::{block_in_place, sleep, timeout};
15use fedimint_core::util::{FmtCompact as _, write_overwrite_async};
16use fedimint_logging::LOG_DEVIMINT;
17use fedimint_testing_core::node_type::LightningNodeType;
18use futures::StreamExt;
19use hex::ToHex;
20use itertools::Itertools;
21use tokio::fs;
22use tokio::sync::{MappedMutexGuard, Mutex, MutexGuard};
23use tokio::task::spawn_blocking;
24use tokio::time::Instant;
25use tonic_lnd::Client as LndClient;
26use tonic_lnd::lnrpc::GetInfoRequest;
27use tracing::{debug, info, trace};
28
29use crate::gatewayd::LdkChainSource;
30use crate::util::{ProcessHandle, ProcessManager, poll};
31use crate::vars::utf8;
32use crate::version_constants::VERSION_0_5_0_ALPHA;
33use crate::{Gatewayd, cmd};
34
35#[derive(Clone)]
36pub struct Bitcoind {
37    pub client: Arc<bitcoincore_rpc::Client>,
38    pub(crate) wallet_client: Arc<JitTryAnyhow<Arc<bitcoincore_rpc::Client>>>,
39    pub(crate) _process: ProcessHandle,
40}
41
42impl Bitcoind {
43    pub async fn new(processmgr: &ProcessManager, skip_setup: bool) -> Result<Self> {
44        let btc_dir = utf8(&processmgr.globals.FM_BTC_DIR);
45
46        let conf = format!(
47            include_str!("cfg/bitcoin.conf"),
48            rpc_port = processmgr.globals.FM_PORT_BTC_RPC,
49            p2p_port = processmgr.globals.FM_PORT_BTC_P2P,
50            zmq_pub_raw_block = processmgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_BLOCK,
51            zmq_pub_raw_tx = processmgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_TX,
52            tx_index = "0",
53        );
54        write_overwrite_async(processmgr.globals.FM_BTC_DIR.join("bitcoin.conf"), conf).await?;
55        let process = processmgr
56            .spawn_daemon(
57                "bitcoind",
58                cmd!(crate::util::Bitcoind, "-datadir={btc_dir}"),
59            )
60            .await?;
61
62        let url = processmgr.globals.FM_BITCOIN_RPC_URL.parse()?;
63        debug!("Parsed FM_BITCOIN_RPC_URL: {:?}", &url);
64        let (host, auth) = fedimint_bitcoind::bitcoincore::from_url_to_url_auth(&url)?;
65        debug!("bitcoind host: {:?}, auth: {:?}", &host, auth);
66        let client =
67            Self::new_bitcoin_rpc(&host, auth.clone()).context("Failed to connect to bitcoind")?;
68        let wallet_client = JitTry::new_try(move || async move {
69            let client =
70                Self::new_bitcoin_rpc(&host, auth).context("Failed to connect to bitcoind")?;
71            Self::init(&client, skip_setup).await?;
72            Ok(Arc::new(client))
73        });
74
75        let bitcoind = Self {
76            _process: process,
77            client: Arc::new(client),
78            wallet_client: Arc::new(wallet_client),
79        };
80
81        bitcoind.poll_ready().await?;
82
83        Ok(bitcoind)
84    }
85
86    fn new_bitcoin_rpc(
87        url: &str,
88        auth: bitcoincore_rpc::Auth,
89    ) -> anyhow::Result<bitcoincore_rpc::Client> {
90        // The default (15s) is too low for some test environments
91        const RPC_TIMEOUT: Duration = Duration::from_secs(45);
92        let mut builder = bitcoincore_rpc::jsonrpc::simple_http::Builder::new()
93            .url(url)?
94            .timeout(RPC_TIMEOUT);
95        let (user, pass) = auth.get_user_pass()?;
96        if let Some(user) = user {
97            builder = builder.auth(user, pass);
98        }
99        let client = bitcoincore_rpc::jsonrpc::Client::with_transport(builder.build());
100        Ok(bitcoincore_rpc::Client::from_jsonrpc(client))
101    }
102
103    pub(crate) async fn init(client: &bitcoincore_rpc::Client, skip_setup: bool) -> Result<()> {
104        debug!("Setting up bitcoind");
105        // create RPC wallet
106        for attempt in 0.. {
107            match block_in_place(|| client.create_wallet("", None, None, None, None)) {
108                Ok(_) => {
109                    break;
110                }
111                Err(err) => {
112                    if err.to_string().contains("Database already exists") {
113                        break;
114                    }
115                    if attempt % 20 == 19 {
116                        debug!(target: LOG_DEVIMINT, %attempt, err = %err.fmt_compact(), "Waiting for initial bitcoind wallet initialization");
117                    }
118                    sleep(Duration::from_millis(100)).await;
119                }
120            }
121        }
122
123        if !skip_setup {
124            // mine blocks
125            let blocks = 101;
126            let address = block_in_place(|| client.get_new_address(None, None))?
127                .require_network(bitcoin::Network::Regtest)
128                .expect("Devimint always runs in regtest");
129            debug!(target: LOG_DEVIMINT, blocks_num=blocks, %address, "Mining blocks to address");
130            block_in_place(|| {
131                client
132                    .generate_to_address(blocks, &address)
133                    .context("Failed to generate blocks")
134            })?;
135            trace!(target: LOG_DEVIMINT, blocks_num=blocks, %address, "Mining blocks to address complete");
136        }
137
138        // wait bitcoind is ready
139        poll("bitcoind", || async {
140            let info = block_in_place(|| client.get_blockchain_info())
141                .context("bitcoind getblockchaininfo")
142                .map_err(ControlFlow::Continue)?;
143            if info.blocks > 100 {
144                Ok(())
145            } else {
146                Err(ControlFlow::Continue(anyhow!(
147                    "not enough blocks: {}",
148                    info.blocks
149                )))
150            }
151        })
152        .await?;
153        debug!("Bitcoind ready");
154        Ok(())
155    }
156
157    /// Poll until bitcoind rpc responds for basic commands
158    async fn poll_ready(&self) -> anyhow::Result<()> {
159        poll("bitcoind rpc ready", || async {
160            self.get_block_count()
161                .await
162                .map_err(ControlFlow::Continue::<anyhow::Error, _>)?;
163            Ok(())
164        })
165        .await
166    }
167
168    /// Client that can has wallet initialized, can generate internal addresses
169    /// and send funds
170    pub async fn wallet_client(&self) -> anyhow::Result<&Self> {
171        self.wallet_client.get_try().await?;
172        Ok(self)
173    }
174
175    /// Returns the total number of blocks in the chain.
176    ///
177    /// Fedimint's IBitcoindRpc considers block count the total number of
178    /// blocks, where bitcoind's rpc returns the height. Since the genesis
179    /// block has height 0, we need to add 1 to get the total block count.
180    pub async fn get_block_count(&self) -> Result<u64> {
181        let client = self.client.clone();
182        Ok(spawn_blocking(move || client.get_block_count()).await?? + 1)
183    }
184
185    pub async fn mine_blocks_no_wait(&self, block_num: u64) -> Result<u64> {
186        let start_time = Instant::now();
187        debug!(target: LOG_DEVIMINT, ?block_num, "Mining bitcoin blocks");
188        let addr = self.get_new_address().await?;
189        let initial_block_count = self.get_block_count().await?;
190        self.generate_to_address(block_num, addr).await?;
191        debug!(target: LOG_DEVIMINT,
192            elapsed_ms = %start_time.elapsed().as_millis(),
193            ?block_num, "Mined blocks (no wait)");
194
195        Ok(initial_block_count)
196    }
197
198    pub async fn mine_blocks(&self, block_num: u64) -> Result<()> {
199        let start_time = Instant::now();
200        debug!(target: LOG_DEVIMINT, ?block_num, "Mining bitcoin blocks");
201        let addr = self.get_new_address().await?;
202        let initial_block_count = self.get_block_count().await?;
203        self.generate_to_address(block_num, addr).await?;
204        while self.get_block_count().await? < initial_block_count + block_num {
205            trace!(target: LOG_DEVIMINT, ?block_num, "Waiting for blocks to be mined");
206            sleep(Duration::from_millis(100)).await;
207        }
208
209        debug!(target: LOG_DEVIMINT,
210            elapsed_ms = %start_time.elapsed().as_millis(),
211            ?block_num, "Mined blocks");
212
213        Ok(())
214    }
215
216    pub async fn send_to(&self, addr: String, amount: u64) -> Result<bitcoin::Txid> {
217        debug!(target: LOG_DEVIMINT, amount, addr, "Sending funds from bitcoind");
218        let amount = bitcoin::Amount::from_sat(amount);
219        let tx = self
220            .wallet_client()
221            .await?
222            .send_to_address(
223                bitcoin::Address::from_str(&addr)?
224                    .require_network(bitcoin::Network::Regtest)
225                    .expect("Devimint always runs in regtest"),
226                amount,
227            )
228            .await?;
229        Ok(tx)
230    }
231
232    pub async fn get_txout_proof(&self, txid: bitcoin::Txid) -> Result<String> {
233        let client = self.wallet_client().await?.clone();
234        let proof = spawn_blocking(move || client.client.get_tx_out_proof(&[txid], None)).await??;
235        Ok(proof.encode_hex())
236    }
237
238    /// Poll a transaction by its txid until it is found in the mempool or in a
239    /// block.
240    pub async fn poll_get_transaction(&self, txid: bitcoin::Txid) -> anyhow::Result<String> {
241        poll("Waiting for transaction in mempool", || async {
242            match self
243                .get_transaction(txid)
244                .await
245                .context("getrawtransaction")
246            {
247                Ok(Some(tx)) => Ok(tx),
248                Ok(None) => Err(ControlFlow::Continue(anyhow::anyhow!(
249                    "Transaction not found yet"
250                ))),
251                Err(err) => Err(ControlFlow::Break(err)),
252            }
253        })
254        .await
255    }
256
257    /// Get a transaction by its txid. Checks the mempool and all blocks.
258    async fn get_transaction(&self, txid: bitcoin::Txid) -> Result<Option<String>> {
259        // Check the mempool.
260        match self.get_raw_transaction(txid, None).await {
261            // The RPC succeeded, and the transaction was not found in the mempool. Continue to
262            // check blocks.
263            Ok(None) => {}
264            // The RPC failed, or the transaction was found in the mempool. Return the result.
265            other => return other,
266        };
267
268        let block_height = self.get_block_count().await? - 1;
269
270        // Check each block for the tx, starting at the chain tip.
271        // Buffer the requests to avoid spamming bitcoind.
272        // We're doing this after checking the mempool since the tx should
273        // usually be in the mempool, and we don't want to needlessly hit
274        // the bitcoind with block requests.
275        let mut buffered_tx_stream = futures::stream::iter((0..block_height).rev())
276            .map(|height| async move {
277                let block_hash = self.get_block_hash(height).await?;
278                self.get_raw_transaction(txid, Some(block_hash)).await
279            })
280            .buffered(32);
281
282        while let Some(tx_or) = buffered_tx_stream.next().await {
283            match tx_or {
284                // The RPC succeeded, and the transaction was not found in the block. Continue to
285                // the next block.
286                Ok(None) => continue,
287                // The RPC failed, or the transaction was found in the block. Return the result.
288                other => return other,
289            };
290        }
291
292        // The transaction was not found in the mempool or any block.
293        Ok(None)
294    }
295
296    async fn get_raw_transaction(
297        &self,
298        txid: bitcoin::Txid,
299        block_hash: Option<BlockHash>,
300    ) -> Result<Option<String>> {
301        let client = self.client.clone();
302        let tx_or =
303            spawn_blocking(move || client.get_raw_transaction(&txid, block_hash.as_ref())).await?;
304
305        let tx = match tx_or {
306            Ok(tx) => tx,
307            // `getrawtransaction` returns a JSON-RPC error with code -5 if the command
308            // reaches bitcoind but is not found. See here:
309            // https://github.com/bitcoin/bitcoin/blob/25dacae9c7feb31308271e2fd5a127c1fc230c2f/src/rpc/rawtransaction.cpp#L360-L376
310            // https://github.com/bitcoin/bitcoin/blob/25dacae9c7feb31308271e2fd5a127c1fc230c2f/src/rpc/protocol.h#L42
311            Err(bitcoincore_rpc::Error::JsonRpc(bitcoincore_rpc::jsonrpc::Error::Rpc(
312                RpcError { code: -5, .. },
313            ))) => return Ok(None),
314            Err(err) => return Err(err.into()),
315        };
316        let bytes = tx.consensus_encode_to_vec();
317        Ok(Some(bytes.encode_hex()))
318    }
319
320    async fn get_block_hash(&self, height: u64) -> Result<BlockHash> {
321        let client = self.client.clone();
322        Ok(spawn_blocking(move || client.get_block_hash(height)).await??)
323    }
324
325    pub async fn get_new_address(&self) -> Result<Address> {
326        let client = self.wallet_client().await?.clone();
327        let addr = spawn_blocking(move || client.client.get_new_address(None, None))
328            .await??
329            .require_network(bitcoin::Network::Regtest)
330            .expect("Devimint always runs in regtest");
331        Ok(addr)
332    }
333
334    pub async fn generate_to_address(
335        &self,
336        block_num: u64,
337        address: Address,
338    ) -> Result<Vec<BlockHash>> {
339        let client = self.wallet_client().await?.clone();
340        Ok(
341            spawn_blocking(move || client.client.generate_to_address(block_num, &address))
342                .await??,
343        )
344    }
345
346    pub async fn get_blockchain_info(&self) -> anyhow::Result<GetBlockchainInfoResult> {
347        let client = self.client.clone();
348        Ok(spawn_blocking(move || client.get_blockchain_info()).await??)
349    }
350
351    pub async fn send_to_address(
352        &self,
353        addr: Address,
354        amount: bitcoin::Amount,
355    ) -> anyhow::Result<bitcoin::Txid> {
356        let client = self.wallet_client().await?.clone();
357
358        let raw_client = client.client.clone();
359        let txid = spawn_blocking(move || {
360            raw_client.send_to_address(&addr, amount, None, None, None, None, None, None)
361        })
362        .await??;
363
364        // If this ever fails, it means we need to poll here, before we return to the
365        // user, as downstream code expects that mining blocks will include this
366        // tx.
367        assert!(client.get_transaction(txid).await?.is_some());
368
369        Ok(txid)
370    }
371
372    pub(crate) async fn get_balances(&self) -> anyhow::Result<GetBalancesResult> {
373        let client = self.wallet_client().await?.clone();
374        Ok(spawn_blocking(move || client.client.get_balances()).await??)
375    }
376
377    pub(crate) fn get_jsonrpc_client(&self) -> &bitcoincore_rpc::jsonrpc::Client {
378        self.client.get_jsonrpc_client()
379    }
380}
381
382#[derive(Clone)]
383pub struct Lnd {
384    pub(crate) client: Arc<Mutex<LndClient>>,
385    pub(crate) process: ProcessHandle,
386    pub(crate) _bitcoind: Bitcoind,
387}
388
389impl Lnd {
390    pub async fn new(process_mgr: &ProcessManager, bitcoind: Bitcoind) -> Result<Self> {
391        let (process, client) = Lnd::start(process_mgr).await?;
392        let this = Self {
393            _bitcoind: bitcoind,
394            client: Arc::new(Mutex::new(client)),
395            process,
396        };
397        // wait for lnd rpc to be active
398        poll("lnd_startup", || async {
399            this.pub_key().await.map_err(ControlFlow::Continue)
400        })
401        .await?;
402        Ok(this)
403    }
404
405    pub async fn start(process_mgr: &ProcessManager) -> Result<(ProcessHandle, LndClient)> {
406        let conf = format!(
407            include_str!("cfg/lnd.conf"),
408            listen_port = process_mgr.globals.FM_PORT_LND_LISTEN,
409            rpc_port = process_mgr.globals.FM_PORT_LND_RPC,
410            rest_port = process_mgr.globals.FM_PORT_LND_REST,
411            btc_rpc_port = process_mgr.globals.FM_PORT_BTC_RPC,
412            zmq_pub_raw_block = process_mgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_BLOCK,
413            zmq_pub_raw_tx = process_mgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_TX,
414        );
415        write_overwrite_async(process_mgr.globals.FM_LND_DIR.join("lnd.conf"), conf).await?;
416        let cmd = cmd!(
417            crate::util::Lnd,
418            format!("--lnddir={}", utf8(&process_mgr.globals.FM_LND_DIR))
419        );
420
421        let process = process_mgr.spawn_daemon("lnd", cmd).await?;
422        let lnd_rpc_addr = &process_mgr.globals.FM_LND_RPC_ADDR;
423        let lnd_macaroon = &process_mgr.globals.FM_LND_MACAROON;
424        let lnd_tls_cert = &process_mgr.globals.FM_LND_TLS_CERT;
425        poll("wait for lnd files", || async {
426            if fs::try_exists(lnd_tls_cert)
427                .await
428                .context("lnd tls cert")
429                .map_err(ControlFlow::Continue)?
430                && fs::try_exists(lnd_macaroon)
431                    .await
432                    .context("lnd macaroon")
433                    .map_err(ControlFlow::Continue)?
434            {
435                Ok(())
436            } else {
437                Err(ControlFlow::Continue(anyhow!(
438                    "lnd tls cert or lnd macaroon not found"
439                )))
440            }
441        })
442        .await?;
443
444        let client = poll("lnd_connect", || async {
445            tonic_lnd::connect(
446                lnd_rpc_addr.clone(),
447                lnd_tls_cert.clone(),
448                lnd_macaroon.clone(),
449            )
450            .await
451            .context("lnd connect")
452            .map_err(ControlFlow::Continue)
453        })
454        .await?;
455
456        Ok((process, client))
457    }
458
459    pub async fn lightning_client_lock(
460        &self,
461    ) -> Result<MappedMutexGuard<'_, tonic_lnd::LightningClient>> {
462        let guard = self.client.lock().await;
463        Ok(MutexGuard::map(guard, |client| client.lightning()))
464    }
465
466    pub async fn invoices_client_lock(
467        &self,
468    ) -> Result<MappedMutexGuard<'_, tonic_lnd::InvoicesClient>> {
469        let guard = self.client.lock().await;
470        Ok(MutexGuard::map(guard, |client| client.invoices()))
471    }
472
473    pub async fn pub_key(&self) -> Result<String> {
474        Ok(self
475            .lightning_client_lock()
476            .await?
477            .get_info(GetInfoRequest {})
478            .await?
479            .into_inner()
480            .identity_pubkey)
481    }
482
483    pub async fn terminate(self) -> Result<()> {
484        self.process.terminate().await
485    }
486
487    pub async fn invoice(&self, amount: u64) -> anyhow::Result<(String, Vec<u8>)> {
488        let add_invoice = self
489            .lightning_client_lock()
490            .await?
491            .add_invoice(tonic_lnd::lnrpc::Invoice {
492                value_msat: amount as i64,
493                ..Default::default()
494            })
495            .await?
496            .into_inner();
497        let invoice = add_invoice.payment_request;
498        let payment_hash = add_invoice.r_hash;
499        Ok((invoice, payment_hash))
500    }
501
502    pub async fn pay_bolt11_invoice(&self, invoice: String) -> anyhow::Result<()> {
503        let payment = self
504            .lightning_client_lock()
505            .await?
506            .send_payment_sync(tonic_lnd::lnrpc::SendRequest {
507                payment_request: invoice.clone(),
508                ..Default::default()
509            })
510            .await?
511            .into_inner();
512        let payment_status = self
513            .lightning_client_lock()
514            .await?
515            .list_payments(tonic_lnd::lnrpc::ListPaymentsRequest {
516                include_incomplete: true,
517                ..Default::default()
518            })
519            .await?
520            .into_inner()
521            .payments
522            .into_iter()
523            .find(|p| p.payment_hash == payment.payment_hash.encode_hex::<String>())
524            .context("payment not in list")?
525            .status();
526        anyhow::ensure!(payment_status == tonic_lnd::lnrpc::payment::PaymentStatus::Succeeded);
527
528        Ok(())
529    }
530
531    pub async fn wait_bolt11_invoice(&self, payment_hash: Vec<u8>) -> anyhow::Result<()> {
532        let invoice_status = self
533            .lightning_client_lock()
534            .await?
535            .lookup_invoice(tonic_lnd::lnrpc::PaymentHash {
536                r_hash: payment_hash,
537                ..Default::default()
538            })
539            .await?
540            .into_inner()
541            .state();
542        anyhow::ensure!(invoice_status == tonic_lnd::lnrpc::invoice::InvoiceState::Settled);
543
544        Ok(())
545    }
546
547    pub async fn create_hold_invoice(
548        &self,
549        amount: u64,
550    ) -> anyhow::Result<([u8; 32], String, sha256::Hash)> {
551        let preimage = rand::random::<[u8; 32]>();
552        let hash = {
553            let mut engine = bitcoin::hashes::sha256::Hash::engine();
554            bitcoin::hashes::HashEngine::input(&mut engine, &preimage);
555            bitcoin::hashes::sha256::Hash::from_engine(engine)
556        };
557        // TODO(support:v0.5): LNv1 cannot pay HOLD invoices with a CLTV expiry greater
558        // than 500 before v0.5
559        let fedimint_cli_version = crate::util::FedimintCli::version_or_default().await;
560        let cltv_expiry = if fedimint_cli_version >= *VERSION_0_5_0_ALPHA {
561            650
562        } else {
563            100
564        };
565        let hold_request = self
566            .invoices_client_lock()
567            .await?
568            .add_hold_invoice(tonic_lnd::invoicesrpc::AddHoldInvoiceRequest {
569                value_msat: amount as i64,
570                hash: hash.to_byte_array().to_vec(),
571                cltv_expiry,
572                ..Default::default()
573            })
574            .await?
575            .into_inner();
576        let payment_request = hold_request.payment_request;
577        Ok((preimage, payment_request, hash))
578    }
579
580    pub async fn settle_hold_invoice(
581        &self,
582        preimage: [u8; 32],
583        payment_hash: sha256::Hash,
584    ) -> anyhow::Result<()> {
585        let mut hold_invoice_subscription = self
586            .invoices_client_lock()
587            .await?
588            .subscribe_single_invoice(tonic_lnd::invoicesrpc::SubscribeSingleInvoiceRequest {
589                r_hash: payment_hash.to_byte_array().to_vec(),
590            })
591            .await?
592            .into_inner();
593        loop {
594            const WAIT_FOR_INVOICE_TIMEOUT: Duration = Duration::from_secs(60);
595            match timeout(
596                WAIT_FOR_INVOICE_TIMEOUT,
597                futures::StreamExt::next(&mut hold_invoice_subscription),
598            )
599            .await
600            {
601                Ok(Some(Ok(invoice))) => {
602                    if invoice.state() == tonic_lnd::lnrpc::invoice::InvoiceState::Accepted {
603                        break;
604                    }
605                    debug!("hold invoice payment state: {:?}", invoice.state());
606                }
607                Ok(Some(Err(e))) => {
608                    bail!("error in invoice subscription: {e:?}");
609                }
610                Ok(None) => {
611                    bail!("invoice subscription ended before invoice was accepted");
612                }
613                Err(_) => {
614                    bail!("timed out waiting for invoice to be accepted")
615                }
616            }
617        }
618
619        self.invoices_client_lock()
620            .await?
621            .settle_invoice(tonic_lnd::invoicesrpc::SettleInvoiceMsg {
622                preimage: preimage.to_vec(),
623            })
624            .await?;
625
626        Ok(())
627    }
628}
629
630pub type NamedGateway<'a> = (&'a Gatewayd, &'a str);
631
632#[allow(clippy::similar_names)]
633pub async fn open_channels_between_gateways(
634    bitcoind: &Bitcoind,
635    gateways: &[NamedGateway<'_>],
636) -> Result<()> {
637    let block_height = bitcoind.get_block_count().await? - 1;
638    debug!(target: LOG_DEVIMINT, ?block_height, "Syncing gateway lightning nodes to block height...");
639    futures::future::try_join_all(
640        gateways
641            .iter()
642            .map(|(gw, _gw_name)| gw.wait_for_block_height(block_height)),
643    )
644    .await?;
645
646    info!(target: LOG_DEVIMINT, "Funding all gateway lightning nodes...");
647    for (gw, _gw_name) in gateways {
648        let funding_addr = gw.get_ln_onchain_address().await?;
649        bitcoind.send_to(funding_addr, 100_000_000).await?;
650    }
651
652    bitcoind.mine_blocks(10).await?;
653
654    info!(target: LOG_DEVIMINT, "Gateway lightning nodes funded.");
655
656    let block_height = bitcoind.get_block_count().await? - 1;
657    debug!(target: LOG_DEVIMINT, ?block_height, "Syncing gateway lightning nodes to block height...");
658    futures::future::try_join_all(
659        gateways
660            .iter()
661            .map(|(gw, _gw_name)| gw.wait_for_block_height(block_height)),
662    )
663    .await?;
664
665    // All unique pairs of gateways.
666    // For a list of gateways [A, B, C], this will produce [(A, B), (B, C)].
667    // Since the first gateway within each pair initiates the channel open,
668    // order within each pair needs to be enforced so that each Lightning node opens
669    // 1 channel.
670    let gateway_pairs: Vec<(&NamedGateway, &NamedGateway)> =
671        gateways.iter().tuple_windows::<(_, _)>().collect();
672
673    info!(target: LOG_DEVIMINT, block_height = %block_height, "devimint current block");
674    let sats_per_side = 5_000_000;
675    for ((gw_a, gw_a_name), (gw_b, gw_b_name)) in &gateway_pairs {
676        info!(target: LOG_DEVIMINT, from=%gw_a_name, to=%gw_b_name, "Opening channel with {sats_per_side} sats on each side...");
677        let txid = gw_a
678            .open_channel(gw_b, sats_per_side * 2, Some(sats_per_side))
679            .await?;
680
681        if let Some(txid) = txid {
682            bitcoind.poll_get_transaction(txid).await?;
683        }
684    }
685
686    // `open_channel` may not have sent out the channel funding transaction
687    // immediately. Since it didn't return a funding txid, we need to wait for
688    // it to get to the mempool.
689    if crate::util::Gatewayd::version_or_default().await < *VERSION_0_5_0_ALPHA {
690        fedimint_core::runtime::sleep(Duration::from_secs(5)).await;
691    }
692
693    bitcoind.mine_blocks(10).await?;
694
695    let block_height = bitcoind.get_block_count().await? - 1;
696    debug!(target: LOG_DEVIMINT, ?block_height, "Syncing gateway lightning nodes to block height...");
697    futures::future::try_join_all(
698        gateways
699            .iter()
700            .map(|(gw, _gw_name)| gw.wait_for_block_height(block_height)),
701    )
702    .await?;
703
704    for ((gw_a, _gw_a_name), (gw_b, _gw_b_name)) in &gateway_pairs {
705        let gw_a_node_pubkey = gw_a.lightning_pubkey().await?;
706        let gw_b_node_pubkey = gw_b.lightning_pubkey().await?;
707
708        wait_for_ready_channel_on_gateway_with_counterparty(gw_b, gw_a_node_pubkey).await?;
709        wait_for_ready_channel_on_gateway_with_counterparty(gw_a, gw_b_node_pubkey).await?;
710    }
711
712    info!(target: LOG_DEVIMINT, "open_channels_between_gateways successful");
713
714    Ok(())
715}
716
717async fn wait_for_ready_channel_on_gateway_with_counterparty(
718    gw: &Gatewayd,
719    counterparty_lightning_node_pubkey: bitcoin::secp256k1::PublicKey,
720) -> anyhow::Result<()> {
721    poll(
722        &format!("Wait for {} channel update", gw.gw_name),
723        || async {
724            let channels = gw
725                .list_active_channels()
726                .await
727                .context("list channels")
728                .map_err(ControlFlow::Break)?;
729
730            if channels
731                .iter()
732                .any(|channel| channel.remote_pubkey == counterparty_lightning_node_pubkey)
733            {
734                return Ok(());
735            }
736
737            debug!(target: LOG_DEVIMINT, ?channels, gw = gw.gw_name, "Counterparty channels not found open");
738            Err(ControlFlow::Continue(anyhow!("channel not found")))
739        },
740    )
741    .await
742}
743
744#[derive(Clone)]
745pub enum LightningNode {
746    Lnd(Lnd),
747    Ldk {
748        name: String,
749        gw_port: u16,
750        ldk_port: u16,
751        chain_source: LdkChainSource,
752    },
753}
754
755impl LightningNode {
756    pub fn ln_type(&self) -> LightningNodeType {
757        match self {
758            LightningNode::Lnd(_) => LightningNodeType::Lnd,
759            LightningNode::Ldk {
760                name: _,
761                gw_port: _,
762                ldk_port: _,
763                chain_source: _,
764            } => LightningNodeType::Ldk,
765        }
766    }
767}
768
769#[derive(Clone)]
770pub struct Electrs {
771    _process: ProcessHandle,
772    _bitcoind: Bitcoind,
773}
774
775impl Electrs {
776    pub async fn new(process_mgr: &ProcessManager, bitcoind: Bitcoind) -> Result<Self> {
777        debug!(target: LOG_DEVIMINT, "Starting electrs");
778        let electrs_dir = process_mgr
779            .globals
780            .FM_ELECTRS_DIR
781            .to_str()
782            .context("non utf8 path")?;
783
784        let daemon_dir = &process_mgr.globals.FM_BTC_DIR.display();
785
786        let conf = format!(
787            include_str!("cfg/electrs.toml"),
788            rpc_port = process_mgr.globals.FM_PORT_BTC_RPC,
789            p2p_port = process_mgr.globals.FM_PORT_BTC_P2P,
790            electrs_port = process_mgr.globals.FM_PORT_ELECTRS,
791            monitoring_port = process_mgr.globals.FM_PORT_ELECTRS_MONITORING,
792        );
793        debug!("electrs conf: {:?}", conf);
794        write_overwrite_async(
795            process_mgr.globals.FM_ELECTRS_DIR.join("electrs.toml"),
796            conf,
797        )
798        .await?;
799        let cmd = cmd!(
800            crate::util::Electrs,
801            "--conf-dir={electrs_dir}",
802            "--db-dir={electrs_dir}",
803            "--daemon-dir={daemon_dir}"
804        );
805        let process = process_mgr.spawn_daemon("electrs", cmd).await?;
806        debug!(target: LOG_DEVIMINT, "Electrs ready");
807
808        Ok(Self {
809            _bitcoind: bitcoind,
810            _process: process,
811        })
812    }
813}
814
815#[derive(Clone)]
816pub struct Esplora {
817    _process: ProcessHandle,
818    _bitcoind: Bitcoind,
819}
820
821impl Esplora {
822    pub async fn new(process_mgr: &ProcessManager, bitcoind: Bitcoind) -> Result<Self> {
823        debug!("Starting esplora");
824        let daemon_dir = process_mgr
825            .globals
826            .FM_BTC_DIR
827            .to_str()
828            .context("non utf8 path")?;
829        let esplora_dir = process_mgr
830            .globals
831            .FM_ESPLORA_DIR
832            .to_str()
833            .context("non utf8 path")?;
834
835        let btc_rpc_port = process_mgr.globals.FM_PORT_BTC_RPC;
836        let esplora_port = process_mgr.globals.FM_PORT_ESPLORA;
837        let esplora_monitoring_port = process_mgr.globals.FM_PORT_ESPLORA_MONITORING;
838        // spawn esplora
839        let cmd = cmd!(
840            crate::util::Esplora,
841            "--daemon-dir={daemon_dir}",
842            "--db-dir={esplora_dir}",
843            "--cookie=bitcoin:bitcoin",
844            "--network=regtest",
845            "--daemon-rpc-addr=127.0.0.1:{btc_rpc_port}",
846            "--http-addr=127.0.0.1:{esplora_port}",
847            "--monitoring-addr=127.0.0.1:{esplora_monitoring_port}",
848            "--jsonrpc-import", // Workaround for incompatible on-disk format
849        );
850        let process = process_mgr.spawn_daemon("esplora", cmd).await?;
851
852        Self::wait_for_ready(process_mgr).await?;
853        debug!(target: LOG_DEVIMINT, "Esplora ready");
854
855        Ok(Self {
856            _bitcoind: bitcoind,
857            _process: process,
858        })
859    }
860
861    /// Wait until the server is able to respond to requests.
862    async fn wait_for_ready(process_mgr: &ProcessManager) -> Result<()> {
863        let client = esplora_client::Builder::new(&format!(
864            "http://localhost:{}",
865            process_mgr.globals.FM_PORT_ESPLORA
866        ))
867        // Disable retrying in the client since we're already retrying in the poll below.
868        .max_retries(0)
869        .build_async()
870        .expect("esplora client build failed");
871
872        poll("esplora server ready", || async {
873            client
874                .get_fee_estimates()
875                .await
876                .map_err(|e| ControlFlow::Continue(anyhow::anyhow!(e)))?;
877
878            Ok(())
879        })
880        .await?;
881
882        Ok(())
883    }
884}
885
886#[allow(unused)]
887pub struct ExternalDaemons {
888    pub bitcoind: Bitcoind,
889    pub electrs: Electrs,
890    pub esplora: Esplora,
891}
892
893pub async fn external_daemons(process_mgr: &ProcessManager) -> Result<ExternalDaemons> {
894    let bitcoind = Bitcoind::new(process_mgr, false).await?;
895    let (electrs, esplora) = tokio::try_join!(
896        Electrs::new(process_mgr, bitcoind.clone()),
897        Esplora::new(process_mgr, bitcoind.clone()),
898    )?;
899    let start_time = fedimint_core::time::now();
900    // make sure the bitcoind wallet is ready
901    let _ = bitcoind.wallet_client().await?;
902    info!(
903        target: LOG_DEVIMINT,
904        "starting base daemons took {:?}",
905        start_time.elapsed()?
906    );
907    Ok(ExternalDaemons {
908        bitcoind,
909        electrs,
910        esplora,
911    })
912}