1use std::ops::ControlFlow;
2use std::str::FromStr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::{Context, Result, anyhow, bail};
7use bitcoin::hashes::{Hash, sha256};
8use bitcoincore_rpc::RpcApi;
9use bitcoincore_rpc::bitcoin::{Address, BlockHash};
10use bitcoincore_rpc::bitcoincore_rpc_json::{GetBalancesResult, GetBlockchainInfoResult};
11use bitcoincore_rpc::jsonrpc::error::RpcError;
12use fedimint_core::encoding::Encodable;
13use fedimint_core::task::jit::{JitTry, JitTryAnyhow};
14use fedimint_core::task::{block_in_place, sleep, timeout};
15use fedimint_core::util::{FmtCompact as _, write_overwrite_async};
16use fedimint_logging::LOG_DEVIMINT;
17use fedimint_testing_core::node_type::LightningNodeType;
18use futures::StreamExt;
19use hex::ToHex;
20use itertools::Itertools;
21use tokio::fs;
22use tokio::sync::{MappedMutexGuard, Mutex, MutexGuard};
23use tokio::task::spawn_blocking;
24use tokio::time::Instant;
25use tonic_lnd::Client as LndClient;
26use tonic_lnd::lnrpc::GetInfoRequest;
27use tracing::{debug, info, trace};
28
29use crate::gatewayd::LdkChainSource;
30use crate::util::{ProcessHandle, ProcessManager, poll};
31use crate::vars::utf8;
32use crate::version_constants::VERSION_0_5_0_ALPHA;
33use crate::{Gatewayd, cmd};
34
35#[derive(Clone)]
36pub struct Bitcoind {
37 pub client: Arc<bitcoincore_rpc::Client>,
38 pub(crate) wallet_client: Arc<JitTryAnyhow<Arc<bitcoincore_rpc::Client>>>,
39 pub(crate) _process: ProcessHandle,
40}
41
42impl Bitcoind {
43 pub async fn new(processmgr: &ProcessManager, skip_setup: bool) -> Result<Self> {
44 let btc_dir = utf8(&processmgr.globals.FM_BTC_DIR);
45
46 let conf = format!(
47 include_str!("cfg/bitcoin.conf"),
48 rpc_port = processmgr.globals.FM_PORT_BTC_RPC,
49 p2p_port = processmgr.globals.FM_PORT_BTC_P2P,
50 zmq_pub_raw_block = processmgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_BLOCK,
51 zmq_pub_raw_tx = processmgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_TX,
52 tx_index = "0",
53 );
54 write_overwrite_async(processmgr.globals.FM_BTC_DIR.join("bitcoin.conf"), conf).await?;
55 let process = processmgr
56 .spawn_daemon(
57 "bitcoind",
58 cmd!(crate::util::Bitcoind, "-datadir={btc_dir}"),
59 )
60 .await?;
61
62 let url = processmgr.globals.FM_BITCOIN_RPC_URL.parse()?;
63 debug!("Parsed FM_BITCOIN_RPC_URL: {:?}", &url);
64 let (host, auth) = fedimint_bitcoind::bitcoincore::from_url_to_url_auth(&url)?;
65 debug!("bitcoind host: {:?}, auth: {:?}", &host, auth);
66 let client =
67 Self::new_bitcoin_rpc(&host, auth.clone()).context("Failed to connect to bitcoind")?;
68 let wallet_client = JitTry::new_try(move || async move {
69 let client =
70 Self::new_bitcoin_rpc(&host, auth).context("Failed to connect to bitcoind")?;
71 Self::init(&client, skip_setup).await?;
72 Ok(Arc::new(client))
73 });
74
75 let bitcoind = Self {
76 _process: process,
77 client: Arc::new(client),
78 wallet_client: Arc::new(wallet_client),
79 };
80
81 bitcoind.poll_ready().await?;
82
83 Ok(bitcoind)
84 }
85
86 fn new_bitcoin_rpc(
87 url: &str,
88 auth: bitcoincore_rpc::Auth,
89 ) -> anyhow::Result<bitcoincore_rpc::Client> {
90 const RPC_TIMEOUT: Duration = Duration::from_secs(45);
92 let mut builder = bitcoincore_rpc::jsonrpc::simple_http::Builder::new()
93 .url(url)?
94 .timeout(RPC_TIMEOUT);
95 let (user, pass) = auth.get_user_pass()?;
96 if let Some(user) = user {
97 builder = builder.auth(user, pass);
98 }
99 let client = bitcoincore_rpc::jsonrpc::Client::with_transport(builder.build());
100 Ok(bitcoincore_rpc::Client::from_jsonrpc(client))
101 }
102
103 pub(crate) async fn init(client: &bitcoincore_rpc::Client, skip_setup: bool) -> Result<()> {
104 debug!("Setting up bitcoind");
105 for attempt in 0.. {
107 match block_in_place(|| client.create_wallet("", None, None, None, None)) {
108 Ok(_) => {
109 break;
110 }
111 Err(err) => {
112 if err.to_string().contains("Database already exists") {
113 break;
114 }
115 if attempt % 20 == 19 {
116 debug!(target: LOG_DEVIMINT, %attempt, err = %err.fmt_compact(), "Waiting for initial bitcoind wallet initialization");
117 }
118 sleep(Duration::from_millis(100)).await;
119 }
120 }
121 }
122
123 if !skip_setup {
124 let blocks = 101;
126 let address = block_in_place(|| client.get_new_address(None, None))?
127 .require_network(bitcoin::Network::Regtest)
128 .expect("Devimint always runs in regtest");
129 debug!(target: LOG_DEVIMINT, blocks_num=blocks, %address, "Mining blocks to address");
130 block_in_place(|| {
131 client
132 .generate_to_address(blocks, &address)
133 .context("Failed to generate blocks")
134 })?;
135 trace!(target: LOG_DEVIMINT, blocks_num=blocks, %address, "Mining blocks to address complete");
136 }
137
138 poll("bitcoind", || async {
140 let info = block_in_place(|| client.get_blockchain_info())
141 .context("bitcoind getblockchaininfo")
142 .map_err(ControlFlow::Continue)?;
143 if info.blocks > 100 {
144 Ok(())
145 } else {
146 Err(ControlFlow::Continue(anyhow!(
147 "not enough blocks: {}",
148 info.blocks
149 )))
150 }
151 })
152 .await?;
153 debug!("Bitcoind ready");
154 Ok(())
155 }
156
157 async fn poll_ready(&self) -> anyhow::Result<()> {
159 poll("bitcoind rpc ready", || async {
160 self.get_block_count()
161 .await
162 .map_err(ControlFlow::Continue::<anyhow::Error, _>)?;
163 Ok(())
164 })
165 .await
166 }
167
168 pub async fn wallet_client(&self) -> anyhow::Result<&Self> {
171 self.wallet_client.get_try().await?;
172 Ok(self)
173 }
174
175 pub async fn get_block_count(&self) -> Result<u64> {
181 let client = self.client.clone();
182 Ok(spawn_blocking(move || client.get_block_count()).await?? + 1)
183 }
184
185 pub async fn mine_blocks_no_wait(&self, block_num: u64) -> Result<u64> {
186 let start_time = Instant::now();
187 debug!(target: LOG_DEVIMINT, ?block_num, "Mining bitcoin blocks");
188 let addr = self.get_new_address().await?;
189 let initial_block_count = self.get_block_count().await?;
190 self.generate_to_address(block_num, addr).await?;
191 debug!(target: LOG_DEVIMINT,
192 elapsed_ms = %start_time.elapsed().as_millis(),
193 ?block_num, "Mined blocks (no wait)");
194
195 Ok(initial_block_count)
196 }
197
198 pub async fn mine_blocks(&self, block_num: u64) -> Result<()> {
199 let start_time = Instant::now();
200 debug!(target: LOG_DEVIMINT, ?block_num, "Mining bitcoin blocks");
201 let addr = self.get_new_address().await?;
202 let initial_block_count = self.get_block_count().await?;
203 self.generate_to_address(block_num, addr).await?;
204 while self.get_block_count().await? < initial_block_count + block_num {
205 trace!(target: LOG_DEVIMINT, ?block_num, "Waiting for blocks to be mined");
206 sleep(Duration::from_millis(100)).await;
207 }
208
209 debug!(target: LOG_DEVIMINT,
210 elapsed_ms = %start_time.elapsed().as_millis(),
211 ?block_num, "Mined blocks");
212
213 Ok(())
214 }
215
216 pub async fn send_to(&self, addr: String, amount: u64) -> Result<bitcoin::Txid> {
217 debug!(target: LOG_DEVIMINT, amount, addr, "Sending funds from bitcoind");
218 let amount = bitcoin::Amount::from_sat(amount);
219 let tx = self
220 .wallet_client()
221 .await?
222 .send_to_address(
223 bitcoin::Address::from_str(&addr)?
224 .require_network(bitcoin::Network::Regtest)
225 .expect("Devimint always runs in regtest"),
226 amount,
227 )
228 .await?;
229 Ok(tx)
230 }
231
232 pub async fn get_txout_proof(&self, txid: bitcoin::Txid) -> Result<String> {
233 let client = self.wallet_client().await?.clone();
234 let proof = spawn_blocking(move || client.client.get_tx_out_proof(&[txid], None)).await??;
235 Ok(proof.encode_hex())
236 }
237
238 pub async fn poll_get_transaction(&self, txid: bitcoin::Txid) -> anyhow::Result<String> {
241 poll("Waiting for transaction in mempool", || async {
242 match self
243 .get_transaction(txid)
244 .await
245 .context("getrawtransaction")
246 {
247 Ok(Some(tx)) => Ok(tx),
248 Ok(None) => Err(ControlFlow::Continue(anyhow::anyhow!(
249 "Transaction not found yet"
250 ))),
251 Err(err) => Err(ControlFlow::Break(err)),
252 }
253 })
254 .await
255 }
256
257 async fn get_transaction(&self, txid: bitcoin::Txid) -> Result<Option<String>> {
259 match self.get_raw_transaction(txid, None).await {
261 Ok(None) => {}
264 other => return other,
266 };
267
268 let block_height = self.get_block_count().await? - 1;
269
270 let mut buffered_tx_stream = futures::stream::iter((0..block_height).rev())
276 .map(|height| async move {
277 let block_hash = self.get_block_hash(height).await?;
278 self.get_raw_transaction(txid, Some(block_hash)).await
279 })
280 .buffered(32);
281
282 while let Some(tx_or) = buffered_tx_stream.next().await {
283 match tx_or {
284 Ok(None) => continue,
287 other => return other,
289 };
290 }
291
292 Ok(None)
294 }
295
296 async fn get_raw_transaction(
297 &self,
298 txid: bitcoin::Txid,
299 block_hash: Option<BlockHash>,
300 ) -> Result<Option<String>> {
301 let client = self.client.clone();
302 let tx_or =
303 spawn_blocking(move || client.get_raw_transaction(&txid, block_hash.as_ref())).await?;
304
305 let tx = match tx_or {
306 Ok(tx) => tx,
307 Err(bitcoincore_rpc::Error::JsonRpc(bitcoincore_rpc::jsonrpc::Error::Rpc(
312 RpcError { code: -5, .. },
313 ))) => return Ok(None),
314 Err(err) => return Err(err.into()),
315 };
316 let bytes = tx.consensus_encode_to_vec();
317 Ok(Some(bytes.encode_hex()))
318 }
319
320 async fn get_block_hash(&self, height: u64) -> Result<BlockHash> {
321 let client = self.client.clone();
322 Ok(spawn_blocking(move || client.get_block_hash(height)).await??)
323 }
324
325 pub async fn get_new_address(&self) -> Result<Address> {
326 let client = self.wallet_client().await?.clone();
327 let addr = spawn_blocking(move || client.client.get_new_address(None, None))
328 .await??
329 .require_network(bitcoin::Network::Regtest)
330 .expect("Devimint always runs in regtest");
331 Ok(addr)
332 }
333
334 pub async fn generate_to_address(
335 &self,
336 block_num: u64,
337 address: Address,
338 ) -> Result<Vec<BlockHash>> {
339 let client = self.wallet_client().await?.clone();
340 Ok(
341 spawn_blocking(move || client.client.generate_to_address(block_num, &address))
342 .await??,
343 )
344 }
345
346 pub async fn get_blockchain_info(&self) -> anyhow::Result<GetBlockchainInfoResult> {
347 let client = self.client.clone();
348 Ok(spawn_blocking(move || client.get_blockchain_info()).await??)
349 }
350
351 pub async fn send_to_address(
352 &self,
353 addr: Address,
354 amount: bitcoin::Amount,
355 ) -> anyhow::Result<bitcoin::Txid> {
356 let client = self.wallet_client().await?.clone();
357
358 let raw_client = client.client.clone();
359 let txid = spawn_blocking(move || {
360 raw_client.send_to_address(&addr, amount, None, None, None, None, None, None)
361 })
362 .await??;
363
364 assert!(client.get_transaction(txid).await?.is_some());
368
369 Ok(txid)
370 }
371
372 pub(crate) async fn get_balances(&self) -> anyhow::Result<GetBalancesResult> {
373 let client = self.wallet_client().await?.clone();
374 Ok(spawn_blocking(move || client.client.get_balances()).await??)
375 }
376
377 pub(crate) fn get_jsonrpc_client(&self) -> &bitcoincore_rpc::jsonrpc::Client {
378 self.client.get_jsonrpc_client()
379 }
380}
381
382#[derive(Clone)]
383pub struct Lnd {
384 pub(crate) client: Arc<Mutex<LndClient>>,
385 pub(crate) process: ProcessHandle,
386 pub(crate) _bitcoind: Bitcoind,
387}
388
389impl Lnd {
390 pub async fn new(process_mgr: &ProcessManager, bitcoind: Bitcoind) -> Result<Self> {
391 let (process, client) = Lnd::start(process_mgr).await?;
392 let this = Self {
393 _bitcoind: bitcoind,
394 client: Arc::new(Mutex::new(client)),
395 process,
396 };
397 poll("lnd_startup", || async {
399 this.pub_key().await.map_err(ControlFlow::Continue)
400 })
401 .await?;
402 Ok(this)
403 }
404
405 pub async fn start(process_mgr: &ProcessManager) -> Result<(ProcessHandle, LndClient)> {
406 let conf = format!(
407 include_str!("cfg/lnd.conf"),
408 listen_port = process_mgr.globals.FM_PORT_LND_LISTEN,
409 rpc_port = process_mgr.globals.FM_PORT_LND_RPC,
410 rest_port = process_mgr.globals.FM_PORT_LND_REST,
411 btc_rpc_port = process_mgr.globals.FM_PORT_BTC_RPC,
412 zmq_pub_raw_block = process_mgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_BLOCK,
413 zmq_pub_raw_tx = process_mgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_TX,
414 );
415 write_overwrite_async(process_mgr.globals.FM_LND_DIR.join("lnd.conf"), conf).await?;
416 let cmd = cmd!(
417 crate::util::Lnd,
418 format!("--lnddir={}", utf8(&process_mgr.globals.FM_LND_DIR))
419 );
420
421 let process = process_mgr.spawn_daemon("lnd", cmd).await?;
422 let lnd_rpc_addr = &process_mgr.globals.FM_LND_RPC_ADDR;
423 let lnd_macaroon = &process_mgr.globals.FM_LND_MACAROON;
424 let lnd_tls_cert = &process_mgr.globals.FM_LND_TLS_CERT;
425 poll("wait for lnd files", || async {
426 if fs::try_exists(lnd_tls_cert)
427 .await
428 .context("lnd tls cert")
429 .map_err(ControlFlow::Continue)?
430 && fs::try_exists(lnd_macaroon)
431 .await
432 .context("lnd macaroon")
433 .map_err(ControlFlow::Continue)?
434 {
435 Ok(())
436 } else {
437 Err(ControlFlow::Continue(anyhow!(
438 "lnd tls cert or lnd macaroon not found"
439 )))
440 }
441 })
442 .await?;
443
444 let client = poll("lnd_connect", || async {
445 tonic_lnd::connect(
446 lnd_rpc_addr.clone(),
447 lnd_tls_cert.clone(),
448 lnd_macaroon.clone(),
449 )
450 .await
451 .context("lnd connect")
452 .map_err(ControlFlow::Continue)
453 })
454 .await?;
455
456 Ok((process, client))
457 }
458
459 pub async fn lightning_client_lock(
460 &self,
461 ) -> Result<MappedMutexGuard<'_, tonic_lnd::LightningClient>> {
462 let guard = self.client.lock().await;
463 Ok(MutexGuard::map(guard, |client| client.lightning()))
464 }
465
466 pub async fn invoices_client_lock(
467 &self,
468 ) -> Result<MappedMutexGuard<'_, tonic_lnd::InvoicesClient>> {
469 let guard = self.client.lock().await;
470 Ok(MutexGuard::map(guard, |client| client.invoices()))
471 }
472
473 pub async fn pub_key(&self) -> Result<String> {
474 Ok(self
475 .lightning_client_lock()
476 .await?
477 .get_info(GetInfoRequest {})
478 .await?
479 .into_inner()
480 .identity_pubkey)
481 }
482
483 pub async fn terminate(self) -> Result<()> {
484 self.process.terminate().await
485 }
486
487 pub async fn invoice(&self, amount: u64) -> anyhow::Result<(String, Vec<u8>)> {
488 let add_invoice = self
489 .lightning_client_lock()
490 .await?
491 .add_invoice(tonic_lnd::lnrpc::Invoice {
492 value_msat: amount as i64,
493 ..Default::default()
494 })
495 .await?
496 .into_inner();
497 let invoice = add_invoice.payment_request;
498 let payment_hash = add_invoice.r_hash;
499 Ok((invoice, payment_hash))
500 }
501
502 pub async fn pay_bolt11_invoice(&self, invoice: String) -> anyhow::Result<()> {
503 let payment = self
504 .lightning_client_lock()
505 .await?
506 .send_payment_sync(tonic_lnd::lnrpc::SendRequest {
507 payment_request: invoice.clone(),
508 ..Default::default()
509 })
510 .await?
511 .into_inner();
512 let payment_status = self
513 .lightning_client_lock()
514 .await?
515 .list_payments(tonic_lnd::lnrpc::ListPaymentsRequest {
516 include_incomplete: true,
517 ..Default::default()
518 })
519 .await?
520 .into_inner()
521 .payments
522 .into_iter()
523 .find(|p| p.payment_hash == payment.payment_hash.encode_hex::<String>())
524 .context("payment not in list")?
525 .status();
526 anyhow::ensure!(payment_status == tonic_lnd::lnrpc::payment::PaymentStatus::Succeeded);
527
528 Ok(())
529 }
530
531 pub async fn wait_bolt11_invoice(&self, payment_hash: Vec<u8>) -> anyhow::Result<()> {
532 let invoice_status = self
533 .lightning_client_lock()
534 .await?
535 .lookup_invoice(tonic_lnd::lnrpc::PaymentHash {
536 r_hash: payment_hash,
537 ..Default::default()
538 })
539 .await?
540 .into_inner()
541 .state();
542 anyhow::ensure!(invoice_status == tonic_lnd::lnrpc::invoice::InvoiceState::Settled);
543
544 Ok(())
545 }
546
547 pub async fn create_hold_invoice(
548 &self,
549 amount: u64,
550 ) -> anyhow::Result<([u8; 32], String, sha256::Hash)> {
551 let preimage = rand::random::<[u8; 32]>();
552 let hash = {
553 let mut engine = bitcoin::hashes::sha256::Hash::engine();
554 bitcoin::hashes::HashEngine::input(&mut engine, &preimage);
555 bitcoin::hashes::sha256::Hash::from_engine(engine)
556 };
557 let fedimint_cli_version = crate::util::FedimintCli::version_or_default().await;
560 let cltv_expiry = if fedimint_cli_version >= *VERSION_0_5_0_ALPHA {
561 650
562 } else {
563 100
564 };
565 let hold_request = self
566 .invoices_client_lock()
567 .await?
568 .add_hold_invoice(tonic_lnd::invoicesrpc::AddHoldInvoiceRequest {
569 value_msat: amount as i64,
570 hash: hash.to_byte_array().to_vec(),
571 cltv_expiry,
572 ..Default::default()
573 })
574 .await?
575 .into_inner();
576 let payment_request = hold_request.payment_request;
577 Ok((preimage, payment_request, hash))
578 }
579
580 pub async fn settle_hold_invoice(
581 &self,
582 preimage: [u8; 32],
583 payment_hash: sha256::Hash,
584 ) -> anyhow::Result<()> {
585 let mut hold_invoice_subscription = self
586 .invoices_client_lock()
587 .await?
588 .subscribe_single_invoice(tonic_lnd::invoicesrpc::SubscribeSingleInvoiceRequest {
589 r_hash: payment_hash.to_byte_array().to_vec(),
590 })
591 .await?
592 .into_inner();
593 loop {
594 const WAIT_FOR_INVOICE_TIMEOUT: Duration = Duration::from_secs(60);
595 match timeout(
596 WAIT_FOR_INVOICE_TIMEOUT,
597 futures::StreamExt::next(&mut hold_invoice_subscription),
598 )
599 .await
600 {
601 Ok(Some(Ok(invoice))) => {
602 if invoice.state() == tonic_lnd::lnrpc::invoice::InvoiceState::Accepted {
603 break;
604 }
605 debug!("hold invoice payment state: {:?}", invoice.state());
606 }
607 Ok(Some(Err(e))) => {
608 bail!("error in invoice subscription: {e:?}");
609 }
610 Ok(None) => {
611 bail!("invoice subscription ended before invoice was accepted");
612 }
613 Err(_) => {
614 bail!("timed out waiting for invoice to be accepted")
615 }
616 }
617 }
618
619 self.invoices_client_lock()
620 .await?
621 .settle_invoice(tonic_lnd::invoicesrpc::SettleInvoiceMsg {
622 preimage: preimage.to_vec(),
623 })
624 .await?;
625
626 Ok(())
627 }
628}
629
630pub type NamedGateway<'a> = (&'a Gatewayd, &'a str);
631
632#[allow(clippy::similar_names)]
633pub async fn open_channels_between_gateways(
634 bitcoind: &Bitcoind,
635 gateways: &[NamedGateway<'_>],
636) -> Result<()> {
637 let block_height = bitcoind.get_block_count().await? - 1;
638 debug!(target: LOG_DEVIMINT, ?block_height, "Syncing gateway lightning nodes to block height...");
639 futures::future::try_join_all(
640 gateways
641 .iter()
642 .map(|(gw, _gw_name)| gw.wait_for_block_height(block_height)),
643 )
644 .await?;
645
646 info!(target: LOG_DEVIMINT, "Funding all gateway lightning nodes...");
647 for (gw, _gw_name) in gateways {
648 let funding_addr = gw.get_ln_onchain_address().await?;
649 bitcoind.send_to(funding_addr, 100_000_000).await?;
650 }
651
652 bitcoind.mine_blocks(10).await?;
653
654 info!(target: LOG_DEVIMINT, "Gateway lightning nodes funded.");
655
656 let block_height = bitcoind.get_block_count().await? - 1;
657 debug!(target: LOG_DEVIMINT, ?block_height, "Syncing gateway lightning nodes to block height...");
658 futures::future::try_join_all(
659 gateways
660 .iter()
661 .map(|(gw, _gw_name)| gw.wait_for_block_height(block_height)),
662 )
663 .await?;
664
665 let gateway_pairs: Vec<(&NamedGateway, &NamedGateway)> =
671 gateways.iter().tuple_windows::<(_, _)>().collect();
672
673 info!(target: LOG_DEVIMINT, block_height = %block_height, "devimint current block");
674 let sats_per_side = 5_000_000;
675 for ((gw_a, gw_a_name), (gw_b, gw_b_name)) in &gateway_pairs {
676 info!(target: LOG_DEVIMINT, from=%gw_a_name, to=%gw_b_name, "Opening channel with {sats_per_side} sats on each side...");
677 let txid = gw_a
678 .open_channel(gw_b, sats_per_side * 2, Some(sats_per_side))
679 .await?;
680
681 if let Some(txid) = txid {
682 bitcoind.poll_get_transaction(txid).await?;
683 }
684 }
685
686 if crate::util::Gatewayd::version_or_default().await < *VERSION_0_5_0_ALPHA {
690 fedimint_core::runtime::sleep(Duration::from_secs(5)).await;
691 }
692
693 bitcoind.mine_blocks(10).await?;
694
695 let block_height = bitcoind.get_block_count().await? - 1;
696 debug!(target: LOG_DEVIMINT, ?block_height, "Syncing gateway lightning nodes to block height...");
697 futures::future::try_join_all(
698 gateways
699 .iter()
700 .map(|(gw, _gw_name)| gw.wait_for_block_height(block_height)),
701 )
702 .await?;
703
704 for ((gw_a, _gw_a_name), (gw_b, _gw_b_name)) in &gateway_pairs {
705 let gw_a_node_pubkey = gw_a.lightning_pubkey().await?;
706 let gw_b_node_pubkey = gw_b.lightning_pubkey().await?;
707
708 wait_for_ready_channel_on_gateway_with_counterparty(gw_b, gw_a_node_pubkey).await?;
709 wait_for_ready_channel_on_gateway_with_counterparty(gw_a, gw_b_node_pubkey).await?;
710 }
711
712 info!(target: LOG_DEVIMINT, "open_channels_between_gateways successful");
713
714 Ok(())
715}
716
717async fn wait_for_ready_channel_on_gateway_with_counterparty(
718 gw: &Gatewayd,
719 counterparty_lightning_node_pubkey: bitcoin::secp256k1::PublicKey,
720) -> anyhow::Result<()> {
721 poll(
722 &format!("Wait for {} channel update", gw.gw_name),
723 || async {
724 let channels = gw
725 .list_active_channels()
726 .await
727 .context("list channels")
728 .map_err(ControlFlow::Break)?;
729
730 if channels
731 .iter()
732 .any(|channel| channel.remote_pubkey == counterparty_lightning_node_pubkey)
733 {
734 return Ok(());
735 }
736
737 debug!(target: LOG_DEVIMINT, ?channels, gw = gw.gw_name, "Counterparty channels not found open");
738 Err(ControlFlow::Continue(anyhow!("channel not found")))
739 },
740 )
741 .await
742}
743
744#[derive(Clone)]
745pub enum LightningNode {
746 Lnd(Lnd),
747 Ldk {
748 name: String,
749 gw_port: u16,
750 ldk_port: u16,
751 chain_source: LdkChainSource,
752 },
753}
754
755impl LightningNode {
756 pub fn ln_type(&self) -> LightningNodeType {
757 match self {
758 LightningNode::Lnd(_) => LightningNodeType::Lnd,
759 LightningNode::Ldk {
760 name: _,
761 gw_port: _,
762 ldk_port: _,
763 chain_source: _,
764 } => LightningNodeType::Ldk,
765 }
766 }
767}
768
769#[derive(Clone)]
770pub struct Electrs {
771 _process: ProcessHandle,
772 _bitcoind: Bitcoind,
773}
774
775impl Electrs {
776 pub async fn new(process_mgr: &ProcessManager, bitcoind: Bitcoind) -> Result<Self> {
777 debug!(target: LOG_DEVIMINT, "Starting electrs");
778 let electrs_dir = process_mgr
779 .globals
780 .FM_ELECTRS_DIR
781 .to_str()
782 .context("non utf8 path")?;
783
784 let daemon_dir = &process_mgr.globals.FM_BTC_DIR.display();
785
786 let conf = format!(
787 include_str!("cfg/electrs.toml"),
788 rpc_port = process_mgr.globals.FM_PORT_BTC_RPC,
789 p2p_port = process_mgr.globals.FM_PORT_BTC_P2P,
790 electrs_port = process_mgr.globals.FM_PORT_ELECTRS,
791 monitoring_port = process_mgr.globals.FM_PORT_ELECTRS_MONITORING,
792 );
793 debug!("electrs conf: {:?}", conf);
794 write_overwrite_async(
795 process_mgr.globals.FM_ELECTRS_DIR.join("electrs.toml"),
796 conf,
797 )
798 .await?;
799 let cmd = cmd!(
800 crate::util::Electrs,
801 "--conf-dir={electrs_dir}",
802 "--db-dir={electrs_dir}",
803 "--daemon-dir={daemon_dir}"
804 );
805 let process = process_mgr.spawn_daemon("electrs", cmd).await?;
806 debug!(target: LOG_DEVIMINT, "Electrs ready");
807
808 Ok(Self {
809 _bitcoind: bitcoind,
810 _process: process,
811 })
812 }
813}
814
815#[derive(Clone)]
816pub struct Esplora {
817 _process: ProcessHandle,
818 _bitcoind: Bitcoind,
819}
820
821impl Esplora {
822 pub async fn new(process_mgr: &ProcessManager, bitcoind: Bitcoind) -> Result<Self> {
823 debug!("Starting esplora");
824 let daemon_dir = process_mgr
825 .globals
826 .FM_BTC_DIR
827 .to_str()
828 .context("non utf8 path")?;
829 let esplora_dir = process_mgr
830 .globals
831 .FM_ESPLORA_DIR
832 .to_str()
833 .context("non utf8 path")?;
834
835 let btc_rpc_port = process_mgr.globals.FM_PORT_BTC_RPC;
836 let esplora_port = process_mgr.globals.FM_PORT_ESPLORA;
837 let esplora_monitoring_port = process_mgr.globals.FM_PORT_ESPLORA_MONITORING;
838 let cmd = cmd!(
840 crate::util::Esplora,
841 "--daemon-dir={daemon_dir}",
842 "--db-dir={esplora_dir}",
843 "--cookie=bitcoin:bitcoin",
844 "--network=regtest",
845 "--daemon-rpc-addr=127.0.0.1:{btc_rpc_port}",
846 "--http-addr=127.0.0.1:{esplora_port}",
847 "--monitoring-addr=127.0.0.1:{esplora_monitoring_port}",
848 "--jsonrpc-import", );
850 let process = process_mgr.spawn_daemon("esplora", cmd).await?;
851
852 Self::wait_for_ready(process_mgr).await?;
853 debug!(target: LOG_DEVIMINT, "Esplora ready");
854
855 Ok(Self {
856 _bitcoind: bitcoind,
857 _process: process,
858 })
859 }
860
861 async fn wait_for_ready(process_mgr: &ProcessManager) -> Result<()> {
863 let client = esplora_client::Builder::new(&format!(
864 "http://localhost:{}",
865 process_mgr.globals.FM_PORT_ESPLORA
866 ))
867 .max_retries(0)
869 .build_async()
870 .expect("esplora client build failed");
871
872 poll("esplora server ready", || async {
873 client
874 .get_fee_estimates()
875 .await
876 .map_err(|e| ControlFlow::Continue(anyhow::anyhow!(e)))?;
877
878 Ok(())
879 })
880 .await?;
881
882 Ok(())
883 }
884}
885
886#[allow(unused)]
887pub struct ExternalDaemons {
888 pub bitcoind: Bitcoind,
889 pub electrs: Electrs,
890 pub esplora: Esplora,
891}
892
893pub async fn external_daemons(process_mgr: &ProcessManager) -> Result<ExternalDaemons> {
894 let bitcoind = Bitcoind::new(process_mgr, false).await?;
895 let (electrs, esplora) = tokio::try_join!(
896 Electrs::new(process_mgr, bitcoind.clone()),
897 Esplora::new(process_mgr, bitcoind.clone()),
898 )?;
899 let start_time = fedimint_core::time::now();
900 let _ = bitcoind.wallet_client().await?;
902 info!(
903 target: LOG_DEVIMINT,
904 "starting base daemons took {:?}",
905 start_time.elapsed()?
906 );
907 Ok(ExternalDaemons {
908 bitcoind,
909 electrs,
910 esplora,
911 })
912}