devimint/
external.rs

1use std::ops::ControlFlow;
2use std::str::FromStr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::{Context, Result, anyhow, bail};
7use bitcoin::hashes::{Hash, sha256};
8use bitcoincore_rpc::bitcoin::{Address, BlockHash};
9use bitcoincore_rpc::bitcoincore_rpc_json::{GetBalancesResult, GetBlockchainInfoResult};
10use bitcoincore_rpc::jsonrpc::error::RpcError;
11use bitcoincore_rpc::{Auth, RpcApi};
12use fedimint_core::encoding::Encodable;
13use fedimint_core::rustls::install_crypto_provider;
14use fedimint_core::task::jit::{JitTry, JitTryAnyhow};
15use fedimint_core::task::{block_in_place, sleep, timeout};
16use fedimint_core::util::{FmtCompact as _, SafeUrl, write_overwrite_async};
17use fedimint_logging::LOG_DEVIMINT;
18use fedimint_testing_core::node_type::LightningNodeType;
19use futures::StreamExt;
20use hex::ToHex;
21use itertools::Itertools;
22use tokio::fs;
23use tokio::sync::{MappedMutexGuard, Mutex, MutexGuard};
24use tokio::task::spawn_blocking;
25use tokio::time::Instant;
26use tonic_lnd::Client as LndClient;
27use tonic_lnd::lnrpc::GetInfoRequest;
28use tracing::{debug, info, trace, warn};
29
30use crate::util::{ProcessHandle, ProcessManager, poll};
31use crate::vars::utf8;
32use crate::{Gatewayd, cmd};
33
34#[derive(Clone)]
35pub struct Bitcoind {
36    pub client: Arc<bitcoincore_rpc::Client>,
37    pub(crate) wallet_client: Arc<JitTryAnyhow<Arc<bitcoincore_rpc::Client>>>,
38    pub(crate) _process: ProcessHandle,
39}
40
41impl Bitcoind {
42    pub async fn new(processmgr: &ProcessManager, skip_setup: bool) -> Result<Self> {
43        let btc_dir = utf8(&processmgr.globals.FM_BTC_DIR);
44
45        let conf = format!(
46            include_str!("cfg/bitcoin.conf"),
47            rpc_port = processmgr.globals.FM_PORT_BTC_RPC,
48            p2p_port = processmgr.globals.FM_PORT_BTC_P2P,
49            zmq_pub_raw_block = processmgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_BLOCK,
50            zmq_pub_raw_tx = processmgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_TX,
51            tx_index = "0",
52        );
53        write_overwrite_async(processmgr.globals.FM_BTC_DIR.join("bitcoin.conf"), conf).await?;
54        let process = processmgr
55            .spawn_daemon(
56                "bitcoind",
57                cmd!(crate::util::Bitcoind, "-datadir={btc_dir}"),
58            )
59            .await?;
60
61        let url: SafeUrl = processmgr.globals.FM_BITCOIN_RPC_URL.parse()?;
62
63        debug!("Parsed FM_BITCOIN_RPC_URL: {:?}", &url);
64
65        let auth = Auth::UserPass(
66            url.username().to_owned(),
67            url.password()
68                .context("Bitcoin RPC URL is missing password")?
69                .to_owned(),
70        );
71
72        let host = url
73            .without_auth()
74            .map_err(|()| anyhow!("Failed to strip auth from Bitcoin Rpc Url"))?
75            .to_string();
76        let wallet_name = "";
77        let host = format!("{host}wallet/{wallet_name}");
78
79        debug!(target: LOG_DEVIMINT, "bitcoind host: {:?}, auth: {:?}", &host, auth);
80        let client =
81            Self::new_bitcoin_rpc(&host, auth.clone()).context("Failed to connect to bitcoind")?;
82        let wallet_client = JitTry::new_try(move || async move {
83            let client =
84                Self::new_bitcoin_rpc(&host, auth).context("Failed to connect to bitcoind")?;
85            Self::init(&client, skip_setup).await?;
86            Ok(Arc::new(client))
87        });
88
89        let bitcoind = Self {
90            _process: process,
91            client: Arc::new(client),
92            wallet_client: Arc::new(wallet_client),
93        };
94
95        bitcoind.poll_ready().await?;
96
97        Ok(bitcoind)
98    }
99
100    fn new_bitcoin_rpc(
101        url: &str,
102        auth: bitcoincore_rpc::Auth,
103    ) -> anyhow::Result<bitcoincore_rpc::Client> {
104        // The default (15s) is too low for some test environments
105        const RPC_TIMEOUT: Duration = Duration::from_secs(45);
106        let mut builder = bitcoincore_rpc::jsonrpc::simple_http::Builder::new()
107            .url(url)?
108            .timeout(RPC_TIMEOUT);
109        let (user, pass) = auth.get_user_pass()?;
110        if let Some(user) = user {
111            builder = builder.auth(user, pass);
112        }
113        let client = bitcoincore_rpc::jsonrpc::Client::with_transport(builder.build());
114        Ok(bitcoincore_rpc::Client::from_jsonrpc(client))
115    }
116
117    pub(crate) async fn init(client: &bitcoincore_rpc::Client, skip_setup: bool) -> Result<()> {
118        debug!(target: LOG_DEVIMINT, "Setting up bitcoind...");
119        // create RPC wallet
120        for attempt in 0.. {
121            match block_in_place(|| client.create_wallet("", None, None, None, None)) {
122                Ok(_) => {
123                    break;
124                }
125                Err(err) => {
126                    if err.to_string().contains("Database already exists") {
127                        break;
128                    }
129                    if attempt % 20 == 19 {
130                        debug!(target: LOG_DEVIMINT, %attempt, err = %err.fmt_compact(), "Waiting for initial bitcoind wallet initialization");
131                    }
132                    sleep(Duration::from_millis(100)).await;
133                }
134            }
135        }
136
137        if !skip_setup {
138            // mine blocks
139            let blocks = 101;
140            let address = block_in_place(|| client.get_new_address(None, None))?
141                .require_network(bitcoin::Network::Regtest)
142                .expect("Devimint always runs in regtest");
143            debug!(target: LOG_DEVIMINT, blocks_num=blocks, %address, "Mining blocks to address");
144            block_in_place(|| {
145                client
146                    .generate_to_address(blocks, &address)
147                    .context("Failed to generate blocks")
148            })?;
149            trace!(target: LOG_DEVIMINT, blocks_num=blocks, %address, "Mining blocks to address complete");
150        }
151
152        // wait bitcoind is ready
153        poll("bitcoind", || async {
154            let info = block_in_place(|| client.get_blockchain_info())
155                .context("bitcoind getblockchaininfo")
156                .map_err(ControlFlow::Continue)?;
157            if info.blocks > 100 {
158                Ok(())
159            } else {
160                Err(ControlFlow::Continue(anyhow!(
161                    "not enough blocks: {}",
162                    info.blocks
163                )))
164            }
165        })
166        .await?;
167        debug!("Bitcoind ready");
168        Ok(())
169    }
170
171    /// Poll until bitcoind rpc responds for basic commands
172    async fn poll_ready(&self) -> anyhow::Result<()> {
173        poll("bitcoind rpc ready", || async {
174            self.get_block_count()
175                .await
176                .map_err(ControlFlow::Continue::<anyhow::Error, _>)?;
177            Ok(())
178        })
179        .await
180    }
181
182    /// Client that can has wallet initialized, can generate internal addresses
183    /// and send funds
184    pub async fn wallet_client(&self) -> anyhow::Result<&Self> {
185        self.wallet_client.get_try().await?;
186        Ok(self)
187    }
188
189    /// Returns the total number of blocks in the chain.
190    ///
191    /// Fedimint's IBitcoindRpc considers block count the total number of
192    /// blocks, where bitcoind's rpc returns the height. Since the genesis
193    /// block has height 0, we need to add 1 to get the total block count.
194    pub async fn get_block_count(&self) -> Result<u64> {
195        let client = self.client.clone();
196        Ok(spawn_blocking(move || client.get_block_count()).await?? + 1)
197    }
198
199    pub async fn mine_blocks_no_wait(&self, block_num: u64) -> Result<u64> {
200        let start_time = Instant::now();
201        debug!(target: LOG_DEVIMINT, ?block_num, "Mining bitcoin blocks");
202        let addr = self.get_new_address().await?;
203        let initial_block_count = self.get_block_count().await?;
204        self.generate_to_address(block_num, addr).await?;
205        debug!(target: LOG_DEVIMINT,
206            elapsed_ms = %start_time.elapsed().as_millis(),
207            ?block_num, "Mined blocks (no wait)");
208
209        Ok(initial_block_count)
210    }
211
212    pub async fn mine_blocks(&self, block_num: u64) -> Result<()> {
213        // `Stdio::piped` can't even parse outputs larger
214        // than pipe buffer size (64K on Linux, 16K on MacOS), so
215        // we should split larger requesteds into smaller chunks.
216        //
217        // On top of it mining a lot of blocks is just slow, so should
218        // be avoided.
219        const BLOCK_NUM_LIMIT: u64 = 32;
220
221        if BLOCK_NUM_LIMIT < block_num {
222            warn!(
223                target: LOG_DEVIMINT,
224                %block_num,
225                "Mining a lot of blocks (even when split) is a terrible idea and can lead to issues. Splitting request just to make it work somehow."
226            );
227            let mut block_num = block_num;
228
229            loop {
230                if BLOCK_NUM_LIMIT < block_num {
231                    block_num -= BLOCK_NUM_LIMIT;
232                    Box::pin(async { self.mine_blocks(BLOCK_NUM_LIMIT).await }).await?;
233                } else {
234                    Box::pin(async { self.mine_blocks(block_num).await }).await?;
235                    return Ok(());
236                }
237            }
238        }
239        let start_time = Instant::now();
240        debug!(target: LOG_DEVIMINT, ?block_num, "Mining bitcoin blocks");
241        let addr = self.get_new_address().await?;
242        let initial_block_count = self.get_block_count().await?;
243        self.generate_to_address(block_num, addr).await?;
244        while self.get_block_count().await? < initial_block_count + block_num {
245            trace!(target: LOG_DEVIMINT, ?block_num, "Waiting for blocks to be mined");
246            sleep(Duration::from_millis(100)).await;
247        }
248
249        debug!(target: LOG_DEVIMINT,
250            elapsed_ms = %start_time.elapsed().as_millis(),
251            ?block_num, "Mined blocks");
252
253        Ok(())
254    }
255
256    pub async fn send_to(&self, addr: String, amount: u64) -> Result<bitcoin::Txid> {
257        debug!(target: LOG_DEVIMINT, amount, addr, "Sending funds from bitcoind");
258        let amount = bitcoin::Amount::from_sat(amount);
259        let tx = self
260            .wallet_client()
261            .await?
262            .send_to_address(
263                bitcoin::Address::from_str(&addr)?
264                    .require_network(bitcoin::Network::Regtest)
265                    .expect("Devimint always runs in regtest"),
266                amount,
267            )
268            .await?;
269        Ok(tx)
270    }
271
272    pub async fn get_txout_proof(&self, txid: bitcoin::Txid) -> Result<String> {
273        let client = self.wallet_client().await?.clone();
274        let proof = spawn_blocking(move || client.client.get_tx_out_proof(&[txid], None)).await??;
275        Ok(proof.encode_hex())
276    }
277
278    /// Poll a transaction by its txid until it is found in the mempool or in a
279    /// block.
280    pub async fn poll_get_transaction(&self, txid: bitcoin::Txid) -> anyhow::Result<String> {
281        poll("Waiting for transaction in mempool", || async {
282            match self
283                .get_transaction(txid)
284                .await
285                .context("getrawtransaction")
286            {
287                Ok(Some(tx)) => Ok(tx),
288                Ok(None) => Err(ControlFlow::Continue(anyhow::anyhow!(
289                    "Transaction not found yet"
290                ))),
291                Err(err) => Err(ControlFlow::Break(err)),
292            }
293        })
294        .await
295    }
296
297    /// Get a transaction by its txid. Checks the mempool and all blocks.
298    async fn get_transaction(&self, txid: bitcoin::Txid) -> Result<Option<String>> {
299        // Check the mempool.
300        match self.get_raw_transaction(txid, None).await {
301            // The RPC succeeded, and the transaction was not found in the mempool. Continue to
302            // check blocks.
303            Ok(None) => {}
304            // The RPC failed, or the transaction was found in the mempool. Return the result.
305            other => return other,
306        }
307
308        let block_height = self.get_block_count().await? - 1;
309
310        // Check each block for the tx, starting at the chain tip.
311        // Buffer the requests to avoid spamming bitcoind.
312        // We're doing this after checking the mempool since the tx should
313        // usually be in the mempool, and we don't want to needlessly hit
314        // the bitcoind with block requests.
315        let mut buffered_tx_stream = futures::stream::iter((0..block_height).rev())
316            .map(|height| async move {
317                let block_hash = self.get_block_hash(height).await?;
318                self.get_raw_transaction(txid, Some(block_hash)).await
319            })
320            .buffered(32);
321
322        while let Some(tx_or) = buffered_tx_stream.next().await {
323            match tx_or {
324                // The RPC succeeded, and the transaction was not found in the block. Continue to
325                // the next block.
326                Ok(None) => {}
327                // The RPC failed, or the transaction was found in the block. Return the result.
328                other => return other,
329            }
330        }
331
332        // The transaction was not found in the mempool or any block.
333        Ok(None)
334    }
335
336    async fn get_raw_transaction(
337        &self,
338        txid: bitcoin::Txid,
339        block_hash: Option<BlockHash>,
340    ) -> Result<Option<String>> {
341        let client = self.client.clone();
342        let tx_or =
343            spawn_blocking(move || client.get_raw_transaction(&txid, block_hash.as_ref())).await?;
344
345        let tx = match tx_or {
346            Ok(tx) => tx,
347            // `getrawtransaction` returns a JSON-RPC error with code -5 if the command
348            // reaches bitcoind but is not found. See here:
349            // https://github.com/bitcoin/bitcoin/blob/25dacae9c7feb31308271e2fd5a127c1fc230c2f/src/rpc/rawtransaction.cpp#L360-L376
350            // https://github.com/bitcoin/bitcoin/blob/25dacae9c7feb31308271e2fd5a127c1fc230c2f/src/rpc/protocol.h#L42
351            Err(bitcoincore_rpc::Error::JsonRpc(bitcoincore_rpc::jsonrpc::Error::Rpc(
352                RpcError { code: -5, .. },
353            ))) => return Ok(None),
354            Err(err) => return Err(err.into()),
355        };
356        let bytes = tx.consensus_encode_to_vec();
357        Ok(Some(bytes.encode_hex()))
358    }
359
360    async fn get_block_hash(&self, height: u64) -> Result<BlockHash> {
361        let client = self.client.clone();
362        Ok(spawn_blocking(move || client.get_block_hash(height)).await??)
363    }
364
365    pub async fn get_new_address(&self) -> Result<Address> {
366        let client = self.wallet_client().await?.clone();
367        let addr = spawn_blocking(move || client.client.get_new_address(None, None))
368            .await??
369            .require_network(bitcoin::Network::Regtest)
370            .expect("Devimint always runs in regtest");
371        Ok(addr)
372    }
373
374    pub async fn generate_to_address(
375        &self,
376        block_num: u64,
377        address: Address,
378    ) -> Result<Vec<BlockHash>> {
379        let client = self.wallet_client().await?.clone();
380        Ok(
381            spawn_blocking(move || client.client.generate_to_address(block_num, &address))
382                .await??,
383        )
384    }
385
386    pub async fn get_blockchain_info(&self) -> anyhow::Result<GetBlockchainInfoResult> {
387        let client = self.client.clone();
388        Ok(spawn_blocking(move || client.get_blockchain_info()).await??)
389    }
390
391    pub async fn send_to_address(
392        &self,
393        addr: Address,
394        amount: bitcoin::Amount,
395    ) -> anyhow::Result<bitcoin::Txid> {
396        let client = self.wallet_client().await?.clone();
397
398        let raw_client = client.client.clone();
399        let txid = spawn_blocking(move || {
400            raw_client.send_to_address(&addr, amount, None, None, None, None, None, None)
401        })
402        .await??;
403
404        // If this ever fails, it means we need to poll here, before we return to the
405        // user, as downstream code expects that mining blocks will include this
406        // tx.
407        assert!(client.get_transaction(txid).await?.is_some());
408
409        Ok(txid)
410    }
411
412    pub(crate) async fn get_balances(&self) -> anyhow::Result<GetBalancesResult> {
413        let client = self.wallet_client().await?.clone();
414        Ok(spawn_blocking(move || client.client.get_balances()).await??)
415    }
416
417    pub(crate) fn get_jsonrpc_client(&self) -> &bitcoincore_rpc::jsonrpc::Client {
418        self.client.get_jsonrpc_client()
419    }
420}
421
422#[derive(Clone)]
423pub struct Lnd {
424    pub(crate) client: Arc<Mutex<LndClient>>,
425    pub(crate) process: ProcessHandle,
426    pub(crate) _bitcoind: Bitcoind,
427}
428
429impl Lnd {
430    pub async fn new(process_mgr: &ProcessManager, bitcoind: Bitcoind) -> Result<Self> {
431        let (process, client) = Lnd::start(process_mgr).await?;
432        let this = Self {
433            _bitcoind: bitcoind,
434            client: Arc::new(Mutex::new(client)),
435            process,
436        };
437        // wait for lnd rpc to be active
438        poll("lnd_startup", || async {
439            this.pub_key().await.map_err(ControlFlow::Continue)
440        })
441        .await?;
442        Ok(this)
443    }
444
445    pub async fn start(process_mgr: &ProcessManager) -> Result<(ProcessHandle, LndClient)> {
446        let conf = format!(
447            include_str!("cfg/lnd.conf"),
448            listen_port = process_mgr.globals.FM_PORT_LND_LISTEN,
449            rpc_port = process_mgr.globals.FM_PORT_LND_RPC,
450            rest_port = process_mgr.globals.FM_PORT_LND_REST,
451            btc_rpc_port = process_mgr.globals.FM_PORT_BTC_RPC,
452            zmq_pub_raw_block = process_mgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_BLOCK,
453            zmq_pub_raw_tx = process_mgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_TX,
454        );
455        write_overwrite_async(process_mgr.globals.FM_LND_DIR.join("lnd.conf"), conf).await?;
456        let cmd = cmd!(
457            crate::util::Lnd,
458            format!("--lnddir={}", utf8(&process_mgr.globals.FM_LND_DIR))
459        );
460
461        let process = process_mgr.spawn_daemon("lnd", cmd).await?;
462        let lnd_rpc_addr = &process_mgr.globals.FM_LND_RPC_ADDR;
463        let lnd_macaroon = &process_mgr.globals.FM_LND_MACAROON;
464        let lnd_tls_cert = &process_mgr.globals.FM_LND_TLS_CERT;
465        poll("wait for lnd files", || async {
466            if fs::try_exists(lnd_tls_cert)
467                .await
468                .context("lnd tls cert")
469                .map_err(ControlFlow::Continue)?
470                && fs::try_exists(lnd_macaroon)
471                    .await
472                    .context("lnd macaroon")
473                    .map_err(ControlFlow::Continue)?
474            {
475                Ok(())
476            } else {
477                Err(ControlFlow::Continue(anyhow!(
478                    "lnd tls cert or lnd macaroon not found"
479                )))
480            }
481        })
482        .await?;
483
484        install_crypto_provider().await;
485
486        let client = poll("lnd_connect", || async {
487            tonic_lnd::connect(
488                lnd_rpc_addr.clone(),
489                lnd_tls_cert.clone(),
490                lnd_macaroon.clone(),
491            )
492            .await
493            .context("lnd connect")
494            .map_err(ControlFlow::Continue)
495        })
496        .await?;
497
498        Ok((process, client))
499    }
500
501    pub async fn lightning_client_lock(
502        &self,
503    ) -> Result<MappedMutexGuard<'_, tonic_lnd::LightningClient>> {
504        let guard = self.client.lock().await;
505        Ok(MutexGuard::map(guard, |client| client.lightning()))
506    }
507
508    pub async fn invoices_client_lock(
509        &self,
510    ) -> Result<MappedMutexGuard<'_, tonic_lnd::InvoicesClient>> {
511        let guard = self.client.lock().await;
512        Ok(MutexGuard::map(guard, |client| client.invoices()))
513    }
514
515    pub async fn pub_key(&self) -> Result<String> {
516        Ok(self
517            .lightning_client_lock()
518            .await?
519            .get_info(GetInfoRequest {})
520            .await?
521            .into_inner()
522            .identity_pubkey)
523    }
524
525    pub async fn terminate(self) -> Result<()> {
526        self.process.terminate().await
527    }
528
529    pub async fn invoice(&self, amount: u64) -> anyhow::Result<(String, Vec<u8>)> {
530        let add_invoice = self
531            .lightning_client_lock()
532            .await?
533            .add_invoice(tonic_lnd::lnrpc::Invoice {
534                value_msat: amount as i64,
535                ..Default::default()
536            })
537            .await?
538            .into_inner();
539        let invoice = add_invoice.payment_request;
540        let payment_hash = add_invoice.r_hash;
541        Ok((invoice, payment_hash))
542    }
543
544    pub async fn pay_bolt11_invoice(&self, invoice: String) -> anyhow::Result<()> {
545        let payment = self
546            .lightning_client_lock()
547            .await?
548            .send_payment_sync(tonic_lnd::lnrpc::SendRequest {
549                payment_request: invoice.clone(),
550                ..Default::default()
551            })
552            .await?
553            .into_inner();
554        let payment_status = self
555            .lightning_client_lock()
556            .await?
557            .list_payments(tonic_lnd::lnrpc::ListPaymentsRequest {
558                include_incomplete: true,
559                ..Default::default()
560            })
561            .await?
562            .into_inner()
563            .payments
564            .into_iter()
565            .find(|p| p.payment_hash == payment.payment_hash.encode_hex::<String>())
566            .context("payment not in list")?
567            .status();
568        anyhow::ensure!(payment_status == tonic_lnd::lnrpc::payment::PaymentStatus::Succeeded);
569
570        Ok(())
571    }
572
573    pub async fn wait_bolt11_invoice(&self, payment_hash: Vec<u8>) -> anyhow::Result<()> {
574        let invoice_status = self
575            .lightning_client_lock()
576            .await?
577            .lookup_invoice(tonic_lnd::lnrpc::PaymentHash {
578                r_hash: payment_hash,
579                ..Default::default()
580            })
581            .await?
582            .into_inner()
583            .state();
584        anyhow::ensure!(invoice_status == tonic_lnd::lnrpc::invoice::InvoiceState::Settled);
585
586        Ok(())
587    }
588
589    pub async fn create_hold_invoice(
590        &self,
591        amount: u64,
592    ) -> anyhow::Result<([u8; 32], String, sha256::Hash)> {
593        let preimage = rand::random::<[u8; 32]>();
594        let hash = {
595            let mut engine = bitcoin::hashes::sha256::Hash::engine();
596            bitcoin::hashes::HashEngine::input(&mut engine, &preimage);
597            bitcoin::hashes::sha256::Hash::from_engine(engine)
598        };
599        let cltv_expiry = 650;
600        let hold_request = self
601            .invoices_client_lock()
602            .await?
603            .add_hold_invoice(tonic_lnd::invoicesrpc::AddHoldInvoiceRequest {
604                value_msat: amount as i64,
605                hash: hash.to_byte_array().to_vec(),
606                cltv_expiry,
607                ..Default::default()
608            })
609            .await?
610            .into_inner();
611        let payment_request = hold_request.payment_request;
612        Ok((preimage, payment_request, hash))
613    }
614
615    pub async fn settle_hold_invoice(
616        &self,
617        preimage: [u8; 32],
618        payment_hash: sha256::Hash,
619    ) -> anyhow::Result<()> {
620        let mut hold_invoice_subscription = self
621            .invoices_client_lock()
622            .await?
623            .subscribe_single_invoice(tonic_lnd::invoicesrpc::SubscribeSingleInvoiceRequest {
624                r_hash: payment_hash.to_byte_array().to_vec(),
625            })
626            .await?
627            .into_inner();
628        loop {
629            const WAIT_FOR_INVOICE_TIMEOUT: Duration = Duration::from_secs(60);
630            match timeout(
631                WAIT_FOR_INVOICE_TIMEOUT,
632                futures::StreamExt::next(&mut hold_invoice_subscription),
633            )
634            .await
635            {
636                Ok(Some(Ok(invoice))) => {
637                    if invoice.state() == tonic_lnd::lnrpc::invoice::InvoiceState::Accepted {
638                        break;
639                    }
640                    debug!("hold invoice payment state: {:?}", invoice.state());
641                }
642                Ok(Some(Err(e))) => {
643                    bail!("error in invoice subscription: {e:?}");
644                }
645                Ok(None) => {
646                    bail!("invoice subscription ended before invoice was accepted");
647                }
648                Err(_) => {
649                    bail!("timed out waiting for invoice to be accepted")
650                }
651            }
652        }
653
654        self.invoices_client_lock()
655            .await?
656            .settle_invoice(tonic_lnd::invoicesrpc::SettleInvoiceMsg {
657                preimage: preimage.to_vec(),
658            })
659            .await?;
660
661        Ok(())
662    }
663}
664
665pub type NamedGateway<'a> = (&'a Gatewayd, &'a str);
666
667#[allow(clippy::similar_names)]
668pub async fn open_channels_between_gateways(
669    bitcoind: &Bitcoind,
670    gateways: &[NamedGateway<'_>],
671) -> Result<()> {
672    let block_height = bitcoind.get_block_count().await? - 1;
673    debug!(target: LOG_DEVIMINT, ?block_height, "Syncing gateway lightning nodes to block height...");
674    futures::future::try_join_all(
675        gateways
676            .iter()
677            .map(|(gw, _gw_name)| gw.wait_for_block_height(block_height)),
678    )
679    .await?;
680
681    info!(target: LOG_DEVIMINT, "Funding all gateway lightning nodes...");
682    for (gw, _gw_name) in gateways {
683        let funding_addr = gw.get_ln_onchain_address().await?;
684        bitcoind.send_to(funding_addr, 100_000_000).await?;
685    }
686
687    bitcoind.mine_blocks(10).await?;
688
689    info!(target: LOG_DEVIMINT, "Gateway lightning nodes funded.");
690
691    let block_height = bitcoind.get_block_count().await? - 1;
692    debug!(target: LOG_DEVIMINT, ?block_height, "Syncing gateway lightning nodes to block height...");
693    futures::future::try_join_all(
694        gateways
695            .iter()
696            .map(|(gw, _gw_name)| gw.wait_for_block_height(block_height)),
697    )
698    .await?;
699
700    // All unique pairs of gateways.
701    // For a list of gateways [A, B, C], this will produce [(A, B), (B, C)].
702    // Since the first gateway within each pair initiates the channel open,
703    // order within each pair needs to be enforced so that each Lightning node opens
704    // 1 channel.
705    let gateway_pairs: Vec<(&NamedGateway, &NamedGateway)> =
706        gateways.iter().tuple_windows::<(_, _)>().collect();
707
708    info!(target: LOG_DEVIMINT, block_height = %block_height, "devimint current block");
709    let sats_per_side = 5_000_000;
710    for ((gw_a, gw_a_name), (gw_b, gw_b_name)) in &gateway_pairs {
711        info!(target: LOG_DEVIMINT, from=%gw_a_name, to=%gw_b_name, "Opening channel with {sats_per_side} sats on each side...");
712        let txid = gw_a
713            .open_channel(gw_b, sats_per_side * 2, Some(sats_per_side))
714            .await?;
715
716        bitcoind.poll_get_transaction(txid).await?;
717    }
718
719    bitcoind.mine_blocks(10).await?;
720
721    let block_height = bitcoind.get_block_count().await? - 1;
722    debug!(target: LOG_DEVIMINT, ?block_height, "Syncing gateway lightning nodes to block height...");
723    futures::future::try_join_all(
724        gateways
725            .iter()
726            .map(|(gw, _gw_name)| gw.wait_for_block_height(block_height)),
727    )
728    .await?;
729
730    for ((gw_a, _gw_a_name), (gw_b, _gw_b_name)) in &gateway_pairs {
731        let gw_a_node_pubkey = gw_a.lightning_pubkey().await?;
732        let gw_b_node_pubkey = gw_b.lightning_pubkey().await?;
733
734        wait_for_ready_channel_on_gateway_with_counterparty(gw_b, gw_a_node_pubkey).await?;
735        wait_for_ready_channel_on_gateway_with_counterparty(gw_a, gw_b_node_pubkey).await?;
736    }
737
738    info!(target: LOG_DEVIMINT, "open_channels_between_gateways successful");
739
740    Ok(())
741}
742
743async fn wait_for_ready_channel_on_gateway_with_counterparty(
744    gw: &Gatewayd,
745    counterparty_lightning_node_pubkey: bitcoin::secp256k1::PublicKey,
746) -> anyhow::Result<()> {
747    poll(
748        &format!("Wait for {} channel update", gw.gw_name),
749        || async {
750            let channels = gw
751                .list_channels()
752                .await
753                .context("list channels")
754                .map_err(ControlFlow::Break)?;
755
756            if channels
757                .iter()
758                .any(|channel| channel.remote_pubkey == counterparty_lightning_node_pubkey && channel.is_active)
759            {
760                return Ok(());
761            }
762
763            debug!(target: LOG_DEVIMINT, ?channels, gw = gw.gw_name, "Counterparty channels not found open");
764            Err(ControlFlow::Continue(anyhow!("channel not found")))
765        },
766    )
767    .await
768}
769
770#[derive(Clone)]
771pub enum LightningNode {
772    Lnd(Lnd),
773    Ldk {
774        name: String,
775        gw_port: u16,
776        ldk_port: u16,
777    },
778}
779
780impl LightningNode {
781    pub fn ln_type(&self) -> LightningNodeType {
782        match self {
783            LightningNode::Lnd(_) => LightningNodeType::Lnd,
784            LightningNode::Ldk {
785                name: _,
786                gw_port: _,
787                ldk_port: _,
788            } => LightningNodeType::Ldk,
789        }
790    }
791}
792
793#[derive(Clone)]
794pub struct Esplora {
795    _process: ProcessHandle,
796    _bitcoind: Bitcoind,
797}
798
799impl Esplora {
800    pub async fn new(process_mgr: &ProcessManager, bitcoind: Bitcoind) -> Result<Self> {
801        debug!("Starting esplora");
802        let daemon_dir = process_mgr
803            .globals
804            .FM_BTC_DIR
805            .to_str()
806            .context("non utf8 path")?;
807        let esplora_dir = process_mgr
808            .globals
809            .FM_ESPLORA_DIR
810            .to_str()
811            .context("non utf8 path")?;
812
813        let btc_rpc_port = process_mgr.globals.FM_PORT_BTC_RPC;
814        let esplora_port = process_mgr.globals.FM_PORT_ESPLORA;
815        let esplora_monitoring_port = process_mgr.globals.FM_PORT_ESPLORA_MONITORING;
816        // spawn esplora
817        let cmd = cmd!(
818            crate::util::Esplora,
819            "--daemon-dir={daemon_dir}",
820            "--db-dir={esplora_dir}",
821            "--cookie=bitcoin:bitcoin",
822            "--network=regtest",
823            "--daemon-rpc-addr=127.0.0.1:{btc_rpc_port}",
824            "--http-addr=127.0.0.1:{esplora_port}",
825            "--monitoring-addr=127.0.0.1:{esplora_monitoring_port}",
826            "--jsonrpc-import", // Workaround for incompatible on-disk format
827        );
828        let process = process_mgr.spawn_daemon("esplora", cmd).await?;
829
830        Self::wait_for_ready(process_mgr).await?;
831        debug!(target: LOG_DEVIMINT, "Esplora ready");
832
833        Ok(Self {
834            _bitcoind: bitcoind,
835            _process: process,
836        })
837    }
838
839    /// Wait until the server is able to respond to requests.
840    async fn wait_for_ready(process_mgr: &ProcessManager) -> Result<()> {
841        let client = esplora_client::Builder::new(&format!(
842            "http://localhost:{}",
843            process_mgr.globals.FM_PORT_ESPLORA
844        ))
845        // Disable retrying in the client since we're already retrying in the poll below.
846        .max_retries(0)
847        .build_async()
848        .expect("esplora client build failed");
849
850        poll("esplora server ready", || async {
851            client
852                .get_fee_estimates()
853                .await
854                .map_err(|e| ControlFlow::Continue(anyhow::anyhow!(e)))?;
855
856            Ok(())
857        })
858        .await?;
859
860        Ok(())
861    }
862}
863
864#[allow(unused)]
865pub struct ExternalDaemons {
866    pub bitcoind: Bitcoind,
867    pub esplora: Esplora,
868}
869
870pub async fn external_daemons(process_mgr: &ProcessManager) -> Result<ExternalDaemons> {
871    let bitcoind = Bitcoind::new(process_mgr, false).await?;
872    let esplora = Esplora::new(process_mgr, bitcoind.clone()).await?;
873    let start_time = fedimint_core::time::now();
874    // make sure the bitcoind wallet is ready
875    let _ = bitcoind.wallet_client().await?;
876    info!(
877        target: LOG_DEVIMINT,
878        "starting base daemons took {:?}",
879        start_time.elapsed()?
880    );
881    Ok(ExternalDaemons { bitcoind, esplora })
882}