1use std::ops::ControlFlow;
2use std::str::FromStr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::{Context, Result, anyhow, bail};
7use bitcoin::hashes::{Hash, sha256};
8use bitcoincore_rpc::bitcoin::{Address, BlockHash};
9use bitcoincore_rpc::bitcoincore_rpc_json::{GetBalancesResult, GetBlockchainInfoResult};
10use bitcoincore_rpc::jsonrpc::error::RpcError;
11use bitcoincore_rpc::{Auth, RpcApi};
12use fedimint_core::encoding::Encodable;
13use fedimint_core::rustls::install_crypto_provider;
14use fedimint_core::task::jit::{JitTry, JitTryAnyhow};
15use fedimint_core::task::{block_in_place, sleep, timeout};
16use fedimint_core::util::{FmtCompact as _, SafeUrl, write_overwrite_async};
17use fedimint_logging::LOG_DEVIMINT;
18use fedimint_testing_core::node_type::LightningNodeType;
19use futures::StreamExt;
20use hex::ToHex;
21use itertools::Itertools;
22use tokio::fs;
23use tokio::sync::{MappedMutexGuard, Mutex, MutexGuard};
24use tokio::task::spawn_blocking;
25use tokio::time::Instant;
26use tonic_lnd::Client as LndClient;
27use tonic_lnd::lnrpc::GetInfoRequest;
28use tracing::{debug, info, trace, warn};
29
30use crate::util::{ProcessHandle, ProcessManager, poll};
31use crate::vars::utf8;
32use crate::{Gatewayd, cmd};
33
34#[derive(Clone)]
35pub struct Bitcoind {
36 pub client: Arc<bitcoincore_rpc::Client>,
37 pub(crate) wallet_client: Arc<JitTryAnyhow<Arc<bitcoincore_rpc::Client>>>,
38 pub(crate) _process: ProcessHandle,
39}
40
41impl Bitcoind {
42 pub async fn new(processmgr: &ProcessManager, skip_setup: bool) -> Result<Self> {
43 let btc_dir = utf8(&processmgr.globals.FM_BTC_DIR);
44
45 let conf = format!(
46 include_str!("cfg/bitcoin.conf"),
47 rpc_port = processmgr.globals.FM_PORT_BTC_RPC,
48 p2p_port = processmgr.globals.FM_PORT_BTC_P2P,
49 zmq_pub_raw_block = processmgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_BLOCK,
50 zmq_pub_raw_tx = processmgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_TX,
51 tx_index = "0",
52 );
53 write_overwrite_async(processmgr.globals.FM_BTC_DIR.join("bitcoin.conf"), conf).await?;
54 let process = processmgr
55 .spawn_daemon(
56 "bitcoind",
57 cmd!(crate::util::Bitcoind, "-datadir={btc_dir}"),
58 )
59 .await?;
60
61 let url: SafeUrl = processmgr.globals.FM_BITCOIN_RPC_URL.parse()?;
62
63 debug!("Parsed FM_BITCOIN_RPC_URL: {:?}", &url);
64
65 let auth = Auth::UserPass(
66 url.username().to_owned(),
67 url.password()
68 .context("Bitcoin RPC URL is missing password")?
69 .to_owned(),
70 );
71
72 let host = url
73 .without_auth()
74 .map_err(|()| anyhow!("Failed to strip auth from Bitcoin Rpc Url"))?
75 .to_string();
76 let wallet_name = "";
77 let host = format!("{host}wallet/{wallet_name}");
78
79 debug!(target: LOG_DEVIMINT, "bitcoind host: {:?}, auth: {:?}", &host, auth);
80 let client =
81 Self::new_bitcoin_rpc(&host, auth.clone()).context("Failed to connect to bitcoind")?;
82 let wallet_client = JitTry::new_try(move || async move {
83 let client =
84 Self::new_bitcoin_rpc(&host, auth).context("Failed to connect to bitcoind")?;
85 Self::init(&client, skip_setup).await?;
86 Ok(Arc::new(client))
87 });
88
89 let bitcoind = Self {
90 _process: process,
91 client: Arc::new(client),
92 wallet_client: Arc::new(wallet_client),
93 };
94
95 bitcoind.poll_ready().await?;
96
97 Ok(bitcoind)
98 }
99
100 fn new_bitcoin_rpc(
101 url: &str,
102 auth: bitcoincore_rpc::Auth,
103 ) -> anyhow::Result<bitcoincore_rpc::Client> {
104 const RPC_TIMEOUT: Duration = Duration::from_secs(45);
106 let mut builder = bitcoincore_rpc::jsonrpc::simple_http::Builder::new()
107 .url(url)?
108 .timeout(RPC_TIMEOUT);
109 let (user, pass) = auth.get_user_pass()?;
110 if let Some(user) = user {
111 builder = builder.auth(user, pass);
112 }
113 let client = bitcoincore_rpc::jsonrpc::Client::with_transport(builder.build());
114 Ok(bitcoincore_rpc::Client::from_jsonrpc(client))
115 }
116
117 pub(crate) async fn init(client: &bitcoincore_rpc::Client, skip_setup: bool) -> Result<()> {
118 debug!(target: LOG_DEVIMINT, "Setting up bitcoind...");
119 for attempt in 0.. {
121 match block_in_place(|| client.create_wallet("", None, None, None, None)) {
122 Ok(_) => {
123 break;
124 }
125 Err(err) => {
126 if err.to_string().contains("Database already exists") {
127 break;
128 }
129 if attempt % 20 == 19 {
130 debug!(target: LOG_DEVIMINT, %attempt, err = %err.fmt_compact(), "Waiting for initial bitcoind wallet initialization");
131 }
132 sleep(Duration::from_millis(100)).await;
133 }
134 }
135 }
136
137 if !skip_setup {
138 let blocks = 101;
140 let address = block_in_place(|| client.get_new_address(None, None))?
141 .require_network(bitcoin::Network::Regtest)
142 .expect("Devimint always runs in regtest");
143 debug!(target: LOG_DEVIMINT, blocks_num=blocks, %address, "Mining blocks to address");
144 block_in_place(|| {
145 client
146 .generate_to_address(blocks, &address)
147 .context("Failed to generate blocks")
148 })?;
149 trace!(target: LOG_DEVIMINT, blocks_num=blocks, %address, "Mining blocks to address complete");
150 }
151
152 poll("bitcoind", || async {
154 let info = block_in_place(|| client.get_blockchain_info())
155 .context("bitcoind getblockchaininfo")
156 .map_err(ControlFlow::Continue)?;
157 if info.blocks > 100 {
158 Ok(())
159 } else {
160 Err(ControlFlow::Continue(anyhow!(
161 "not enough blocks: {}",
162 info.blocks
163 )))
164 }
165 })
166 .await?;
167 debug!("Bitcoind ready");
168 Ok(())
169 }
170
171 async fn poll_ready(&self) -> anyhow::Result<()> {
173 poll("bitcoind rpc ready", || async {
174 self.get_block_count()
175 .await
176 .map_err(ControlFlow::Continue::<anyhow::Error, _>)?;
177 Ok(())
178 })
179 .await
180 }
181
182 pub async fn wallet_client(&self) -> anyhow::Result<&Self> {
185 self.wallet_client.get_try().await?;
186 Ok(self)
187 }
188
189 pub async fn get_block_count(&self) -> Result<u64> {
195 let client = self.client.clone();
196 Ok(spawn_blocking(move || client.get_block_count()).await?? + 1)
197 }
198
199 pub async fn mine_blocks_no_wait(&self, block_num: u64) -> Result<u64> {
200 let start_time = Instant::now();
201 debug!(target: LOG_DEVIMINT, ?block_num, "Mining bitcoin blocks");
202 let addr = self.get_new_address().await?;
203 let initial_block_count = self.get_block_count().await?;
204 self.generate_to_address(block_num, addr).await?;
205 debug!(target: LOG_DEVIMINT,
206 elapsed_ms = %start_time.elapsed().as_millis(),
207 ?block_num, "Mined blocks (no wait)");
208
209 Ok(initial_block_count)
210 }
211
212 pub async fn mine_blocks(&self, block_num: u64) -> Result<()> {
213 const BLOCK_NUM_LIMIT: u64 = 32;
220
221 if BLOCK_NUM_LIMIT < block_num {
222 warn!(
223 target: LOG_DEVIMINT,
224 %block_num,
225 "Mining a lot of blocks (even when split) is a terrible idea and can lead to issues. Splitting request just to make it work somehow."
226 );
227 let mut block_num = block_num;
228
229 loop {
230 if BLOCK_NUM_LIMIT < block_num {
231 block_num -= BLOCK_NUM_LIMIT;
232 Box::pin(async { self.mine_blocks(BLOCK_NUM_LIMIT).await }).await?;
233 } else {
234 Box::pin(async { self.mine_blocks(block_num).await }).await?;
235 return Ok(());
236 }
237 }
238 }
239 let start_time = Instant::now();
240 debug!(target: LOG_DEVIMINT, ?block_num, "Mining bitcoin blocks");
241 let addr = self.get_new_address().await?;
242 let initial_block_count = self.get_block_count().await?;
243 self.generate_to_address(block_num, addr).await?;
244 while self.get_block_count().await? < initial_block_count + block_num {
245 trace!(target: LOG_DEVIMINT, ?block_num, "Waiting for blocks to be mined");
246 sleep(Duration::from_millis(100)).await;
247 }
248
249 debug!(target: LOG_DEVIMINT,
250 elapsed_ms = %start_time.elapsed().as_millis(),
251 ?block_num, "Mined blocks");
252
253 Ok(())
254 }
255
256 pub async fn send_to(&self, addr: String, amount: u64) -> Result<bitcoin::Txid> {
257 debug!(target: LOG_DEVIMINT, amount, addr, "Sending funds from bitcoind");
258 let amount = bitcoin::Amount::from_sat(amount);
259 let tx = self
260 .wallet_client()
261 .await?
262 .send_to_address(
263 bitcoin::Address::from_str(&addr)?
264 .require_network(bitcoin::Network::Regtest)
265 .expect("Devimint always runs in regtest"),
266 amount,
267 )
268 .await?;
269 Ok(tx)
270 }
271
272 pub async fn get_txout_proof(&self, txid: bitcoin::Txid) -> Result<String> {
273 let client = self.wallet_client().await?.clone();
274 let proof = spawn_blocking(move || client.client.get_tx_out_proof(&[txid], None)).await??;
275 Ok(proof.encode_hex())
276 }
277
278 pub async fn poll_get_transaction(&self, txid: bitcoin::Txid) -> anyhow::Result<String> {
281 poll("Waiting for transaction in mempool", || async {
282 match self
283 .get_transaction(txid)
284 .await
285 .context("getrawtransaction")
286 {
287 Ok(Some(tx)) => Ok(tx),
288 Ok(None) => Err(ControlFlow::Continue(anyhow::anyhow!(
289 "Transaction not found yet"
290 ))),
291 Err(err) => Err(ControlFlow::Break(err)),
292 }
293 })
294 .await
295 }
296
297 async fn get_transaction(&self, txid: bitcoin::Txid) -> Result<Option<String>> {
299 match self.get_raw_transaction(txid, None).await {
301 Ok(None) => {}
304 other => return other,
306 }
307
308 let block_height = self.get_block_count().await? - 1;
309
310 let mut buffered_tx_stream = futures::stream::iter((0..block_height).rev())
316 .map(|height| async move {
317 let block_hash = self.get_block_hash(height).await?;
318 self.get_raw_transaction(txid, Some(block_hash)).await
319 })
320 .buffered(32);
321
322 while let Some(tx_or) = buffered_tx_stream.next().await {
323 match tx_or {
324 Ok(None) => {}
327 other => return other,
329 }
330 }
331
332 Ok(None)
334 }
335
336 async fn get_raw_transaction(
337 &self,
338 txid: bitcoin::Txid,
339 block_hash: Option<BlockHash>,
340 ) -> Result<Option<String>> {
341 let client = self.client.clone();
342 let tx_or =
343 spawn_blocking(move || client.get_raw_transaction(&txid, block_hash.as_ref())).await?;
344
345 let tx = match tx_or {
346 Ok(tx) => tx,
347 Err(bitcoincore_rpc::Error::JsonRpc(bitcoincore_rpc::jsonrpc::Error::Rpc(
352 RpcError { code: -5, .. },
353 ))) => return Ok(None),
354 Err(err) => return Err(err.into()),
355 };
356 let bytes = tx.consensus_encode_to_vec();
357 Ok(Some(bytes.encode_hex()))
358 }
359
360 async fn get_block_hash(&self, height: u64) -> Result<BlockHash> {
361 let client = self.client.clone();
362 Ok(spawn_blocking(move || client.get_block_hash(height)).await??)
363 }
364
365 pub async fn get_new_address(&self) -> Result<Address> {
366 let client = self.wallet_client().await?.clone();
367 let addr = spawn_blocking(move || client.client.get_new_address(None, None))
368 .await??
369 .require_network(bitcoin::Network::Regtest)
370 .expect("Devimint always runs in regtest");
371 Ok(addr)
372 }
373
374 pub async fn generate_to_address(
375 &self,
376 block_num: u64,
377 address: Address,
378 ) -> Result<Vec<BlockHash>> {
379 let client = self.wallet_client().await?.clone();
380 Ok(
381 spawn_blocking(move || client.client.generate_to_address(block_num, &address))
382 .await??,
383 )
384 }
385
386 pub async fn get_blockchain_info(&self) -> anyhow::Result<GetBlockchainInfoResult> {
387 let client = self.client.clone();
388 Ok(spawn_blocking(move || client.get_blockchain_info()).await??)
389 }
390
391 pub async fn send_to_address(
392 &self,
393 addr: Address,
394 amount: bitcoin::Amount,
395 ) -> anyhow::Result<bitcoin::Txid> {
396 let client = self.wallet_client().await?.clone();
397
398 let raw_client = client.client.clone();
399 let txid = spawn_blocking(move || {
400 raw_client.send_to_address(&addr, amount, None, None, None, None, None, None)
401 })
402 .await??;
403
404 assert!(client.get_transaction(txid).await?.is_some());
408
409 Ok(txid)
410 }
411
412 pub(crate) async fn get_balances(&self) -> anyhow::Result<GetBalancesResult> {
413 let client = self.wallet_client().await?.clone();
414 Ok(spawn_blocking(move || client.client.get_balances()).await??)
415 }
416
417 pub(crate) fn get_jsonrpc_client(&self) -> &bitcoincore_rpc::jsonrpc::Client {
418 self.client.get_jsonrpc_client()
419 }
420}
421
422#[derive(Clone)]
423pub struct Lnd {
424 pub(crate) client: Arc<Mutex<LndClient>>,
425 pub(crate) process: ProcessHandle,
426 pub(crate) _bitcoind: Bitcoind,
427}
428
429impl Lnd {
430 pub async fn new(process_mgr: &ProcessManager, bitcoind: Bitcoind) -> Result<Self> {
431 let (process, client) = Lnd::start(process_mgr).await?;
432 let this = Self {
433 _bitcoind: bitcoind,
434 client: Arc::new(Mutex::new(client)),
435 process,
436 };
437 poll("lnd_startup", || async {
439 this.pub_key().await.map_err(ControlFlow::Continue)
440 })
441 .await?;
442 Ok(this)
443 }
444
445 pub async fn start(process_mgr: &ProcessManager) -> Result<(ProcessHandle, LndClient)> {
446 let conf = format!(
447 include_str!("cfg/lnd.conf"),
448 listen_port = process_mgr.globals.FM_PORT_LND_LISTEN,
449 rpc_port = process_mgr.globals.FM_PORT_LND_RPC,
450 rest_port = process_mgr.globals.FM_PORT_LND_REST,
451 btc_rpc_port = process_mgr.globals.FM_PORT_BTC_RPC,
452 zmq_pub_raw_block = process_mgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_BLOCK,
453 zmq_pub_raw_tx = process_mgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_TX,
454 );
455 write_overwrite_async(process_mgr.globals.FM_LND_DIR.join("lnd.conf"), conf).await?;
456 let cmd = cmd!(
457 crate::util::Lnd,
458 format!("--lnddir={}", utf8(&process_mgr.globals.FM_LND_DIR))
459 );
460
461 let process = process_mgr.spawn_daemon("lnd", cmd).await?;
462 let lnd_rpc_addr = &process_mgr.globals.FM_LND_RPC_ADDR;
463 let lnd_macaroon = &process_mgr.globals.FM_LND_MACAROON;
464 let lnd_tls_cert = &process_mgr.globals.FM_LND_TLS_CERT;
465 poll("wait for lnd files", || async {
466 if fs::try_exists(lnd_tls_cert)
467 .await
468 .context("lnd tls cert")
469 .map_err(ControlFlow::Continue)?
470 && fs::try_exists(lnd_macaroon)
471 .await
472 .context("lnd macaroon")
473 .map_err(ControlFlow::Continue)?
474 {
475 Ok(())
476 } else {
477 Err(ControlFlow::Continue(anyhow!(
478 "lnd tls cert or lnd macaroon not found"
479 )))
480 }
481 })
482 .await?;
483
484 install_crypto_provider().await;
485
486 let client = poll("lnd_connect", || async {
487 tonic_lnd::connect(
488 lnd_rpc_addr.clone(),
489 lnd_tls_cert.clone(),
490 lnd_macaroon.clone(),
491 )
492 .await
493 .context("lnd connect")
494 .map_err(ControlFlow::Continue)
495 })
496 .await?;
497
498 Ok((process, client))
499 }
500
501 pub async fn lightning_client_lock(
502 &self,
503 ) -> Result<MappedMutexGuard<'_, tonic_lnd::LightningClient>> {
504 let guard = self.client.lock().await;
505 Ok(MutexGuard::map(guard, |client| client.lightning()))
506 }
507
508 pub async fn invoices_client_lock(
509 &self,
510 ) -> Result<MappedMutexGuard<'_, tonic_lnd::InvoicesClient>> {
511 let guard = self.client.lock().await;
512 Ok(MutexGuard::map(guard, |client| client.invoices()))
513 }
514
515 pub async fn pub_key(&self) -> Result<String> {
516 Ok(self
517 .lightning_client_lock()
518 .await?
519 .get_info(GetInfoRequest {})
520 .await?
521 .into_inner()
522 .identity_pubkey)
523 }
524
525 pub async fn terminate(self) -> Result<()> {
526 self.process.terminate().await
527 }
528
529 pub async fn invoice(&self, amount: u64) -> anyhow::Result<(String, Vec<u8>)> {
530 let add_invoice = self
531 .lightning_client_lock()
532 .await?
533 .add_invoice(tonic_lnd::lnrpc::Invoice {
534 value_msat: amount as i64,
535 ..Default::default()
536 })
537 .await?
538 .into_inner();
539 let invoice = add_invoice.payment_request;
540 let payment_hash = add_invoice.r_hash;
541 Ok((invoice, payment_hash))
542 }
543
544 pub async fn pay_bolt11_invoice(&self, invoice: String) -> anyhow::Result<()> {
545 let payment = self
546 .lightning_client_lock()
547 .await?
548 .send_payment_sync(tonic_lnd::lnrpc::SendRequest {
549 payment_request: invoice.clone(),
550 ..Default::default()
551 })
552 .await?
553 .into_inner();
554 let payment_status = self
555 .lightning_client_lock()
556 .await?
557 .list_payments(tonic_lnd::lnrpc::ListPaymentsRequest {
558 include_incomplete: true,
559 ..Default::default()
560 })
561 .await?
562 .into_inner()
563 .payments
564 .into_iter()
565 .find(|p| p.payment_hash == payment.payment_hash.encode_hex::<String>())
566 .context("payment not in list")?
567 .status();
568 anyhow::ensure!(payment_status == tonic_lnd::lnrpc::payment::PaymentStatus::Succeeded);
569
570 Ok(())
571 }
572
573 pub async fn wait_bolt11_invoice(&self, payment_hash: Vec<u8>) -> anyhow::Result<()> {
574 let invoice_status = self
575 .lightning_client_lock()
576 .await?
577 .lookup_invoice(tonic_lnd::lnrpc::PaymentHash {
578 r_hash: payment_hash,
579 ..Default::default()
580 })
581 .await?
582 .into_inner()
583 .state();
584 anyhow::ensure!(invoice_status == tonic_lnd::lnrpc::invoice::InvoiceState::Settled);
585
586 Ok(())
587 }
588
589 pub async fn create_hold_invoice(
590 &self,
591 amount: u64,
592 ) -> anyhow::Result<([u8; 32], String, sha256::Hash)> {
593 let preimage = rand::random::<[u8; 32]>();
594 let hash = {
595 let mut engine = bitcoin::hashes::sha256::Hash::engine();
596 bitcoin::hashes::HashEngine::input(&mut engine, &preimage);
597 bitcoin::hashes::sha256::Hash::from_engine(engine)
598 };
599 let cltv_expiry = 650;
600 let hold_request = self
601 .invoices_client_lock()
602 .await?
603 .add_hold_invoice(tonic_lnd::invoicesrpc::AddHoldInvoiceRequest {
604 value_msat: amount as i64,
605 hash: hash.to_byte_array().to_vec(),
606 cltv_expiry,
607 ..Default::default()
608 })
609 .await?
610 .into_inner();
611 let payment_request = hold_request.payment_request;
612 Ok((preimage, payment_request, hash))
613 }
614
615 pub async fn settle_hold_invoice(
616 &self,
617 preimage: [u8; 32],
618 payment_hash: sha256::Hash,
619 ) -> anyhow::Result<()> {
620 let mut hold_invoice_subscription = self
621 .invoices_client_lock()
622 .await?
623 .subscribe_single_invoice(tonic_lnd::invoicesrpc::SubscribeSingleInvoiceRequest {
624 r_hash: payment_hash.to_byte_array().to_vec(),
625 })
626 .await?
627 .into_inner();
628 loop {
629 const WAIT_FOR_INVOICE_TIMEOUT: Duration = Duration::from_secs(60);
630 match timeout(
631 WAIT_FOR_INVOICE_TIMEOUT,
632 futures::StreamExt::next(&mut hold_invoice_subscription),
633 )
634 .await
635 {
636 Ok(Some(Ok(invoice))) => {
637 if invoice.state() == tonic_lnd::lnrpc::invoice::InvoiceState::Accepted {
638 break;
639 }
640 debug!("hold invoice payment state: {:?}", invoice.state());
641 }
642 Ok(Some(Err(e))) => {
643 bail!("error in invoice subscription: {e:?}");
644 }
645 Ok(None) => {
646 bail!("invoice subscription ended before invoice was accepted");
647 }
648 Err(_) => {
649 bail!("timed out waiting for invoice to be accepted")
650 }
651 }
652 }
653
654 self.invoices_client_lock()
655 .await?
656 .settle_invoice(tonic_lnd::invoicesrpc::SettleInvoiceMsg {
657 preimage: preimage.to_vec(),
658 })
659 .await?;
660
661 Ok(())
662 }
663}
664
665pub type NamedGateway<'a> = (&'a Gatewayd, &'a str);
666
667#[allow(clippy::similar_names)]
668pub async fn open_channels_between_gateways(
669 bitcoind: &Bitcoind,
670 gateways: &[NamedGateway<'_>],
671) -> Result<()> {
672 let block_height = bitcoind.get_block_count().await? - 1;
673 debug!(target: LOG_DEVIMINT, ?block_height, "Syncing gateway lightning nodes to block height...");
674 futures::future::try_join_all(
675 gateways
676 .iter()
677 .map(|(gw, _gw_name)| gw.wait_for_block_height(block_height)),
678 )
679 .await?;
680
681 info!(target: LOG_DEVIMINT, "Funding all gateway lightning nodes...");
682 for (gw, _gw_name) in gateways {
683 let funding_addr = gw.get_ln_onchain_address().await?;
684 bitcoind.send_to(funding_addr, 100_000_000).await?;
685 }
686
687 bitcoind.mine_blocks(10).await?;
688
689 info!(target: LOG_DEVIMINT, "Gateway lightning nodes funded.");
690
691 let block_height = bitcoind.get_block_count().await? - 1;
692 debug!(target: LOG_DEVIMINT, ?block_height, "Syncing gateway lightning nodes to block height...");
693 futures::future::try_join_all(
694 gateways
695 .iter()
696 .map(|(gw, _gw_name)| gw.wait_for_block_height(block_height)),
697 )
698 .await?;
699
700 let gateway_pairs: Vec<(&NamedGateway, &NamedGateway)> =
706 gateways.iter().tuple_windows::<(_, _)>().collect();
707
708 info!(target: LOG_DEVIMINT, block_height = %block_height, "devimint current block");
709 let sats_per_side = 5_000_000;
710 for ((gw_a, gw_a_name), (gw_b, gw_b_name)) in &gateway_pairs {
711 info!(target: LOG_DEVIMINT, from=%gw_a_name, to=%gw_b_name, "Opening channel with {sats_per_side} sats on each side...");
712 let txid = gw_a
713 .open_channel(gw_b, sats_per_side * 2, Some(sats_per_side))
714 .await?;
715
716 bitcoind.poll_get_transaction(txid).await?;
717 }
718
719 bitcoind.mine_blocks(10).await?;
720
721 let block_height = bitcoind.get_block_count().await? - 1;
722 debug!(target: LOG_DEVIMINT, ?block_height, "Syncing gateway lightning nodes to block height...");
723 futures::future::try_join_all(
724 gateways
725 .iter()
726 .map(|(gw, _gw_name)| gw.wait_for_block_height(block_height)),
727 )
728 .await?;
729
730 for ((gw_a, _gw_a_name), (gw_b, _gw_b_name)) in &gateway_pairs {
731 let gw_a_node_pubkey = gw_a.lightning_pubkey().await?;
732 let gw_b_node_pubkey = gw_b.lightning_pubkey().await?;
733
734 wait_for_ready_channel_on_gateway_with_counterparty(gw_b, gw_a_node_pubkey).await?;
735 wait_for_ready_channel_on_gateway_with_counterparty(gw_a, gw_b_node_pubkey).await?;
736 }
737
738 info!(target: LOG_DEVIMINT, "open_channels_between_gateways successful");
739
740 Ok(())
741}
742
743async fn wait_for_ready_channel_on_gateway_with_counterparty(
744 gw: &Gatewayd,
745 counterparty_lightning_node_pubkey: bitcoin::secp256k1::PublicKey,
746) -> anyhow::Result<()> {
747 poll(
748 &format!("Wait for {} channel update", gw.gw_name),
749 || async {
750 let channels = gw
751 .list_channels()
752 .await
753 .context("list channels")
754 .map_err(ControlFlow::Break)?;
755
756 if channels
757 .iter()
758 .any(|channel| channel.remote_pubkey == counterparty_lightning_node_pubkey && channel.is_active)
759 {
760 return Ok(());
761 }
762
763 debug!(target: LOG_DEVIMINT, ?channels, gw = gw.gw_name, "Counterparty channels not found open");
764 Err(ControlFlow::Continue(anyhow!("channel not found")))
765 },
766 )
767 .await
768}
769
770#[derive(Clone)]
771pub enum LightningNode {
772 Lnd(Lnd),
773 Ldk {
774 name: String,
775 gw_port: u16,
776 ldk_port: u16,
777 },
778}
779
780impl LightningNode {
781 pub fn ln_type(&self) -> LightningNodeType {
782 match self {
783 LightningNode::Lnd(_) => LightningNodeType::Lnd,
784 LightningNode::Ldk {
785 name: _,
786 gw_port: _,
787 ldk_port: _,
788 } => LightningNodeType::Ldk,
789 }
790 }
791}
792
793#[derive(Clone)]
794pub struct Esplora {
795 _process: ProcessHandle,
796 _bitcoind: Bitcoind,
797}
798
799impl Esplora {
800 pub async fn new(process_mgr: &ProcessManager, bitcoind: Bitcoind) -> Result<Self> {
801 debug!("Starting esplora");
802 let daemon_dir = process_mgr
803 .globals
804 .FM_BTC_DIR
805 .to_str()
806 .context("non utf8 path")?;
807 let esplora_dir = process_mgr
808 .globals
809 .FM_ESPLORA_DIR
810 .to_str()
811 .context("non utf8 path")?;
812
813 let btc_rpc_port = process_mgr.globals.FM_PORT_BTC_RPC;
814 let esplora_port = process_mgr.globals.FM_PORT_ESPLORA;
815 let esplora_monitoring_port = process_mgr.globals.FM_PORT_ESPLORA_MONITORING;
816 let cmd = cmd!(
818 crate::util::Esplora,
819 "--daemon-dir={daemon_dir}",
820 "--db-dir={esplora_dir}",
821 "--cookie=bitcoin:bitcoin",
822 "--network=regtest",
823 "--daemon-rpc-addr=127.0.0.1:{btc_rpc_port}",
824 "--http-addr=127.0.0.1:{esplora_port}",
825 "--monitoring-addr=127.0.0.1:{esplora_monitoring_port}",
826 "--jsonrpc-import", );
828 let process = process_mgr.spawn_daemon("esplora", cmd).await?;
829
830 Self::wait_for_ready(process_mgr).await?;
831 debug!(target: LOG_DEVIMINT, "Esplora ready");
832
833 Ok(Self {
834 _bitcoind: bitcoind,
835 _process: process,
836 })
837 }
838
839 async fn wait_for_ready(process_mgr: &ProcessManager) -> Result<()> {
841 let client = esplora_client::Builder::new(&format!(
842 "http://localhost:{}",
843 process_mgr.globals.FM_PORT_ESPLORA
844 ))
845 .max_retries(0)
847 .build_async()
848 .expect("esplora client build failed");
849
850 poll("esplora server ready", || async {
851 client
852 .get_fee_estimates()
853 .await
854 .map_err(|e| ControlFlow::Continue(anyhow::anyhow!(e)))?;
855
856 Ok(())
857 })
858 .await?;
859
860 Ok(())
861 }
862}
863
864#[allow(unused)]
865pub struct ExternalDaemons {
866 pub bitcoind: Bitcoind,
867 pub esplora: Esplora,
868}
869
870pub async fn external_daemons(process_mgr: &ProcessManager) -> Result<ExternalDaemons> {
871 let bitcoind = Bitcoind::new(process_mgr, false).await?;
872 let esplora = Esplora::new(process_mgr, bitcoind.clone()).await?;
873 let start_time = fedimint_core::time::now();
874 let _ = bitcoind.wallet_client().await?;
876 info!(
877 target: LOG_DEVIMINT,
878 "starting base daemons took {:?}",
879 start_time.elapsed()?
880 );
881 Ok(ExternalDaemons { bitcoind, esplora })
882}