1use std::ops::ControlFlow;
2use std::str::FromStr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::{Context, Result, anyhow, bail};
7use bitcoin::hashes::{Hash, sha256};
8use bitcoincore_rpc::bitcoin::{Address, BlockHash};
9use bitcoincore_rpc::bitcoincore_rpc_json::{GetBalancesResult, GetBlockchainInfoResult};
10use bitcoincore_rpc::jsonrpc::error::RpcError;
11use bitcoincore_rpc::{Auth, RpcApi};
12use fedimint_core::encoding::Encodable;
13use fedimint_core::rustls::install_crypto_provider;
14use fedimint_core::task::jit::{JitTry, JitTryAnyhow};
15use fedimint_core::task::{block_in_place, sleep, timeout};
16use fedimint_core::util::backoff_util::api_networking_backoff;
17use fedimint_core::util::{FmtCompact as _, SafeUrl, retry, write_overwrite_async};
18use fedimint_logging::LOG_DEVIMINT;
19use fedimint_testing_core::node_type::LightningNodeType;
20use futures::StreamExt;
21use hex::ToHex;
22use itertools::Itertools;
23use tokio::fs;
24use tokio::sync::{MappedMutexGuard, Mutex, MutexGuard};
25use tokio::task::spawn_blocking;
26use tokio::time::Instant;
27use tonic_lnd::Client as LndClient;
28use tonic_lnd::lnrpc::GetInfoRequest;
29use tonic_lnd::routerrpc::SendPaymentRequest;
30use tracing::{debug, info, trace, warn};
31
32use crate::util::{ProcessHandle, ProcessManager, poll};
33use crate::vars::utf8;
34use crate::{Gatewayd, cmd};
35
36#[derive(Clone)]
37pub struct Bitcoind {
38 pub client: Arc<bitcoincore_rpc::Client>,
39 pub(crate) wallet_client: Arc<JitTryAnyhow<Arc<bitcoincore_rpc::Client>>>,
40 pub(crate) _process: ProcessHandle,
41}
42
43impl Bitcoind {
44 pub async fn new(processmgr: &ProcessManager, skip_setup: bool) -> Result<Self> {
45 let btc_dir = utf8(&processmgr.globals.FM_BTC_DIR);
46
47 let esplora_pid_file_path = processmgr.globals.FM_TEST_DIR.join("esplora.pid");
48 let esplora_pid_file = utf8(&esplora_pid_file_path);
49 let conf = format!(
50 include_str!("cfg/bitcoin.conf"),
51 rpc_port = processmgr.globals.FM_PORT_BTC_RPC,
52 p2p_port = processmgr.globals.FM_PORT_BTC_P2P,
53 zmq_pub_raw_block = processmgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_BLOCK,
54 zmq_pub_raw_tx = processmgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_TX,
55 esplora_pid_file = esplora_pid_file,
56 tx_index = "0",
57 );
58 write_overwrite_async(processmgr.globals.FM_BTC_DIR.join("bitcoin.conf"), conf).await?;
59 let process = processmgr
60 .spawn_daemon(
61 "bitcoind",
62 cmd!(crate::util::Bitcoind, "-datadir={btc_dir}"),
63 )
64 .await?;
65
66 let url: SafeUrl = processmgr.globals.FM_BITCOIN_RPC_URL.parse()?;
67
68 debug!("Parsed FM_BITCOIN_RPC_URL: {:?}", &url);
69
70 let auth = Auth::UserPass(
71 url.username().to_owned(),
72 url.password()
73 .context("Bitcoin RPC URL is missing password")?
74 .to_owned(),
75 );
76
77 let host = url
78 .without_auth()
79 .map_err(|()| anyhow!("Failed to strip auth from Bitcoin Rpc Url"))?
80 .to_string();
81 let wallet_name = "default";
82 let host = format!("{host}wallet/{wallet_name}");
83
84 debug!(target: LOG_DEVIMINT, "bitcoind host: {:?}, auth: {:?}", &host, auth);
85 let client =
86 Self::new_bitcoin_rpc(&host, auth.clone()).context("Failed to connect to bitcoind")?;
87 let wallet_client = JitTry::new_try(move || async move {
88 let client =
89 Self::new_bitcoin_rpc(&host, auth).context("Failed to connect to bitcoind")?;
90 Self::init(&client, skip_setup).await?;
91 Ok(Arc::new(client))
92 });
93
94 let bitcoind = Self {
95 _process: process,
96 client: Arc::new(client),
97 wallet_client: Arc::new(wallet_client),
98 };
99
100 bitcoind.poll_ready().await?;
101
102 let addr = bitcoind.get_new_address().await?;
104 bitcoind.generate_to_address(1, addr).await?;
105
106 Ok(bitcoind)
107 }
108
109 fn new_bitcoin_rpc(
110 url: &str,
111 auth: bitcoincore_rpc::Auth,
112 ) -> anyhow::Result<bitcoincore_rpc::Client> {
113 const RPC_TIMEOUT: Duration = Duration::from_secs(45);
115 let mut builder = bitcoincore_rpc::jsonrpc::simple_http::Builder::new()
116 .url(url)?
117 .timeout(RPC_TIMEOUT);
118 let (user, pass) = auth.get_user_pass()?;
119 if let Some(user) = user {
120 builder = builder.auth(user, pass);
121 }
122 let client = bitcoincore_rpc::jsonrpc::Client::with_transport(builder.build());
123 Ok(bitcoincore_rpc::Client::from_jsonrpc(client))
124 }
125
126 pub(crate) async fn init(client: &bitcoincore_rpc::Client, skip_setup: bool) -> Result<()> {
127 debug!(target: LOG_DEVIMINT, "Setting up bitcoind...");
128 for attempt in 0.. {
130 match block_in_place(|| client.create_wallet("default", None, None, None, None)) {
131 Ok(_) => {
132 break;
133 }
134 Err(err) => {
135 if err.to_string().contains("Database already exists") {
136 break;
137 }
138 if attempt % 20 == 19 {
139 debug!(target: LOG_DEVIMINT, %attempt, err = %err.fmt_compact(), "Waiting for initial bitcoind wallet initialization");
140 }
141 sleep(Duration::from_millis(100)).await;
142 }
143 }
144 }
145
146 if !skip_setup {
147 let blocks = 101;
149 let address = block_in_place(|| client.get_new_address(None, None))?
150 .require_network(bitcoin::Network::Regtest)
151 .expect("Devimint always runs in regtest");
152 debug!(target: LOG_DEVIMINT, blocks_num=blocks, %address, "Mining blocks to address");
153 block_in_place(|| {
154 client
155 .generate_to_address(blocks, &address)
156 .context("Failed to generate blocks")
157 })?;
158 trace!(target: LOG_DEVIMINT, blocks_num=blocks, %address, "Mining blocks to address complete");
159 }
160
161 poll("bitcoind", || async {
163 let info = block_in_place(|| client.get_blockchain_info())
164 .context("bitcoind getblockchaininfo")
165 .map_err(ControlFlow::Continue)?;
166 if info.blocks > 100 {
167 Ok(())
168 } else {
169 Err(ControlFlow::Continue(anyhow!(
170 "not enough blocks: {}",
171 info.blocks
172 )))
173 }
174 })
175 .await?;
176 debug!("Bitcoind ready");
177 Ok(())
178 }
179
180 async fn poll_ready(&self) -> anyhow::Result<()> {
182 poll("bitcoind rpc ready", || async {
183 self.get_block_count()
184 .await
185 .map_err(ControlFlow::Continue::<anyhow::Error, _>)?;
186 Ok(())
187 })
188 .await
189 }
190
191 pub async fn wallet_client(&self) -> anyhow::Result<&Self> {
194 self.wallet_client.get_try().await?;
195 Ok(self)
196 }
197
198 pub async fn get_block_count(&self) -> Result<u64> {
204 let client = self.client.clone();
205 Ok(spawn_blocking(move || client.get_block_count()).await?? + 1)
206 }
207
208 pub async fn mine_blocks_no_wait(&self, block_num: u64) -> Result<u64> {
209 let start_time = Instant::now();
210 debug!(target: LOG_DEVIMINT, ?block_num, "Mining bitcoin blocks");
211 let addr = self.get_new_address().await?;
212 let initial_block_count = self.get_block_count().await?;
213 self.generate_to_address(block_num, addr).await?;
214 debug!(target: LOG_DEVIMINT,
215 elapsed_ms = %start_time.elapsed().as_millis(),
216 ?block_num, "Mined blocks (no wait)");
217
218 Ok(initial_block_count)
219 }
220
221 pub async fn mine_blocks(&self, block_num: u64) -> Result<()> {
222 const BLOCK_NUM_LIMIT: u64 = 32;
229
230 if BLOCK_NUM_LIMIT < block_num {
231 warn!(
232 target: LOG_DEVIMINT,
233 %block_num,
234 "Mining a lot of blocks (even when split) is a terrible idea and can lead to issues. Splitting request just to make it work somehow."
235 );
236 let mut block_num = block_num;
237
238 loop {
239 if BLOCK_NUM_LIMIT < block_num {
240 block_num -= BLOCK_NUM_LIMIT;
241 Box::pin(async { self.mine_blocks(BLOCK_NUM_LIMIT).await }).await?;
242 } else {
243 Box::pin(async { self.mine_blocks(block_num).await }).await?;
244 return Ok(());
245 }
246 }
247 }
248 let start_time = Instant::now();
249 debug!(target: LOG_DEVIMINT, ?block_num, "Mining bitcoin blocks");
250 let addr = self.get_new_address().await?;
251 let initial_block_count = self.get_block_count().await?;
252 self.generate_to_address(block_num, addr).await?;
253 while self.get_block_count().await? < initial_block_count + block_num {
254 trace!(target: LOG_DEVIMINT, ?block_num, "Waiting for blocks to be mined");
255 sleep(Duration::from_millis(100)).await;
256 }
257
258 debug!(target: LOG_DEVIMINT,
259 elapsed_ms = %start_time.elapsed().as_millis(),
260 ?block_num, "Mined blocks");
261
262 Ok(())
263 }
264
265 pub async fn send_to(&self, addr: String, amount: u64) -> Result<bitcoin::Txid> {
266 debug!(target: LOG_DEVIMINT, amount, addr, "Sending funds from bitcoind");
267 let amount = bitcoin::Amount::from_sat(amount);
268 let tx = self
269 .wallet_client()
270 .await?
271 .send_to_address(
272 bitcoin::Address::from_str(&addr)?
273 .require_network(bitcoin::Network::Regtest)
274 .expect("Devimint always runs in regtest"),
275 amount,
276 )
277 .await?;
278 Ok(tx)
279 }
280
281 pub async fn get_txout_proof(&self, txid: bitcoin::Txid) -> Result<String> {
282 let client = self.wallet_client().await?.clone();
283 let proof = spawn_blocking(move || client.client.get_tx_out_proof(&[txid], None)).await??;
284 Ok(proof.encode_hex())
285 }
286
287 pub async fn poll_get_transaction(&self, txid: bitcoin::Txid) -> anyhow::Result<String> {
290 poll("Waiting for transaction in mempool", || async {
291 match self
292 .get_transaction(txid)
293 .await
294 .context("getrawtransaction")
295 {
296 Ok(Some(tx)) => Ok(tx),
297 Ok(None) => Err(ControlFlow::Continue(anyhow::anyhow!(
298 "Transaction not found yet"
299 ))),
300 Err(err) => Err(ControlFlow::Break(err)),
301 }
302 })
303 .await
304 }
305
306 async fn get_transaction(&self, txid: bitcoin::Txid) -> Result<Option<String>> {
308 match self.get_raw_transaction(txid, None).await {
310 Ok(None) => {}
313 other => return other,
315 }
316
317 let block_count = self.get_block_count().await?;
318
319 let mut buffered_tx_stream = futures::stream::iter((0..block_count).rev())
325 .map(|height| async move {
326 let block_hash = self.get_block_hash(height).await?;
327 self.get_raw_transaction(txid, Some(block_hash)).await
328 })
329 .buffered(32);
330
331 while let Some(tx_or) = buffered_tx_stream.next().await {
332 match tx_or {
333 Ok(None) => {}
336 other => return other,
338 }
339 }
340
341 Ok(None)
343 }
344
345 async fn get_raw_transaction(
346 &self,
347 txid: bitcoin::Txid,
348 block_hash: Option<BlockHash>,
349 ) -> Result<Option<String>> {
350 let client = self.client.clone();
351 let tx_or =
352 spawn_blocking(move || client.get_raw_transaction(&txid, block_hash.as_ref())).await?;
353
354 let tx = match tx_or {
355 Ok(tx) => tx,
356 Err(bitcoincore_rpc::Error::JsonRpc(bitcoincore_rpc::jsonrpc::Error::Rpc(
361 RpcError { code: -5, .. },
362 ))) => return Ok(None),
363 Err(err) => return Err(err.into()),
364 };
365 let bytes = tx.consensus_encode_to_vec();
366 Ok(Some(bytes.encode_hex()))
367 }
368
369 async fn get_block_hash(&self, height: u64) -> Result<BlockHash> {
370 let client = self.client.clone();
371 Ok(spawn_blocking(move || client.get_block_hash(height)).await??)
372 }
373
374 pub async fn get_new_address(&self) -> Result<Address> {
375 let client = self.wallet_client().await?.clone();
376 let addr = spawn_blocking(move || client.client.get_new_address(None, None))
377 .await??
378 .require_network(bitcoin::Network::Regtest)
379 .expect("Devimint always runs in regtest");
380 Ok(addr)
381 }
382
383 pub async fn generate_to_address(
384 &self,
385 block_num: u64,
386 address: Address,
387 ) -> Result<Vec<BlockHash>> {
388 let client = self.wallet_client().await?.clone();
389 Ok(
390 spawn_blocking(move || client.client.generate_to_address(block_num, &address))
391 .await??,
392 )
393 }
394
395 pub async fn get_blockchain_info(&self) -> anyhow::Result<GetBlockchainInfoResult> {
396 let client = self.client.clone();
397 Ok(spawn_blocking(move || client.get_blockchain_info()).await??)
398 }
399
400 pub async fn send_to_address(
401 &self,
402 addr: Address,
403 amount: bitcoin::Amount,
404 ) -> anyhow::Result<bitcoin::Txid> {
405 let client = self.wallet_client().await?.clone();
406
407 let raw_client = client.client.clone();
408 let txid = spawn_blocking(move || {
409 raw_client.send_to_address(&addr, amount, None, None, None, None, None, None)
410 })
411 .await??;
412
413 retry(
416 "await-genesis-tx-processed".to_string(),
417 api_networking_backoff(),
418 || async {
419 if client.get_transaction(txid).await?.is_none() {
420 bail!("Genesis tx not visible yet = {txid}");
421 }
422
423 Ok(())
424 },
425 )
426 .await
427 .expect("Number of retries has no limit");
428 Ok(txid)
429 }
430
431 pub(crate) async fn get_balances(&self) -> anyhow::Result<GetBalancesResult> {
432 let client = self.wallet_client().await?.clone();
433 Ok(spawn_blocking(move || client.client.get_balances()).await??)
434 }
435
436 pub(crate) fn get_jsonrpc_client(&self) -> &bitcoincore_rpc::jsonrpc::Client {
437 self.client.get_jsonrpc_client()
438 }
439}
440
441#[derive(Clone)]
442pub struct Lnd {
443 pub(crate) client: Arc<Mutex<LndClient>>,
444 pub(crate) process: ProcessHandle,
445 pub(crate) _bitcoind: Bitcoind,
446}
447
448impl Lnd {
449 pub async fn new(process_mgr: &ProcessManager, bitcoind: Bitcoind) -> Result<Self> {
450 let (process, client) = Lnd::start(process_mgr).await?;
451 let this = Self {
452 _bitcoind: bitcoind,
453 client: Arc::new(Mutex::new(client)),
454 process,
455 };
456 poll("lnd_startup", || async {
458 this.pub_key().await.map_err(ControlFlow::Continue)
459 })
460 .await?;
461 Ok(this)
462 }
463
464 pub async fn start(process_mgr: &ProcessManager) -> Result<(ProcessHandle, LndClient)> {
465 let conf = format!(
466 include_str!("cfg/lnd.conf"),
467 listen_port = process_mgr.globals.FM_PORT_LND_LISTEN,
468 rpc_port = process_mgr.globals.FM_PORT_LND_RPC,
469 rest_port = process_mgr.globals.FM_PORT_LND_REST,
470 btc_rpc_port = process_mgr.globals.FM_PORT_BTC_RPC,
471 zmq_pub_raw_block = process_mgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_BLOCK,
472 zmq_pub_raw_tx = process_mgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_TX,
473 );
474 write_overwrite_async(process_mgr.globals.FM_LND_DIR.join("lnd.conf"), conf).await?;
475 let cmd = cmd!(
476 crate::util::Lnd,
477 format!("--lnddir={}", utf8(&process_mgr.globals.FM_LND_DIR))
478 );
479
480 let process = process_mgr.spawn_daemon("lnd", cmd).await?;
481 let lnd_rpc_addr = &process_mgr.globals.FM_LND_RPC_ADDR;
482 let lnd_macaroon = &process_mgr.globals.FM_LND_MACAROON;
483 let lnd_tls_cert = &process_mgr.globals.FM_LND_TLS_CERT;
484 poll("wait for lnd files", || async {
485 if fs::try_exists(lnd_tls_cert)
486 .await
487 .context("lnd tls cert")
488 .map_err(ControlFlow::Continue)?
489 && fs::try_exists(lnd_macaroon)
490 .await
491 .context("lnd macaroon")
492 .map_err(ControlFlow::Continue)?
493 {
494 Ok(())
495 } else {
496 Err(ControlFlow::Continue(anyhow!(
497 "lnd tls cert or lnd macaroon not found"
498 )))
499 }
500 })
501 .await?;
502
503 install_crypto_provider().await;
504
505 let client = poll("lnd_connect", || async {
506 tonic_lnd::connect(
507 lnd_rpc_addr.clone(),
508 lnd_tls_cert.clone(),
509 lnd_macaroon.clone(),
510 )
511 .await
512 .context("lnd connect")
513 .map_err(ControlFlow::Continue)
514 })
515 .await?;
516
517 Ok((process, client))
518 }
519
520 pub async fn lightning_client_lock(
521 &self,
522 ) -> Result<MappedMutexGuard<'_, tonic_lnd::LightningClient>> {
523 let guard = self.client.lock().await;
524 Ok(MutexGuard::map(guard, |client| client.lightning()))
525 }
526
527 pub async fn invoices_client_lock(
528 &self,
529 ) -> Result<MappedMutexGuard<'_, tonic_lnd::InvoicesClient>> {
530 let guard = self.client.lock().await;
531 Ok(MutexGuard::map(guard, |client| client.invoices()))
532 }
533
534 pub async fn router_client_lock(
535 &self,
536 ) -> Result<MappedMutexGuard<'_, tonic_lnd::RouterClient>> {
537 let guard = self.client.lock().await;
538 Ok(MutexGuard::map(guard, |client| client.router()))
539 }
540
541 pub async fn pub_key(&self) -> Result<String> {
542 Ok(self
543 .lightning_client_lock()
544 .await?
545 .get_info(GetInfoRequest {})
546 .await?
547 .into_inner()
548 .identity_pubkey)
549 }
550
551 pub async fn terminate(self) -> Result<()> {
552 self.process.terminate().await
553 }
554
555 pub async fn invoice(&self, amount: u64) -> anyhow::Result<(String, Vec<u8>)> {
556 let add_invoice = self
557 .lightning_client_lock()
558 .await?
559 .add_invoice(tonic_lnd::lnrpc::Invoice {
560 value_msat: amount as i64,
561 ..Default::default()
562 })
563 .await?
564 .into_inner();
565 let invoice = add_invoice.payment_request;
566 let payment_hash = add_invoice.r_hash;
567 Ok((invoice, payment_hash))
568 }
569
570 pub async fn pay_bolt11_invoice(&self, invoice: String) -> anyhow::Result<()> {
571 let mut payment = self
572 .router_client_lock()
573 .await?
574 .send_payment_v2(SendPaymentRequest {
575 payment_request: invoice.clone(),
576 ..Default::default()
577 })
578 .await?
579 .into_inner();
580
581 while let Some(update) = payment.message().await? {
582 match update.status() {
583 tonic_lnd::lnrpc::payment::PaymentStatus::Succeeded => return Ok(()),
584 tonic_lnd::lnrpc::payment::PaymentStatus::InFlight => {}
585 _ => return Err(anyhow!("LND lightning payment failed")),
586 }
587 }
588
589 Err(anyhow!("LND lightning payment unknown status"))
590 }
591
592 pub async fn wait_bolt11_invoice(&self, payment_hash: Vec<u8>) -> anyhow::Result<()> {
593 let invoice_status = self
594 .lightning_client_lock()
595 .await?
596 .lookup_invoice(tonic_lnd::lnrpc::PaymentHash {
597 r_hash: payment_hash,
598 ..Default::default()
599 })
600 .await?
601 .into_inner()
602 .state();
603 anyhow::ensure!(invoice_status == tonic_lnd::lnrpc::invoice::InvoiceState::Settled);
604
605 Ok(())
606 }
607
608 pub async fn create_hold_invoice(
609 &self,
610 amount: u64,
611 ) -> anyhow::Result<([u8; 32], String, sha256::Hash)> {
612 let preimage = rand::random::<[u8; 32]>();
613 let hash = {
614 let mut engine = bitcoin::hashes::sha256::Hash::engine();
615 bitcoin::hashes::HashEngine::input(&mut engine, &preimage);
616 bitcoin::hashes::sha256::Hash::from_engine(engine)
617 };
618 let cltv_expiry = 650;
619 let hold_request = self
620 .invoices_client_lock()
621 .await?
622 .add_hold_invoice(tonic_lnd::invoicesrpc::AddHoldInvoiceRequest {
623 value_msat: amount as i64,
624 hash: hash.to_byte_array().to_vec(),
625 cltv_expiry,
626 ..Default::default()
627 })
628 .await?
629 .into_inner();
630 let payment_request = hold_request.payment_request;
631 Ok((preimage, payment_request, hash))
632 }
633
634 pub async fn settle_hold_invoice(
635 &self,
636 preimage: [u8; 32],
637 payment_hash: sha256::Hash,
638 ) -> anyhow::Result<()> {
639 let mut hold_invoice_subscription = self
640 .invoices_client_lock()
641 .await?
642 .subscribe_single_invoice(tonic_lnd::invoicesrpc::SubscribeSingleInvoiceRequest {
643 r_hash: payment_hash.to_byte_array().to_vec(),
644 })
645 .await?
646 .into_inner();
647 loop {
648 const WAIT_FOR_INVOICE_TIMEOUT: Duration = Duration::from_mins(1);
649 match timeout(
650 WAIT_FOR_INVOICE_TIMEOUT,
651 futures::StreamExt::next(&mut hold_invoice_subscription),
652 )
653 .await
654 {
655 Ok(Some(Ok(invoice))) => {
656 if invoice.state() == tonic_lnd::lnrpc::invoice::InvoiceState::Accepted {
657 break;
658 }
659 debug!("hold invoice payment state: {:?}", invoice.state());
660 }
661 Ok(Some(Err(e))) => {
662 bail!("error in invoice subscription: {e:?}");
663 }
664 Ok(None) => {
665 bail!("invoice subscription ended before invoice was accepted");
666 }
667 Err(_) => {
668 bail!("timed out waiting for invoice to be accepted")
669 }
670 }
671 }
672
673 self.invoices_client_lock()
674 .await?
675 .settle_invoice(tonic_lnd::invoicesrpc::SettleInvoiceMsg {
676 preimage: preimage.to_vec(),
677 })
678 .await?;
679
680 Ok(())
681 }
682}
683
684pub type NamedGateway<'a> = (&'a Gatewayd, &'a str);
685
686#[allow(clippy::similar_names)]
687pub async fn open_channels_between_gateways(
688 bitcoind: &Bitcoind,
689 gateways: &[NamedGateway<'_>],
690) -> Result<()> {
691 let block_height = bitcoind.get_block_count().await? - 1;
692 debug!(target: LOG_DEVIMINT, ?block_height, "Syncing gateway lightning nodes to block height...");
693 futures::future::try_join_all(
694 gateways
695 .iter()
696 .map(|(gw, _gw_name)| async { gw.client().wait_for_block_height(block_height).await }),
697 )
698 .await?;
699
700 info!(target: LOG_DEVIMINT, "Funding all gateway lightning nodes...");
701 for (gw, _gw_name) in gateways {
702 let funding_addr = gw.client().get_ln_onchain_address().await?;
703 bitcoind.send_to(funding_addr, 100_000_000).await?;
704 }
705
706 bitcoind.mine_blocks(10).await?;
707
708 info!(target: LOG_DEVIMINT, "Gateway lightning nodes funded.");
709
710 let block_height = bitcoind.get_block_count().await? - 1;
711 debug!(target: LOG_DEVIMINT, ?block_height, "Syncing gateway lightning nodes to block height...");
712 futures::future::try_join_all(
713 gateways
714 .iter()
715 .map(|(gw, _gw_name)| async { gw.client().wait_for_block_height(block_height).await }),
716 )
717 .await?;
718
719 let gateway_pairs: Vec<(&NamedGateway, &NamedGateway)> =
724 gateways.iter().circular_tuple_windows::<(_, _)>().collect();
725
726 info!(target: LOG_DEVIMINT, block_height = %block_height, "devimint current block");
727 let sats_per_side = 5_000_000;
728 for ((gw_a, gw_a_name), (gw_b, gw_b_name)) in &gateway_pairs {
729 info!(target: LOG_DEVIMINT, from=%gw_a_name, to=%gw_b_name, "Opening channel with {sats_per_side} sats on each side...");
730 let txid = gw_a
731 .client()
732 .open_channel(gw_b, sats_per_side * 2, Some(sats_per_side))
733 .await?;
734
735 bitcoind.poll_get_transaction(txid).await?;
736 }
737
738 bitcoind.mine_blocks(10).await?;
739
740 let block_height = bitcoind.get_block_count().await? - 1;
741 debug!(target: LOG_DEVIMINT, ?block_height, "Syncing gateway lightning nodes to block height...");
742 futures::future::try_join_all(
743 gateways
744 .iter()
745 .map(|(gw, _gw_name)| async { gw.client().wait_for_block_height(block_height).await }),
746 )
747 .await?;
748
749 for ((gw_a, _gw_a_name), (gw_b, _gw_b_name)) in &gateway_pairs {
750 let gw_a_node_pubkey = gw_a.client().lightning_pubkey().await?;
751 let gw_b_node_pubkey = gw_b.client().lightning_pubkey().await?;
752
753 wait_for_ready_channel_on_gateway_with_counterparty(gw_b, gw_a_node_pubkey).await?;
754 wait_for_ready_channel_on_gateway_with_counterparty(gw_a, gw_b_node_pubkey).await?;
755 }
756
757 info!(target: LOG_DEVIMINT, "open_channels_between_gateways successful");
758
759 Ok(())
760}
761
762async fn wait_for_ready_channel_on_gateway_with_counterparty(
763 gw: &Gatewayd,
764 counterparty_lightning_node_pubkey: bitcoin::secp256k1::PublicKey,
765) -> anyhow::Result<()> {
766 poll(
767 &format!("Wait for {} channel update", gw.gw_name),
768 || async {
769 let channels = gw
770 .client()
771 .list_channels()
772 .await
773 .context("list channels")
774 .map_err(ControlFlow::Break)?;
775
776 if channels
777 .iter()
778 .any(|channel| channel.remote_pubkey == counterparty_lightning_node_pubkey && channel.is_active)
779 {
780 return Ok(());
781 }
782
783 debug!(target: LOG_DEVIMINT, ?channels, gw = gw.gw_name, "Counterparty channels not found open");
784 Err(ControlFlow::Continue(anyhow!("channel not found")))
785 },
786 )
787 .await
788}
789
790#[derive(Clone)]
791pub enum LightningNode {
792 Lnd(Lnd),
793 Ldk {
794 name: String,
795 gw_port: u16,
796 ldk_port: u16,
797 metrics_port: u16,
798 },
799}
800
801impl LightningNode {
802 pub fn ln_type(&self) -> LightningNodeType {
803 match self {
804 LightningNode::Lnd(_) => LightningNodeType::Lnd,
805 LightningNode::Ldk {
806 name: _,
807 gw_port: _,
808 ldk_port: _,
809 metrics_port: _,
810 } => LightningNodeType::Ldk,
811 }
812 }
813}
814
815#[derive(Clone)]
816pub struct Esplora {
817 _process: ProcessHandle,
818 _bitcoind: Bitcoind,
819}
820
821impl Esplora {
822 pub async fn new(process_mgr: &ProcessManager, bitcoind: Bitcoind) -> Result<Self> {
823 debug!("Starting esplora");
824 let daemon_dir = process_mgr
825 .globals
826 .FM_BTC_DIR
827 .to_str()
828 .context("non utf8 path")?;
829 let esplora_dir = process_mgr
830 .globals
831 .FM_ESPLORA_DIR
832 .to_str()
833 .context("non utf8 path")?;
834
835 let btc_rpc_port = process_mgr.globals.FM_PORT_BTC_RPC;
836 let esplora_port = process_mgr.globals.FM_PORT_ESPLORA;
837 let esplora_monitoring_port = process_mgr.globals.FM_PORT_ESPLORA_MONITORING;
838 let cmd = cmd!(
840 crate::util::Esplora,
841 "--daemon-dir={daemon_dir}",
842 "--db-dir={esplora_dir}",
843 "--cookie=bitcoin:bitcoin",
844 "--network=regtest",
845 "--daemon-rpc-addr=127.0.0.1:{btc_rpc_port}",
846 "--http-addr=127.0.0.1:{esplora_port}",
847 "--monitoring-addr=127.0.0.1:{esplora_monitoring_port}",
848 "--jsonrpc-import", "--cors=*", );
851 let process = process_mgr.spawn_daemon("esplora", cmd).await?;
852
853 Self::wait_for_ready(process_mgr).await?;
854 debug!(target: LOG_DEVIMINT, "Esplora ready");
855 if let Some(pid) = process.id().await {
856 write_overwrite_async(
857 process_mgr.globals.FM_TEST_DIR.join("esplora.pid"),
858 pid.to_string(),
859 )
860 .await?;
861 }
862
863 Ok(Self {
864 _bitcoind: bitcoind,
865 _process: process,
866 })
867 }
868
869 async fn wait_for_ready(process_mgr: &ProcessManager) -> Result<()> {
871 let client = esplora_client::Builder::new(&format!(
872 "http://localhost:{}",
873 process_mgr.globals.FM_PORT_ESPLORA
874 ))
875 .max_retries(0)
877 .build_async()
878 .expect("esplora client build failed");
879
880 poll("esplora server ready", || async {
881 client
882 .get_fee_estimates()
883 .await
884 .map_err(|e| ControlFlow::Continue(anyhow::anyhow!(e)))?;
885
886 Ok(())
887 })
888 .await?;
889
890 Ok(())
891 }
892}
893
894#[allow(unused)]
895pub struct ExternalDaemons {
896 pub bitcoind: Bitcoind,
897 pub esplora: Esplora,
898}
899
900pub async fn external_daemons(process_mgr: &ProcessManager) -> Result<ExternalDaemons> {
901 let bitcoind = Bitcoind::new(process_mgr, false).await?;
902 let esplora = Esplora::new(process_mgr, bitcoind.clone()).await?;
903 let start_time = fedimint_core::time::now();
904 let _ = bitcoind.wallet_client().await?;
906 info!(
907 target: LOG_DEVIMINT,
908 "starting base daemons took {:?}",
909 start_time.elapsed()?
910 );
911 Ok(ExternalDaemons { bitcoind, esplora })
912}