devimint/
external.rs

1use std::ops::ControlFlow;
2use std::str::FromStr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::{Context, Result, anyhow, bail};
7use bitcoin::hashes::{Hash, sha256};
8use bitcoincore_rpc::bitcoin::{Address, BlockHash};
9use bitcoincore_rpc::bitcoincore_rpc_json::{GetBalancesResult, GetBlockchainInfoResult};
10use bitcoincore_rpc::jsonrpc::error::RpcError;
11use bitcoincore_rpc::{Auth, RpcApi};
12use fedimint_core::encoding::Encodable;
13use fedimint_core::rustls::install_crypto_provider;
14use fedimint_core::task::jit::{JitTry, JitTryAnyhow};
15use fedimint_core::task::{block_in_place, sleep, timeout};
16use fedimint_core::util::backoff_util::api_networking_backoff;
17use fedimint_core::util::{FmtCompact as _, SafeUrl, retry, write_overwrite_async};
18use fedimint_logging::LOG_DEVIMINT;
19use fedimint_testing_core::node_type::LightningNodeType;
20use futures::StreamExt;
21use hex::ToHex;
22use itertools::Itertools;
23use tokio::fs;
24use tokio::sync::{MappedMutexGuard, Mutex, MutexGuard};
25use tokio::task::spawn_blocking;
26use tokio::time::Instant;
27use tonic_lnd::Client as LndClient;
28use tonic_lnd::lnrpc::GetInfoRequest;
29use tonic_lnd::routerrpc::SendPaymentRequest;
30use tracing::{debug, info, trace, warn};
31
32use crate::util::{ProcessHandle, ProcessManager, poll};
33use crate::vars::utf8;
34use crate::{Gatewayd, cmd};
35
36#[derive(Clone)]
37pub struct Bitcoind {
38    pub client: Arc<bitcoincore_rpc::Client>,
39    pub(crate) wallet_client: Arc<JitTryAnyhow<Arc<bitcoincore_rpc::Client>>>,
40    pub(crate) _process: ProcessHandle,
41}
42
43impl Bitcoind {
44    pub async fn new(processmgr: &ProcessManager, skip_setup: bool) -> Result<Self> {
45        let btc_dir = utf8(&processmgr.globals.FM_BTC_DIR);
46
47        let conf = format!(
48            include_str!("cfg/bitcoin.conf"),
49            rpc_port = processmgr.globals.FM_PORT_BTC_RPC,
50            p2p_port = processmgr.globals.FM_PORT_BTC_P2P,
51            zmq_pub_raw_block = processmgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_BLOCK,
52            zmq_pub_raw_tx = processmgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_TX,
53            tx_index = "0",
54        );
55        write_overwrite_async(processmgr.globals.FM_BTC_DIR.join("bitcoin.conf"), conf).await?;
56        let process = processmgr
57            .spawn_daemon(
58                "bitcoind",
59                cmd!(crate::util::Bitcoind, "-datadir={btc_dir}"),
60            )
61            .await?;
62
63        let url: SafeUrl = processmgr.globals.FM_BITCOIN_RPC_URL.parse()?;
64
65        debug!("Parsed FM_BITCOIN_RPC_URL: {:?}", &url);
66
67        let auth = Auth::UserPass(
68            url.username().to_owned(),
69            url.password()
70                .context("Bitcoin RPC URL is missing password")?
71                .to_owned(),
72        );
73
74        let host = url
75            .without_auth()
76            .map_err(|()| anyhow!("Failed to strip auth from Bitcoin Rpc Url"))?
77            .to_string();
78        let wallet_name = "";
79        let host = format!("{host}wallet/{wallet_name}");
80
81        debug!(target: LOG_DEVIMINT, "bitcoind host: {:?}, auth: {:?}", &host, auth);
82        let client =
83            Self::new_bitcoin_rpc(&host, auth.clone()).context("Failed to connect to bitcoind")?;
84        let wallet_client = JitTry::new_try(move || async move {
85            let client =
86                Self::new_bitcoin_rpc(&host, auth).context("Failed to connect to bitcoind")?;
87            Self::init(&client, skip_setup).await?;
88            Ok(Arc::new(client))
89        });
90
91        let bitcoind = Self {
92            _process: process,
93            client: Arc::new(client),
94            wallet_client: Arc::new(wallet_client),
95        };
96
97        bitcoind.poll_ready().await?;
98
99        // To have a ChainId we always need at least one block.
100        let addr = bitcoind.get_new_address().await?;
101        bitcoind.generate_to_address(1, addr).await?;
102
103        Ok(bitcoind)
104    }
105
106    fn new_bitcoin_rpc(
107        url: &str,
108        auth: bitcoincore_rpc::Auth,
109    ) -> anyhow::Result<bitcoincore_rpc::Client> {
110        // The default (15s) is too low for some test environments
111        const RPC_TIMEOUT: Duration = Duration::from_secs(45);
112        let mut builder = bitcoincore_rpc::jsonrpc::simple_http::Builder::new()
113            .url(url)?
114            .timeout(RPC_TIMEOUT);
115        let (user, pass) = auth.get_user_pass()?;
116        if let Some(user) = user {
117            builder = builder.auth(user, pass);
118        }
119        let client = bitcoincore_rpc::jsonrpc::Client::with_transport(builder.build());
120        Ok(bitcoincore_rpc::Client::from_jsonrpc(client))
121    }
122
123    pub(crate) async fn init(client: &bitcoincore_rpc::Client, skip_setup: bool) -> Result<()> {
124        debug!(target: LOG_DEVIMINT, "Setting up bitcoind...");
125        // create RPC wallet
126        for attempt in 0.. {
127            match block_in_place(|| client.create_wallet("", None, None, None, None)) {
128                Ok(_) => {
129                    break;
130                }
131                Err(err) => {
132                    if err.to_string().contains("Database already exists") {
133                        break;
134                    }
135                    if attempt % 20 == 19 {
136                        debug!(target: LOG_DEVIMINT, %attempt, err = %err.fmt_compact(), "Waiting for initial bitcoind wallet initialization");
137                    }
138                    sleep(Duration::from_millis(100)).await;
139                }
140            }
141        }
142
143        if !skip_setup {
144            // mine blocks
145            let blocks = 101;
146            let address = block_in_place(|| client.get_new_address(None, None))?
147                .require_network(bitcoin::Network::Regtest)
148                .expect("Devimint always runs in regtest");
149            debug!(target: LOG_DEVIMINT, blocks_num=blocks, %address, "Mining blocks to address");
150            block_in_place(|| {
151                client
152                    .generate_to_address(blocks, &address)
153                    .context("Failed to generate blocks")
154            })?;
155            trace!(target: LOG_DEVIMINT, blocks_num=blocks, %address, "Mining blocks to address complete");
156        }
157
158        // wait bitcoind is ready
159        poll("bitcoind", || async {
160            let info = block_in_place(|| client.get_blockchain_info())
161                .context("bitcoind getblockchaininfo")
162                .map_err(ControlFlow::Continue)?;
163            if info.blocks > 100 {
164                Ok(())
165            } else {
166                Err(ControlFlow::Continue(anyhow!(
167                    "not enough blocks: {}",
168                    info.blocks
169                )))
170            }
171        })
172        .await?;
173        debug!("Bitcoind ready");
174        Ok(())
175    }
176
177    /// Poll until bitcoind rpc responds for basic commands
178    async fn poll_ready(&self) -> anyhow::Result<()> {
179        poll("bitcoind rpc ready", || async {
180            self.get_block_count()
181                .await
182                .map_err(ControlFlow::Continue::<anyhow::Error, _>)?;
183            Ok(())
184        })
185        .await
186    }
187
188    /// Client that can has wallet initialized, can generate internal addresses
189    /// and send funds
190    pub async fn wallet_client(&self) -> anyhow::Result<&Self> {
191        self.wallet_client.get_try().await?;
192        Ok(self)
193    }
194
195    /// Returns the total number of blocks in the chain.
196    ///
197    /// Fedimint's IBitcoindRpc considers block count the total number of
198    /// blocks, where bitcoind's rpc returns the height. Since the genesis
199    /// block has height 0, we need to add 1 to get the total block count.
200    pub async fn get_block_count(&self) -> Result<u64> {
201        let client = self.client.clone();
202        Ok(spawn_blocking(move || client.get_block_count()).await?? + 1)
203    }
204
205    pub async fn mine_blocks_no_wait(&self, block_num: u64) -> Result<u64> {
206        let start_time = Instant::now();
207        debug!(target: LOG_DEVIMINT, ?block_num, "Mining bitcoin blocks");
208        let addr = self.get_new_address().await?;
209        let initial_block_count = self.get_block_count().await?;
210        self.generate_to_address(block_num, addr).await?;
211        debug!(target: LOG_DEVIMINT,
212            elapsed_ms = %start_time.elapsed().as_millis(),
213            ?block_num, "Mined blocks (no wait)");
214
215        Ok(initial_block_count)
216    }
217
218    pub async fn mine_blocks(&self, block_num: u64) -> Result<()> {
219        // `Stdio::piped` can't even parse outputs larger
220        // than pipe buffer size (64K on Linux, 16K on MacOS), so
221        // we should split larger requesteds into smaller chunks.
222        //
223        // On top of it mining a lot of blocks is just slow, so should
224        // be avoided.
225        const BLOCK_NUM_LIMIT: u64 = 32;
226
227        if BLOCK_NUM_LIMIT < block_num {
228            warn!(
229                target: LOG_DEVIMINT,
230                %block_num,
231                "Mining a lot of blocks (even when split) is a terrible idea and can lead to issues. Splitting request just to make it work somehow."
232            );
233            let mut block_num = block_num;
234
235            loop {
236                if BLOCK_NUM_LIMIT < block_num {
237                    block_num -= BLOCK_NUM_LIMIT;
238                    Box::pin(async { self.mine_blocks(BLOCK_NUM_LIMIT).await }).await?;
239                } else {
240                    Box::pin(async { self.mine_blocks(block_num).await }).await?;
241                    return Ok(());
242                }
243            }
244        }
245        let start_time = Instant::now();
246        debug!(target: LOG_DEVIMINT, ?block_num, "Mining bitcoin blocks");
247        let addr = self.get_new_address().await?;
248        let initial_block_count = self.get_block_count().await?;
249        self.generate_to_address(block_num, addr).await?;
250        while self.get_block_count().await? < initial_block_count + block_num {
251            trace!(target: LOG_DEVIMINT, ?block_num, "Waiting for blocks to be mined");
252            sleep(Duration::from_millis(100)).await;
253        }
254
255        debug!(target: LOG_DEVIMINT,
256            elapsed_ms = %start_time.elapsed().as_millis(),
257            ?block_num, "Mined blocks");
258
259        Ok(())
260    }
261
262    pub async fn send_to(&self, addr: String, amount: u64) -> Result<bitcoin::Txid> {
263        debug!(target: LOG_DEVIMINT, amount, addr, "Sending funds from bitcoind");
264        let amount = bitcoin::Amount::from_sat(amount);
265        let tx = self
266            .wallet_client()
267            .await?
268            .send_to_address(
269                bitcoin::Address::from_str(&addr)?
270                    .require_network(bitcoin::Network::Regtest)
271                    .expect("Devimint always runs in regtest"),
272                amount,
273            )
274            .await?;
275        Ok(tx)
276    }
277
278    pub async fn get_txout_proof(&self, txid: bitcoin::Txid) -> Result<String> {
279        let client = self.wallet_client().await?.clone();
280        let proof = spawn_blocking(move || client.client.get_tx_out_proof(&[txid], None)).await??;
281        Ok(proof.encode_hex())
282    }
283
284    /// Poll a transaction by its txid until it is found in the mempool or in a
285    /// block.
286    pub async fn poll_get_transaction(&self, txid: bitcoin::Txid) -> anyhow::Result<String> {
287        poll("Waiting for transaction in mempool", || async {
288            match self
289                .get_transaction(txid)
290                .await
291                .context("getrawtransaction")
292            {
293                Ok(Some(tx)) => Ok(tx),
294                Ok(None) => Err(ControlFlow::Continue(anyhow::anyhow!(
295                    "Transaction not found yet"
296                ))),
297                Err(err) => Err(ControlFlow::Break(err)),
298            }
299        })
300        .await
301    }
302
303    /// Get a transaction by its txid. Checks the mempool and all blocks.
304    async fn get_transaction(&self, txid: bitcoin::Txid) -> Result<Option<String>> {
305        // Check the mempool.
306        match self.get_raw_transaction(txid, None).await {
307            // The RPC succeeded, and the transaction was not found in the mempool. Continue to
308            // check blocks.
309            Ok(None) => {}
310            // The RPC failed, or the transaction was found in the mempool. Return the result.
311            other => return other,
312        }
313
314        let block_height = self.get_block_count().await? - 1;
315
316        // Check each block for the tx, starting at the chain tip.
317        // Buffer the requests to avoid spamming bitcoind.
318        // We're doing this after checking the mempool since the tx should
319        // usually be in the mempool, and we don't want to needlessly hit
320        // the bitcoind with block requests.
321        let mut buffered_tx_stream = futures::stream::iter((0..block_height).rev())
322            .map(|height| async move {
323                let block_hash = self.get_block_hash(height).await?;
324                self.get_raw_transaction(txid, Some(block_hash)).await
325            })
326            .buffered(32);
327
328        while let Some(tx_or) = buffered_tx_stream.next().await {
329            match tx_or {
330                // The RPC succeeded, and the transaction was not found in the block. Continue to
331                // the next block.
332                Ok(None) => {}
333                // The RPC failed, or the transaction was found in the block. Return the result.
334                other => return other,
335            }
336        }
337
338        // The transaction was not found in the mempool or any block.
339        Ok(None)
340    }
341
342    async fn get_raw_transaction(
343        &self,
344        txid: bitcoin::Txid,
345        block_hash: Option<BlockHash>,
346    ) -> Result<Option<String>> {
347        let client = self.client.clone();
348        let tx_or =
349            spawn_blocking(move || client.get_raw_transaction(&txid, block_hash.as_ref())).await?;
350
351        let tx = match tx_or {
352            Ok(tx) => tx,
353            // `getrawtransaction` returns a JSON-RPC error with code -5 if the command
354            // reaches bitcoind but is not found. See here:
355            // https://github.com/bitcoin/bitcoin/blob/25dacae9c7feb31308271e2fd5a127c1fc230c2f/src/rpc/rawtransaction.cpp#L360-L376
356            // https://github.com/bitcoin/bitcoin/blob/25dacae9c7feb31308271e2fd5a127c1fc230c2f/src/rpc/protocol.h#L42
357            Err(bitcoincore_rpc::Error::JsonRpc(bitcoincore_rpc::jsonrpc::Error::Rpc(
358                RpcError { code: -5, .. },
359            ))) => return Ok(None),
360            Err(err) => return Err(err.into()),
361        };
362        let bytes = tx.consensus_encode_to_vec();
363        Ok(Some(bytes.encode_hex()))
364    }
365
366    async fn get_block_hash(&self, height: u64) -> Result<BlockHash> {
367        let client = self.client.clone();
368        Ok(spawn_blocking(move || client.get_block_hash(height)).await??)
369    }
370
371    pub async fn get_new_address(&self) -> Result<Address> {
372        let client = self.wallet_client().await?.clone();
373        let addr = spawn_blocking(move || client.client.get_new_address(None, None))
374            .await??
375            .require_network(bitcoin::Network::Regtest)
376            .expect("Devimint always runs in regtest");
377        Ok(addr)
378    }
379
380    pub async fn generate_to_address(
381        &self,
382        block_num: u64,
383        address: Address,
384    ) -> Result<Vec<BlockHash>> {
385        let client = self.wallet_client().await?.clone();
386        Ok(
387            spawn_blocking(move || client.client.generate_to_address(block_num, &address))
388                .await??,
389        )
390    }
391
392    pub async fn get_blockchain_info(&self) -> anyhow::Result<GetBlockchainInfoResult> {
393        let client = self.client.clone();
394        Ok(spawn_blocking(move || client.get_blockchain_info()).await??)
395    }
396
397    pub async fn send_to_address(
398        &self,
399        addr: Address,
400        amount: bitcoin::Amount,
401    ) -> anyhow::Result<bitcoin::Txid> {
402        let client = self.wallet_client().await?.clone();
403
404        let raw_client = client.client.clone();
405        let txid = spawn_blocking(move || {
406            raw_client.send_to_address(&addr, amount, None, None, None, None, None, None)
407        })
408        .await??;
409
410        // Downstream code expects that mining blocks will include this
411        // tx. Seems like this is not always immediately the case in Bitcoin Knobs.
412        retry(
413            "await-genesis-tx-processed".to_string(),
414            api_networking_backoff(),
415            || async {
416                if client.get_transaction(txid).await?.is_none() {
417                    bail!("Genesis tx not visible yet = {}", txid);
418                }
419
420                Ok(())
421            },
422        )
423        .await
424        .expect("Number of retries has no limit");
425        Ok(txid)
426    }
427
428    pub(crate) async fn get_balances(&self) -> anyhow::Result<GetBalancesResult> {
429        let client = self.wallet_client().await?.clone();
430        Ok(spawn_blocking(move || client.client.get_balances()).await??)
431    }
432
433    pub(crate) fn get_jsonrpc_client(&self) -> &bitcoincore_rpc::jsonrpc::Client {
434        self.client.get_jsonrpc_client()
435    }
436}
437
438#[derive(Clone)]
439pub struct Lnd {
440    pub(crate) client: Arc<Mutex<LndClient>>,
441    pub(crate) process: ProcessHandle,
442    pub(crate) _bitcoind: Bitcoind,
443}
444
445impl Lnd {
446    pub async fn new(process_mgr: &ProcessManager, bitcoind: Bitcoind) -> Result<Self> {
447        let (process, client) = Lnd::start(process_mgr).await?;
448        let this = Self {
449            _bitcoind: bitcoind,
450            client: Arc::new(Mutex::new(client)),
451            process,
452        };
453        // wait for lnd rpc to be active
454        poll("lnd_startup", || async {
455            this.pub_key().await.map_err(ControlFlow::Continue)
456        })
457        .await?;
458        Ok(this)
459    }
460
461    pub async fn start(process_mgr: &ProcessManager) -> Result<(ProcessHandle, LndClient)> {
462        let conf = format!(
463            include_str!("cfg/lnd.conf"),
464            listen_port = process_mgr.globals.FM_PORT_LND_LISTEN,
465            rpc_port = process_mgr.globals.FM_PORT_LND_RPC,
466            rest_port = process_mgr.globals.FM_PORT_LND_REST,
467            btc_rpc_port = process_mgr.globals.FM_PORT_BTC_RPC,
468            zmq_pub_raw_block = process_mgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_BLOCK,
469            zmq_pub_raw_tx = process_mgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_TX,
470        );
471        write_overwrite_async(process_mgr.globals.FM_LND_DIR.join("lnd.conf"), conf).await?;
472        let cmd = cmd!(
473            crate::util::Lnd,
474            format!("--lnddir={}", utf8(&process_mgr.globals.FM_LND_DIR))
475        );
476
477        let process = process_mgr.spawn_daemon("lnd", cmd).await?;
478        let lnd_rpc_addr = &process_mgr.globals.FM_LND_RPC_ADDR;
479        let lnd_macaroon = &process_mgr.globals.FM_LND_MACAROON;
480        let lnd_tls_cert = &process_mgr.globals.FM_LND_TLS_CERT;
481        poll("wait for lnd files", || async {
482            if fs::try_exists(lnd_tls_cert)
483                .await
484                .context("lnd tls cert")
485                .map_err(ControlFlow::Continue)?
486                && fs::try_exists(lnd_macaroon)
487                    .await
488                    .context("lnd macaroon")
489                    .map_err(ControlFlow::Continue)?
490            {
491                Ok(())
492            } else {
493                Err(ControlFlow::Continue(anyhow!(
494                    "lnd tls cert or lnd macaroon not found"
495                )))
496            }
497        })
498        .await?;
499
500        install_crypto_provider().await;
501
502        let client = poll("lnd_connect", || async {
503            tonic_lnd::connect(
504                lnd_rpc_addr.clone(),
505                lnd_tls_cert.clone(),
506                lnd_macaroon.clone(),
507            )
508            .await
509            .context("lnd connect")
510            .map_err(ControlFlow::Continue)
511        })
512        .await?;
513
514        Ok((process, client))
515    }
516
517    pub async fn lightning_client_lock(
518        &self,
519    ) -> Result<MappedMutexGuard<'_, tonic_lnd::LightningClient>> {
520        let guard = self.client.lock().await;
521        Ok(MutexGuard::map(guard, |client| client.lightning()))
522    }
523
524    pub async fn invoices_client_lock(
525        &self,
526    ) -> Result<MappedMutexGuard<'_, tonic_lnd::InvoicesClient>> {
527        let guard = self.client.lock().await;
528        Ok(MutexGuard::map(guard, |client| client.invoices()))
529    }
530
531    pub async fn router_client_lock(
532        &self,
533    ) -> Result<MappedMutexGuard<'_, tonic_lnd::RouterClient>> {
534        let guard = self.client.lock().await;
535        Ok(MutexGuard::map(guard, |client| client.router()))
536    }
537
538    pub async fn pub_key(&self) -> Result<String> {
539        Ok(self
540            .lightning_client_lock()
541            .await?
542            .get_info(GetInfoRequest {})
543            .await?
544            .into_inner()
545            .identity_pubkey)
546    }
547
548    pub async fn terminate(self) -> Result<()> {
549        self.process.terminate().await
550    }
551
552    pub async fn invoice(&self, amount: u64) -> anyhow::Result<(String, Vec<u8>)> {
553        let add_invoice = self
554            .lightning_client_lock()
555            .await?
556            .add_invoice(tonic_lnd::lnrpc::Invoice {
557                value_msat: amount as i64,
558                ..Default::default()
559            })
560            .await?
561            .into_inner();
562        let invoice = add_invoice.payment_request;
563        let payment_hash = add_invoice.r_hash;
564        Ok((invoice, payment_hash))
565    }
566
567    pub async fn pay_bolt11_invoice(&self, invoice: String) -> anyhow::Result<()> {
568        let mut payment = self
569            .router_client_lock()
570            .await?
571            .send_payment_v2(SendPaymentRequest {
572                payment_request: invoice.clone(),
573                ..Default::default()
574            })
575            .await?
576            .into_inner();
577
578        while let Some(update) = payment.message().await? {
579            match update.status() {
580                tonic_lnd::lnrpc::payment::PaymentStatus::Succeeded => return Ok(()),
581                tonic_lnd::lnrpc::payment::PaymentStatus::InFlight => {}
582                _ => return Err(anyhow!("LND lightning payment failed")),
583            }
584        }
585
586        Err(anyhow!("LND lightning payment unknown status"))
587    }
588
589    pub async fn wait_bolt11_invoice(&self, payment_hash: Vec<u8>) -> anyhow::Result<()> {
590        let invoice_status = self
591            .lightning_client_lock()
592            .await?
593            .lookup_invoice(tonic_lnd::lnrpc::PaymentHash {
594                r_hash: payment_hash,
595                ..Default::default()
596            })
597            .await?
598            .into_inner()
599            .state();
600        anyhow::ensure!(invoice_status == tonic_lnd::lnrpc::invoice::InvoiceState::Settled);
601
602        Ok(())
603    }
604
605    pub async fn create_hold_invoice(
606        &self,
607        amount: u64,
608    ) -> anyhow::Result<([u8; 32], String, sha256::Hash)> {
609        let preimage = rand::random::<[u8; 32]>();
610        let hash = {
611            let mut engine = bitcoin::hashes::sha256::Hash::engine();
612            bitcoin::hashes::HashEngine::input(&mut engine, &preimage);
613            bitcoin::hashes::sha256::Hash::from_engine(engine)
614        };
615        let cltv_expiry = 650;
616        let hold_request = self
617            .invoices_client_lock()
618            .await?
619            .add_hold_invoice(tonic_lnd::invoicesrpc::AddHoldInvoiceRequest {
620                value_msat: amount as i64,
621                hash: hash.to_byte_array().to_vec(),
622                cltv_expiry,
623                ..Default::default()
624            })
625            .await?
626            .into_inner();
627        let payment_request = hold_request.payment_request;
628        Ok((preimage, payment_request, hash))
629    }
630
631    pub async fn settle_hold_invoice(
632        &self,
633        preimage: [u8; 32],
634        payment_hash: sha256::Hash,
635    ) -> anyhow::Result<()> {
636        let mut hold_invoice_subscription = self
637            .invoices_client_lock()
638            .await?
639            .subscribe_single_invoice(tonic_lnd::invoicesrpc::SubscribeSingleInvoiceRequest {
640                r_hash: payment_hash.to_byte_array().to_vec(),
641            })
642            .await?
643            .into_inner();
644        loop {
645            const WAIT_FOR_INVOICE_TIMEOUT: Duration = Duration::from_secs(60);
646            match timeout(
647                WAIT_FOR_INVOICE_TIMEOUT,
648                futures::StreamExt::next(&mut hold_invoice_subscription),
649            )
650            .await
651            {
652                Ok(Some(Ok(invoice))) => {
653                    if invoice.state() == tonic_lnd::lnrpc::invoice::InvoiceState::Accepted {
654                        break;
655                    }
656                    debug!("hold invoice payment state: {:?}", invoice.state());
657                }
658                Ok(Some(Err(e))) => {
659                    bail!("error in invoice subscription: {e:?}");
660                }
661                Ok(None) => {
662                    bail!("invoice subscription ended before invoice was accepted");
663                }
664                Err(_) => {
665                    bail!("timed out waiting for invoice to be accepted")
666                }
667            }
668        }
669
670        self.invoices_client_lock()
671            .await?
672            .settle_invoice(tonic_lnd::invoicesrpc::SettleInvoiceMsg {
673                preimage: preimage.to_vec(),
674            })
675            .await?;
676
677        Ok(())
678    }
679}
680
681pub type NamedGateway<'a> = (&'a Gatewayd, &'a str);
682
683#[allow(clippy::similar_names)]
684pub async fn open_channels_between_gateways(
685    bitcoind: &Bitcoind,
686    gateways: &[NamedGateway<'_>],
687) -> Result<()> {
688    let block_height = bitcoind.get_block_count().await? - 1;
689    debug!(target: LOG_DEVIMINT, ?block_height, "Syncing gateway lightning nodes to block height...");
690    futures::future::try_join_all(
691        gateways
692            .iter()
693            .map(|(gw, _gw_name)| gw.wait_for_block_height(block_height)),
694    )
695    .await?;
696
697    info!(target: LOG_DEVIMINT, "Funding all gateway lightning nodes...");
698    for (gw, _gw_name) in gateways {
699        let funding_addr = gw.get_ln_onchain_address().await?;
700        bitcoind.send_to(funding_addr, 100_000_000).await?;
701    }
702
703    bitcoind.mine_blocks(10).await?;
704
705    info!(target: LOG_DEVIMINT, "Gateway lightning nodes funded.");
706
707    let block_height = bitcoind.get_block_count().await? - 1;
708    debug!(target: LOG_DEVIMINT, ?block_height, "Syncing gateway lightning nodes to block height...");
709    futures::future::try_join_all(
710        gateways
711            .iter()
712            .map(|(gw, _gw_name)| gw.wait_for_block_height(block_height)),
713    )
714    .await?;
715
716    // All unique pairs of gateways.
717    // For a list of gateways [A, B, C], this will produce [(A, B), (B, C)].
718    // Since the first gateway within each pair initiates the channel open,
719    // order within each pair needs to be enforced so that each Lightning node opens
720    // 1 channel.
721    let gateway_pairs: Vec<(&NamedGateway, &NamedGateway)> =
722        gateways.iter().tuple_windows::<(_, _)>().collect();
723
724    info!(target: LOG_DEVIMINT, block_height = %block_height, "devimint current block");
725    let sats_per_side = 5_000_000;
726    for ((gw_a, gw_a_name), (gw_b, gw_b_name)) in &gateway_pairs {
727        info!(target: LOG_DEVIMINT, from=%gw_a_name, to=%gw_b_name, "Opening channel with {sats_per_side} sats on each side...");
728        let txid = gw_a
729            .open_channel(gw_b, sats_per_side * 2, Some(sats_per_side))
730            .await?;
731
732        bitcoind.poll_get_transaction(txid).await?;
733    }
734
735    bitcoind.mine_blocks(10).await?;
736
737    let block_height = bitcoind.get_block_count().await? - 1;
738    debug!(target: LOG_DEVIMINT, ?block_height, "Syncing gateway lightning nodes to block height...");
739    futures::future::try_join_all(
740        gateways
741            .iter()
742            .map(|(gw, _gw_name)| gw.wait_for_block_height(block_height)),
743    )
744    .await?;
745
746    for ((gw_a, _gw_a_name), (gw_b, _gw_b_name)) in &gateway_pairs {
747        let gw_a_node_pubkey = gw_a.lightning_pubkey().await?;
748        let gw_b_node_pubkey = gw_b.lightning_pubkey().await?;
749
750        wait_for_ready_channel_on_gateway_with_counterparty(gw_b, gw_a_node_pubkey).await?;
751        wait_for_ready_channel_on_gateway_with_counterparty(gw_a, gw_b_node_pubkey).await?;
752    }
753
754    info!(target: LOG_DEVIMINT, "open_channels_between_gateways successful");
755
756    Ok(())
757}
758
759async fn wait_for_ready_channel_on_gateway_with_counterparty(
760    gw: &Gatewayd,
761    counterparty_lightning_node_pubkey: bitcoin::secp256k1::PublicKey,
762) -> anyhow::Result<()> {
763    poll(
764        &format!("Wait for {} channel update", gw.gw_name),
765        || async {
766            let channels = gw
767                .list_channels()
768                .await
769                .context("list channels")
770                .map_err(ControlFlow::Break)?;
771
772            if channels
773                .iter()
774                .any(|channel| channel.remote_pubkey == counterparty_lightning_node_pubkey && channel.is_active)
775            {
776                return Ok(());
777            }
778
779            debug!(target: LOG_DEVIMINT, ?channels, gw = gw.gw_name, "Counterparty channels not found open");
780            Err(ControlFlow::Continue(anyhow!("channel not found")))
781        },
782    )
783    .await
784}
785
786#[derive(Clone)]
787pub enum LightningNode {
788    Lnd(Lnd),
789    Ldk {
790        name: String,
791        gw_port: u16,
792        ldk_port: u16,
793        metrics_port: u16,
794    },
795}
796
797impl LightningNode {
798    pub fn ln_type(&self) -> LightningNodeType {
799        match self {
800            LightningNode::Lnd(_) => LightningNodeType::Lnd,
801            LightningNode::Ldk {
802                name: _,
803                gw_port: _,
804                ldk_port: _,
805                metrics_port: _,
806            } => LightningNodeType::Ldk,
807        }
808    }
809}
810
811#[derive(Clone)]
812pub struct Esplora {
813    _process: ProcessHandle,
814    _bitcoind: Bitcoind,
815}
816
817impl Esplora {
818    pub async fn new(process_mgr: &ProcessManager, bitcoind: Bitcoind) -> Result<Self> {
819        debug!("Starting esplora");
820        let daemon_dir = process_mgr
821            .globals
822            .FM_BTC_DIR
823            .to_str()
824            .context("non utf8 path")?;
825        let esplora_dir = process_mgr
826            .globals
827            .FM_ESPLORA_DIR
828            .to_str()
829            .context("non utf8 path")?;
830
831        let btc_rpc_port = process_mgr.globals.FM_PORT_BTC_RPC;
832        let esplora_port = process_mgr.globals.FM_PORT_ESPLORA;
833        let esplora_monitoring_port = process_mgr.globals.FM_PORT_ESPLORA_MONITORING;
834        // spawn esplora
835        let cmd = cmd!(
836            crate::util::Esplora,
837            "--daemon-dir={daemon_dir}",
838            "--db-dir={esplora_dir}",
839            "--cookie=bitcoin:bitcoin",
840            "--network=regtest",
841            "--daemon-rpc-addr=127.0.0.1:{btc_rpc_port}",
842            "--http-addr=127.0.0.1:{esplora_port}",
843            "--monitoring-addr=127.0.0.1:{esplora_monitoring_port}",
844            "--jsonrpc-import", // Workaround for incompatible on-disk format
845        );
846        let process = process_mgr.spawn_daemon("esplora", cmd).await?;
847
848        Self::wait_for_ready(process_mgr).await?;
849        debug!(target: LOG_DEVIMINT, "Esplora ready");
850
851        Ok(Self {
852            _bitcoind: bitcoind,
853            _process: process,
854        })
855    }
856
857    /// Wait until the server is able to respond to requests.
858    async fn wait_for_ready(process_mgr: &ProcessManager) -> Result<()> {
859        let client = esplora_client::Builder::new(&format!(
860            "http://localhost:{}",
861            process_mgr.globals.FM_PORT_ESPLORA
862        ))
863        // Disable retrying in the client since we're already retrying in the poll below.
864        .max_retries(0)
865        .build_async()
866        .expect("esplora client build failed");
867
868        poll("esplora server ready", || async {
869            client
870                .get_fee_estimates()
871                .await
872                .map_err(|e| ControlFlow::Continue(anyhow::anyhow!(e)))?;
873
874            Ok(())
875        })
876        .await?;
877
878        Ok(())
879    }
880}
881
882#[allow(unused)]
883pub struct ExternalDaemons {
884    pub bitcoind: Bitcoind,
885    pub esplora: Esplora,
886}
887
888pub async fn external_daemons(process_mgr: &ProcessManager) -> Result<ExternalDaemons> {
889    let bitcoind = Bitcoind::new(process_mgr, false).await?;
890    let esplora = Esplora::new(process_mgr, bitcoind.clone()).await?;
891    let start_time = fedimint_core::time::now();
892    // make sure the bitcoind wallet is ready
893    let _ = bitcoind.wallet_client().await?;
894    info!(
895        target: LOG_DEVIMINT,
896        "starting base daemons took {:?}",
897        start_time.elapsed()?
898    );
899    Ok(ExternalDaemons { bitcoind, esplora })
900}