1use std::ops::ControlFlow;
2use std::str::FromStr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::{Context, Result, anyhow, bail};
7use bitcoin::hashes::{Hash, sha256};
8use bitcoincore_rpc::bitcoin::{Address, BlockHash};
9use bitcoincore_rpc::bitcoincore_rpc_json::{GetBalancesResult, GetBlockchainInfoResult};
10use bitcoincore_rpc::jsonrpc::error::RpcError;
11use bitcoincore_rpc::{Auth, RpcApi};
12use fedimint_core::encoding::Encodable;
13use fedimint_core::task::jit::{JitTry, JitTryAnyhow};
14use fedimint_core::task::{block_in_place, sleep, timeout};
15use fedimint_core::util::{FmtCompact as _, SafeUrl, write_overwrite_async};
16use fedimint_logging::LOG_DEVIMINT;
17use fedimint_testing_core::node_type::LightningNodeType;
18use futures::StreamExt;
19use hex::ToHex;
20use itertools::Itertools;
21use tokio::fs;
22use tokio::sync::{MappedMutexGuard, Mutex, MutexGuard};
23use tokio::task::spawn_blocking;
24use tokio::time::Instant;
25use tonic_lnd::Client as LndClient;
26use tonic_lnd::lnrpc::GetInfoRequest;
27use tracing::{debug, info, trace, warn};
28
29use crate::gatewayd::LdkChainSource;
30use crate::util::{ProcessHandle, ProcessManager, poll};
31use crate::vars::utf8;
32use crate::{Gatewayd, cmd};
33
34#[derive(Clone)]
35pub struct Bitcoind {
36 pub client: Arc<bitcoincore_rpc::Client>,
37 pub(crate) wallet_client: Arc<JitTryAnyhow<Arc<bitcoincore_rpc::Client>>>,
38 pub(crate) _process: ProcessHandle,
39}
40
41impl Bitcoind {
42 pub async fn new(processmgr: &ProcessManager, skip_setup: bool) -> Result<Self> {
43 let btc_dir = utf8(&processmgr.globals.FM_BTC_DIR);
44
45 let conf = format!(
46 include_str!("cfg/bitcoin.conf"),
47 rpc_port = processmgr.globals.FM_PORT_BTC_RPC,
48 p2p_port = processmgr.globals.FM_PORT_BTC_P2P,
49 zmq_pub_raw_block = processmgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_BLOCK,
50 zmq_pub_raw_tx = processmgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_TX,
51 tx_index = "0",
52 );
53 write_overwrite_async(processmgr.globals.FM_BTC_DIR.join("bitcoin.conf"), conf).await?;
54 let process = processmgr
55 .spawn_daemon(
56 "bitcoind",
57 cmd!(crate::util::Bitcoind, "-datadir={btc_dir}"),
58 )
59 .await?;
60
61 let url: SafeUrl = processmgr.globals.FM_BITCOIN_RPC_URL.parse()?;
62
63 debug!("Parsed FM_BITCOIN_RPC_URL: {:?}", &url);
64
65 let auth = Auth::UserPass(
66 url.username().to_owned(),
67 url.password()
68 .context("Bitcoin RPC URL is missing password")?
69 .to_owned(),
70 );
71
72 let host = url
73 .without_auth()
74 .map_err(|()| anyhow!("Failed to strip auth from Bitcoin Rpc Url"))?
75 .to_string();
76
77 debug!("bitcoind host: {:?}, auth: {:?}", &host, auth);
78 let client =
79 Self::new_bitcoin_rpc(&host, auth.clone()).context("Failed to connect to bitcoind")?;
80 let wallet_client = JitTry::new_try(move || async move {
81 let client =
82 Self::new_bitcoin_rpc(&host, auth).context("Failed to connect to bitcoind")?;
83 Self::init(&client, skip_setup).await?;
84 Ok(Arc::new(client))
85 });
86
87 let bitcoind = Self {
88 _process: process,
89 client: Arc::new(client),
90 wallet_client: Arc::new(wallet_client),
91 };
92
93 bitcoind.poll_ready().await?;
94
95 Ok(bitcoind)
96 }
97
98 fn new_bitcoin_rpc(
99 url: &str,
100 auth: bitcoincore_rpc::Auth,
101 ) -> anyhow::Result<bitcoincore_rpc::Client> {
102 const RPC_TIMEOUT: Duration = Duration::from_secs(45);
104 let mut builder = bitcoincore_rpc::jsonrpc::simple_http::Builder::new()
105 .url(url)?
106 .timeout(RPC_TIMEOUT);
107 let (user, pass) = auth.get_user_pass()?;
108 if let Some(user) = user {
109 builder = builder.auth(user, pass);
110 }
111 let client = bitcoincore_rpc::jsonrpc::Client::with_transport(builder.build());
112 Ok(bitcoincore_rpc::Client::from_jsonrpc(client))
113 }
114
115 pub(crate) async fn init(client: &bitcoincore_rpc::Client, skip_setup: bool) -> Result<()> {
116 debug!("Setting up bitcoind");
117 for attempt in 0.. {
119 match block_in_place(|| client.create_wallet("", None, None, None, None)) {
120 Ok(_) => {
121 break;
122 }
123 Err(err) => {
124 if err.to_string().contains("Database already exists") {
125 break;
126 }
127 if attempt % 20 == 19 {
128 debug!(target: LOG_DEVIMINT, %attempt, err = %err.fmt_compact(), "Waiting for initial bitcoind wallet initialization");
129 }
130 sleep(Duration::from_millis(100)).await;
131 }
132 }
133 }
134
135 if !skip_setup {
136 let blocks = 101;
138 let address = block_in_place(|| client.get_new_address(None, None))?
139 .require_network(bitcoin::Network::Regtest)
140 .expect("Devimint always runs in regtest");
141 debug!(target: LOG_DEVIMINT, blocks_num=blocks, %address, "Mining blocks to address");
142 block_in_place(|| {
143 client
144 .generate_to_address(blocks, &address)
145 .context("Failed to generate blocks")
146 })?;
147 trace!(target: LOG_DEVIMINT, blocks_num=blocks, %address, "Mining blocks to address complete");
148 }
149
150 poll("bitcoind", || async {
152 let info = block_in_place(|| client.get_blockchain_info())
153 .context("bitcoind getblockchaininfo")
154 .map_err(ControlFlow::Continue)?;
155 if info.blocks > 100 {
156 Ok(())
157 } else {
158 Err(ControlFlow::Continue(anyhow!(
159 "not enough blocks: {}",
160 info.blocks
161 )))
162 }
163 })
164 .await?;
165 debug!("Bitcoind ready");
166 Ok(())
167 }
168
169 async fn poll_ready(&self) -> anyhow::Result<()> {
171 poll("bitcoind rpc ready", || async {
172 self.get_block_count()
173 .await
174 .map_err(ControlFlow::Continue::<anyhow::Error, _>)?;
175 Ok(())
176 })
177 .await
178 }
179
180 pub async fn wallet_client(&self) -> anyhow::Result<&Self> {
183 self.wallet_client.get_try().await?;
184 Ok(self)
185 }
186
187 pub async fn get_block_count(&self) -> Result<u64> {
193 let client = self.client.clone();
194 Ok(spawn_blocking(move || client.get_block_count()).await?? + 1)
195 }
196
197 pub async fn mine_blocks_no_wait(&self, block_num: u64) -> Result<u64> {
198 let start_time = Instant::now();
199 debug!(target: LOG_DEVIMINT, ?block_num, "Mining bitcoin blocks");
200 let addr = self.get_new_address().await?;
201 let initial_block_count = self.get_block_count().await?;
202 self.generate_to_address(block_num, addr).await?;
203 debug!(target: LOG_DEVIMINT,
204 elapsed_ms = %start_time.elapsed().as_millis(),
205 ?block_num, "Mined blocks (no wait)");
206
207 Ok(initial_block_count)
208 }
209
210 pub async fn mine_blocks(&self, block_num: u64) -> Result<()> {
211 const BLOCK_NUM_LIMIT: u64 = 32;
218
219 if BLOCK_NUM_LIMIT < block_num {
220 warn!(
221 target: LOG_DEVIMINT,
222 %block_num,
223 "Mining a lot of blocks (even when split) is a terrible idea and can lead to issues. Splitting request just to make it work somehow."
224 );
225 let mut block_num = block_num;
226
227 loop {
228 if BLOCK_NUM_LIMIT < block_num {
229 block_num -= BLOCK_NUM_LIMIT;
230 Box::pin(async { self.mine_blocks(BLOCK_NUM_LIMIT).await }).await?;
231 } else {
232 Box::pin(async { self.mine_blocks(block_num).await }).await?;
233 return Ok(());
234 }
235 }
236 }
237 let start_time = Instant::now();
238 debug!(target: LOG_DEVIMINT, ?block_num, "Mining bitcoin blocks");
239 let addr = self.get_new_address().await?;
240 let initial_block_count = self.get_block_count().await?;
241 self.generate_to_address(block_num, addr).await?;
242 while self.get_block_count().await? < initial_block_count + block_num {
243 trace!(target: LOG_DEVIMINT, ?block_num, "Waiting for blocks to be mined");
244 sleep(Duration::from_millis(100)).await;
245 }
246
247 debug!(target: LOG_DEVIMINT,
248 elapsed_ms = %start_time.elapsed().as_millis(),
249 ?block_num, "Mined blocks");
250
251 Ok(())
252 }
253
254 pub async fn send_to(&self, addr: String, amount: u64) -> Result<bitcoin::Txid> {
255 debug!(target: LOG_DEVIMINT, amount, addr, "Sending funds from bitcoind");
256 let amount = bitcoin::Amount::from_sat(amount);
257 let tx = self
258 .wallet_client()
259 .await?
260 .send_to_address(
261 bitcoin::Address::from_str(&addr)?
262 .require_network(bitcoin::Network::Regtest)
263 .expect("Devimint always runs in regtest"),
264 amount,
265 )
266 .await?;
267 Ok(tx)
268 }
269
270 pub async fn get_txout_proof(&self, txid: bitcoin::Txid) -> Result<String> {
271 let client = self.wallet_client().await?.clone();
272 let proof = spawn_blocking(move || client.client.get_tx_out_proof(&[txid], None)).await??;
273 Ok(proof.encode_hex())
274 }
275
276 pub async fn poll_get_transaction(&self, txid: bitcoin::Txid) -> anyhow::Result<String> {
279 poll("Waiting for transaction in mempool", || async {
280 match self
281 .get_transaction(txid)
282 .await
283 .context("getrawtransaction")
284 {
285 Ok(Some(tx)) => Ok(tx),
286 Ok(None) => Err(ControlFlow::Continue(anyhow::anyhow!(
287 "Transaction not found yet"
288 ))),
289 Err(err) => Err(ControlFlow::Break(err)),
290 }
291 })
292 .await
293 }
294
295 async fn get_transaction(&self, txid: bitcoin::Txid) -> Result<Option<String>> {
297 match self.get_raw_transaction(txid, None).await {
299 Ok(None) => {}
302 other => return other,
304 }
305
306 let block_height = self.get_block_count().await? - 1;
307
308 let mut buffered_tx_stream = futures::stream::iter((0..block_height).rev())
314 .map(|height| async move {
315 let block_hash = self.get_block_hash(height).await?;
316 self.get_raw_transaction(txid, Some(block_hash)).await
317 })
318 .buffered(32);
319
320 while let Some(tx_or) = buffered_tx_stream.next().await {
321 match tx_or {
322 Ok(None) => {}
325 other => return other,
327 }
328 }
329
330 Ok(None)
332 }
333
334 async fn get_raw_transaction(
335 &self,
336 txid: bitcoin::Txid,
337 block_hash: Option<BlockHash>,
338 ) -> Result<Option<String>> {
339 let client = self.client.clone();
340 let tx_or =
341 spawn_blocking(move || client.get_raw_transaction(&txid, block_hash.as_ref())).await?;
342
343 let tx = match tx_or {
344 Ok(tx) => tx,
345 Err(bitcoincore_rpc::Error::JsonRpc(bitcoincore_rpc::jsonrpc::Error::Rpc(
350 RpcError { code: -5, .. },
351 ))) => return Ok(None),
352 Err(err) => return Err(err.into()),
353 };
354 let bytes = tx.consensus_encode_to_vec();
355 Ok(Some(bytes.encode_hex()))
356 }
357
358 async fn get_block_hash(&self, height: u64) -> Result<BlockHash> {
359 let client = self.client.clone();
360 Ok(spawn_blocking(move || client.get_block_hash(height)).await??)
361 }
362
363 pub async fn get_new_address(&self) -> Result<Address> {
364 let client = self.wallet_client().await?.clone();
365 let addr = spawn_blocking(move || client.client.get_new_address(None, None))
366 .await??
367 .require_network(bitcoin::Network::Regtest)
368 .expect("Devimint always runs in regtest");
369 Ok(addr)
370 }
371
372 pub async fn generate_to_address(
373 &self,
374 block_num: u64,
375 address: Address,
376 ) -> Result<Vec<BlockHash>> {
377 let client = self.wallet_client().await?.clone();
378 Ok(
379 spawn_blocking(move || client.client.generate_to_address(block_num, &address))
380 .await??,
381 )
382 }
383
384 pub async fn get_blockchain_info(&self) -> anyhow::Result<GetBlockchainInfoResult> {
385 let client = self.client.clone();
386 Ok(spawn_blocking(move || client.get_blockchain_info()).await??)
387 }
388
389 pub async fn send_to_address(
390 &self,
391 addr: Address,
392 amount: bitcoin::Amount,
393 ) -> anyhow::Result<bitcoin::Txid> {
394 let client = self.wallet_client().await?.clone();
395
396 let raw_client = client.client.clone();
397 let txid = spawn_blocking(move || {
398 raw_client.send_to_address(&addr, amount, None, None, None, None, None, None)
399 })
400 .await??;
401
402 assert!(client.get_transaction(txid).await?.is_some());
406
407 Ok(txid)
408 }
409
410 pub(crate) async fn get_balances(&self) -> anyhow::Result<GetBalancesResult> {
411 let client = self.wallet_client().await?.clone();
412 Ok(spawn_blocking(move || client.client.get_balances()).await??)
413 }
414
415 pub(crate) fn get_jsonrpc_client(&self) -> &bitcoincore_rpc::jsonrpc::Client {
416 self.client.get_jsonrpc_client()
417 }
418}
419
420#[derive(Clone)]
421pub struct Lnd {
422 pub(crate) client: Arc<Mutex<LndClient>>,
423 pub(crate) process: ProcessHandle,
424 pub(crate) _bitcoind: Bitcoind,
425}
426
427impl Lnd {
428 pub async fn new(process_mgr: &ProcessManager, bitcoind: Bitcoind) -> Result<Self> {
429 let (process, client) = Lnd::start(process_mgr).await?;
430 let this = Self {
431 _bitcoind: bitcoind,
432 client: Arc::new(Mutex::new(client)),
433 process,
434 };
435 poll("lnd_startup", || async {
437 this.pub_key().await.map_err(ControlFlow::Continue)
438 })
439 .await?;
440 Ok(this)
441 }
442
443 pub async fn start(process_mgr: &ProcessManager) -> Result<(ProcessHandle, LndClient)> {
444 let conf = format!(
445 include_str!("cfg/lnd.conf"),
446 listen_port = process_mgr.globals.FM_PORT_LND_LISTEN,
447 rpc_port = process_mgr.globals.FM_PORT_LND_RPC,
448 rest_port = process_mgr.globals.FM_PORT_LND_REST,
449 btc_rpc_port = process_mgr.globals.FM_PORT_BTC_RPC,
450 zmq_pub_raw_block = process_mgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_BLOCK,
451 zmq_pub_raw_tx = process_mgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_TX,
452 );
453 write_overwrite_async(process_mgr.globals.FM_LND_DIR.join("lnd.conf"), conf).await?;
454 let cmd = cmd!(
455 crate::util::Lnd,
456 format!("--lnddir={}", utf8(&process_mgr.globals.FM_LND_DIR))
457 );
458
459 let process = process_mgr.spawn_daemon("lnd", cmd).await?;
460 let lnd_rpc_addr = &process_mgr.globals.FM_LND_RPC_ADDR;
461 let lnd_macaroon = &process_mgr.globals.FM_LND_MACAROON;
462 let lnd_tls_cert = &process_mgr.globals.FM_LND_TLS_CERT;
463 poll("wait for lnd files", || async {
464 if fs::try_exists(lnd_tls_cert)
465 .await
466 .context("lnd tls cert")
467 .map_err(ControlFlow::Continue)?
468 && fs::try_exists(lnd_macaroon)
469 .await
470 .context("lnd macaroon")
471 .map_err(ControlFlow::Continue)?
472 {
473 Ok(())
474 } else {
475 Err(ControlFlow::Continue(anyhow!(
476 "lnd tls cert or lnd macaroon not found"
477 )))
478 }
479 })
480 .await?;
481
482 let client = poll("lnd_connect", || async {
483 tonic_lnd::connect(
484 lnd_rpc_addr.clone(),
485 lnd_tls_cert.clone(),
486 lnd_macaroon.clone(),
487 )
488 .await
489 .context("lnd connect")
490 .map_err(ControlFlow::Continue)
491 })
492 .await?;
493
494 Ok((process, client))
495 }
496
497 pub async fn lightning_client_lock(
498 &self,
499 ) -> Result<MappedMutexGuard<'_, tonic_lnd::LightningClient>> {
500 let guard = self.client.lock().await;
501 Ok(MutexGuard::map(guard, |client| client.lightning()))
502 }
503
504 pub async fn invoices_client_lock(
505 &self,
506 ) -> Result<MappedMutexGuard<'_, tonic_lnd::InvoicesClient>> {
507 let guard = self.client.lock().await;
508 Ok(MutexGuard::map(guard, |client| client.invoices()))
509 }
510
511 pub async fn pub_key(&self) -> Result<String> {
512 Ok(self
513 .lightning_client_lock()
514 .await?
515 .get_info(GetInfoRequest {})
516 .await?
517 .into_inner()
518 .identity_pubkey)
519 }
520
521 pub async fn terminate(self) -> Result<()> {
522 self.process.terminate().await
523 }
524
525 pub async fn invoice(&self, amount: u64) -> anyhow::Result<(String, Vec<u8>)> {
526 let add_invoice = self
527 .lightning_client_lock()
528 .await?
529 .add_invoice(tonic_lnd::lnrpc::Invoice {
530 value_msat: amount as i64,
531 ..Default::default()
532 })
533 .await?
534 .into_inner();
535 let invoice = add_invoice.payment_request;
536 let payment_hash = add_invoice.r_hash;
537 Ok((invoice, payment_hash))
538 }
539
540 pub async fn pay_bolt11_invoice(&self, invoice: String) -> anyhow::Result<()> {
541 let payment = self
542 .lightning_client_lock()
543 .await?
544 .send_payment_sync(tonic_lnd::lnrpc::SendRequest {
545 payment_request: invoice.clone(),
546 ..Default::default()
547 })
548 .await?
549 .into_inner();
550 let payment_status = self
551 .lightning_client_lock()
552 .await?
553 .list_payments(tonic_lnd::lnrpc::ListPaymentsRequest {
554 include_incomplete: true,
555 ..Default::default()
556 })
557 .await?
558 .into_inner()
559 .payments
560 .into_iter()
561 .find(|p| p.payment_hash == payment.payment_hash.encode_hex::<String>())
562 .context("payment not in list")?
563 .status();
564 anyhow::ensure!(payment_status == tonic_lnd::lnrpc::payment::PaymentStatus::Succeeded);
565
566 Ok(())
567 }
568
569 pub async fn wait_bolt11_invoice(&self, payment_hash: Vec<u8>) -> anyhow::Result<()> {
570 let invoice_status = self
571 .lightning_client_lock()
572 .await?
573 .lookup_invoice(tonic_lnd::lnrpc::PaymentHash {
574 r_hash: payment_hash,
575 ..Default::default()
576 })
577 .await?
578 .into_inner()
579 .state();
580 anyhow::ensure!(invoice_status == tonic_lnd::lnrpc::invoice::InvoiceState::Settled);
581
582 Ok(())
583 }
584
585 pub async fn create_hold_invoice(
586 &self,
587 amount: u64,
588 ) -> anyhow::Result<([u8; 32], String, sha256::Hash)> {
589 let preimage = rand::random::<[u8; 32]>();
590 let hash = {
591 let mut engine = bitcoin::hashes::sha256::Hash::engine();
592 bitcoin::hashes::HashEngine::input(&mut engine, &preimage);
593 bitcoin::hashes::sha256::Hash::from_engine(engine)
594 };
595 let cltv_expiry = 650;
596 let hold_request = self
597 .invoices_client_lock()
598 .await?
599 .add_hold_invoice(tonic_lnd::invoicesrpc::AddHoldInvoiceRequest {
600 value_msat: amount as i64,
601 hash: hash.to_byte_array().to_vec(),
602 cltv_expiry,
603 ..Default::default()
604 })
605 .await?
606 .into_inner();
607 let payment_request = hold_request.payment_request;
608 Ok((preimage, payment_request, hash))
609 }
610
611 pub async fn settle_hold_invoice(
612 &self,
613 preimage: [u8; 32],
614 payment_hash: sha256::Hash,
615 ) -> anyhow::Result<()> {
616 let mut hold_invoice_subscription = self
617 .invoices_client_lock()
618 .await?
619 .subscribe_single_invoice(tonic_lnd::invoicesrpc::SubscribeSingleInvoiceRequest {
620 r_hash: payment_hash.to_byte_array().to_vec(),
621 })
622 .await?
623 .into_inner();
624 loop {
625 const WAIT_FOR_INVOICE_TIMEOUT: Duration = Duration::from_secs(60);
626 match timeout(
627 WAIT_FOR_INVOICE_TIMEOUT,
628 futures::StreamExt::next(&mut hold_invoice_subscription),
629 )
630 .await
631 {
632 Ok(Some(Ok(invoice))) => {
633 if invoice.state() == tonic_lnd::lnrpc::invoice::InvoiceState::Accepted {
634 break;
635 }
636 debug!("hold invoice payment state: {:?}", invoice.state());
637 }
638 Ok(Some(Err(e))) => {
639 bail!("error in invoice subscription: {e:?}");
640 }
641 Ok(None) => {
642 bail!("invoice subscription ended before invoice was accepted");
643 }
644 Err(_) => {
645 bail!("timed out waiting for invoice to be accepted")
646 }
647 }
648 }
649
650 self.invoices_client_lock()
651 .await?
652 .settle_invoice(tonic_lnd::invoicesrpc::SettleInvoiceMsg {
653 preimage: preimage.to_vec(),
654 })
655 .await?;
656
657 Ok(())
658 }
659}
660
661pub type NamedGateway<'a> = (&'a Gatewayd, &'a str);
662
663#[allow(clippy::similar_names)]
664pub async fn open_channels_between_gateways(
665 bitcoind: &Bitcoind,
666 gateways: &[NamedGateway<'_>],
667) -> Result<()> {
668 let block_height = bitcoind.get_block_count().await? - 1;
669 debug!(target: LOG_DEVIMINT, ?block_height, "Syncing gateway lightning nodes to block height...");
670 futures::future::try_join_all(
671 gateways
672 .iter()
673 .map(|(gw, _gw_name)| gw.wait_for_block_height(block_height)),
674 )
675 .await?;
676
677 info!(target: LOG_DEVIMINT, "Funding all gateway lightning nodes...");
678 for (gw, _gw_name) in gateways {
679 let funding_addr = gw.get_ln_onchain_address().await?;
680 bitcoind.send_to(funding_addr, 100_000_000).await?;
681 }
682
683 bitcoind.mine_blocks(10).await?;
684
685 info!(target: LOG_DEVIMINT, "Gateway lightning nodes funded.");
686
687 let block_height = bitcoind.get_block_count().await? - 1;
688 debug!(target: LOG_DEVIMINT, ?block_height, "Syncing gateway lightning nodes to block height...");
689 futures::future::try_join_all(
690 gateways
691 .iter()
692 .map(|(gw, _gw_name)| gw.wait_for_block_height(block_height)),
693 )
694 .await?;
695
696 let gateway_pairs: Vec<(&NamedGateway, &NamedGateway)> =
702 gateways.iter().tuple_windows::<(_, _)>().collect();
703
704 info!(target: LOG_DEVIMINT, block_height = %block_height, "devimint current block");
705 let sats_per_side = 5_000_000;
706 for ((gw_a, gw_a_name), (gw_b, gw_b_name)) in &gateway_pairs {
707 info!(target: LOG_DEVIMINT, from=%gw_a_name, to=%gw_b_name, "Opening channel with {sats_per_side} sats on each side...");
708 let txid = gw_a
709 .open_channel(gw_b, sats_per_side * 2, Some(sats_per_side))
710 .await?;
711
712 bitcoind.poll_get_transaction(txid).await?;
713 }
714
715 bitcoind.mine_blocks(10).await?;
716
717 let block_height = bitcoind.get_block_count().await? - 1;
718 debug!(target: LOG_DEVIMINT, ?block_height, "Syncing gateway lightning nodes to block height...");
719 futures::future::try_join_all(
720 gateways
721 .iter()
722 .map(|(gw, _gw_name)| gw.wait_for_block_height(block_height)),
723 )
724 .await?;
725
726 for ((gw_a, _gw_a_name), (gw_b, _gw_b_name)) in &gateway_pairs {
727 let gw_a_node_pubkey = gw_a.lightning_pubkey().await?;
728 let gw_b_node_pubkey = gw_b.lightning_pubkey().await?;
729
730 wait_for_ready_channel_on_gateway_with_counterparty(gw_b, gw_a_node_pubkey).await?;
731 wait_for_ready_channel_on_gateway_with_counterparty(gw_a, gw_b_node_pubkey).await?;
732 }
733
734 info!(target: LOG_DEVIMINT, "open_channels_between_gateways successful");
735
736 Ok(())
737}
738
739async fn wait_for_ready_channel_on_gateway_with_counterparty(
740 gw: &Gatewayd,
741 counterparty_lightning_node_pubkey: bitcoin::secp256k1::PublicKey,
742) -> anyhow::Result<()> {
743 poll(
744 &format!("Wait for {} channel update", gw.gw_name),
745 || async {
746 let channels = gw
747 .list_active_channels()
748 .await
749 .context("list channels")
750 .map_err(ControlFlow::Break)?;
751
752 if channels
753 .iter()
754 .any(|channel| channel.remote_pubkey == counterparty_lightning_node_pubkey)
755 {
756 return Ok(());
757 }
758
759 debug!(target: LOG_DEVIMINT, ?channels, gw = gw.gw_name, "Counterparty channels not found open");
760 Err(ControlFlow::Continue(anyhow!("channel not found")))
761 },
762 )
763 .await
764}
765
766#[derive(Clone)]
767pub enum LightningNode {
768 Lnd(Lnd),
769 Ldk {
770 name: String,
771 gw_port: u16,
772 ldk_port: u16,
773 chain_source: LdkChainSource,
774 },
775}
776
777impl LightningNode {
778 pub fn ln_type(&self) -> LightningNodeType {
779 match self {
780 LightningNode::Lnd(_) => LightningNodeType::Lnd,
781 LightningNode::Ldk {
782 name: _,
783 gw_port: _,
784 ldk_port: _,
785 chain_source: _,
786 } => LightningNodeType::Ldk,
787 }
788 }
789}
790
791#[derive(Clone)]
792pub struct Esplora {
793 _process: ProcessHandle,
794 _bitcoind: Bitcoind,
795}
796
797impl Esplora {
798 pub async fn new(process_mgr: &ProcessManager, bitcoind: Bitcoind) -> Result<Self> {
799 debug!("Starting esplora");
800 let daemon_dir = process_mgr
801 .globals
802 .FM_BTC_DIR
803 .to_str()
804 .context("non utf8 path")?;
805 let esplora_dir = process_mgr
806 .globals
807 .FM_ESPLORA_DIR
808 .to_str()
809 .context("non utf8 path")?;
810
811 let btc_rpc_port = process_mgr.globals.FM_PORT_BTC_RPC;
812 let esplora_port = process_mgr.globals.FM_PORT_ESPLORA;
813 let esplora_monitoring_port = process_mgr.globals.FM_PORT_ESPLORA_MONITORING;
814 let cmd = cmd!(
816 crate::util::Esplora,
817 "--daemon-dir={daemon_dir}",
818 "--db-dir={esplora_dir}",
819 "--cookie=bitcoin:bitcoin",
820 "--network=regtest",
821 "--daemon-rpc-addr=127.0.0.1:{btc_rpc_port}",
822 "--http-addr=127.0.0.1:{esplora_port}",
823 "--monitoring-addr=127.0.0.1:{esplora_monitoring_port}",
824 "--jsonrpc-import", );
826 let process = process_mgr.spawn_daemon("esplora", cmd).await?;
827
828 Self::wait_for_ready(process_mgr).await?;
829 debug!(target: LOG_DEVIMINT, "Esplora ready");
830
831 Ok(Self {
832 _bitcoind: bitcoind,
833 _process: process,
834 })
835 }
836
837 async fn wait_for_ready(process_mgr: &ProcessManager) -> Result<()> {
839 let client = esplora_client::Builder::new(&format!(
840 "http://localhost:{}",
841 process_mgr.globals.FM_PORT_ESPLORA
842 ))
843 .max_retries(0)
845 .build_async()
846 .expect("esplora client build failed");
847
848 poll("esplora server ready", || async {
849 client
850 .get_fee_estimates()
851 .await
852 .map_err(|e| ControlFlow::Continue(anyhow::anyhow!(e)))?;
853
854 Ok(())
855 })
856 .await?;
857
858 Ok(())
859 }
860}
861
862#[allow(unused)]
863pub struct ExternalDaemons {
864 pub bitcoind: Bitcoind,
865 pub esplora: Esplora,
866}
867
868pub async fn external_daemons(process_mgr: &ProcessManager) -> Result<ExternalDaemons> {
869 let bitcoind = Bitcoind::new(process_mgr, false).await?;
870 let esplora = Esplora::new(process_mgr, bitcoind.clone()).await?;
871 let start_time = fedimint_core::time::now();
872 let _ = bitcoind.wallet_client().await?;
874 info!(
875 target: LOG_DEVIMINT,
876 "starting base daemons took {:?}",
877 start_time.elapsed()?
878 );
879 Ok(ExternalDaemons { bitcoind, esplora })
880}