devimint/
external.rs

1use std::ops::ControlFlow;
2use std::str::FromStr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::{Context, Result, anyhow, bail};
7use bitcoin::hashes::{Hash, sha256};
8use bitcoincore_rpc::bitcoin::{Address, BlockHash};
9use bitcoincore_rpc::bitcoincore_rpc_json::{GetBalancesResult, GetBlockchainInfoResult};
10use bitcoincore_rpc::jsonrpc::error::RpcError;
11use bitcoincore_rpc::{Auth, RpcApi};
12use fedimint_core::encoding::Encodable;
13use fedimint_core::task::jit::{JitTry, JitTryAnyhow};
14use fedimint_core::task::{block_in_place, sleep, timeout};
15use fedimint_core::util::{FmtCompact as _, SafeUrl, write_overwrite_async};
16use fedimint_logging::LOG_DEVIMINT;
17use fedimint_testing_core::node_type::LightningNodeType;
18use futures::StreamExt;
19use hex::ToHex;
20use itertools::Itertools;
21use tokio::fs;
22use tokio::sync::{MappedMutexGuard, Mutex, MutexGuard};
23use tokio::task::spawn_blocking;
24use tokio::time::Instant;
25use tonic_lnd::Client as LndClient;
26use tonic_lnd::lnrpc::GetInfoRequest;
27use tracing::{debug, info, trace, warn};
28
29use crate::gatewayd::LdkChainSource;
30use crate::util::{ProcessHandle, ProcessManager, poll};
31use crate::vars::utf8;
32use crate::{Gatewayd, cmd};
33
34#[derive(Clone)]
35pub struct Bitcoind {
36    pub client: Arc<bitcoincore_rpc::Client>,
37    pub(crate) wallet_client: Arc<JitTryAnyhow<Arc<bitcoincore_rpc::Client>>>,
38    pub(crate) _process: ProcessHandle,
39}
40
41impl Bitcoind {
42    pub async fn new(processmgr: &ProcessManager, skip_setup: bool) -> Result<Self> {
43        let btc_dir = utf8(&processmgr.globals.FM_BTC_DIR);
44
45        let conf = format!(
46            include_str!("cfg/bitcoin.conf"),
47            rpc_port = processmgr.globals.FM_PORT_BTC_RPC,
48            p2p_port = processmgr.globals.FM_PORT_BTC_P2P,
49            zmq_pub_raw_block = processmgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_BLOCK,
50            zmq_pub_raw_tx = processmgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_TX,
51            tx_index = "0",
52        );
53        write_overwrite_async(processmgr.globals.FM_BTC_DIR.join("bitcoin.conf"), conf).await?;
54        let process = processmgr
55            .spawn_daemon(
56                "bitcoind",
57                cmd!(crate::util::Bitcoind, "-datadir={btc_dir}"),
58            )
59            .await?;
60
61        let url: SafeUrl = processmgr.globals.FM_BITCOIN_RPC_URL.parse()?;
62
63        debug!("Parsed FM_BITCOIN_RPC_URL: {:?}", &url);
64
65        let auth = Auth::UserPass(
66            url.username().to_owned(),
67            url.password()
68                .context("Bitcoin RPC URL is missing password")?
69                .to_owned(),
70        );
71
72        let host = url
73            .without_auth()
74            .map_err(|()| anyhow!("Failed to strip auth from Bitcoin Rpc Url"))?
75            .to_string();
76
77        debug!("bitcoind host: {:?}, auth: {:?}", &host, auth);
78        let client =
79            Self::new_bitcoin_rpc(&host, auth.clone()).context("Failed to connect to bitcoind")?;
80        let wallet_client = JitTry::new_try(move || async move {
81            let client =
82                Self::new_bitcoin_rpc(&host, auth).context("Failed to connect to bitcoind")?;
83            Self::init(&client, skip_setup).await?;
84            Ok(Arc::new(client))
85        });
86
87        let bitcoind = Self {
88            _process: process,
89            client: Arc::new(client),
90            wallet_client: Arc::new(wallet_client),
91        };
92
93        bitcoind.poll_ready().await?;
94
95        Ok(bitcoind)
96    }
97
98    fn new_bitcoin_rpc(
99        url: &str,
100        auth: bitcoincore_rpc::Auth,
101    ) -> anyhow::Result<bitcoincore_rpc::Client> {
102        // The default (15s) is too low for some test environments
103        const RPC_TIMEOUT: Duration = Duration::from_secs(45);
104        let mut builder = bitcoincore_rpc::jsonrpc::simple_http::Builder::new()
105            .url(url)?
106            .timeout(RPC_TIMEOUT);
107        let (user, pass) = auth.get_user_pass()?;
108        if let Some(user) = user {
109            builder = builder.auth(user, pass);
110        }
111        let client = bitcoincore_rpc::jsonrpc::Client::with_transport(builder.build());
112        Ok(bitcoincore_rpc::Client::from_jsonrpc(client))
113    }
114
115    pub(crate) async fn init(client: &bitcoincore_rpc::Client, skip_setup: bool) -> Result<()> {
116        debug!("Setting up bitcoind");
117        // create RPC wallet
118        for attempt in 0.. {
119            match block_in_place(|| client.create_wallet("", None, None, None, None)) {
120                Ok(_) => {
121                    break;
122                }
123                Err(err) => {
124                    if err.to_string().contains("Database already exists") {
125                        break;
126                    }
127                    if attempt % 20 == 19 {
128                        debug!(target: LOG_DEVIMINT, %attempt, err = %err.fmt_compact(), "Waiting for initial bitcoind wallet initialization");
129                    }
130                    sleep(Duration::from_millis(100)).await;
131                }
132            }
133        }
134
135        if !skip_setup {
136            // mine blocks
137            let blocks = 101;
138            let address = block_in_place(|| client.get_new_address(None, None))?
139                .require_network(bitcoin::Network::Regtest)
140                .expect("Devimint always runs in regtest");
141            debug!(target: LOG_DEVIMINT, blocks_num=blocks, %address, "Mining blocks to address");
142            block_in_place(|| {
143                client
144                    .generate_to_address(blocks, &address)
145                    .context("Failed to generate blocks")
146            })?;
147            trace!(target: LOG_DEVIMINT, blocks_num=blocks, %address, "Mining blocks to address complete");
148        }
149
150        // wait bitcoind is ready
151        poll("bitcoind", || async {
152            let info = block_in_place(|| client.get_blockchain_info())
153                .context("bitcoind getblockchaininfo")
154                .map_err(ControlFlow::Continue)?;
155            if info.blocks > 100 {
156                Ok(())
157            } else {
158                Err(ControlFlow::Continue(anyhow!(
159                    "not enough blocks: {}",
160                    info.blocks
161                )))
162            }
163        })
164        .await?;
165        debug!("Bitcoind ready");
166        Ok(())
167    }
168
169    /// Poll until bitcoind rpc responds for basic commands
170    async fn poll_ready(&self) -> anyhow::Result<()> {
171        poll("bitcoind rpc ready", || async {
172            self.get_block_count()
173                .await
174                .map_err(ControlFlow::Continue::<anyhow::Error, _>)?;
175            Ok(())
176        })
177        .await
178    }
179
180    /// Client that can has wallet initialized, can generate internal addresses
181    /// and send funds
182    pub async fn wallet_client(&self) -> anyhow::Result<&Self> {
183        self.wallet_client.get_try().await?;
184        Ok(self)
185    }
186
187    /// Returns the total number of blocks in the chain.
188    ///
189    /// Fedimint's IBitcoindRpc considers block count the total number of
190    /// blocks, where bitcoind's rpc returns the height. Since the genesis
191    /// block has height 0, we need to add 1 to get the total block count.
192    pub async fn get_block_count(&self) -> Result<u64> {
193        let client = self.client.clone();
194        Ok(spawn_blocking(move || client.get_block_count()).await?? + 1)
195    }
196
197    pub async fn mine_blocks_no_wait(&self, block_num: u64) -> Result<u64> {
198        let start_time = Instant::now();
199        debug!(target: LOG_DEVIMINT, ?block_num, "Mining bitcoin blocks");
200        let addr = self.get_new_address().await?;
201        let initial_block_count = self.get_block_count().await?;
202        self.generate_to_address(block_num, addr).await?;
203        debug!(target: LOG_DEVIMINT,
204            elapsed_ms = %start_time.elapsed().as_millis(),
205            ?block_num, "Mined blocks (no wait)");
206
207        Ok(initial_block_count)
208    }
209
210    pub async fn mine_blocks(&self, block_num: u64) -> Result<()> {
211        // `Stdio::piped` can't even parse outputs larger
212        // than pipe buffer size (64K on Linux, 16K on MacOS), so
213        // we should split larger requesteds into smaller chunks.
214        //
215        // On top of it mining a lot of blocks is just slow, so should
216        // be avoided.
217        const BLOCK_NUM_LIMIT: u64 = 32;
218
219        if BLOCK_NUM_LIMIT < block_num {
220            warn!(
221                target: LOG_DEVIMINT,
222                %block_num,
223                "Mining a lot of blocks (even when split) is a terrible idea and can lead to issues. Splitting request just to make it work somehow."
224            );
225            let mut block_num = block_num;
226
227            loop {
228                if BLOCK_NUM_LIMIT < block_num {
229                    block_num -= BLOCK_NUM_LIMIT;
230                    Box::pin(async { self.mine_blocks(BLOCK_NUM_LIMIT).await }).await?;
231                } else {
232                    Box::pin(async { self.mine_blocks(block_num).await }).await?;
233                    return Ok(());
234                }
235            }
236        }
237        let start_time = Instant::now();
238        debug!(target: LOG_DEVIMINT, ?block_num, "Mining bitcoin blocks");
239        let addr = self.get_new_address().await?;
240        let initial_block_count = self.get_block_count().await?;
241        self.generate_to_address(block_num, addr).await?;
242        while self.get_block_count().await? < initial_block_count + block_num {
243            trace!(target: LOG_DEVIMINT, ?block_num, "Waiting for blocks to be mined");
244            sleep(Duration::from_millis(100)).await;
245        }
246
247        debug!(target: LOG_DEVIMINT,
248            elapsed_ms = %start_time.elapsed().as_millis(),
249            ?block_num, "Mined blocks");
250
251        Ok(())
252    }
253
254    pub async fn send_to(&self, addr: String, amount: u64) -> Result<bitcoin::Txid> {
255        debug!(target: LOG_DEVIMINT, amount, addr, "Sending funds from bitcoind");
256        let amount = bitcoin::Amount::from_sat(amount);
257        let tx = self
258            .wallet_client()
259            .await?
260            .send_to_address(
261                bitcoin::Address::from_str(&addr)?
262                    .require_network(bitcoin::Network::Regtest)
263                    .expect("Devimint always runs in regtest"),
264                amount,
265            )
266            .await?;
267        Ok(tx)
268    }
269
270    pub async fn get_txout_proof(&self, txid: bitcoin::Txid) -> Result<String> {
271        let client = self.wallet_client().await?.clone();
272        let proof = spawn_blocking(move || client.client.get_tx_out_proof(&[txid], None)).await??;
273        Ok(proof.encode_hex())
274    }
275
276    /// Poll a transaction by its txid until it is found in the mempool or in a
277    /// block.
278    pub async fn poll_get_transaction(&self, txid: bitcoin::Txid) -> anyhow::Result<String> {
279        poll("Waiting for transaction in mempool", || async {
280            match self
281                .get_transaction(txid)
282                .await
283                .context("getrawtransaction")
284            {
285                Ok(Some(tx)) => Ok(tx),
286                Ok(None) => Err(ControlFlow::Continue(anyhow::anyhow!(
287                    "Transaction not found yet"
288                ))),
289                Err(err) => Err(ControlFlow::Break(err)),
290            }
291        })
292        .await
293    }
294
295    /// Get a transaction by its txid. Checks the mempool and all blocks.
296    async fn get_transaction(&self, txid: bitcoin::Txid) -> Result<Option<String>> {
297        // Check the mempool.
298        match self.get_raw_transaction(txid, None).await {
299            // The RPC succeeded, and the transaction was not found in the mempool. Continue to
300            // check blocks.
301            Ok(None) => {}
302            // The RPC failed, or the transaction was found in the mempool. Return the result.
303            other => return other,
304        }
305
306        let block_height = self.get_block_count().await? - 1;
307
308        // Check each block for the tx, starting at the chain tip.
309        // Buffer the requests to avoid spamming bitcoind.
310        // We're doing this after checking the mempool since the tx should
311        // usually be in the mempool, and we don't want to needlessly hit
312        // the bitcoind with block requests.
313        let mut buffered_tx_stream = futures::stream::iter((0..block_height).rev())
314            .map(|height| async move {
315                let block_hash = self.get_block_hash(height).await?;
316                self.get_raw_transaction(txid, Some(block_hash)).await
317            })
318            .buffered(32);
319
320        while let Some(tx_or) = buffered_tx_stream.next().await {
321            match tx_or {
322                // The RPC succeeded, and the transaction was not found in the block. Continue to
323                // the next block.
324                Ok(None) => {}
325                // The RPC failed, or the transaction was found in the block. Return the result.
326                other => return other,
327            }
328        }
329
330        // The transaction was not found in the mempool or any block.
331        Ok(None)
332    }
333
334    async fn get_raw_transaction(
335        &self,
336        txid: bitcoin::Txid,
337        block_hash: Option<BlockHash>,
338    ) -> Result<Option<String>> {
339        let client = self.client.clone();
340        let tx_or =
341            spawn_blocking(move || client.get_raw_transaction(&txid, block_hash.as_ref())).await?;
342
343        let tx = match tx_or {
344            Ok(tx) => tx,
345            // `getrawtransaction` returns a JSON-RPC error with code -5 if the command
346            // reaches bitcoind but is not found. See here:
347            // https://github.com/bitcoin/bitcoin/blob/25dacae9c7feb31308271e2fd5a127c1fc230c2f/src/rpc/rawtransaction.cpp#L360-L376
348            // https://github.com/bitcoin/bitcoin/blob/25dacae9c7feb31308271e2fd5a127c1fc230c2f/src/rpc/protocol.h#L42
349            Err(bitcoincore_rpc::Error::JsonRpc(bitcoincore_rpc::jsonrpc::Error::Rpc(
350                RpcError { code: -5, .. },
351            ))) => return Ok(None),
352            Err(err) => return Err(err.into()),
353        };
354        let bytes = tx.consensus_encode_to_vec();
355        Ok(Some(bytes.encode_hex()))
356    }
357
358    async fn get_block_hash(&self, height: u64) -> Result<BlockHash> {
359        let client = self.client.clone();
360        Ok(spawn_blocking(move || client.get_block_hash(height)).await??)
361    }
362
363    pub async fn get_new_address(&self) -> Result<Address> {
364        let client = self.wallet_client().await?.clone();
365        let addr = spawn_blocking(move || client.client.get_new_address(None, None))
366            .await??
367            .require_network(bitcoin::Network::Regtest)
368            .expect("Devimint always runs in regtest");
369        Ok(addr)
370    }
371
372    pub async fn generate_to_address(
373        &self,
374        block_num: u64,
375        address: Address,
376    ) -> Result<Vec<BlockHash>> {
377        let client = self.wallet_client().await?.clone();
378        Ok(
379            spawn_blocking(move || client.client.generate_to_address(block_num, &address))
380                .await??,
381        )
382    }
383
384    pub async fn get_blockchain_info(&self) -> anyhow::Result<GetBlockchainInfoResult> {
385        let client = self.client.clone();
386        Ok(spawn_blocking(move || client.get_blockchain_info()).await??)
387    }
388
389    pub async fn send_to_address(
390        &self,
391        addr: Address,
392        amount: bitcoin::Amount,
393    ) -> anyhow::Result<bitcoin::Txid> {
394        let client = self.wallet_client().await?.clone();
395
396        let raw_client = client.client.clone();
397        let txid = spawn_blocking(move || {
398            raw_client.send_to_address(&addr, amount, None, None, None, None, None, None)
399        })
400        .await??;
401
402        // If this ever fails, it means we need to poll here, before we return to the
403        // user, as downstream code expects that mining blocks will include this
404        // tx.
405        assert!(client.get_transaction(txid).await?.is_some());
406
407        Ok(txid)
408    }
409
410    pub(crate) async fn get_balances(&self) -> anyhow::Result<GetBalancesResult> {
411        let client = self.wallet_client().await?.clone();
412        Ok(spawn_blocking(move || client.client.get_balances()).await??)
413    }
414
415    pub(crate) fn get_jsonrpc_client(&self) -> &bitcoincore_rpc::jsonrpc::Client {
416        self.client.get_jsonrpc_client()
417    }
418}
419
420#[derive(Clone)]
421pub struct Lnd {
422    pub(crate) client: Arc<Mutex<LndClient>>,
423    pub(crate) process: ProcessHandle,
424    pub(crate) _bitcoind: Bitcoind,
425}
426
427impl Lnd {
428    pub async fn new(process_mgr: &ProcessManager, bitcoind: Bitcoind) -> Result<Self> {
429        let (process, client) = Lnd::start(process_mgr).await?;
430        let this = Self {
431            _bitcoind: bitcoind,
432            client: Arc::new(Mutex::new(client)),
433            process,
434        };
435        // wait for lnd rpc to be active
436        poll("lnd_startup", || async {
437            this.pub_key().await.map_err(ControlFlow::Continue)
438        })
439        .await?;
440        Ok(this)
441    }
442
443    pub async fn start(process_mgr: &ProcessManager) -> Result<(ProcessHandle, LndClient)> {
444        let conf = format!(
445            include_str!("cfg/lnd.conf"),
446            listen_port = process_mgr.globals.FM_PORT_LND_LISTEN,
447            rpc_port = process_mgr.globals.FM_PORT_LND_RPC,
448            rest_port = process_mgr.globals.FM_PORT_LND_REST,
449            btc_rpc_port = process_mgr.globals.FM_PORT_BTC_RPC,
450            zmq_pub_raw_block = process_mgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_BLOCK,
451            zmq_pub_raw_tx = process_mgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_TX,
452        );
453        write_overwrite_async(process_mgr.globals.FM_LND_DIR.join("lnd.conf"), conf).await?;
454        let cmd = cmd!(
455            crate::util::Lnd,
456            format!("--lnddir={}", utf8(&process_mgr.globals.FM_LND_DIR))
457        );
458
459        let process = process_mgr.spawn_daemon("lnd", cmd).await?;
460        let lnd_rpc_addr = &process_mgr.globals.FM_LND_RPC_ADDR;
461        let lnd_macaroon = &process_mgr.globals.FM_LND_MACAROON;
462        let lnd_tls_cert = &process_mgr.globals.FM_LND_TLS_CERT;
463        poll("wait for lnd files", || async {
464            if fs::try_exists(lnd_tls_cert)
465                .await
466                .context("lnd tls cert")
467                .map_err(ControlFlow::Continue)?
468                && fs::try_exists(lnd_macaroon)
469                    .await
470                    .context("lnd macaroon")
471                    .map_err(ControlFlow::Continue)?
472            {
473                Ok(())
474            } else {
475                Err(ControlFlow::Continue(anyhow!(
476                    "lnd tls cert or lnd macaroon not found"
477                )))
478            }
479        })
480        .await?;
481
482        let client = poll("lnd_connect", || async {
483            tonic_lnd::connect(
484                lnd_rpc_addr.clone(),
485                lnd_tls_cert.clone(),
486                lnd_macaroon.clone(),
487            )
488            .await
489            .context("lnd connect")
490            .map_err(ControlFlow::Continue)
491        })
492        .await?;
493
494        Ok((process, client))
495    }
496
497    pub async fn lightning_client_lock(
498        &self,
499    ) -> Result<MappedMutexGuard<'_, tonic_lnd::LightningClient>> {
500        let guard = self.client.lock().await;
501        Ok(MutexGuard::map(guard, |client| client.lightning()))
502    }
503
504    pub async fn invoices_client_lock(
505        &self,
506    ) -> Result<MappedMutexGuard<'_, tonic_lnd::InvoicesClient>> {
507        let guard = self.client.lock().await;
508        Ok(MutexGuard::map(guard, |client| client.invoices()))
509    }
510
511    pub async fn pub_key(&self) -> Result<String> {
512        Ok(self
513            .lightning_client_lock()
514            .await?
515            .get_info(GetInfoRequest {})
516            .await?
517            .into_inner()
518            .identity_pubkey)
519    }
520
521    pub async fn terminate(self) -> Result<()> {
522        self.process.terminate().await
523    }
524
525    pub async fn invoice(&self, amount: u64) -> anyhow::Result<(String, Vec<u8>)> {
526        let add_invoice = self
527            .lightning_client_lock()
528            .await?
529            .add_invoice(tonic_lnd::lnrpc::Invoice {
530                value_msat: amount as i64,
531                ..Default::default()
532            })
533            .await?
534            .into_inner();
535        let invoice = add_invoice.payment_request;
536        let payment_hash = add_invoice.r_hash;
537        Ok((invoice, payment_hash))
538    }
539
540    pub async fn pay_bolt11_invoice(&self, invoice: String) -> anyhow::Result<()> {
541        let payment = self
542            .lightning_client_lock()
543            .await?
544            .send_payment_sync(tonic_lnd::lnrpc::SendRequest {
545                payment_request: invoice.clone(),
546                ..Default::default()
547            })
548            .await?
549            .into_inner();
550        let payment_status = self
551            .lightning_client_lock()
552            .await?
553            .list_payments(tonic_lnd::lnrpc::ListPaymentsRequest {
554                include_incomplete: true,
555                ..Default::default()
556            })
557            .await?
558            .into_inner()
559            .payments
560            .into_iter()
561            .find(|p| p.payment_hash == payment.payment_hash.encode_hex::<String>())
562            .context("payment not in list")?
563            .status();
564        anyhow::ensure!(payment_status == tonic_lnd::lnrpc::payment::PaymentStatus::Succeeded);
565
566        Ok(())
567    }
568
569    pub async fn wait_bolt11_invoice(&self, payment_hash: Vec<u8>) -> anyhow::Result<()> {
570        let invoice_status = self
571            .lightning_client_lock()
572            .await?
573            .lookup_invoice(tonic_lnd::lnrpc::PaymentHash {
574                r_hash: payment_hash,
575                ..Default::default()
576            })
577            .await?
578            .into_inner()
579            .state();
580        anyhow::ensure!(invoice_status == tonic_lnd::lnrpc::invoice::InvoiceState::Settled);
581
582        Ok(())
583    }
584
585    pub async fn create_hold_invoice(
586        &self,
587        amount: u64,
588    ) -> anyhow::Result<([u8; 32], String, sha256::Hash)> {
589        let preimage = rand::random::<[u8; 32]>();
590        let hash = {
591            let mut engine = bitcoin::hashes::sha256::Hash::engine();
592            bitcoin::hashes::HashEngine::input(&mut engine, &preimage);
593            bitcoin::hashes::sha256::Hash::from_engine(engine)
594        };
595        let cltv_expiry = 650;
596        let hold_request = self
597            .invoices_client_lock()
598            .await?
599            .add_hold_invoice(tonic_lnd::invoicesrpc::AddHoldInvoiceRequest {
600                value_msat: amount as i64,
601                hash: hash.to_byte_array().to_vec(),
602                cltv_expiry,
603                ..Default::default()
604            })
605            .await?
606            .into_inner();
607        let payment_request = hold_request.payment_request;
608        Ok((preimage, payment_request, hash))
609    }
610
611    pub async fn settle_hold_invoice(
612        &self,
613        preimage: [u8; 32],
614        payment_hash: sha256::Hash,
615    ) -> anyhow::Result<()> {
616        let mut hold_invoice_subscription = self
617            .invoices_client_lock()
618            .await?
619            .subscribe_single_invoice(tonic_lnd::invoicesrpc::SubscribeSingleInvoiceRequest {
620                r_hash: payment_hash.to_byte_array().to_vec(),
621            })
622            .await?
623            .into_inner();
624        loop {
625            const WAIT_FOR_INVOICE_TIMEOUT: Duration = Duration::from_secs(60);
626            match timeout(
627                WAIT_FOR_INVOICE_TIMEOUT,
628                futures::StreamExt::next(&mut hold_invoice_subscription),
629            )
630            .await
631            {
632                Ok(Some(Ok(invoice))) => {
633                    if invoice.state() == tonic_lnd::lnrpc::invoice::InvoiceState::Accepted {
634                        break;
635                    }
636                    debug!("hold invoice payment state: {:?}", invoice.state());
637                }
638                Ok(Some(Err(e))) => {
639                    bail!("error in invoice subscription: {e:?}");
640                }
641                Ok(None) => {
642                    bail!("invoice subscription ended before invoice was accepted");
643                }
644                Err(_) => {
645                    bail!("timed out waiting for invoice to be accepted")
646                }
647            }
648        }
649
650        self.invoices_client_lock()
651            .await?
652            .settle_invoice(tonic_lnd::invoicesrpc::SettleInvoiceMsg {
653                preimage: preimage.to_vec(),
654            })
655            .await?;
656
657        Ok(())
658    }
659}
660
661pub type NamedGateway<'a> = (&'a Gatewayd, &'a str);
662
663#[allow(clippy::similar_names)]
664pub async fn open_channels_between_gateways(
665    bitcoind: &Bitcoind,
666    gateways: &[NamedGateway<'_>],
667) -> Result<()> {
668    let block_height = bitcoind.get_block_count().await? - 1;
669    debug!(target: LOG_DEVIMINT, ?block_height, "Syncing gateway lightning nodes to block height...");
670    futures::future::try_join_all(
671        gateways
672            .iter()
673            .map(|(gw, _gw_name)| gw.wait_for_block_height(block_height)),
674    )
675    .await?;
676
677    info!(target: LOG_DEVIMINT, "Funding all gateway lightning nodes...");
678    for (gw, _gw_name) in gateways {
679        let funding_addr = gw.get_ln_onchain_address().await?;
680        bitcoind.send_to(funding_addr, 100_000_000).await?;
681    }
682
683    bitcoind.mine_blocks(10).await?;
684
685    info!(target: LOG_DEVIMINT, "Gateway lightning nodes funded.");
686
687    let block_height = bitcoind.get_block_count().await? - 1;
688    debug!(target: LOG_DEVIMINT, ?block_height, "Syncing gateway lightning nodes to block height...");
689    futures::future::try_join_all(
690        gateways
691            .iter()
692            .map(|(gw, _gw_name)| gw.wait_for_block_height(block_height)),
693    )
694    .await?;
695
696    // All unique pairs of gateways.
697    // For a list of gateways [A, B, C], this will produce [(A, B), (B, C)].
698    // Since the first gateway within each pair initiates the channel open,
699    // order within each pair needs to be enforced so that each Lightning node opens
700    // 1 channel.
701    let gateway_pairs: Vec<(&NamedGateway, &NamedGateway)> =
702        gateways.iter().tuple_windows::<(_, _)>().collect();
703
704    info!(target: LOG_DEVIMINT, block_height = %block_height, "devimint current block");
705    let sats_per_side = 5_000_000;
706    for ((gw_a, gw_a_name), (gw_b, gw_b_name)) in &gateway_pairs {
707        info!(target: LOG_DEVIMINT, from=%gw_a_name, to=%gw_b_name, "Opening channel with {sats_per_side} sats on each side...");
708        let txid = gw_a
709            .open_channel(gw_b, sats_per_side * 2, Some(sats_per_side))
710            .await?;
711
712        bitcoind.poll_get_transaction(txid).await?;
713    }
714
715    bitcoind.mine_blocks(10).await?;
716
717    let block_height = bitcoind.get_block_count().await? - 1;
718    debug!(target: LOG_DEVIMINT, ?block_height, "Syncing gateway lightning nodes to block height...");
719    futures::future::try_join_all(
720        gateways
721            .iter()
722            .map(|(gw, _gw_name)| gw.wait_for_block_height(block_height)),
723    )
724    .await?;
725
726    for ((gw_a, _gw_a_name), (gw_b, _gw_b_name)) in &gateway_pairs {
727        let gw_a_node_pubkey = gw_a.lightning_pubkey().await?;
728        let gw_b_node_pubkey = gw_b.lightning_pubkey().await?;
729
730        wait_for_ready_channel_on_gateway_with_counterparty(gw_b, gw_a_node_pubkey).await?;
731        wait_for_ready_channel_on_gateway_with_counterparty(gw_a, gw_b_node_pubkey).await?;
732    }
733
734    info!(target: LOG_DEVIMINT, "open_channels_between_gateways successful");
735
736    Ok(())
737}
738
739async fn wait_for_ready_channel_on_gateway_with_counterparty(
740    gw: &Gatewayd,
741    counterparty_lightning_node_pubkey: bitcoin::secp256k1::PublicKey,
742) -> anyhow::Result<()> {
743    poll(
744        &format!("Wait for {} channel update", gw.gw_name),
745        || async {
746            let channels = gw
747                .list_active_channels()
748                .await
749                .context("list channels")
750                .map_err(ControlFlow::Break)?;
751
752            if channels
753                .iter()
754                .any(|channel| channel.remote_pubkey == counterparty_lightning_node_pubkey)
755            {
756                return Ok(());
757            }
758
759            debug!(target: LOG_DEVIMINT, ?channels, gw = gw.gw_name, "Counterparty channels not found open");
760            Err(ControlFlow::Continue(anyhow!("channel not found")))
761        },
762    )
763    .await
764}
765
766#[derive(Clone)]
767pub enum LightningNode {
768    Lnd(Lnd),
769    Ldk {
770        name: String,
771        gw_port: u16,
772        ldk_port: u16,
773        chain_source: LdkChainSource,
774    },
775}
776
777impl LightningNode {
778    pub fn ln_type(&self) -> LightningNodeType {
779        match self {
780            LightningNode::Lnd(_) => LightningNodeType::Lnd,
781            LightningNode::Ldk {
782                name: _,
783                gw_port: _,
784                ldk_port: _,
785                chain_source: _,
786            } => LightningNodeType::Ldk,
787        }
788    }
789}
790
791#[derive(Clone)]
792pub struct Esplora {
793    _process: ProcessHandle,
794    _bitcoind: Bitcoind,
795}
796
797impl Esplora {
798    pub async fn new(process_mgr: &ProcessManager, bitcoind: Bitcoind) -> Result<Self> {
799        debug!("Starting esplora");
800        let daemon_dir = process_mgr
801            .globals
802            .FM_BTC_DIR
803            .to_str()
804            .context("non utf8 path")?;
805        let esplora_dir = process_mgr
806            .globals
807            .FM_ESPLORA_DIR
808            .to_str()
809            .context("non utf8 path")?;
810
811        let btc_rpc_port = process_mgr.globals.FM_PORT_BTC_RPC;
812        let esplora_port = process_mgr.globals.FM_PORT_ESPLORA;
813        let esplora_monitoring_port = process_mgr.globals.FM_PORT_ESPLORA_MONITORING;
814        // spawn esplora
815        let cmd = cmd!(
816            crate::util::Esplora,
817            "--daemon-dir={daemon_dir}",
818            "--db-dir={esplora_dir}",
819            "--cookie=bitcoin:bitcoin",
820            "--network=regtest",
821            "--daemon-rpc-addr=127.0.0.1:{btc_rpc_port}",
822            "--http-addr=127.0.0.1:{esplora_port}",
823            "--monitoring-addr=127.0.0.1:{esplora_monitoring_port}",
824            "--jsonrpc-import", // Workaround for incompatible on-disk format
825        );
826        let process = process_mgr.spawn_daemon("esplora", cmd).await?;
827
828        Self::wait_for_ready(process_mgr).await?;
829        debug!(target: LOG_DEVIMINT, "Esplora ready");
830
831        Ok(Self {
832            _bitcoind: bitcoind,
833            _process: process,
834        })
835    }
836
837    /// Wait until the server is able to respond to requests.
838    async fn wait_for_ready(process_mgr: &ProcessManager) -> Result<()> {
839        let client = esplora_client::Builder::new(&format!(
840            "http://localhost:{}",
841            process_mgr.globals.FM_PORT_ESPLORA
842        ))
843        // Disable retrying in the client since we're already retrying in the poll below.
844        .max_retries(0)
845        .build_async()
846        .expect("esplora client build failed");
847
848        poll("esplora server ready", || async {
849            client
850                .get_fee_estimates()
851                .await
852                .map_err(|e| ControlFlow::Continue(anyhow::anyhow!(e)))?;
853
854            Ok(())
855        })
856        .await?;
857
858        Ok(())
859    }
860}
861
862#[allow(unused)]
863pub struct ExternalDaemons {
864    pub bitcoind: Bitcoind,
865    pub esplora: Esplora,
866}
867
868pub async fn external_daemons(process_mgr: &ProcessManager) -> Result<ExternalDaemons> {
869    let bitcoind = Bitcoind::new(process_mgr, false).await?;
870    let esplora = Esplora::new(process_mgr, bitcoind.clone()).await?;
871    let start_time = fedimint_core::time::now();
872    // make sure the bitcoind wallet is ready
873    let _ = bitcoind.wallet_client().await?;
874    info!(
875        target: LOG_DEVIMINT,
876        "starting base daemons took {:?}",
877        start_time.elapsed()?
878    );
879    Ok(ExternalDaemons { bitcoind, esplora })
880}