1use std::ops::ControlFlow;
2use std::path::Path;
3use std::str::FromStr;
4use std::sync::Arc;
5use std::time::Duration;
6
7use anyhow::{Context, Result, anyhow, bail};
8use bitcoin::hashes::Hash;
9use bitcoincore_rpc::RpcApi;
10use bitcoincore_rpc::bitcoin::{Address, BlockHash};
11use bitcoincore_rpc::bitcoincore_rpc_json::{GetBalancesResult, GetBlockchainInfoResult};
12use bitcoincore_rpc::jsonrpc::error::RpcError;
13use cln_rpc::ClnRpc;
14use cln_rpc::primitives::{Amount as ClnRpcAmount, AmountOrAny};
15use fedimint_core::encoding::Encodable;
16use fedimint_core::task::jit::{JitTry, JitTryAnyhow};
17use fedimint_core::task::{block_in_place, block_on, sleep, timeout};
18use fedimint_core::util::{FmtCompact as _, write_overwrite_async};
19use fedimint_logging::LOG_DEVIMINT;
20use fedimint_testing::ln::LightningNodeType;
21use futures::StreamExt;
22use hex::ToHex;
23use itertools::Itertools;
24use tokio::fs;
25use tokio::sync::{MappedMutexGuard, Mutex, MutexGuard};
26use tokio::task::spawn_blocking;
27use tokio::time::Instant;
28use tonic_lnd::Client as LndClient;
29use tonic_lnd::lnrpc::{ChanInfoRequest, GetInfoRequest, ListChannelsRequest};
30use tracing::{debug, error, info, trace, warn};
31
32use crate::util::{ProcessHandle, ProcessManager, poll, poll_with_timeout};
33use crate::vars::utf8;
34use crate::version_constants::VERSION_0_5_0_ALPHA;
35use crate::{Gatewayd, cmd, poll_eq};
36
37#[derive(Clone)]
38pub struct Bitcoind {
39 pub client: Arc<bitcoincore_rpc::Client>,
40 pub(crate) wallet_client: Arc<JitTryAnyhow<Arc<bitcoincore_rpc::Client>>>,
41 pub(crate) _process: ProcessHandle,
42}
43
44impl Bitcoind {
45 pub async fn new(processmgr: &ProcessManager, skip_setup: bool) -> Result<Self> {
46 let btc_dir = utf8(&processmgr.globals.FM_BTC_DIR);
47
48 let conf = format!(
49 include_str!("cfg/bitcoin.conf"),
50 rpc_port = processmgr.globals.FM_PORT_BTC_RPC,
51 p2p_port = processmgr.globals.FM_PORT_BTC_P2P,
52 zmq_pub_raw_block = processmgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_BLOCK,
53 zmq_pub_raw_tx = processmgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_TX,
54 tx_index = "0",
55 );
56 write_overwrite_async(processmgr.globals.FM_BTC_DIR.join("bitcoin.conf"), conf).await?;
57 let process = processmgr
58 .spawn_daemon(
59 "bitcoind",
60 cmd!(crate::util::Bitcoind, "-datadir={btc_dir}"),
61 )
62 .await?;
63
64 let url = processmgr.globals.FM_BITCOIN_RPC_URL.parse()?;
65 debug!("Parsed FM_BITCOIN_RPC_URL: {:?}", &url);
66 let (host, auth) = fedimint_bitcoind::bitcoincore::from_url_to_url_auth(&url)?;
67 debug!("bitcoind host: {:?}, auth: {:?}", &host, auth);
68 let client =
69 Self::new_bitcoin_rpc(&host, auth.clone()).context("Failed to connect to bitcoind")?;
70 let wallet_client = JitTry::new_try(move || async move {
71 let client =
72 Self::new_bitcoin_rpc(&host, auth).context("Failed to connect to bitcoind")?;
73 Self::init(&client, skip_setup).await?;
74 Ok(Arc::new(client))
75 });
76
77 let bitcoind = Self {
78 _process: process,
79 client: Arc::new(client),
80 wallet_client: Arc::new(wallet_client),
81 };
82
83 bitcoind.poll_ready().await?;
84
85 Ok(bitcoind)
86 }
87
88 fn new_bitcoin_rpc(
89 url: &str,
90 auth: bitcoincore_rpc::Auth,
91 ) -> anyhow::Result<bitcoincore_rpc::Client> {
92 const RPC_TIMEOUT: Duration = Duration::from_secs(45);
94 let mut builder = bitcoincore_rpc::jsonrpc::simple_http::Builder::new()
95 .url(url)?
96 .timeout(RPC_TIMEOUT);
97 let (user, pass) = auth.get_user_pass()?;
98 if let Some(user) = user {
99 builder = builder.auth(user, pass);
100 }
101 let client = bitcoincore_rpc::jsonrpc::Client::with_transport(builder.build());
102 Ok(bitcoincore_rpc::Client::from_jsonrpc(client))
103 }
104
105 pub(crate) async fn init(client: &bitcoincore_rpc::Client, skip_setup: bool) -> Result<()> {
106 debug!("Setting up bitcoind");
107 for attempt in 0.. {
109 match block_in_place(|| client.create_wallet("", None, None, None, None)) {
110 Ok(_) => {
111 break;
112 }
113 Err(err) => {
114 if err.to_string().contains("Database already exists") {
115 break;
116 }
117 if attempt % 20 == 19 {
118 debug!(target: LOG_DEVIMINT, %attempt, err = %err.fmt_compact(), "Waiting for initial bitcoind wallet initialization");
119 }
120 sleep(Duration::from_millis(100)).await;
121 }
122 }
123 }
124
125 if !skip_setup {
126 let blocks = 101;
128 let address = block_in_place(|| client.get_new_address(None, None))?
129 .require_network(bitcoin::Network::Regtest)
130 .expect("Devimint always runs in regtest");
131 debug!(target: LOG_DEVIMINT, blocks_num=blocks, %address, "Mining blocks to address");
132 block_in_place(|| {
133 client
134 .generate_to_address(blocks, &address)
135 .context("Failed to generate blocks")
136 })?;
137 trace!(target: LOG_DEVIMINT, blocks_num=blocks, %address, "Mining blocks to address complete");
138 }
139
140 poll("bitcoind", || async {
142 let info = block_in_place(|| client.get_blockchain_info())
143 .context("bitcoind getblockchaininfo")
144 .map_err(ControlFlow::Continue)?;
145 if info.blocks > 100 {
146 Ok(())
147 } else {
148 Err(ControlFlow::Continue(anyhow!(
149 "not enough blocks: {}",
150 info.blocks
151 )))
152 }
153 })
154 .await?;
155 debug!("Bitcoind ready");
156 Ok(())
157 }
158
159 async fn poll_ready(&self) -> anyhow::Result<()> {
161 poll("bitcoind rpc ready", || async {
162 self.get_block_count()
163 .await
164 .map_err(ControlFlow::Continue::<anyhow::Error, _>)?;
165 Ok(())
166 })
167 .await
168 }
169
170 pub async fn wallet_client(&self) -> anyhow::Result<&Self> {
173 self.wallet_client.get_try().await?;
174 Ok(self)
175 }
176
177 pub async fn get_block_count(&self) -> Result<u64> {
183 let client = self.client.clone();
184 Ok(spawn_blocking(move || client.get_block_count()).await?? + 1)
185 }
186
187 pub async fn mine_blocks_no_wait(&self, block_num: u64) -> Result<u64> {
188 let start_time = Instant::now();
189 debug!(target: LOG_DEVIMINT, ?block_num, "Mining bitcoin blocks");
190 let addr = self.get_new_address().await?;
191 let initial_block_count = self.get_block_count().await?;
192 self.generate_to_address(block_num, addr).await?;
193 debug!(target: LOG_DEVIMINT,
194 elapsed_ms = %start_time.elapsed().as_millis(),
195 ?block_num, "Mined blocks (no wait)");
196
197 Ok(initial_block_count)
198 }
199
200 pub async fn mine_blocks(&self, block_num: u64) -> Result<()> {
201 let start_time = Instant::now();
202 debug!(target: LOG_DEVIMINT, ?block_num, "Mining bitcoin blocks");
203 let addr = self.get_new_address().await?;
204 let initial_block_count = self.get_block_count().await?;
205 self.generate_to_address(block_num, addr).await?;
206 while self.get_block_count().await? < initial_block_count + block_num {
207 trace!(target: LOG_DEVIMINT, ?block_num, "Waiting for blocks to be mined");
208 sleep(Duration::from_millis(100)).await;
209 }
210
211 debug!(target: LOG_DEVIMINT,
212 elapsed_ms = %start_time.elapsed().as_millis(),
213 ?block_num, "Mined blocks");
214
215 Ok(())
216 }
217
218 pub async fn send_to(&self, addr: String, amount: u64) -> Result<bitcoin::Txid> {
219 debug!(target: LOG_DEVIMINT, amount, addr, "Sending funds from bitcoind");
220 let amount = bitcoin::Amount::from_sat(amount);
221 let tx = self
222 .wallet_client()
223 .await?
224 .send_to_address(
225 bitcoin::Address::from_str(&addr)?
226 .require_network(bitcoin::Network::Regtest)
227 .expect("Devimint always runs in regtest"),
228 amount,
229 )
230 .await?;
231 Ok(tx)
232 }
233
234 pub async fn get_txout_proof(&self, txid: bitcoin::Txid) -> Result<String> {
235 let client = self.wallet_client().await?.clone();
236 let proof = spawn_blocking(move || client.client.get_tx_out_proof(&[txid], None)).await??;
237 Ok(proof.encode_hex())
238 }
239
240 pub async fn poll_get_transaction(&self, txid: bitcoin::Txid) -> anyhow::Result<String> {
243 poll("Waiting for transaction in mempool", || async {
244 match self
245 .get_transaction(txid)
246 .await
247 .context("getrawtransaction")
248 {
249 Ok(Some(tx)) => Ok(tx),
250 Ok(None) => Err(ControlFlow::Continue(anyhow::anyhow!(
251 "Transaction not found yet"
252 ))),
253 Err(err) => Err(ControlFlow::Break(err)),
254 }
255 })
256 .await
257 }
258
259 async fn get_transaction(&self, txid: bitcoin::Txid) -> Result<Option<String>> {
261 match self.get_raw_transaction(txid, None).await {
263 Ok(None) => {}
266 other => return other,
268 };
269
270 let block_height = self.get_block_count().await? - 1;
271
272 let mut buffered_tx_stream = futures::stream::iter((0..block_height).rev())
278 .map(|height| async move {
279 let block_hash = self.get_block_hash(height).await?;
280 self.get_raw_transaction(txid, Some(block_hash)).await
281 })
282 .buffered(32);
283
284 while let Some(tx_or) = buffered_tx_stream.next().await {
285 match tx_or {
286 Ok(None) => continue,
289 other => return other,
291 };
292 }
293
294 Ok(None)
296 }
297
298 async fn get_raw_transaction(
299 &self,
300 txid: bitcoin::Txid,
301 block_hash: Option<BlockHash>,
302 ) -> Result<Option<String>> {
303 let client = self.client.clone();
304 let tx_or =
305 spawn_blocking(move || client.get_raw_transaction(&txid, block_hash.as_ref())).await?;
306
307 let tx = match tx_or {
308 Ok(tx) => tx,
309 Err(bitcoincore_rpc::Error::JsonRpc(bitcoincore_rpc::jsonrpc::Error::Rpc(
314 RpcError { code: -5, .. },
315 ))) => return Ok(None),
316 Err(err) => return Err(err.into()),
317 };
318 let bytes = tx.consensus_encode_to_vec();
319 Ok(Some(bytes.encode_hex()))
320 }
321
322 async fn get_block_hash(&self, height: u64) -> Result<BlockHash> {
323 let client = self.client.clone();
324 Ok(spawn_blocking(move || client.get_block_hash(height)).await??)
325 }
326
327 pub async fn get_new_address(&self) -> Result<Address> {
328 let client = self.wallet_client().await?.clone();
329 let addr = spawn_blocking(move || client.client.get_new_address(None, None))
330 .await??
331 .require_network(bitcoin::Network::Regtest)
332 .expect("Devimint always runs in regtest");
333 Ok(addr)
334 }
335
336 pub async fn generate_to_address(
337 &self,
338 block_num: u64,
339 address: Address,
340 ) -> Result<Vec<BlockHash>> {
341 let client = self.wallet_client().await?.clone();
342 Ok(
343 spawn_blocking(move || client.client.generate_to_address(block_num, &address))
344 .await??,
345 )
346 }
347
348 pub async fn get_blockchain_info(&self) -> anyhow::Result<GetBlockchainInfoResult> {
349 let client = self.client.clone();
350 Ok(spawn_blocking(move || client.get_blockchain_info()).await??)
351 }
352
353 pub async fn send_to_address(
354 &self,
355 addr: Address,
356 amount: bitcoin::Amount,
357 ) -> anyhow::Result<bitcoin::Txid> {
358 let client = self.wallet_client().await?.clone();
359 Ok(spawn_blocking(move || {
360 client
361 .client
362 .send_to_address(&addr, amount, None, None, None, None, None, None)
363 })
364 .await??)
365 }
366
367 pub(crate) async fn get_balances(&self) -> anyhow::Result<GetBalancesResult> {
368 let client = self.wallet_client().await?.clone();
369 Ok(spawn_blocking(move || client.client.get_balances()).await??)
370 }
371
372 pub(crate) fn get_jsonrpc_client(&self) -> &bitcoincore_rpc::jsonrpc::Client {
373 self.client.get_jsonrpc_client()
374 }
375}
376
377pub struct LightningdProcessHandle(ProcessHandle);
378
379impl LightningdProcessHandle {
380 async fn terminate(&self) -> Result<()> {
381 if self.0.is_running().await {
382 self.0.terminate().await
383 } else {
384 Ok(())
385 }
386 }
387}
388
389impl Drop for LightningdProcessHandle {
390 fn drop(&mut self) {
391 block_in_place(|| {
393 if let Err(e) = block_on(self.terminate()) {
394 warn!(target: LOG_DEVIMINT, "failed to terminate lightningd: {e:?}");
395 }
396 });
397 }
398}
399
400#[derive(Clone)]
401pub struct Lightningd {
402 pub(crate) rpc: Arc<Mutex<ClnRpc>>,
403 pub(crate) process: Arc<LightningdProcessHandle>,
404 pub(crate) bitcoind: Bitcoind,
405}
406
407impl Lightningd {
408 pub async fn new(process_mgr: &ProcessManager, bitcoind: Bitcoind) -> Result<Self> {
409 let cln_dir = &process_mgr.globals.FM_CLN_DIR;
410 let conf = format!(
411 include_str!("cfg/lightningd.conf"),
412 port = process_mgr.globals.FM_PORT_CLN,
413 bitcoin_rpcport = process_mgr.globals.FM_PORT_BTC_RPC,
414 log_path = process_mgr.globals.FM_CLN_DIR.join("cln.log").display(),
415 );
416 write_overwrite_async(process_mgr.globals.FM_CLN_DIR.join("config"), conf).await?;
417 let process = Lightningd::start(process_mgr, cln_dir).await?;
418
419 let socket_cln = cln_dir.join("regtest/lightning-rpc");
420 poll("lightningd", || async {
421 ClnRpc::new(socket_cln.clone())
422 .await
423 .context("connect to lightningd")
424 .map_err(ControlFlow::Continue)
425 })
426 .await?;
427 let rpc = ClnRpc::new(socket_cln).await?;
428 Ok(Self {
429 bitcoind,
430 rpc: Arc::new(Mutex::new(rpc)),
431 process: Arc::new(LightningdProcessHandle(process)),
432 })
433 }
434
435 pub async fn start(process_mgr: &ProcessManager, cln_dir: &Path) -> Result<ProcessHandle> {
436 let btc_dir = utf8(&process_mgr.globals.FM_BTC_DIR);
437 let cmd = cmd!(
438 crate::util::Lightningd,
439 "--dev-fast-gossip",
440 "--dev-bitcoind-poll=1",
441 format!("--lightning-dir={}", utf8(cln_dir)),
442 format!("--bitcoin-datadir={btc_dir}"),
443 );
444
445 process_mgr.spawn_daemon("lightningd", cmd).await
446 }
447
448 pub async fn request<R>(&self, request: R) -> Result<R::Response>
449 where
450 R: cln_rpc::model::TypedRequest + serde::Serialize + std::fmt::Debug,
451 R::Response: serde::de::DeserializeOwned + std::fmt::Debug,
452 {
453 let mut rpc = self.rpc.lock().await;
454 Ok(rpc.call_typed(&request).await?)
455 }
456
457 async fn await_block_processing(&self) -> Result<()> {
460 poll("lightningd block processing", || async {
461 let btc_height = self
462 .bitcoind
463 .get_blockchain_info()
464 .await
465 .context("bitcoind getblockchaininfo")
466 .map_err(ControlFlow::Continue)?
467 .blocks;
468 let lnd_height = self
469 .request(cln_rpc::model::requests::GetinfoRequest {})
470 .await
471 .map_err(ControlFlow::Continue)?
472 .blockheight;
473 poll_eq!(u64::from(lnd_height), btc_height)
474 })
475 .await?;
476 Ok(())
477 }
478
479 pub async fn pub_key(&self) -> Result<String> {
480 Ok(self
481 .request(cln_rpc::model::requests::GetinfoRequest {})
482 .await?
483 .id
484 .to_string())
485 }
486
487 pub async fn terminate(self) -> Result<()> {
488 self.process.terminate().await
489 }
490
491 pub async fn invoice(
492 &self,
493 amount: u64,
494 description: String,
495 label: String,
496 ) -> anyhow::Result<String> {
497 let invoice = self
498 .request(cln_rpc::model::requests::InvoiceRequest {
499 amount_msat: AmountOrAny::Amount(ClnRpcAmount::from_msat(amount)),
500 description,
501 label,
502 expiry: Some(60),
503 fallbacks: None,
504 preimage: None,
505 cltv: None,
506 deschashonly: None,
507 exposeprivatechannels: None,
508 })
509 .await?
510 .bolt11;
511 Ok(invoice)
512 }
513
514 pub async fn pay_bolt11_invoice(&self, invoice: String) -> anyhow::Result<()> {
515 let invoice_status = self
516 .request(cln_rpc::model::requests::PayRequest {
517 bolt11: invoice,
518 amount_msat: None,
519 label: None,
520 riskfactor: None,
521 maxfeepercent: None,
522 retry_for: None,
523 maxdelay: None,
524 exemptfee: None,
525 localinvreqid: None,
526 exclude: None,
527 maxfee: None,
528 description: None,
529 partial_msat: None,
530 })
531 .await?
532 .status;
533
534 anyhow::ensure!(matches!(
535 invoice_status,
536 cln_rpc::model::responses::PayStatus::COMPLETE
537 ));
538
539 Ok(())
540 }
541
542 pub async fn wait_any_bolt11_invoice(&self) -> anyhow::Result<()> {
543 let invoice_status = self
544 .request(cln_rpc::model::requests::WaitanyinvoiceRequest {
545 lastpay_index: None,
546 timeout: None,
547 })
548 .await?
549 .status;
550 anyhow::ensure!(matches!(
551 invoice_status,
552 cln_rpc::model::responses::WaitanyinvoiceStatus::PAID
553 ));
554
555 Ok(())
556 }
557}
558
559#[derive(Clone)]
560pub struct Lnd {
561 pub(crate) client: Arc<Mutex<LndClient>>,
562 pub(crate) process: ProcessHandle,
563 pub(crate) _bitcoind: Bitcoind,
564}
565
566impl Lnd {
567 pub async fn new(process_mgr: &ProcessManager, bitcoind: Bitcoind) -> Result<Self> {
568 let (process, client) = Lnd::start(process_mgr).await?;
569 let this = Self {
570 _bitcoind: bitcoind,
571 client: Arc::new(Mutex::new(client)),
572 process,
573 };
574 poll("lnd_startup", || async {
576 this.pub_key().await.map_err(ControlFlow::Continue)
577 })
578 .await?;
579 Ok(this)
580 }
581
582 pub async fn start(process_mgr: &ProcessManager) -> Result<(ProcessHandle, LndClient)> {
583 let conf = format!(
584 include_str!("cfg/lnd.conf"),
585 listen_port = process_mgr.globals.FM_PORT_LND_LISTEN,
586 rpc_port = process_mgr.globals.FM_PORT_LND_RPC,
587 rest_port = process_mgr.globals.FM_PORT_LND_REST,
588 btc_rpc_port = process_mgr.globals.FM_PORT_BTC_RPC,
589 zmq_pub_raw_block = process_mgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_BLOCK,
590 zmq_pub_raw_tx = process_mgr.globals.FM_PORT_BTC_ZMQ_PUB_RAW_TX,
591 );
592 write_overwrite_async(process_mgr.globals.FM_LND_DIR.join("lnd.conf"), conf).await?;
593 let cmd = cmd!(
594 crate::util::Lnd,
595 format!("--lnddir={}", utf8(&process_mgr.globals.FM_LND_DIR))
596 );
597
598 let process = process_mgr.spawn_daemon("lnd", cmd).await?;
599 let lnd_rpc_addr = &process_mgr.globals.FM_LND_RPC_ADDR;
600 let lnd_macaroon = &process_mgr.globals.FM_LND_MACAROON;
601 let lnd_tls_cert = &process_mgr.globals.FM_LND_TLS_CERT;
602 poll("wait for lnd files", || async {
603 if fs::try_exists(lnd_tls_cert)
604 .await
605 .context("lnd tls cert")
606 .map_err(ControlFlow::Continue)?
607 && fs::try_exists(lnd_macaroon)
608 .await
609 .context("lnd macaroon")
610 .map_err(ControlFlow::Continue)?
611 {
612 Ok(())
613 } else {
614 Err(ControlFlow::Continue(anyhow!(
615 "lnd tls cert or lnd macaroon not found"
616 )))
617 }
618 })
619 .await?;
620
621 let client = poll("lnd_connect", || async {
622 tonic_lnd::connect(
623 lnd_rpc_addr.clone(),
624 lnd_tls_cert.clone(),
625 lnd_macaroon.clone(),
626 )
627 .await
628 .context("lnd connect")
629 .map_err(ControlFlow::Continue)
630 })
631 .await?;
632
633 Ok((process, client))
634 }
635
636 pub async fn lightning_client_lock(
637 &self,
638 ) -> Result<MappedMutexGuard<'_, tonic_lnd::LightningClient>> {
639 let guard = self.client.lock().await;
640 Ok(MutexGuard::map(guard, |client| client.lightning()))
641 }
642
643 pub async fn invoices_client_lock(
644 &self,
645 ) -> Result<MappedMutexGuard<'_, tonic_lnd::InvoicesClient>> {
646 let guard = self.client.lock().await;
647 Ok(MutexGuard::map(guard, |client| client.invoices()))
648 }
649
650 pub async fn pub_key(&self) -> Result<String> {
651 Ok(self
652 .lightning_client_lock()
653 .await?
654 .get_info(GetInfoRequest {})
655 .await?
656 .into_inner()
657 .identity_pubkey)
658 }
659
660 async fn await_block_processing(&self) -> Result<()> {
663 poll("lnd block processing", || async {
664 let synced = self
665 .lightning_client_lock()
666 .await
667 .map_err(ControlFlow::Break)?
668 .get_info(GetInfoRequest {})
669 .await
670 .context("lnd get_info")
671 .map_err(ControlFlow::Continue)?
672 .into_inner()
673 .synced_to_chain;
674 if synced {
675 Ok(())
676 } else {
677 Err(ControlFlow::Continue(anyhow!("lnd not synced_to_chain")))
678 }
679 })
680 .await?;
681 Ok(())
682 }
683
684 pub async fn terminate(self) -> Result<()> {
685 self.process.terminate().await
686 }
687
688 pub async fn invoice(&self, amount: u64) -> anyhow::Result<(String, Vec<u8>)> {
689 let add_invoice = self
690 .lightning_client_lock()
691 .await?
692 .add_invoice(tonic_lnd::lnrpc::Invoice {
693 value_msat: amount as i64,
694 ..Default::default()
695 })
696 .await?
697 .into_inner();
698 let invoice = add_invoice.payment_request;
699 let payment_hash = add_invoice.r_hash;
700 Ok((invoice, payment_hash))
701 }
702
703 pub async fn pay_bolt11_invoice(&self, invoice: String) -> anyhow::Result<()> {
704 let payment = self
705 .lightning_client_lock()
706 .await?
707 .send_payment_sync(tonic_lnd::lnrpc::SendRequest {
708 payment_request: invoice.clone(),
709 ..Default::default()
710 })
711 .await?
712 .into_inner();
713 let payment_status = self
714 .lightning_client_lock()
715 .await?
716 .list_payments(tonic_lnd::lnrpc::ListPaymentsRequest {
717 include_incomplete: true,
718 ..Default::default()
719 })
720 .await?
721 .into_inner()
722 .payments
723 .into_iter()
724 .find(|p| p.payment_hash == payment.payment_hash.encode_hex::<String>())
725 .context("payment not in list")?
726 .status();
727 anyhow::ensure!(payment_status == tonic_lnd::lnrpc::payment::PaymentStatus::Succeeded);
728
729 Ok(())
730 }
731
732 pub async fn wait_bolt11_invoice(&self, payment_hash: Vec<u8>) -> anyhow::Result<()> {
733 let invoice_status = self
734 .lightning_client_lock()
735 .await?
736 .lookup_invoice(tonic_lnd::lnrpc::PaymentHash {
737 r_hash: payment_hash,
738 ..Default::default()
739 })
740 .await?
741 .into_inner()
742 .state();
743 anyhow::ensure!(invoice_status == tonic_lnd::lnrpc::invoice::InvoiceState::Settled);
744
745 Ok(())
746 }
747
748 pub async fn create_hold_invoice(
749 &self,
750 amount: u64,
751 ) -> anyhow::Result<([u8; 32], String, cln_rpc::primitives::Sha256)> {
752 let preimage = rand::random::<[u8; 32]>();
753 let hash = {
754 let mut engine = bitcoin::hashes::sha256::Hash::engine();
755 bitcoin::hashes::HashEngine::input(&mut engine, &preimage);
756 bitcoin::hashes::sha256::Hash::from_engine(engine)
757 };
758 let fedimint_cli_version = crate::util::FedimintCli::version_or_default().await;
761 let cltv_expiry = if fedimint_cli_version >= *VERSION_0_5_0_ALPHA {
762 650
763 } else {
764 100
765 };
766 let hold_request = self
767 .invoices_client_lock()
768 .await?
769 .add_hold_invoice(tonic_lnd::invoicesrpc::AddHoldInvoiceRequest {
770 value_msat: amount as i64,
771 hash: hash.to_byte_array().to_vec(),
772 cltv_expiry,
773 ..Default::default()
774 })
775 .await?
776 .into_inner();
777 let payment_request = hold_request.payment_request;
778 Ok((preimage, payment_request, hash))
779 }
780
781 pub async fn settle_hold_invoice(
782 &self,
783 preimage: [u8; 32],
784 payment_hash: cln_rpc::primitives::Sha256,
785 ) -> anyhow::Result<()> {
786 let mut hold_invoice_subscription = self
787 .invoices_client_lock()
788 .await?
789 .subscribe_single_invoice(tonic_lnd::invoicesrpc::SubscribeSingleInvoiceRequest {
790 r_hash: payment_hash.to_byte_array().to_vec(),
791 })
792 .await?
793 .into_inner();
794 loop {
795 const WAIT_FOR_INVOICE_TIMEOUT: Duration = Duration::from_secs(60);
796 match timeout(
797 WAIT_FOR_INVOICE_TIMEOUT,
798 futures::StreamExt::next(&mut hold_invoice_subscription),
799 )
800 .await
801 {
802 Ok(Some(Ok(invoice))) => {
803 if invoice.state() == tonic_lnd::lnrpc::invoice::InvoiceState::Accepted {
804 break;
805 }
806 debug!("hold invoice payment state: {:?}", invoice.state());
807 }
808 Ok(Some(Err(e))) => {
809 bail!("error in invoice subscription: {e:?}");
810 }
811 Ok(None) => {
812 bail!("invoice subscription ended before invoice was accepted");
813 }
814 Err(_) => {
815 bail!("timed out waiting for invoice to be accepted")
816 }
817 }
818 }
819
820 self.invoices_client_lock()
821 .await?
822 .settle_invoice(tonic_lnd::invoicesrpc::SettleInvoiceMsg {
823 preimage: preimage.to_vec(),
824 })
825 .await?;
826
827 Ok(())
828 }
829}
830
831pub async fn open_channel(
834 process_mgr: &ProcessManager,
835 bitcoind: &Bitcoind,
836 cln: &Lightningd,
837 lnd: &Lnd,
838) -> Result<()> {
839 debug!(target: LOG_DEVIMINT, "Await block ln nodes block processing");
840 tokio::try_join!(cln.await_block_processing(), lnd.await_block_processing())?;
841
842 let cln_addr = cln
843 .request(cln_rpc::model::requests::NewaddrRequest { addresstype: None })
844 .await?
845 .bech32
846 .context("bech32 should be present")?;
847
848 bitcoind.send_to(cln_addr, 100_000_000).await?;
849 bitcoind.mine_blocks(10).await?;
850
851 let lnd_pubkey = lnd.pub_key().await?;
852 let cln_pubkey = cln.pub_key().await?;
853
854 cln.request(cln_rpc::model::requests::ConnectRequest {
855 id: format!(
856 "{}@127.0.0.1:{}",
857 lnd_pubkey, process_mgr.globals.FM_PORT_LND_LISTEN
858 ),
859 host: None,
860 port: None,
861 })
862 .await
863 .context("connect request")?;
864
865 poll("fund channel", || async {
866 cln.request(cln_rpc::model::requests::FundchannelRequest {
867 id: lnd_pubkey
868 .parse()
869 .context("failed to parse lnd pubkey")
870 .map_err(ControlFlow::Break)?,
871 amount: cln_rpc::primitives::AmountOrAll::Amount(
872 cln_rpc::primitives::Amount::from_sat(10_000_000),
873 ),
874 push_msat: Some(cln_rpc::primitives::Amount::from_sat(5_000_000)),
875 feerate: None,
876 announce: None,
877 minconf: None,
878 close_to: None,
879 request_amt: None,
880 compact_lease: None,
881 utxos: None,
882 mindepth: None,
883 reserve: None,
884 channel_type: None,
885 })
886 .await
887 .map_err(ControlFlow::Continue)
888 })
889 .await?;
890
891 bitcoind.mine_blocks(10).await?;
892
893 poll("Wait LND for channel update", || async {
894 let mut lnd_client = lnd.client.lock().await;
895 let channels = lnd_client
896 .lightning()
897 .list_channels(ListChannelsRequest {
898 active_only: true,
899 ..Default::default()
900 })
901 .await
902 .context("lnd list channels")
903 .map_err(ControlFlow::Break)?
904 .into_inner();
905
906 if let Some(channel) = channels
907 .channels
908 .iter()
909 .find(|channel| channel.remote_pubkey == cln_pubkey)
910 {
911 let chan_info = lnd_client
912 .lightning()
913 .get_chan_info(ChanInfoRequest {
914 chan_id: channel.chan_id,
915 })
916 .await;
917
918 match chan_info {
919 Ok(_) => {
920 return Ok(());
921 }
922 Err(err) => {
923 debug!(target: LOG_DEVIMINT, err = %err.fmt_compact(), "Getting chan info failed");
924 }
925 }
926 }
927
928 Err(ControlFlow::Continue(anyhow!("channel not found")))
929 })
930 .await?;
931
932 Ok(())
933}
934
935pub type NamedGateway<'a> = (&'a Gatewayd, &'a str);
936
937#[allow(clippy::similar_names)]
938pub async fn open_channels_between_gateways(
939 bitcoind: &Bitcoind,
940 gateways: &[NamedGateway<'_>],
941) -> Result<()> {
942 let block_height = bitcoind.get_block_count().await? - 1;
943 debug!(target: LOG_DEVIMINT, ?block_height, "Syncing gateway lightning nodes to block height...");
944 futures::future::try_join_all(
945 gateways
946 .iter()
947 .map(|(gw, _gw_name)| gw.wait_for_block_height(block_height)),
948 )
949 .await?;
950
951 debug!(target: LOG_DEVIMINT, "Funding all gateway lightning nodes...");
952 for (gw, _gw_name) in gateways {
953 let funding_addr = gw.get_ln_onchain_address().await?;
954 bitcoind.send_to(funding_addr, 100_000_000).await?;
955 }
956
957 bitcoind.mine_blocks(10).await?;
958
959 let block_height = bitcoind.get_block_count().await? - 1;
960 debug!(target: LOG_DEVIMINT, ?block_height, "Syncing gateway lightning nodes to block height...");
961 futures::future::try_join_all(
962 gateways
963 .iter()
964 .map(|(gw, _gw_name)| gw.wait_for_block_height(block_height)),
965 )
966 .await?;
967
968 let gateway_pairs: Vec<(&NamedGateway, &NamedGateway)> = if gateways.len() == 2 {
974 gateways.iter().tuple_windows::<(_, _)>().collect()
975 } else {
976 gateways.iter().circular_tuple_windows::<(_, _)>().collect()
977 };
978
979 let open_channel_tasks = gateway_pairs.iter()
980 .map(|((gw_a, gw_a_name), (gw_b, gw_b_name))| {
981 let gw_a = (*gw_a).clone();
982 let gw_a_name = (*gw_a_name).to_string();
983 let gw_b = (*gw_b).clone();
984 let gw_b_name = (*gw_b_name).to_string();
985
986 let sats_per_side = 5_000_000;
987 debug!(target: LOG_DEVIMINT, from=%gw_a_name, to=%gw_b_name, "Opening channel with {sats_per_side} sats on each side...");
988 tokio::task::spawn(async move {
989 let res = poll_with_timeout(&format!("Open channel from {gw_a_name} to {gw_b_name}"), Duration::from_secs(30), || async {
991 gw_a.open_channel(&gw_b, sats_per_side * 2, Some(sats_per_side)).await.map_err(ControlFlow::Continue)
992 })
993 .await;
994
995 if res.is_err() {
996 error!(target: LOG_DEVIMINT, from=%gw_a_name, to=%gw_b_name, ?res, "Failed to open channel");
997 gw_a.dump_logs()?;
998 gw_b.dump_logs()?;
999 }
1000
1001 res
1002 })
1003 })
1004 .collect::<Vec<_>>();
1005 let open_channel_task_results: Vec<Result<Result<_, _>, _>> =
1006 futures::future::join_all(open_channel_tasks).await;
1007
1008 let mut channel_funding_txids = Vec::new();
1009 for open_channel_task_result in open_channel_task_results {
1010 match open_channel_task_result {
1011 Ok(Ok(txid)) => {
1012 channel_funding_txids.push(txid);
1013 }
1014 Ok(Err(e)) => {
1015 return Err(anyhow::anyhow!(e));
1016 }
1017 Err(e) => {
1018 return Err(anyhow::anyhow!(e));
1019 }
1020 }
1021 }
1022
1023 let mut is_missing_any_txids = false;
1025 for txid_or in &channel_funding_txids {
1026 if let Some(txid) = txid_or {
1027 bitcoind.poll_get_transaction(*txid).await?;
1028 } else {
1029 is_missing_any_txids = true;
1030 }
1031 }
1032
1033 if is_missing_any_txids {
1037 fedimint_core::runtime::sleep(Duration::from_secs(2)).await;
1038 }
1039
1040 bitcoind.mine_blocks(10).await?;
1041
1042 let block_height = bitcoind.get_block_count().await? - 1;
1043 debug!(target: LOG_DEVIMINT, ?block_height, "Syncing gateway lightning nodes to block height...");
1044 futures::future::try_join_all(
1045 gateways
1046 .iter()
1047 .map(|(gw, _gw_name)| gw.wait_for_block_height(block_height)),
1048 )
1049 .await?;
1050
1051 for ((gw_a, _gw_a_name), (gw_b, _gw_b_name)) in &gateway_pairs {
1052 let gw_a_node_pubkey = gw_a.lightning_pubkey().await?;
1053 let gw_b_node_pubkey = gw_b.lightning_pubkey().await?;
1054
1055 wait_for_ready_channel_on_gateway_with_counterparty(gw_b, gw_a_node_pubkey).await?;
1056 wait_for_ready_channel_on_gateway_with_counterparty(gw_a, gw_b_node_pubkey).await?;
1057 }
1058
1059 Ok(())
1060}
1061
1062async fn wait_for_ready_channel_on_gateway_with_counterparty(
1063 gw: &Gatewayd,
1064 counterparty_lightning_node_pubkey: bitcoin::secp256k1::PublicKey,
1065) -> anyhow::Result<()> {
1066 poll(
1067 &format!("Wait for {} channel update", gw.gw_name),
1068 || async {
1069 let channels = gw
1070 .list_active_channels()
1071 .await
1072 .context("list channels")
1073 .map_err(ControlFlow::Break)?;
1074
1075 if channels
1076 .iter()
1077 .any(|channel| channel.remote_pubkey == counterparty_lightning_node_pubkey)
1078 {
1079 return Ok(());
1080 }
1081
1082 debug!(target: LOG_DEVIMINT, ?channels, gw = gw.gw_name, "Counterparty channels not found open");
1083
1084 Err(ControlFlow::Continue(anyhow!("channel not found")))
1085 },
1086 )
1087 .await
1088}
1089
1090#[derive(Clone)]
1091pub enum LightningNode {
1092 Lnd(Lnd),
1093 Ldk { name: String },
1094}
1095
1096impl LightningNode {
1097 pub fn ln_type(&self) -> LightningNodeType {
1098 match self {
1099 LightningNode::Lnd(_) => LightningNodeType::Lnd,
1100 LightningNode::Ldk { name: _ } => LightningNodeType::Ldk,
1101 }
1102 }
1103}
1104
1105#[derive(Clone)]
1106pub struct Electrs {
1107 _process: ProcessHandle,
1108 _bitcoind: Bitcoind,
1109}
1110
1111impl Electrs {
1112 pub async fn new(process_mgr: &ProcessManager, bitcoind: Bitcoind) -> Result<Self> {
1113 debug!(target: LOG_DEVIMINT, "Starting electrs");
1114 let electrs_dir = process_mgr
1115 .globals
1116 .FM_ELECTRS_DIR
1117 .to_str()
1118 .context("non utf8 path")?;
1119
1120 let daemon_dir = &process_mgr.globals.FM_BTC_DIR.display();
1121
1122 let conf = format!(
1123 include_str!("cfg/electrs.toml"),
1124 rpc_port = process_mgr.globals.FM_PORT_BTC_RPC,
1125 p2p_port = process_mgr.globals.FM_PORT_BTC_P2P,
1126 electrs_port = process_mgr.globals.FM_PORT_ELECTRS,
1127 monitoring_port = process_mgr.globals.FM_PORT_ELECTRS_MONITORING,
1128 );
1129 debug!("electrs conf: {:?}", conf);
1130 write_overwrite_async(
1131 process_mgr.globals.FM_ELECTRS_DIR.join("electrs.toml"),
1132 conf,
1133 )
1134 .await?;
1135 let cmd = cmd!(
1136 crate::util::Electrs,
1137 "--conf-dir={electrs_dir}",
1138 "--db-dir={electrs_dir}",
1139 "--daemon-dir={daemon_dir}"
1140 );
1141 let process = process_mgr.spawn_daemon("electrs", cmd).await?;
1142 debug!(target: LOG_DEVIMINT, "Electrs ready");
1143
1144 Ok(Self {
1145 _bitcoind: bitcoind,
1146 _process: process,
1147 })
1148 }
1149}
1150
1151#[derive(Clone)]
1152pub struct Esplora {
1153 _process: ProcessHandle,
1154 _bitcoind: Bitcoind,
1155}
1156
1157impl Esplora {
1158 pub async fn new(process_mgr: &ProcessManager, bitcoind: Bitcoind) -> Result<Self> {
1159 debug!("Starting esplora");
1160 let daemon_dir = process_mgr
1161 .globals
1162 .FM_BTC_DIR
1163 .to_str()
1164 .context("non utf8 path")?;
1165 let esplora_dir = process_mgr
1166 .globals
1167 .FM_ESPLORA_DIR
1168 .to_str()
1169 .context("non utf8 path")?;
1170
1171 let btc_rpc_port = process_mgr.globals.FM_PORT_BTC_RPC;
1172 let esplora_port = process_mgr.globals.FM_PORT_ESPLORA;
1173 let esplora_monitoring_port = process_mgr.globals.FM_PORT_ESPLORA_MONITORING;
1174 let cmd = cmd!(
1176 crate::util::Esplora,
1177 "--daemon-dir={daemon_dir}",
1178 "--db-dir={esplora_dir}",
1179 "--cookie=bitcoin:bitcoin",
1180 "--network=regtest",
1181 "--daemon-rpc-addr=127.0.0.1:{btc_rpc_port}",
1182 "--http-addr=127.0.0.1:{esplora_port}",
1183 "--monitoring-addr=127.0.0.1:{esplora_monitoring_port}",
1184 "--jsonrpc-import", );
1186 let process = process_mgr.spawn_daemon("esplora", cmd).await?;
1187
1188 Self::wait_for_ready(process_mgr).await?;
1189 debug!(target: LOG_DEVIMINT, "Esplora ready");
1190
1191 Ok(Self {
1192 _bitcoind: bitcoind,
1193 _process: process,
1194 })
1195 }
1196
1197 async fn wait_for_ready(process_mgr: &ProcessManager) -> Result<()> {
1199 let client = esplora_client::Builder::new(&format!(
1200 "http://localhost:{}",
1201 process_mgr.globals.FM_PORT_ESPLORA
1202 ))
1203 .max_retries(0)
1205 .build_async()
1206 .expect("esplora client build failed");
1207
1208 poll("esplora server ready", || async {
1209 client
1210 .get_fee_estimates()
1211 .await
1212 .map_err(|e| ControlFlow::Continue(anyhow::anyhow!(e)))?;
1213
1214 Ok(())
1215 })
1216 .await?;
1217
1218 Ok(())
1219 }
1220}
1221
1222#[allow(unused)]
1223pub struct ExternalDaemons {
1224 pub bitcoind: Bitcoind,
1225 pub cln: Lightningd,
1226 pub lnd: Lnd,
1227 pub electrs: Electrs,
1228 pub esplora: Esplora,
1229}
1230
1231pub async fn external_daemons(process_mgr: &ProcessManager) -> Result<ExternalDaemons> {
1232 let start_time = fedimint_core::time::now();
1233 let bitcoind = Bitcoind::new(process_mgr, false).await?;
1234 let (cln, lnd, electrs, esplora) = tokio::try_join!(
1235 Lightningd::new(process_mgr, bitcoind.clone()),
1236 Lnd::new(process_mgr, bitcoind.clone()),
1237 Electrs::new(process_mgr, bitcoind.clone()),
1238 Esplora::new(process_mgr, bitcoind.clone()),
1239 )?;
1240 open_channel(process_mgr, &bitcoind, &cln, &lnd).await?;
1241 let _ = bitcoind.wallet_client().await?;
1243 info!(
1244 target: LOG_DEVIMINT,
1245 "starting base daemons took {:?}",
1246 start_time.elapsed()?
1247 );
1248 Ok(ExternalDaemons {
1249 bitcoind,
1250 cln,
1251 lnd,
1252 electrs,
1253 esplora,
1254 })
1255}