1use std::ops::ControlFlow;
2use std::str::FromStr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::{Context, Result, anyhow, bail};
7use bitcoin::hashes::{Hash, sha256};
8use bitcoincore_rpc::bitcoin::{Address, BlockHash};
9use bitcoincore_rpc::bitcoincore_rpc_json::{GetBalancesResult, GetBlockchainInfoResult};
10use bitcoincore_rpc::jsonrpc::error::RpcError;
11use bitcoincore_rpc::{Auth, RpcApi};
12use fedimint_core::encoding::Encodable;
13use fedimint_core::rustls::install_crypto_provider;
14use fedimint_core::task::jit::{JitTry, JitTryAnyhow};
15use fedimint_core::task::{block_in_place, sleep, timeout};
16use fedimint_core::util::backoff_util::api_networking_backoff;
17use fedimint_core::util::{FmtCompact as _, SafeUrl, retry, write_overwrite_async};
18use fedimint_logging::LOG_DEVIMINT;
19use fedimint_testing_core::node_type::LightningNodeType;
20use futures::StreamExt;
21use hex::ToHex;
22use itertools::Itertools;
23use tokio::fs;
24use tokio::sync::{MappedMutexGuard, Mutex, MutexGuard};
25use tokio::task::spawn_blocking;
26use tokio::time::Instant;
27use tonic_lnd::Client as LndClient;
28use tonic_lnd::lnrpc::GetInfoRequest;
29use tonic_lnd::routerrpc::SendPaymentRequest;
30use tracing::{debug, info, trace, warn};
31
32use crate::util::{ProcessHandle, ProcessManager, poll};
33use crate::vars::utf8;
34use crate::{Gatewayd, cmd};
35
36#[derive(Clone)]
37pub struct Bitcoind {
38 pub client: Arc<bitcoincore_rpc::Client>,
39 pub(crate) wallet_client: Arc<JitTryAnyhow<Arc<bitcoincore_rpc::Client>>>,
40 pub(crate) _process: ProcessHandle,
41}
42
43impl Bitcoind {
44 pub async fn new(processmgr: &ProcessManager, skip_setup: bool) -> Result<Self> {
45 let btc_dir = utf8(&processmgr.globals.FM_BTC_DIR);
46
47 let conf = format!(
48 include_str!("cfg/bitcoin.conf"),
49 rpc_port = processmgr.globals.FM_PORT_BTC_RPC,
50 p2p_port = processmgr.globals.FM_PORT_BTC_P2P,
51 zmq_pub_raw_block = processmgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_BLOCK,
52 zmq_pub_raw_tx = processmgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_TX,
53 tx_index = "0",
54 );
55 write_overwrite_async(processmgr.globals.FM_BTC_DIR.join("bitcoin.conf"), conf).await?;
56 let process = processmgr
57 .spawn_daemon(
58 "bitcoind",
59 cmd!(crate::util::Bitcoind, "-datadir={btc_dir}"),
60 )
61 .await?;
62
63 let url: SafeUrl = processmgr.globals.FM_BITCOIN_RPC_URL.parse()?;
64
65 debug!("Parsed FM_BITCOIN_RPC_URL: {:?}", &url);
66
67 let auth = Auth::UserPass(
68 url.username().to_owned(),
69 url.password()
70 .context("Bitcoin RPC URL is missing password")?
71 .to_owned(),
72 );
73
74 let host = url
75 .without_auth()
76 .map_err(|()| anyhow!("Failed to strip auth from Bitcoin Rpc Url"))?
77 .to_string();
78 let wallet_name = "";
79 let host = format!("{host}wallet/{wallet_name}");
80
81 debug!(target: LOG_DEVIMINT, "bitcoind host: {:?}, auth: {:?}", &host, auth);
82 let client =
83 Self::new_bitcoin_rpc(&host, auth.clone()).context("Failed to connect to bitcoind")?;
84 let wallet_client = JitTry::new_try(move || async move {
85 let client =
86 Self::new_bitcoin_rpc(&host, auth).context("Failed to connect to bitcoind")?;
87 Self::init(&client, skip_setup).await?;
88 Ok(Arc::new(client))
89 });
90
91 let bitcoind = Self {
92 _process: process,
93 client: Arc::new(client),
94 wallet_client: Arc::new(wallet_client),
95 };
96
97 bitcoind.poll_ready().await?;
98
99 let addr = bitcoind.get_new_address().await?;
101 bitcoind.generate_to_address(1, addr).await?;
102
103 Ok(bitcoind)
104 }
105
106 fn new_bitcoin_rpc(
107 url: &str,
108 auth: bitcoincore_rpc::Auth,
109 ) -> anyhow::Result<bitcoincore_rpc::Client> {
110 const RPC_TIMEOUT: Duration = Duration::from_secs(45);
112 let mut builder = bitcoincore_rpc::jsonrpc::simple_http::Builder::new()
113 .url(url)?
114 .timeout(RPC_TIMEOUT);
115 let (user, pass) = auth.get_user_pass()?;
116 if let Some(user) = user {
117 builder = builder.auth(user, pass);
118 }
119 let client = bitcoincore_rpc::jsonrpc::Client::with_transport(builder.build());
120 Ok(bitcoincore_rpc::Client::from_jsonrpc(client))
121 }
122
123 pub(crate) async fn init(client: &bitcoincore_rpc::Client, skip_setup: bool) -> Result<()> {
124 debug!(target: LOG_DEVIMINT, "Setting up bitcoind...");
125 for attempt in 0.. {
127 match block_in_place(|| client.create_wallet("", None, None, None, None)) {
128 Ok(_) => {
129 break;
130 }
131 Err(err) => {
132 if err.to_string().contains("Database already exists") {
133 break;
134 }
135 if attempt % 20 == 19 {
136 debug!(target: LOG_DEVIMINT, %attempt, err = %err.fmt_compact(), "Waiting for initial bitcoind wallet initialization");
137 }
138 sleep(Duration::from_millis(100)).await;
139 }
140 }
141 }
142
143 if !skip_setup {
144 let blocks = 101;
146 let address = block_in_place(|| client.get_new_address(None, None))?
147 .require_network(bitcoin::Network::Regtest)
148 .expect("Devimint always runs in regtest");
149 debug!(target: LOG_DEVIMINT, blocks_num=blocks, %address, "Mining blocks to address");
150 block_in_place(|| {
151 client
152 .generate_to_address(blocks, &address)
153 .context("Failed to generate blocks")
154 })?;
155 trace!(target: LOG_DEVIMINT, blocks_num=blocks, %address, "Mining blocks to address complete");
156 }
157
158 poll("bitcoind", || async {
160 let info = block_in_place(|| client.get_blockchain_info())
161 .context("bitcoind getblockchaininfo")
162 .map_err(ControlFlow::Continue)?;
163 if info.blocks > 100 {
164 Ok(())
165 } else {
166 Err(ControlFlow::Continue(anyhow!(
167 "not enough blocks: {}",
168 info.blocks
169 )))
170 }
171 })
172 .await?;
173 debug!("Bitcoind ready");
174 Ok(())
175 }
176
177 async fn poll_ready(&self) -> anyhow::Result<()> {
179 poll("bitcoind rpc ready", || async {
180 self.get_block_count()
181 .await
182 .map_err(ControlFlow::Continue::<anyhow::Error, _>)?;
183 Ok(())
184 })
185 .await
186 }
187
188 pub async fn wallet_client(&self) -> anyhow::Result<&Self> {
191 self.wallet_client.get_try().await?;
192 Ok(self)
193 }
194
195 pub async fn get_block_count(&self) -> Result<u64> {
201 let client = self.client.clone();
202 Ok(spawn_blocking(move || client.get_block_count()).await?? + 1)
203 }
204
205 pub async fn mine_blocks_no_wait(&self, block_num: u64) -> Result<u64> {
206 let start_time = Instant::now();
207 debug!(target: LOG_DEVIMINT, ?block_num, "Mining bitcoin blocks");
208 let addr = self.get_new_address().await?;
209 let initial_block_count = self.get_block_count().await?;
210 self.generate_to_address(block_num, addr).await?;
211 debug!(target: LOG_DEVIMINT,
212 elapsed_ms = %start_time.elapsed().as_millis(),
213 ?block_num, "Mined blocks (no wait)");
214
215 Ok(initial_block_count)
216 }
217
218 pub async fn mine_blocks(&self, block_num: u64) -> Result<()> {
219 const BLOCK_NUM_LIMIT: u64 = 32;
226
227 if BLOCK_NUM_LIMIT < block_num {
228 warn!(
229 target: LOG_DEVIMINT,
230 %block_num,
231 "Mining a lot of blocks (even when split) is a terrible idea and can lead to issues. Splitting request just to make it work somehow."
232 );
233 let mut block_num = block_num;
234
235 loop {
236 if BLOCK_NUM_LIMIT < block_num {
237 block_num -= BLOCK_NUM_LIMIT;
238 Box::pin(async { self.mine_blocks(BLOCK_NUM_LIMIT).await }).await?;
239 } else {
240 Box::pin(async { self.mine_blocks(block_num).await }).await?;
241 return Ok(());
242 }
243 }
244 }
245 let start_time = Instant::now();
246 debug!(target: LOG_DEVIMINT, ?block_num, "Mining bitcoin blocks");
247 let addr = self.get_new_address().await?;
248 let initial_block_count = self.get_block_count().await?;
249 self.generate_to_address(block_num, addr).await?;
250 while self.get_block_count().await? < initial_block_count + block_num {
251 trace!(target: LOG_DEVIMINT, ?block_num, "Waiting for blocks to be mined");
252 sleep(Duration::from_millis(100)).await;
253 }
254
255 debug!(target: LOG_DEVIMINT,
256 elapsed_ms = %start_time.elapsed().as_millis(),
257 ?block_num, "Mined blocks");
258
259 Ok(())
260 }
261
262 pub async fn send_to(&self, addr: String, amount: u64) -> Result<bitcoin::Txid> {
263 debug!(target: LOG_DEVIMINT, amount, addr, "Sending funds from bitcoind");
264 let amount = bitcoin::Amount::from_sat(amount);
265 let tx = self
266 .wallet_client()
267 .await?
268 .send_to_address(
269 bitcoin::Address::from_str(&addr)?
270 .require_network(bitcoin::Network::Regtest)
271 .expect("Devimint always runs in regtest"),
272 amount,
273 )
274 .await?;
275 Ok(tx)
276 }
277
278 pub async fn get_txout_proof(&self, txid: bitcoin::Txid) -> Result<String> {
279 let client = self.wallet_client().await?.clone();
280 let proof = spawn_blocking(move || client.client.get_tx_out_proof(&[txid], None)).await??;
281 Ok(proof.encode_hex())
282 }
283
284 pub async fn poll_get_transaction(&self, txid: bitcoin::Txid) -> anyhow::Result<String> {
287 poll("Waiting for transaction in mempool", || async {
288 match self
289 .get_transaction(txid)
290 .await
291 .context("getrawtransaction")
292 {
293 Ok(Some(tx)) => Ok(tx),
294 Ok(None) => Err(ControlFlow::Continue(anyhow::anyhow!(
295 "Transaction not found yet"
296 ))),
297 Err(err) => Err(ControlFlow::Break(err)),
298 }
299 })
300 .await
301 }
302
303 async fn get_transaction(&self, txid: bitcoin::Txid) -> Result<Option<String>> {
305 match self.get_raw_transaction(txid, None).await {
307 Ok(None) => {}
310 other => return other,
312 }
313
314 let block_height = self.get_block_count().await? - 1;
315
316 let mut buffered_tx_stream = futures::stream::iter((0..block_height).rev())
322 .map(|height| async move {
323 let block_hash = self.get_block_hash(height).await?;
324 self.get_raw_transaction(txid, Some(block_hash)).await
325 })
326 .buffered(32);
327
328 while let Some(tx_or) = buffered_tx_stream.next().await {
329 match tx_or {
330 Ok(None) => {}
333 other => return other,
335 }
336 }
337
338 Ok(None)
340 }
341
342 async fn get_raw_transaction(
343 &self,
344 txid: bitcoin::Txid,
345 block_hash: Option<BlockHash>,
346 ) -> Result<Option<String>> {
347 let client = self.client.clone();
348 let tx_or =
349 spawn_blocking(move || client.get_raw_transaction(&txid, block_hash.as_ref())).await?;
350
351 let tx = match tx_or {
352 Ok(tx) => tx,
353 Err(bitcoincore_rpc::Error::JsonRpc(bitcoincore_rpc::jsonrpc::Error::Rpc(
358 RpcError { code: -5, .. },
359 ))) => return Ok(None),
360 Err(err) => return Err(err.into()),
361 };
362 let bytes = tx.consensus_encode_to_vec();
363 Ok(Some(bytes.encode_hex()))
364 }
365
366 async fn get_block_hash(&self, height: u64) -> Result<BlockHash> {
367 let client = self.client.clone();
368 Ok(spawn_blocking(move || client.get_block_hash(height)).await??)
369 }
370
371 pub async fn get_new_address(&self) -> Result<Address> {
372 let client = self.wallet_client().await?.clone();
373 let addr = spawn_blocking(move || client.client.get_new_address(None, None))
374 .await??
375 .require_network(bitcoin::Network::Regtest)
376 .expect("Devimint always runs in regtest");
377 Ok(addr)
378 }
379
380 pub async fn generate_to_address(
381 &self,
382 block_num: u64,
383 address: Address,
384 ) -> Result<Vec<BlockHash>> {
385 let client = self.wallet_client().await?.clone();
386 Ok(
387 spawn_blocking(move || client.client.generate_to_address(block_num, &address))
388 .await??,
389 )
390 }
391
392 pub async fn get_blockchain_info(&self) -> anyhow::Result<GetBlockchainInfoResult> {
393 let client = self.client.clone();
394 Ok(spawn_blocking(move || client.get_blockchain_info()).await??)
395 }
396
397 pub async fn send_to_address(
398 &self,
399 addr: Address,
400 amount: bitcoin::Amount,
401 ) -> anyhow::Result<bitcoin::Txid> {
402 let client = self.wallet_client().await?.clone();
403
404 let raw_client = client.client.clone();
405 let txid = spawn_blocking(move || {
406 raw_client.send_to_address(&addr, amount, None, None, None, None, None, None)
407 })
408 .await??;
409
410 retry(
413 "await-genesis-tx-processed".to_string(),
414 api_networking_backoff(),
415 || async {
416 if client.get_transaction(txid).await?.is_none() {
417 bail!("Genesis tx not visible yet = {}", txid);
418 }
419
420 Ok(())
421 },
422 )
423 .await
424 .expect("Number of retries has no limit");
425 Ok(txid)
426 }
427
428 pub(crate) async fn get_balances(&self) -> anyhow::Result<GetBalancesResult> {
429 let client = self.wallet_client().await?.clone();
430 Ok(spawn_blocking(move || client.client.get_balances()).await??)
431 }
432
433 pub(crate) fn get_jsonrpc_client(&self) -> &bitcoincore_rpc::jsonrpc::Client {
434 self.client.get_jsonrpc_client()
435 }
436}
437
438#[derive(Clone)]
439pub struct Lnd {
440 pub(crate) client: Arc<Mutex<LndClient>>,
441 pub(crate) process: ProcessHandle,
442 pub(crate) _bitcoind: Bitcoind,
443}
444
445impl Lnd {
446 pub async fn new(process_mgr: &ProcessManager, bitcoind: Bitcoind) -> Result<Self> {
447 let (process, client) = Lnd::start(process_mgr).await?;
448 let this = Self {
449 _bitcoind: bitcoind,
450 client: Arc::new(Mutex::new(client)),
451 process,
452 };
453 poll("lnd_startup", || async {
455 this.pub_key().await.map_err(ControlFlow::Continue)
456 })
457 .await?;
458 Ok(this)
459 }
460
461 pub async fn start(process_mgr: &ProcessManager) -> Result<(ProcessHandle, LndClient)> {
462 let conf = format!(
463 include_str!("cfg/lnd.conf"),
464 listen_port = process_mgr.globals.FM_PORT_LND_LISTEN,
465 rpc_port = process_mgr.globals.FM_PORT_LND_RPC,
466 rest_port = process_mgr.globals.FM_PORT_LND_REST,
467 btc_rpc_port = process_mgr.globals.FM_PORT_BTC_RPC,
468 zmq_pub_raw_block = process_mgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_BLOCK,
469 zmq_pub_raw_tx = process_mgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_TX,
470 );
471 write_overwrite_async(process_mgr.globals.FM_LND_DIR.join("lnd.conf"), conf).await?;
472 let cmd = cmd!(
473 crate::util::Lnd,
474 format!("--lnddir={}", utf8(&process_mgr.globals.FM_LND_DIR))
475 );
476
477 let process = process_mgr.spawn_daemon("lnd", cmd).await?;
478 let lnd_rpc_addr = &process_mgr.globals.FM_LND_RPC_ADDR;
479 let lnd_macaroon = &process_mgr.globals.FM_LND_MACAROON;
480 let lnd_tls_cert = &process_mgr.globals.FM_LND_TLS_CERT;
481 poll("wait for lnd files", || async {
482 if fs::try_exists(lnd_tls_cert)
483 .await
484 .context("lnd tls cert")
485 .map_err(ControlFlow::Continue)?
486 && fs::try_exists(lnd_macaroon)
487 .await
488 .context("lnd macaroon")
489 .map_err(ControlFlow::Continue)?
490 {
491 Ok(())
492 } else {
493 Err(ControlFlow::Continue(anyhow!(
494 "lnd tls cert or lnd macaroon not found"
495 )))
496 }
497 })
498 .await?;
499
500 install_crypto_provider().await;
501
502 let client = poll("lnd_connect", || async {
503 tonic_lnd::connect(
504 lnd_rpc_addr.clone(),
505 lnd_tls_cert.clone(),
506 lnd_macaroon.clone(),
507 )
508 .await
509 .context("lnd connect")
510 .map_err(ControlFlow::Continue)
511 })
512 .await?;
513
514 Ok((process, client))
515 }
516
517 pub async fn lightning_client_lock(
518 &self,
519 ) -> Result<MappedMutexGuard<'_, tonic_lnd::LightningClient>> {
520 let guard = self.client.lock().await;
521 Ok(MutexGuard::map(guard, |client| client.lightning()))
522 }
523
524 pub async fn invoices_client_lock(
525 &self,
526 ) -> Result<MappedMutexGuard<'_, tonic_lnd::InvoicesClient>> {
527 let guard = self.client.lock().await;
528 Ok(MutexGuard::map(guard, |client| client.invoices()))
529 }
530
531 pub async fn router_client_lock(
532 &self,
533 ) -> Result<MappedMutexGuard<'_, tonic_lnd::RouterClient>> {
534 let guard = self.client.lock().await;
535 Ok(MutexGuard::map(guard, |client| client.router()))
536 }
537
538 pub async fn pub_key(&self) -> Result<String> {
539 Ok(self
540 .lightning_client_lock()
541 .await?
542 .get_info(GetInfoRequest {})
543 .await?
544 .into_inner()
545 .identity_pubkey)
546 }
547
548 pub async fn terminate(self) -> Result<()> {
549 self.process.terminate().await
550 }
551
552 pub async fn invoice(&self, amount: u64) -> anyhow::Result<(String, Vec<u8>)> {
553 let add_invoice = self
554 .lightning_client_lock()
555 .await?
556 .add_invoice(tonic_lnd::lnrpc::Invoice {
557 value_msat: amount as i64,
558 ..Default::default()
559 })
560 .await?
561 .into_inner();
562 let invoice = add_invoice.payment_request;
563 let payment_hash = add_invoice.r_hash;
564 Ok((invoice, payment_hash))
565 }
566
567 pub async fn pay_bolt11_invoice(&self, invoice: String) -> anyhow::Result<()> {
568 let mut payment = self
569 .router_client_lock()
570 .await?
571 .send_payment_v2(SendPaymentRequest {
572 payment_request: invoice.clone(),
573 ..Default::default()
574 })
575 .await?
576 .into_inner();
577
578 while let Some(update) = payment.message().await? {
579 match update.status() {
580 tonic_lnd::lnrpc::payment::PaymentStatus::Succeeded => return Ok(()),
581 tonic_lnd::lnrpc::payment::PaymentStatus::InFlight => {}
582 _ => return Err(anyhow!("LND lightning payment failed")),
583 }
584 }
585
586 Err(anyhow!("LND lightning payment unknown status"))
587 }
588
589 pub async fn wait_bolt11_invoice(&self, payment_hash: Vec<u8>) -> anyhow::Result<()> {
590 let invoice_status = self
591 .lightning_client_lock()
592 .await?
593 .lookup_invoice(tonic_lnd::lnrpc::PaymentHash {
594 r_hash: payment_hash,
595 ..Default::default()
596 })
597 .await?
598 .into_inner()
599 .state();
600 anyhow::ensure!(invoice_status == tonic_lnd::lnrpc::invoice::InvoiceState::Settled);
601
602 Ok(())
603 }
604
605 pub async fn create_hold_invoice(
606 &self,
607 amount: u64,
608 ) -> anyhow::Result<([u8; 32], String, sha256::Hash)> {
609 let preimage = rand::random::<[u8; 32]>();
610 let hash = {
611 let mut engine = bitcoin::hashes::sha256::Hash::engine();
612 bitcoin::hashes::HashEngine::input(&mut engine, &preimage);
613 bitcoin::hashes::sha256::Hash::from_engine(engine)
614 };
615 let cltv_expiry = 650;
616 let hold_request = self
617 .invoices_client_lock()
618 .await?
619 .add_hold_invoice(tonic_lnd::invoicesrpc::AddHoldInvoiceRequest {
620 value_msat: amount as i64,
621 hash: hash.to_byte_array().to_vec(),
622 cltv_expiry,
623 ..Default::default()
624 })
625 .await?
626 .into_inner();
627 let payment_request = hold_request.payment_request;
628 Ok((preimage, payment_request, hash))
629 }
630
631 pub async fn settle_hold_invoice(
632 &self,
633 preimage: [u8; 32],
634 payment_hash: sha256::Hash,
635 ) -> anyhow::Result<()> {
636 let mut hold_invoice_subscription = self
637 .invoices_client_lock()
638 .await?
639 .subscribe_single_invoice(tonic_lnd::invoicesrpc::SubscribeSingleInvoiceRequest {
640 r_hash: payment_hash.to_byte_array().to_vec(),
641 })
642 .await?
643 .into_inner();
644 loop {
645 const WAIT_FOR_INVOICE_TIMEOUT: Duration = Duration::from_secs(60);
646 match timeout(
647 WAIT_FOR_INVOICE_TIMEOUT,
648 futures::StreamExt::next(&mut hold_invoice_subscription),
649 )
650 .await
651 {
652 Ok(Some(Ok(invoice))) => {
653 if invoice.state() == tonic_lnd::lnrpc::invoice::InvoiceState::Accepted {
654 break;
655 }
656 debug!("hold invoice payment state: {:?}", invoice.state());
657 }
658 Ok(Some(Err(e))) => {
659 bail!("error in invoice subscription: {e:?}");
660 }
661 Ok(None) => {
662 bail!("invoice subscription ended before invoice was accepted");
663 }
664 Err(_) => {
665 bail!("timed out waiting for invoice to be accepted")
666 }
667 }
668 }
669
670 self.invoices_client_lock()
671 .await?
672 .settle_invoice(tonic_lnd::invoicesrpc::SettleInvoiceMsg {
673 preimage: preimage.to_vec(),
674 })
675 .await?;
676
677 Ok(())
678 }
679}
680
681pub type NamedGateway<'a> = (&'a Gatewayd, &'a str);
682
683#[allow(clippy::similar_names)]
684pub async fn open_channels_between_gateways(
685 bitcoind: &Bitcoind,
686 gateways: &[NamedGateway<'_>],
687) -> Result<()> {
688 let block_height = bitcoind.get_block_count().await? - 1;
689 debug!(target: LOG_DEVIMINT, ?block_height, "Syncing gateway lightning nodes to block height...");
690 futures::future::try_join_all(
691 gateways
692 .iter()
693 .map(|(gw, _gw_name)| gw.wait_for_block_height(block_height)),
694 )
695 .await?;
696
697 info!(target: LOG_DEVIMINT, "Funding all gateway lightning nodes...");
698 for (gw, _gw_name) in gateways {
699 let funding_addr = gw.get_ln_onchain_address().await?;
700 bitcoind.send_to(funding_addr, 100_000_000).await?;
701 }
702
703 bitcoind.mine_blocks(10).await?;
704
705 info!(target: LOG_DEVIMINT, "Gateway lightning nodes funded.");
706
707 let block_height = bitcoind.get_block_count().await? - 1;
708 debug!(target: LOG_DEVIMINT, ?block_height, "Syncing gateway lightning nodes to block height...");
709 futures::future::try_join_all(
710 gateways
711 .iter()
712 .map(|(gw, _gw_name)| gw.wait_for_block_height(block_height)),
713 )
714 .await?;
715
716 let gateway_pairs: Vec<(&NamedGateway, &NamedGateway)> =
722 gateways.iter().tuple_windows::<(_, _)>().collect();
723
724 info!(target: LOG_DEVIMINT, block_height = %block_height, "devimint current block");
725 let sats_per_side = 5_000_000;
726 for ((gw_a, gw_a_name), (gw_b, gw_b_name)) in &gateway_pairs {
727 info!(target: LOG_DEVIMINT, from=%gw_a_name, to=%gw_b_name, "Opening channel with {sats_per_side} sats on each side...");
728 let txid = gw_a
729 .open_channel(gw_b, sats_per_side * 2, Some(sats_per_side))
730 .await?;
731
732 bitcoind.poll_get_transaction(txid).await?;
733 }
734
735 bitcoind.mine_blocks(10).await?;
736
737 let block_height = bitcoind.get_block_count().await? - 1;
738 debug!(target: LOG_DEVIMINT, ?block_height, "Syncing gateway lightning nodes to block height...");
739 futures::future::try_join_all(
740 gateways
741 .iter()
742 .map(|(gw, _gw_name)| gw.wait_for_block_height(block_height)),
743 )
744 .await?;
745
746 for ((gw_a, _gw_a_name), (gw_b, _gw_b_name)) in &gateway_pairs {
747 let gw_a_node_pubkey = gw_a.lightning_pubkey().await?;
748 let gw_b_node_pubkey = gw_b.lightning_pubkey().await?;
749
750 wait_for_ready_channel_on_gateway_with_counterparty(gw_b, gw_a_node_pubkey).await?;
751 wait_for_ready_channel_on_gateway_with_counterparty(gw_a, gw_b_node_pubkey).await?;
752 }
753
754 info!(target: LOG_DEVIMINT, "open_channels_between_gateways successful");
755
756 Ok(())
757}
758
759async fn wait_for_ready_channel_on_gateway_with_counterparty(
760 gw: &Gatewayd,
761 counterparty_lightning_node_pubkey: bitcoin::secp256k1::PublicKey,
762) -> anyhow::Result<()> {
763 poll(
764 &format!("Wait for {} channel update", gw.gw_name),
765 || async {
766 let channels = gw
767 .list_channels()
768 .await
769 .context("list channels")
770 .map_err(ControlFlow::Break)?;
771
772 if channels
773 .iter()
774 .any(|channel| channel.remote_pubkey == counterparty_lightning_node_pubkey && channel.is_active)
775 {
776 return Ok(());
777 }
778
779 debug!(target: LOG_DEVIMINT, ?channels, gw = gw.gw_name, "Counterparty channels not found open");
780 Err(ControlFlow::Continue(anyhow!("channel not found")))
781 },
782 )
783 .await
784}
785
786#[derive(Clone)]
787pub enum LightningNode {
788 Lnd(Lnd),
789 Ldk {
790 name: String,
791 gw_port: u16,
792 ldk_port: u16,
793 metrics_port: u16,
794 },
795}
796
797impl LightningNode {
798 pub fn ln_type(&self) -> LightningNodeType {
799 match self {
800 LightningNode::Lnd(_) => LightningNodeType::Lnd,
801 LightningNode::Ldk {
802 name: _,
803 gw_port: _,
804 ldk_port: _,
805 metrics_port: _,
806 } => LightningNodeType::Ldk,
807 }
808 }
809}
810
811#[derive(Clone)]
812pub struct Esplora {
813 _process: ProcessHandle,
814 _bitcoind: Bitcoind,
815}
816
817impl Esplora {
818 pub async fn new(process_mgr: &ProcessManager, bitcoind: Bitcoind) -> Result<Self> {
819 debug!("Starting esplora");
820 let daemon_dir = process_mgr
821 .globals
822 .FM_BTC_DIR
823 .to_str()
824 .context("non utf8 path")?;
825 let esplora_dir = process_mgr
826 .globals
827 .FM_ESPLORA_DIR
828 .to_str()
829 .context("non utf8 path")?;
830
831 let btc_rpc_port = process_mgr.globals.FM_PORT_BTC_RPC;
832 let esplora_port = process_mgr.globals.FM_PORT_ESPLORA;
833 let esplora_monitoring_port = process_mgr.globals.FM_PORT_ESPLORA_MONITORING;
834 let cmd = cmd!(
836 crate::util::Esplora,
837 "--daemon-dir={daemon_dir}",
838 "--db-dir={esplora_dir}",
839 "--cookie=bitcoin:bitcoin",
840 "--network=regtest",
841 "--daemon-rpc-addr=127.0.0.1:{btc_rpc_port}",
842 "--http-addr=127.0.0.1:{esplora_port}",
843 "--monitoring-addr=127.0.0.1:{esplora_monitoring_port}",
844 "--jsonrpc-import", );
846 let process = process_mgr.spawn_daemon("esplora", cmd).await?;
847
848 Self::wait_for_ready(process_mgr).await?;
849 debug!(target: LOG_DEVIMINT, "Esplora ready");
850
851 Ok(Self {
852 _bitcoind: bitcoind,
853 _process: process,
854 })
855 }
856
857 async fn wait_for_ready(process_mgr: &ProcessManager) -> Result<()> {
859 let client = esplora_client::Builder::new(&format!(
860 "http://localhost:{}",
861 process_mgr.globals.FM_PORT_ESPLORA
862 ))
863 .max_retries(0)
865 .build_async()
866 .expect("esplora client build failed");
867
868 poll("esplora server ready", || async {
869 client
870 .get_fee_estimates()
871 .await
872 .map_err(|e| ControlFlow::Continue(anyhow::anyhow!(e)))?;
873
874 Ok(())
875 })
876 .await?;
877
878 Ok(())
879 }
880}
881
882#[allow(unused)]
883pub struct ExternalDaemons {
884 pub bitcoind: Bitcoind,
885 pub esplora: Esplora,
886}
887
888pub async fn external_daemons(process_mgr: &ProcessManager) -> Result<ExternalDaemons> {
889 let bitcoind = Bitcoind::new(process_mgr, false).await?;
890 let esplora = Esplora::new(process_mgr, bitcoind.clone()).await?;
891 let start_time = fedimint_core::time::now();
892 let _ = bitcoind.wallet_client().await?;
894 info!(
895 target: LOG_DEVIMINT,
896 "starting base daemons took {:?}",
897 start_time.elapsed()?
898 );
899 Ok(ExternalDaemons { bitcoind, esplora })
900}