1use std::ops::Deref as _;
2use std::sync::Arc;
3
4use anyhow::Result;
5use fedimint_core::runtime;
6use fedimint_core::task::jit::{JitTry, JitTryAnyhow};
7use fedimint_logging::LOG_DEVIMINT;
8use tokio::join;
9use tracing::{debug, info};
10
11use crate::LightningNode;
12use crate::external::{
13 Bitcoind, Electrs, Esplora, Lnd, NamedGateway, open_channels_between_gateways,
14};
15use crate::federation::{Client, Federation};
16use crate::gatewayd::{Gatewayd, LdkChainSource};
17use crate::recurringd::Recurringd;
18use crate::util::{ProcessManager, supports_lnv2};
19
20async fn spawn_drop<T>(t: T)
21where
22 T: Send + 'static,
23{
24 runtime::spawn("spawn_drop", async {
25 drop(t);
26 })
27 .await
28 .expect("drop panic");
29}
30
31#[derive(Clone)]
32pub struct DevFed {
33 pub bitcoind: Bitcoind,
34 pub lnd: Lnd,
35 pub fed: Federation,
36 pub gw_lnd: Gatewayd,
37 pub gw_ldk: Gatewayd,
38 pub gw_ldk_second: Gatewayd,
39 pub electrs: Electrs,
40 pub esplora: Esplora,
41 pub recurringd: Recurringd,
42}
43
44impl DevFed {
45 pub async fn fast_terminate(self) {
46 let Self {
47 bitcoind,
48 lnd,
49 fed,
50 gw_lnd,
51 gw_ldk,
52 gw_ldk_second,
53 electrs,
54 esplora,
55 recurringd,
56 } = self;
57
58 join!(
59 spawn_drop(gw_lnd),
60 spawn_drop(gw_ldk),
61 spawn_drop(gw_ldk_second),
62 spawn_drop(fed),
63 spawn_drop(lnd),
64 spawn_drop(esplora),
65 spawn_drop(electrs),
66 spawn_drop(bitcoind),
67 spawn_drop(recurringd),
68 );
69 }
70}
71pub async fn dev_fed(process_mgr: &ProcessManager) -> Result<DevFed> {
72 DevJitFed::new(process_mgr, false, false)?
73 .to_dev_fed(process_mgr)
74 .await
75}
76
77type JitArc<T> = JitTryAnyhow<Arc<T>>;
78
79#[derive(Clone)]
80pub struct DevJitFed {
81 bitcoind: JitArc<Bitcoind>,
82 lnd: JitArc<Lnd>,
83 fed: JitArc<Federation>,
84 gw_lnd: JitArc<Gatewayd>,
85 gw_ldk: JitArc<Gatewayd>,
86 gw_ldk_second: JitArc<Gatewayd>,
87 electrs: JitArc<Electrs>,
88 esplora: JitArc<Esplora>,
89 recurringd: JitArc<Recurringd>,
90 start_time: std::time::SystemTime,
91 gw_lnd_registered: JitArc<()>,
92 gw_ldk_connected: JitArc<()>,
93 gw_ldk_second_connected: JitArc<()>,
94 fed_epoch_generated: JitArc<()>,
95 channel_opened: JitArc<()>,
96 recurringd_connected: JitArc<()>,
97
98 skip_setup: bool,
99 pre_dkg: bool,
100}
101
102impl DevJitFed {
103 pub fn new(process_mgr: &ProcessManager, skip_setup: bool, pre_dkg: bool) -> Result<DevJitFed> {
104 let fed_size = process_mgr.globals.FM_FED_SIZE;
105 let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
106 anyhow::ensure!(
107 fed_size > 3 * offline_nodes,
108 "too many offline nodes ({offline_nodes}) to reach consensus"
109 );
110 let start_time = fedimint_core::time::now();
111
112 debug!("Starting dev federation");
113
114 let bitcoind = JitTry::new_try({
115 let process_mgr = process_mgr.to_owned();
116 move || async move {
117 debug!(target: LOG_DEVIMINT, "Starting bitcoind...");
118 let start_time = fedimint_core::time::now();
119 let bitcoind = Bitcoind::new(&process_mgr, skip_setup).await?;
120 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started bitcoind");
121 Ok(Arc::new(bitcoind))
122 }
123 });
124 let lnd = JitTry::new_try({
125 let process_mgr = process_mgr.to_owned();
126 let bitcoind = bitcoind.clone();
127 || async move {
128 let bitcoind = bitcoind.get_try().await?.deref().clone();
129 debug!(target: LOG_DEVIMINT, "Starting lnd...");
130 let start_time = fedimint_core::time::now();
131 let lnd = Lnd::new(&process_mgr, bitcoind).await?;
132 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started lnd");
133 Ok(Arc::new(lnd))
134 }
135 });
136 let electrs = JitTryAnyhow::new_try({
137 let process_mgr = process_mgr.to_owned();
138 let bitcoind = bitcoind.clone();
139 || async move {
140 let bitcoind = bitcoind.get_try().await?.deref().clone();
141 debug!(target: LOG_DEVIMINT, "Starting electrs...");
142 let start_time = fedimint_core::time::now();
143 let electrs = Electrs::new(&process_mgr, bitcoind).await?;
144 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started electrs");
145 Ok(Arc::new(electrs))
146 }
147 });
148 let esplora = JitTryAnyhow::new_try({
149 let process_mgr = process_mgr.to_owned();
150 let bitcoind = bitcoind.clone();
151 || async move {
152 let bitcoind = bitcoind.get_try().await?.deref().clone();
153 debug!(target: LOG_DEVIMINT, "Starting esplora...");
154 let start_time = fedimint_core::time::now();
155 let esplora = Esplora::new(&process_mgr, bitcoind).await?;
156 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started esplora");
157 Ok(Arc::new(esplora))
158 }
159 });
160
161 let fed = JitTryAnyhow::new_try({
162 let process_mgr = process_mgr.to_owned();
163 let bitcoind = bitcoind.clone();
164 move || async move {
165 let bitcoind = bitcoind.get_try().await?.deref().clone();
166 debug!(target: LOG_DEVIMINT, "Starting federation...");
167 let start_time = fedimint_core::time::now();
168 let mut fed = Federation::new(
169 &process_mgr,
170 bitcoind,
171 skip_setup,
172 pre_dkg,
173 0,
174 "default".to_string(),
175 )
176 .await?;
177
178 fed.degrade_federation(&process_mgr).await?;
180
181 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started federation");
182
183 Ok(Arc::new(fed))
184 }
185 });
186
187 let gw_lnd = JitTryAnyhow::new_try({
188 let process_mgr = process_mgr.to_owned();
189 let lnd = lnd.clone();
190 || async move {
191 let lnd = lnd.get_try().await?.deref().clone();
192 debug!(target: LOG_DEVIMINT, "Starting lnd gateway...");
193 let start_time = fedimint_core::time::now();
194 let lnd_gw = Gatewayd::new(&process_mgr, LightningNode::Lnd(lnd)).await?;
195 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started lnd gateway");
196 Ok(Arc::new(lnd_gw))
197 }
198 });
199 let gw_lnd_registered = JitTryAnyhow::new_try({
200 let gw_lnd = gw_lnd.clone();
201 let fed = fed.clone();
202 move || async move {
203 let gw_lnd = gw_lnd.get_try().await?.deref();
204 let fed = fed.get_try().await?.deref();
205 debug!(target: LOG_DEVIMINT, "Registering lnd gateway...");
206 let start_time = fedimint_core::time::now();
207 if !skip_setup && !pre_dkg {
208 gw_lnd.connect_fed(fed).await?;
209 }
210 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Registered lnd gateway");
211 Ok(Arc::new(()))
212 }
213 });
214
215 let gw_ldk = JitTryAnyhow::new_try({
216 let process_mgr = process_mgr.to_owned();
217 move || async move {
218 debug!(target: LOG_DEVIMINT, "Starting ldk gateway...");
219 let start_time = fedimint_core::time::now();
220 let ldk_gw = Gatewayd::new(
221 &process_mgr,
222 LightningNode::Ldk {
223 name: "gatewayd-ldk-0".to_string(),
224 gw_port: process_mgr.globals.FM_PORT_GW_LDK,
225 ldk_port: process_mgr.globals.FM_PORT_LDK,
226 chain_source: LdkChainSource::Bitcoind,
227 },
228 )
229 .await?;
230 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started ldk gateway");
231 Ok(Arc::new(ldk_gw))
232 }
233 });
234 let gw_ldk_second = JitTryAnyhow::new_try({
235 let process_mgr = process_mgr.to_owned();
236 move || async move {
237 debug!(target: LOG_DEVIMINT, "Starting ldk gateway 2...");
238 let start_time = fedimint_core::time::now();
239 let ldk_gw2 = Gatewayd::new(
240 &process_mgr,
241 LightningNode::Ldk {
242 name: "gatewayd-ldk-1".to_string(),
243 gw_port: process_mgr.globals.FM_PORT_GW_LDK2,
244 ldk_port: process_mgr.globals.FM_PORT_LDK2,
245 chain_source: LdkChainSource::Bitcoind,
246 },
247 )
248 .await?;
249 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started ldk gateway 2");
250 Ok(Arc::new(ldk_gw2))
251 }
252 });
253 let gw_ldk_connected = JitTryAnyhow::new_try({
254 let gw_ldk = gw_ldk.clone();
255 let fed = fed.clone();
256 move || async move {
257 let gw_ldk = gw_ldk.get_try().await?.deref();
258 if supports_lnv2() {
259 let fed = fed.get_try().await?.deref();
260 debug!(target: LOG_DEVIMINT, "Registering ldk gateway...");
261 let start_time = fedimint_core::time::now();
262 if !skip_setup && !pre_dkg {
263 gw_ldk.connect_fed(fed).await?;
264 }
265 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected ldk gateway");
266 }
267 Ok(Arc::new(()))
268 }
269 });
270 let gw_ldk_second_connected = JitTryAnyhow::new_try({
271 let gw_ldk_second = gw_ldk_second.clone();
272 let fed = fed.clone();
273 move || async move {
274 let gw_ldk2 = gw_ldk_second.get_try().await?.deref();
275 if supports_lnv2() {
276 let fed = fed.get_try().await?.deref();
277 debug!(target: LOG_DEVIMINT, "Registering ldk gateway 2...");
278 let start_time = fedimint_core::time::now();
279 if !skip_setup && !pre_dkg {
280 gw_ldk2.connect_fed(fed).await?;
281 }
282 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected ldk gateway 2");
283 }
284 Ok(Arc::new(()))
285 }
286 });
287
288 let channel_opened = JitTryAnyhow::new_try({
289 let gw_lnd = gw_lnd.clone();
290 let gw_ldk = gw_ldk.clone();
291 let gw_ldk_second = gw_ldk_second.clone();
292 let bitcoind = bitcoind.clone();
293 move || async move {
294 let bitcoind = bitcoind.get_try().await?.deref().clone();
298
299 let gw_ldk_second = gw_ldk_second.get_try().await?.deref();
300 let gw_ldk = gw_ldk.get_try().await?.deref();
301 let gw_lnd = gw_lnd.get_try().await?.deref();
302 let gateways: &[NamedGateway<'_>] =
303 &[(gw_ldk_second, "LDK2"), (gw_lnd, "LND"), (gw_ldk, "LDK")];
304
305 debug!(target: LOG_DEVIMINT, "Opening channels between gateways...");
306 let start_time = fedimint_core::time::now();
307 open_channels_between_gateways(&bitcoind, gateways).await?;
308 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Opened channels between gateways");
309
310 Ok(Arc::new(()))
311 }
312 });
313
314 let fed_epoch_generated = JitTryAnyhow::new_try({
315 let fed = fed.clone();
316 move || async move {
317 let fed = fed.get_try().await?.deref().clone();
318 debug!(target: LOG_DEVIMINT, "Generating federation epoch...");
319 let start_time = fedimint_core::time::now();
320 if !skip_setup && !pre_dkg {
321 fed.mine_then_wait_blocks_sync(10).await?;
322 }
323 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Generated federation epoch");
324 Ok(Arc::new(()))
325 }
326 });
327
328 let recurringd = JitTryAnyhow::new_try({
329 let process_mgr = process_mgr.to_owned();
330 move || async move {
331 debug!(target: LOG_DEVIMINT, "Starting recurringd...");
332 let start_time = fedimint_core::time::now();
333 let recurringd = Recurringd::new(&process_mgr).await?;
334 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started recurringd");
335 Ok(Arc::new(recurringd))
336 }
337 });
338
339 let recurringd_connected = JitTryAnyhow::new_try({
340 let recurringd = recurringd.clone();
341 let fed = fed.clone();
342 move || async move {
343 let recurringd = recurringd.get_try().await?.deref();
344 let fed = fed.get_try().await?.deref();
345 debug!(target: LOG_DEVIMINT, "Connecting recurringd to federation...");
346 let start_time = fedimint_core::time::now();
347 if !skip_setup && !pre_dkg {
348 let invite_code = fed.invite_code()?;
349 recurringd.add_federation(&invite_code).await?;
350 }
351 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected recurringd to federation");
352 Ok(Arc::new(()))
353 }
354 });
355
356 Ok(DevJitFed {
357 bitcoind,
358 lnd,
359 fed,
360 gw_lnd,
361 gw_ldk,
362 gw_ldk_second,
363 electrs,
364 esplora,
365 recurringd,
366 start_time,
367 gw_lnd_registered,
368 gw_ldk_connected,
369 gw_ldk_second_connected,
370 fed_epoch_generated,
371 channel_opened,
372 recurringd_connected,
373 skip_setup,
374 pre_dkg,
375 })
376 }
377
378 pub async fn electrs(&self) -> anyhow::Result<&Electrs> {
379 Ok(self.electrs.get_try().await?.deref())
380 }
381 pub async fn esplora(&self) -> anyhow::Result<&Esplora> {
382 Ok(self.esplora.get_try().await?.deref())
383 }
384 pub async fn lnd(&self) -> anyhow::Result<&Lnd> {
385 Ok(self.lnd.get_try().await?.deref())
386 }
387 pub async fn gw_lnd(&self) -> anyhow::Result<&Gatewayd> {
388 Ok(self.gw_lnd.get_try().await?.deref())
389 }
390 pub async fn gw_lnd_registered(&self) -> anyhow::Result<&Gatewayd> {
391 self.gw_lnd_registered.get_try().await?;
392 Ok(self.gw_lnd.get_try().await?.deref())
393 }
394 pub async fn gw_ldk(&self) -> anyhow::Result<&Gatewayd> {
395 Ok(self.gw_ldk.get_try().await?.deref())
396 }
397 pub async fn gw_ldk_second(&self) -> anyhow::Result<&Gatewayd> {
398 Ok(self.gw_ldk_second.get_try().await?.deref())
399 }
400 pub async fn gw_ldk_connected(&self) -> anyhow::Result<&Gatewayd> {
401 self.gw_ldk_connected.get_try().await?;
402 Ok(self.gw_ldk.get_try().await?.deref())
403 }
404 pub async fn gw_ldk_second_connected(&self) -> anyhow::Result<&Gatewayd> {
405 self.gw_ldk_second_connected.get_try().await?;
406 Ok(self.gw_ldk_second.get_try().await?.deref())
407 }
408 pub async fn fed(&self) -> anyhow::Result<&Federation> {
409 Ok(self.fed.get_try().await?.deref())
410 }
411 pub async fn bitcoind(&self) -> anyhow::Result<&Bitcoind> {
412 Ok(self.bitcoind.get_try().await?.deref())
413 }
414
415 pub async fn internal_client(&self) -> anyhow::Result<Client> {
416 Ok(self.fed().await?.internal_client().await?.clone())
417 }
418
419 pub async fn internal_client_gw_registered(&self) -> anyhow::Result<Client> {
422 self.fed().await?.await_gateways_registered().await?;
423 Ok(self.fed().await?.internal_client().await?.clone())
424 }
425
426 pub async fn recurringd(&self) -> anyhow::Result<&Recurringd> {
427 Ok(self.recurringd.get_try().await?.deref())
428 }
429
430 pub async fn recurringd_connected(&self) -> anyhow::Result<&Recurringd> {
431 self.recurringd_connected.get_try().await?;
432 Ok(self.recurringd.get_try().await?.deref())
433 }
434
435 pub async fn finalize(&self, process_mgr: &ProcessManager) -> anyhow::Result<()> {
436 let fed_size = process_mgr.globals.FM_FED_SIZE;
437 let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
438 anyhow::ensure!(
439 fed_size > 3 * offline_nodes,
440 "too many offline nodes ({offline_nodes}) to reach consensus"
441 );
442
443 if !self.pre_dkg && !self.skip_setup {
444 let _ = self.internal_client_gw_registered().await?;
445 }
446 let _ = self.channel_opened.get_try().await?;
447 let _ = self.gw_lnd_registered().await?;
448 let _ = self.gw_ldk_connected().await?;
449 let _ = self.gw_ldk_second_connected().await?;
450 let _ = self.lnd().await?;
451 let _ = self.electrs().await?;
452 let _ = self.esplora().await?;
453 let _ = self.recurringd_connected().await?;
454 let _ = self.fed_epoch_generated.get_try().await?;
455
456 debug!(
457 target: LOG_DEVIMINT,
458 fed_size,
459 offline_nodes,
460 elapsed_ms = %self.start_time.elapsed()?.as_millis(),
461 "Dev federation ready",
462 );
463 Ok(())
464 }
465
466 pub async fn to_dev_fed(self, process_mgr: &ProcessManager) -> anyhow::Result<DevFed> {
467 self.finalize(process_mgr).await?;
468 Ok(DevFed {
469 bitcoind: self.bitcoind().await?.to_owned(),
470 lnd: self.lnd().await?.to_owned(),
471 fed: self.fed().await?.to_owned(),
472 gw_lnd: self.gw_lnd().await?.to_owned(),
473 gw_ldk: self.gw_ldk().await?.to_owned(),
474 gw_ldk_second: self.gw_ldk_second().await?.to_owned(),
475 esplora: self.esplora().await?.to_owned(),
476 electrs: self.electrs().await?.to_owned(),
477 recurringd: self.recurringd().await?.to_owned(),
478 })
479 }
480
481 pub async fn fast_terminate(self) {
482 let Self {
483 bitcoind,
484 lnd,
485 fed,
486 gw_lnd,
487 electrs,
488 esplora,
489 gw_ldk,
490 gw_ldk_second,
491 recurringd,
492 ..
493 } = self;
494
495 join!(
496 spawn_drop(gw_lnd),
497 spawn_drop(gw_ldk),
498 spawn_drop(gw_ldk_second),
499 spawn_drop(fed),
500 spawn_drop(lnd),
501 spawn_drop(esplora),
502 spawn_drop(electrs),
503 spawn_drop(bitcoind),
504 spawn_drop(recurringd),
505 );
506 }
507}