Skip to main content

devimint/
external.rs

1use std::ops::ControlFlow;
2use std::str::FromStr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::{Context, Result, anyhow, bail};
7use bitcoin::hashes::{Hash, sha256};
8use bitcoincore_rpc::bitcoin::{Address, BlockHash};
9use bitcoincore_rpc::bitcoincore_rpc_json::{GetBalancesResult, GetBlockchainInfoResult};
10use bitcoincore_rpc::jsonrpc::error::RpcError;
11use bitcoincore_rpc::{Auth, RpcApi};
12use fedimint_core::encoding::Encodable;
13use fedimint_core::rustls::install_crypto_provider;
14use fedimint_core::task::jit::{JitTry, JitTryAnyhow};
15use fedimint_core::task::{block_in_place, sleep, timeout};
16use fedimint_core::util::backoff_util::api_networking_backoff;
17use fedimint_core::util::{FmtCompact as _, SafeUrl, retry, write_overwrite_async};
18use fedimint_logging::LOG_DEVIMINT;
19use fedimint_testing_core::node_type::LightningNodeType;
20use futures::StreamExt;
21use hex::ToHex;
22use itertools::Itertools;
23use tokio::fs;
24use tokio::sync::{MappedMutexGuard, Mutex, MutexGuard};
25use tokio::task::spawn_blocking;
26use tokio::time::Instant;
27use tonic_lnd::Client as LndClient;
28use tonic_lnd::lnrpc::GetInfoRequest;
29use tonic_lnd::routerrpc::SendPaymentRequest;
30use tracing::{debug, info, trace, warn};
31
32use crate::util::{ProcessHandle, ProcessManager, poll};
33use crate::vars::utf8;
34use crate::{Gatewayd, cmd};
35
36#[derive(Clone)]
37pub struct Bitcoind {
38    pub client: Arc<bitcoincore_rpc::Client>,
39    pub(crate) wallet_client: Arc<JitTryAnyhow<Arc<bitcoincore_rpc::Client>>>,
40    pub(crate) _process: ProcessHandle,
41}
42
43impl Bitcoind {
44    pub async fn new(processmgr: &ProcessManager, skip_setup: bool) -> Result<Self> {
45        let btc_dir = utf8(&processmgr.globals.FM_BTC_DIR);
46
47        let esplora_pid_file_path = processmgr.globals.FM_TEST_DIR.join("esplora.pid");
48        let esplora_pid_file = utf8(&esplora_pid_file_path);
49        let conf = format!(
50            include_str!("cfg/bitcoin.conf"),
51            rpc_port = processmgr.globals.FM_PORT_BTC_RPC,
52            p2p_port = processmgr.globals.FM_PORT_BTC_P2P,
53            zmq_pub_raw_block = processmgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_BLOCK,
54            zmq_pub_raw_tx = processmgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_TX,
55            esplora_pid_file = esplora_pid_file,
56            tx_index = "0",
57        );
58        write_overwrite_async(processmgr.globals.FM_BTC_DIR.join("bitcoin.conf"), conf).await?;
59        let process = processmgr
60            .spawn_daemon(
61                "bitcoind",
62                cmd!(crate::util::Bitcoind, "-datadir={btc_dir}"),
63            )
64            .await?;
65
66        let url: SafeUrl = processmgr.globals.FM_BITCOIN_RPC_URL.parse()?;
67
68        debug!("Parsed FM_BITCOIN_RPC_URL: {:?}", &url);
69
70        let auth = Auth::UserPass(
71            url.username().to_owned(),
72            url.password()
73                .context("Bitcoin RPC URL is missing password")?
74                .to_owned(),
75        );
76
77        let host = url
78            .without_auth()
79            .map_err(|()| anyhow!("Failed to strip auth from Bitcoin Rpc Url"))?
80            .to_string();
81        let wallet_name = "default";
82        let host = format!("{host}wallet/{wallet_name}");
83
84        debug!(target: LOG_DEVIMINT, "bitcoind host: {:?}, auth: {:?}", &host, auth);
85        let client =
86            Self::new_bitcoin_rpc(&host, auth.clone()).context("Failed to connect to bitcoind")?;
87        let wallet_client = JitTry::new_try(move || async move {
88            let client =
89                Self::new_bitcoin_rpc(&host, auth).context("Failed to connect to bitcoind")?;
90            Self::init(&client, skip_setup).await?;
91            Ok(Arc::new(client))
92        });
93
94        let bitcoind = Self {
95            _process: process,
96            client: Arc::new(client),
97            wallet_client: Arc::new(wallet_client),
98        };
99
100        bitcoind.poll_ready().await?;
101
102        // To have a ChainId we always need at least one block.
103        let addr = bitcoind.get_new_address().await?;
104        bitcoind.generate_to_address(1, addr).await?;
105
106        Ok(bitcoind)
107    }
108
109    fn new_bitcoin_rpc(
110        url: &str,
111        auth: bitcoincore_rpc::Auth,
112    ) -> anyhow::Result<bitcoincore_rpc::Client> {
113        // The default (15s) is too low for some test environments
114        const RPC_TIMEOUT: Duration = Duration::from_secs(45);
115        let mut builder = bitcoincore_rpc::jsonrpc::simple_http::Builder::new()
116            .url(url)?
117            .timeout(RPC_TIMEOUT);
118        let (user, pass) = auth.get_user_pass()?;
119        if let Some(user) = user {
120            builder = builder.auth(user, pass);
121        }
122        let client = bitcoincore_rpc::jsonrpc::Client::with_transport(builder.build());
123        Ok(bitcoincore_rpc::Client::from_jsonrpc(client))
124    }
125
126    pub(crate) async fn init(client: &bitcoincore_rpc::Client, skip_setup: bool) -> Result<()> {
127        debug!(target: LOG_DEVIMINT, "Setting up bitcoind...");
128        // create RPC wallet
129        for attempt in 0.. {
130            match block_in_place(|| client.create_wallet("default", None, None, None, None)) {
131                Ok(_) => {
132                    break;
133                }
134                Err(err) => {
135                    if err.to_string().contains("Database already exists") {
136                        break;
137                    }
138                    if attempt % 20 == 19 {
139                        debug!(target: LOG_DEVIMINT, %attempt, err = %err.fmt_compact(), "Waiting for initial bitcoind wallet initialization");
140                    }
141                    sleep(Duration::from_millis(100)).await;
142                }
143            }
144        }
145
146        if !skip_setup {
147            // mine blocks
148            let blocks = 101;
149            let address = block_in_place(|| client.get_new_address(None, None))?
150                .require_network(bitcoin::Network::Regtest)
151                .expect("Devimint always runs in regtest");
152            debug!(target: LOG_DEVIMINT, blocks_num=blocks, %address, "Mining blocks to address");
153            block_in_place(|| {
154                client
155                    .generate_to_address(blocks, &address)
156                    .context("Failed to generate blocks")
157            })?;
158            trace!(target: LOG_DEVIMINT, blocks_num=blocks, %address, "Mining blocks to address complete");
159        }
160
161        // wait bitcoind is ready
162        poll("bitcoind", || async {
163            let info = block_in_place(|| client.get_blockchain_info())
164                .context("bitcoind getblockchaininfo")
165                .map_err(ControlFlow::Continue)?;
166            if info.blocks > 100 {
167                Ok(())
168            } else {
169                Err(ControlFlow::Continue(anyhow!(
170                    "not enough blocks: {}",
171                    info.blocks
172                )))
173            }
174        })
175        .await?;
176        debug!("Bitcoind ready");
177        Ok(())
178    }
179
180    /// Poll until bitcoind rpc responds for basic commands
181    async fn poll_ready(&self) -> anyhow::Result<()> {
182        poll("bitcoind rpc ready", || async {
183            self.get_block_count()
184                .await
185                .map_err(ControlFlow::Continue::<anyhow::Error, _>)?;
186            Ok(())
187        })
188        .await
189    }
190
191    /// Client that can has wallet initialized, can generate internal addresses
192    /// and send funds
193    pub async fn wallet_client(&self) -> anyhow::Result<&Self> {
194        self.wallet_client.get_try().await?;
195        Ok(self)
196    }
197
198    /// Returns the total number of blocks in the chain.
199    ///
200    /// Fedimint's IBitcoindRpc considers block count the total number of
201    /// blocks, where bitcoind's rpc returns the height. Since the genesis
202    /// block has height 0, we need to add 1 to get the total block count.
203    pub async fn get_block_count(&self) -> Result<u64> {
204        let client = self.client.clone();
205        Ok(spawn_blocking(move || client.get_block_count()).await?? + 1)
206    }
207
208    pub async fn mine_blocks_no_wait(&self, block_num: u64) -> Result<u64> {
209        let start_time = Instant::now();
210        debug!(target: LOG_DEVIMINT, ?block_num, "Mining bitcoin blocks");
211        let addr = self.get_new_address().await?;
212        let initial_block_count = self.get_block_count().await?;
213        self.generate_to_address(block_num, addr).await?;
214        debug!(target: LOG_DEVIMINT,
215            elapsed_ms = %start_time.elapsed().as_millis(),
216            ?block_num, "Mined blocks (no wait)");
217
218        Ok(initial_block_count)
219    }
220
221    pub async fn mine_blocks(&self, block_num: u64) -> Result<()> {
222        // `Stdio::piped` can't even parse outputs larger
223        // than pipe buffer size (64K on Linux, 16K on MacOS), so
224        // we should split larger requesteds into smaller chunks.
225        //
226        // On top of it mining a lot of blocks is just slow, so should
227        // be avoided.
228        const BLOCK_NUM_LIMIT: u64 = 32;
229
230        if BLOCK_NUM_LIMIT < block_num {
231            warn!(
232                target: LOG_DEVIMINT,
233                %block_num,
234                "Mining a lot of blocks (even when split) is a terrible idea and can lead to issues. Splitting request just to make it work somehow."
235            );
236            let mut block_num = block_num;
237
238            loop {
239                if BLOCK_NUM_LIMIT < block_num {
240                    block_num -= BLOCK_NUM_LIMIT;
241                    Box::pin(async { self.mine_blocks(BLOCK_NUM_LIMIT).await }).await?;
242                } else {
243                    Box::pin(async { self.mine_blocks(block_num).await }).await?;
244                    return Ok(());
245                }
246            }
247        }
248        let start_time = Instant::now();
249        debug!(target: LOG_DEVIMINT, ?block_num, "Mining bitcoin blocks");
250        let addr = self.get_new_address().await?;
251        let initial_block_count = self.get_block_count().await?;
252        self.generate_to_address(block_num, addr).await?;
253        while self.get_block_count().await? < initial_block_count + block_num {
254            trace!(target: LOG_DEVIMINT, ?block_num, "Waiting for blocks to be mined");
255            sleep(Duration::from_millis(100)).await;
256        }
257
258        debug!(target: LOG_DEVIMINT,
259            elapsed_ms = %start_time.elapsed().as_millis(),
260            ?block_num, "Mined blocks");
261
262        Ok(())
263    }
264
265    pub async fn send_to(&self, addr: String, amount: u64) -> Result<bitcoin::Txid> {
266        debug!(target: LOG_DEVIMINT, amount, addr, "Sending funds from bitcoind");
267        let amount = bitcoin::Amount::from_sat(amount);
268        let tx = self
269            .wallet_client()
270            .await?
271            .send_to_address(
272                bitcoin::Address::from_str(&addr)?
273                    .require_network(bitcoin::Network::Regtest)
274                    .expect("Devimint always runs in regtest"),
275                amount,
276            )
277            .await?;
278        Ok(tx)
279    }
280
281    pub async fn get_txout_proof(&self, txid: bitcoin::Txid) -> Result<String> {
282        let client = self.wallet_client().await?.clone();
283        let proof = spawn_blocking(move || client.client.get_tx_out_proof(&[txid], None)).await??;
284        Ok(proof.encode_hex())
285    }
286
287    /// Poll a transaction by its txid until it is found in the mempool or in a
288    /// block.
289    pub async fn poll_get_transaction(&self, txid: bitcoin::Txid) -> anyhow::Result<String> {
290        poll("Waiting for transaction in mempool", || async {
291            match self
292                .get_transaction(txid)
293                .await
294                .context("getrawtransaction")
295            {
296                Ok(Some(tx)) => Ok(tx),
297                Ok(None) => Err(ControlFlow::Continue(anyhow::anyhow!(
298                    "Transaction not found yet"
299                ))),
300                Err(err) => Err(ControlFlow::Break(err)),
301            }
302        })
303        .await
304    }
305
306    /// Get a transaction by its txid. Checks the mempool and all blocks.
307    async fn get_transaction(&self, txid: bitcoin::Txid) -> Result<Option<String>> {
308        // Check the mempool.
309        match self.get_raw_transaction(txid, None).await {
310            // The RPC succeeded, and the transaction was not found in the mempool. Continue to
311            // check blocks.
312            Ok(None) => {}
313            // The RPC failed, or the transaction was found in the mempool. Return the result.
314            other => return other,
315        }
316
317        let block_count = self.get_block_count().await?;
318
319        // Check each block for the tx, starting at the chain tip.
320        // Buffer the requests to avoid spamming bitcoind.
321        // We're doing this after checking the mempool since the tx should
322        // usually be in the mempool, and we don't want to needlessly hit
323        // the bitcoind with block requests.
324        let mut buffered_tx_stream = futures::stream::iter((0..block_count).rev())
325            .map(|height| async move {
326                let block_hash = self.get_block_hash(height).await?;
327                self.get_raw_transaction(txid, Some(block_hash)).await
328            })
329            .buffered(32);
330
331        while let Some(tx_or) = buffered_tx_stream.next().await {
332            match tx_or {
333                // The RPC succeeded, and the transaction was not found in the block. Continue to
334                // the next block.
335                Ok(None) => {}
336                // The RPC failed, or the transaction was found in the block. Return the result.
337                other => return other,
338            }
339        }
340
341        // The transaction was not found in the mempool or any block.
342        Ok(None)
343    }
344
345    async fn get_raw_transaction(
346        &self,
347        txid: bitcoin::Txid,
348        block_hash: Option<BlockHash>,
349    ) -> Result<Option<String>> {
350        let client = self.client.clone();
351        let tx_or =
352            spawn_blocking(move || client.get_raw_transaction(&txid, block_hash.as_ref())).await?;
353
354        let tx = match tx_or {
355            Ok(tx) => tx,
356            // `getrawtransaction` returns a JSON-RPC error with code -5 if the command
357            // reaches bitcoind but is not found. See here:
358            // https://github.com/bitcoin/bitcoin/blob/25dacae9c7feb31308271e2fd5a127c1fc230c2f/src/rpc/rawtransaction.cpp#L360-L376
359            // https://github.com/bitcoin/bitcoin/blob/25dacae9c7feb31308271e2fd5a127c1fc230c2f/src/rpc/protocol.h#L42
360            Err(bitcoincore_rpc::Error::JsonRpc(bitcoincore_rpc::jsonrpc::Error::Rpc(
361                RpcError { code: -5, .. },
362            ))) => return Ok(None),
363            Err(err) => return Err(err.into()),
364        };
365        let bytes = tx.consensus_encode_to_vec();
366        Ok(Some(bytes.encode_hex()))
367    }
368
369    async fn get_block_hash(&self, height: u64) -> Result<BlockHash> {
370        let client = self.client.clone();
371        Ok(spawn_blocking(move || client.get_block_hash(height)).await??)
372    }
373
374    pub async fn get_new_address(&self) -> Result<Address> {
375        let client = self.wallet_client().await?.clone();
376        let addr = spawn_blocking(move || client.client.get_new_address(None, None))
377            .await??
378            .require_network(bitcoin::Network::Regtest)
379            .expect("Devimint always runs in regtest");
380        Ok(addr)
381    }
382
383    pub async fn generate_to_address(
384        &self,
385        block_num: u64,
386        address: Address,
387    ) -> Result<Vec<BlockHash>> {
388        let client = self.wallet_client().await?.clone();
389        Ok(
390            spawn_blocking(move || client.client.generate_to_address(block_num, &address))
391                .await??,
392        )
393    }
394
395    pub async fn get_blockchain_info(&self) -> anyhow::Result<GetBlockchainInfoResult> {
396        let client = self.client.clone();
397        Ok(spawn_blocking(move || client.get_blockchain_info()).await??)
398    }
399
400    pub async fn send_to_address(
401        &self,
402        addr: Address,
403        amount: bitcoin::Amount,
404    ) -> anyhow::Result<bitcoin::Txid> {
405        let client = self.wallet_client().await?.clone();
406
407        let raw_client = client.client.clone();
408        let txid = spawn_blocking(move || {
409            raw_client.send_to_address(&addr, amount, None, None, None, None, None, None)
410        })
411        .await??;
412
413        // Downstream code expects that mining blocks will include this
414        // tx. Seems like this is not always immediately the case in Bitcoin Knobs.
415        retry(
416            "await-genesis-tx-processed".to_string(),
417            api_networking_backoff(),
418            || async {
419                if client.get_transaction(txid).await?.is_none() {
420                    bail!("Genesis tx not visible yet = {txid}");
421                }
422
423                Ok(())
424            },
425        )
426        .await
427        .expect("Number of retries has no limit");
428        Ok(txid)
429    }
430
431    pub(crate) async fn get_balances(&self) -> anyhow::Result<GetBalancesResult> {
432        let client = self.wallet_client().await?.clone();
433        Ok(spawn_blocking(move || client.client.get_balances()).await??)
434    }
435
436    pub(crate) fn get_jsonrpc_client(&self) -> &bitcoincore_rpc::jsonrpc::Client {
437        self.client.get_jsonrpc_client()
438    }
439}
440
441#[derive(Clone)]
442pub struct Lnd {
443    pub(crate) client: Arc<Mutex<LndClient>>,
444    pub(crate) process: ProcessHandle,
445    pub(crate) _bitcoind: Bitcoind,
446}
447
448impl Lnd {
449    pub async fn new(process_mgr: &ProcessManager, bitcoind: Bitcoind) -> Result<Self> {
450        let (process, client) = Lnd::start(process_mgr).await?;
451        let this = Self {
452            _bitcoind: bitcoind,
453            client: Arc::new(Mutex::new(client)),
454            process,
455        };
456        // wait for lnd rpc to be active
457        poll("lnd_startup", || async {
458            this.pub_key().await.map_err(ControlFlow::Continue)
459        })
460        .await?;
461        Ok(this)
462    }
463
464    pub async fn start(process_mgr: &ProcessManager) -> Result<(ProcessHandle, LndClient)> {
465        let conf = format!(
466            include_str!("cfg/lnd.conf"),
467            listen_port = process_mgr.globals.FM_PORT_LND_LISTEN,
468            rpc_port = process_mgr.globals.FM_PORT_LND_RPC,
469            rest_port = process_mgr.globals.FM_PORT_LND_REST,
470            btc_rpc_port = process_mgr.globals.FM_PORT_BTC_RPC,
471            zmq_pub_raw_block = process_mgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_BLOCK,
472            zmq_pub_raw_tx = process_mgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_TX,
473        );
474        write_overwrite_async(process_mgr.globals.FM_LND_DIR.join("lnd.conf"), conf).await?;
475        let cmd = cmd!(
476            crate::util::Lnd,
477            format!("--lnddir={}", utf8(&process_mgr.globals.FM_LND_DIR))
478        );
479
480        let process = process_mgr.spawn_daemon("lnd", cmd).await?;
481        let lnd_rpc_addr = &process_mgr.globals.FM_LND_RPC_ADDR;
482        let lnd_macaroon = &process_mgr.globals.FM_LND_MACAROON;
483        let lnd_tls_cert = &process_mgr.globals.FM_LND_TLS_CERT;
484        poll("wait for lnd files", || async {
485            if fs::try_exists(lnd_tls_cert)
486                .await
487                .context("lnd tls cert")
488                .map_err(ControlFlow::Continue)?
489                && fs::try_exists(lnd_macaroon)
490                    .await
491                    .context("lnd macaroon")
492                    .map_err(ControlFlow::Continue)?
493            {
494                Ok(())
495            } else {
496                Err(ControlFlow::Continue(anyhow!(
497                    "lnd tls cert or lnd macaroon not found"
498                )))
499            }
500        })
501        .await?;
502
503        install_crypto_provider().await;
504
505        let client = poll("lnd_connect", || async {
506            tonic_lnd::connect(
507                lnd_rpc_addr.clone(),
508                lnd_tls_cert.clone(),
509                lnd_macaroon.clone(),
510            )
511            .await
512            .context("lnd connect")
513            .map_err(ControlFlow::Continue)
514        })
515        .await?;
516
517        Ok((process, client))
518    }
519
520    pub async fn lightning_client_lock(
521        &self,
522    ) -> Result<MappedMutexGuard<'_, tonic_lnd::LightningClient>> {
523        let guard = self.client.lock().await;
524        Ok(MutexGuard::map(guard, |client| client.lightning()))
525    }
526
527    pub async fn invoices_client_lock(
528        &self,
529    ) -> Result<MappedMutexGuard<'_, tonic_lnd::InvoicesClient>> {
530        let guard = self.client.lock().await;
531        Ok(MutexGuard::map(guard, |client| client.invoices()))
532    }
533
534    pub async fn router_client_lock(
535        &self,
536    ) -> Result<MappedMutexGuard<'_, tonic_lnd::RouterClient>> {
537        let guard = self.client.lock().await;
538        Ok(MutexGuard::map(guard, |client| client.router()))
539    }
540
541    pub async fn pub_key(&self) -> Result<String> {
542        Ok(self
543            .lightning_client_lock()
544            .await?
545            .get_info(GetInfoRequest {})
546            .await?
547            .into_inner()
548            .identity_pubkey)
549    }
550
551    pub async fn terminate(self) -> Result<()> {
552        self.process.terminate().await
553    }
554
555    pub async fn invoice(&self, amount: u64) -> anyhow::Result<(String, Vec<u8>)> {
556        let add_invoice = self
557            .lightning_client_lock()
558            .await?
559            .add_invoice(tonic_lnd::lnrpc::Invoice {
560                value_msat: amount as i64,
561                ..Default::default()
562            })
563            .await?
564            .into_inner();
565        let invoice = add_invoice.payment_request;
566        let payment_hash = add_invoice.r_hash;
567        Ok((invoice, payment_hash))
568    }
569
570    pub async fn pay_bolt11_invoice(&self, invoice: String) -> anyhow::Result<()> {
571        let mut payment = self
572            .router_client_lock()
573            .await?
574            .send_payment_v2(SendPaymentRequest {
575                payment_request: invoice.clone(),
576                ..Default::default()
577            })
578            .await?
579            .into_inner();
580
581        while let Some(update) = payment.message().await? {
582            match update.status() {
583                tonic_lnd::lnrpc::payment::PaymentStatus::Succeeded => return Ok(()),
584                tonic_lnd::lnrpc::payment::PaymentStatus::InFlight => {}
585                _ => return Err(anyhow!("LND lightning payment failed")),
586            }
587        }
588
589        Err(anyhow!("LND lightning payment unknown status"))
590    }
591
592    pub async fn wait_bolt11_invoice(&self, payment_hash: Vec<u8>) -> anyhow::Result<()> {
593        let invoice_status = self
594            .lightning_client_lock()
595            .await?
596            .lookup_invoice(tonic_lnd::lnrpc::PaymentHash {
597                r_hash: payment_hash,
598                ..Default::default()
599            })
600            .await?
601            .into_inner()
602            .state();
603        anyhow::ensure!(invoice_status == tonic_lnd::lnrpc::invoice::InvoiceState::Settled);
604
605        Ok(())
606    }
607
608    pub async fn create_hold_invoice(
609        &self,
610        amount: u64,
611    ) -> anyhow::Result<([u8; 32], String, sha256::Hash)> {
612        let preimage = rand::random::<[u8; 32]>();
613        let hash = {
614            let mut engine = bitcoin::hashes::sha256::Hash::engine();
615            bitcoin::hashes::HashEngine::input(&mut engine, &preimage);
616            bitcoin::hashes::sha256::Hash::from_engine(engine)
617        };
618        let cltv_expiry = 650;
619        let hold_request = self
620            .invoices_client_lock()
621            .await?
622            .add_hold_invoice(tonic_lnd::invoicesrpc::AddHoldInvoiceRequest {
623                value_msat: amount as i64,
624                hash: hash.to_byte_array().to_vec(),
625                cltv_expiry,
626                ..Default::default()
627            })
628            .await?
629            .into_inner();
630        let payment_request = hold_request.payment_request;
631        Ok((preimage, payment_request, hash))
632    }
633
634    pub async fn settle_hold_invoice(
635        &self,
636        preimage: [u8; 32],
637        payment_hash: sha256::Hash,
638    ) -> anyhow::Result<()> {
639        let mut hold_invoice_subscription = self
640            .invoices_client_lock()
641            .await?
642            .subscribe_single_invoice(tonic_lnd::invoicesrpc::SubscribeSingleInvoiceRequest {
643                r_hash: payment_hash.to_byte_array().to_vec(),
644            })
645            .await?
646            .into_inner();
647        loop {
648            const WAIT_FOR_INVOICE_TIMEOUT: Duration = Duration::from_mins(1);
649            match timeout(
650                WAIT_FOR_INVOICE_TIMEOUT,
651                futures::StreamExt::next(&mut hold_invoice_subscription),
652            )
653            .await
654            {
655                Ok(Some(Ok(invoice))) => {
656                    if invoice.state() == tonic_lnd::lnrpc::invoice::InvoiceState::Accepted {
657                        break;
658                    }
659                    debug!("hold invoice payment state: {:?}", invoice.state());
660                }
661                Ok(Some(Err(e))) => {
662                    bail!("error in invoice subscription: {e:?}");
663                }
664                Ok(None) => {
665                    bail!("invoice subscription ended before invoice was accepted");
666                }
667                Err(_) => {
668                    bail!("timed out waiting for invoice to be accepted")
669                }
670            }
671        }
672
673        self.invoices_client_lock()
674            .await?
675            .settle_invoice(tonic_lnd::invoicesrpc::SettleInvoiceMsg {
676                preimage: preimage.to_vec(),
677            })
678            .await?;
679
680        Ok(())
681    }
682}
683
684pub type NamedGateway<'a> = (&'a Gatewayd, &'a str);
685
686#[allow(clippy::similar_names)]
687pub async fn open_channels_between_gateways(
688    bitcoind: &Bitcoind,
689    gateways: &[NamedGateway<'_>],
690) -> Result<()> {
691    let block_height = bitcoind.get_block_count().await? - 1;
692    debug!(target: LOG_DEVIMINT, ?block_height, "Syncing gateway lightning nodes to block height...");
693    futures::future::try_join_all(
694        gateways
695            .iter()
696            .map(|(gw, _gw_name)| async { gw.client().wait_for_block_height(block_height).await }),
697    )
698    .await?;
699
700    info!(target: LOG_DEVIMINT, "Funding all gateway lightning nodes...");
701    for (gw, _gw_name) in gateways {
702        let funding_addr = gw.client().get_ln_onchain_address().await?;
703        bitcoind.send_to(funding_addr, 100_000_000).await?;
704    }
705
706    bitcoind.mine_blocks(10).await?;
707
708    info!(target: LOG_DEVIMINT, "Gateway lightning nodes funded.");
709
710    let block_height = bitcoind.get_block_count().await? - 1;
711    debug!(target: LOG_DEVIMINT, ?block_height, "Syncing gateway lightning nodes to block height...");
712    futures::future::try_join_all(
713        gateways
714            .iter()
715            .map(|(gw, _gw_name)| async { gw.client().wait_for_block_height(block_height).await }),
716    )
717    .await?;
718
719    // Cyclic pairs of gateways, so every node gets a channel with every other
720    // node for our typical 3-gateway setup. For a list [A, B, C] this produces
721    // [(A, B), (B, C), (C, A)]. The first gateway in each pair initiates the
722    // channel open, so each Lightning node opens exactly one channel.
723    let gateway_pairs: Vec<(&NamedGateway, &NamedGateway)> =
724        gateways.iter().circular_tuple_windows::<(_, _)>().collect();
725
726    info!(target: LOG_DEVIMINT, block_height = %block_height, "devimint current block");
727    let sats_per_side = 5_000_000;
728    for ((gw_a, gw_a_name), (gw_b, gw_b_name)) in &gateway_pairs {
729        info!(target: LOG_DEVIMINT, from=%gw_a_name, to=%gw_b_name, "Opening channel with {sats_per_side} sats on each side...");
730        let txid = gw_a
731            .client()
732            .open_channel(gw_b, sats_per_side * 2, Some(sats_per_side))
733            .await?;
734
735        bitcoind.poll_get_transaction(txid).await?;
736    }
737
738    bitcoind.mine_blocks(10).await?;
739
740    let block_height = bitcoind.get_block_count().await? - 1;
741    debug!(target: LOG_DEVIMINT, ?block_height, "Syncing gateway lightning nodes to block height...");
742    futures::future::try_join_all(
743        gateways
744            .iter()
745            .map(|(gw, _gw_name)| async { gw.client().wait_for_block_height(block_height).await }),
746    )
747    .await?;
748
749    for ((gw_a, _gw_a_name), (gw_b, _gw_b_name)) in &gateway_pairs {
750        let gw_a_node_pubkey = gw_a.client().lightning_pubkey().await?;
751        let gw_b_node_pubkey = gw_b.client().lightning_pubkey().await?;
752
753        wait_for_ready_channel_on_gateway_with_counterparty(gw_b, gw_a_node_pubkey).await?;
754        wait_for_ready_channel_on_gateway_with_counterparty(gw_a, gw_b_node_pubkey).await?;
755    }
756
757    info!(target: LOG_DEVIMINT, "open_channels_between_gateways successful");
758
759    Ok(())
760}
761
762async fn wait_for_ready_channel_on_gateway_with_counterparty(
763    gw: &Gatewayd,
764    counterparty_lightning_node_pubkey: bitcoin::secp256k1::PublicKey,
765) -> anyhow::Result<()> {
766    poll(
767        &format!("Wait for {} channel update", gw.gw_name),
768        || async {
769            let channels = gw
770                .client()
771                .list_channels()
772                .await
773                .context("list channels")
774                .map_err(ControlFlow::Break)?;
775
776            if channels
777                .iter()
778                .any(|channel| channel.remote_pubkey == counterparty_lightning_node_pubkey && channel.is_active)
779            {
780                return Ok(());
781            }
782
783            debug!(target: LOG_DEVIMINT, ?channels, gw = gw.gw_name, "Counterparty channels not found open");
784            Err(ControlFlow::Continue(anyhow!("channel not found")))
785        },
786    )
787    .await
788}
789
790#[derive(Clone)]
791pub enum LightningNode {
792    Lnd(Lnd),
793    Ldk {
794        name: String,
795        gw_port: u16,
796        ldk_port: u16,
797        metrics_port: u16,
798    },
799}
800
801impl LightningNode {
802    pub fn ln_type(&self) -> LightningNodeType {
803        match self {
804            LightningNode::Lnd(_) => LightningNodeType::Lnd,
805            LightningNode::Ldk {
806                name: _,
807                gw_port: _,
808                ldk_port: _,
809                metrics_port: _,
810            } => LightningNodeType::Ldk,
811        }
812    }
813}
814
815#[derive(Clone)]
816pub struct Esplora {
817    _process: ProcessHandle,
818    _bitcoind: Bitcoind,
819}
820
821impl Esplora {
822    pub async fn new(process_mgr: &ProcessManager, bitcoind: Bitcoind) -> Result<Self> {
823        debug!("Starting esplora");
824        let daemon_dir = process_mgr
825            .globals
826            .FM_BTC_DIR
827            .to_str()
828            .context("non utf8 path")?;
829        let esplora_dir = process_mgr
830            .globals
831            .FM_ESPLORA_DIR
832            .to_str()
833            .context("non utf8 path")?;
834
835        let btc_rpc_port = process_mgr.globals.FM_PORT_BTC_RPC;
836        let esplora_port = process_mgr.globals.FM_PORT_ESPLORA;
837        let esplora_monitoring_port = process_mgr.globals.FM_PORT_ESPLORA_MONITORING;
838        // spawn esplora
839        let cmd = cmd!(
840            crate::util::Esplora,
841            "--daemon-dir={daemon_dir}",
842            "--db-dir={esplora_dir}",
843            "--cookie=bitcoin:bitcoin",
844            "--network=regtest",
845            "--daemon-rpc-addr=127.0.0.1:{btc_rpc_port}",
846            "--http-addr=127.0.0.1:{esplora_port}",
847            "--monitoring-addr=127.0.0.1:{esplora_monitoring_port}",
848            "--jsonrpc-import", // Workaround for incompatible on-disk format
849            "--cors=*",         // Allow CORS for WASM recovery tool
850        );
851        let process = process_mgr.spawn_daemon("esplora", cmd).await?;
852
853        Self::wait_for_ready(process_mgr).await?;
854        debug!(target: LOG_DEVIMINT, "Esplora ready");
855        if let Some(pid) = process.id().await {
856            write_overwrite_async(
857                process_mgr.globals.FM_TEST_DIR.join("esplora.pid"),
858                pid.to_string(),
859            )
860            .await?;
861        }
862
863        Ok(Self {
864            _bitcoind: bitcoind,
865            _process: process,
866        })
867    }
868
869    /// Wait until the server is able to respond to requests.
870    async fn wait_for_ready(process_mgr: &ProcessManager) -> Result<()> {
871        let client = esplora_client::Builder::new(&format!(
872            "http://localhost:{}",
873            process_mgr.globals.FM_PORT_ESPLORA
874        ))
875        // Disable retrying in the client since we're already retrying in the poll below.
876        .max_retries(0)
877        .build_async()
878        .expect("esplora client build failed");
879
880        poll("esplora server ready", || async {
881            client
882                .get_fee_estimates()
883                .await
884                .map_err(|e| ControlFlow::Continue(anyhow::anyhow!(e)))?;
885
886            Ok(())
887        })
888        .await?;
889
890        Ok(())
891    }
892}
893
894#[allow(unused)]
895pub struct ExternalDaemons {
896    pub bitcoind: Bitcoind,
897    pub esplora: Esplora,
898}
899
900pub async fn external_daemons(process_mgr: &ProcessManager) -> Result<ExternalDaemons> {
901    let bitcoind = Bitcoind::new(process_mgr, false).await?;
902    let esplora = Esplora::new(process_mgr, bitcoind.clone()).await?;
903    let start_time = fedimint_core::time::now();
904    // make sure the bitcoind wallet is ready
905    let _ = bitcoind.wallet_client().await?;
906    info!(
907        target: LOG_DEVIMINT,
908        "starting base daemons took {:?}",
909        start_time.elapsed()?
910    );
911    Ok(ExternalDaemons { bitcoind, esplora })
912}