1use std::ops::Deref as _;
2use std::sync::Arc;
3
4use anyhow::Result;
5use fedimint_core::runtime;
6use fedimint_core::task::jit::{JitTry, JitTryAnyhow};
7use fedimint_logging::LOG_DEVIMINT;
8use tokio::join;
9use tracing::{debug, info};
10
11use crate::LightningNode;
12use crate::external::{Bitcoind, Esplora, Lnd, NamedGateway, open_channels_between_gateways};
13use crate::federation::{Client, Federation};
14use crate::gatewayd::Gatewayd;
15use crate::recurringd::Recurringd;
16use crate::recurringdv2::Recurringdv2;
17use crate::util::{ProcessManager, supports_lnv2};
18
19async fn spawn_drop<T>(t: T)
20where
21 T: Send + 'static,
22{
23 runtime::spawn("spawn_drop", async {
24 drop(t);
25 })
26 .await
27 .expect("drop panic");
28}
29
30#[derive(Clone)]
31pub struct DevFed {
32 pub bitcoind: Bitcoind,
33 pub lnd: Lnd,
34 pub fed: Federation,
35 pub gw_lnd: Gatewayd,
36 pub gw_ldk: Gatewayd,
37 pub gw_ldk_second: Gatewayd,
38 pub esplora: Esplora,
39 pub recurringd: Recurringd,
40 pub recurringdv2: Recurringdv2,
41}
42
43impl DevFed {
44 pub async fn fast_terminate(self) {
45 let Self {
46 bitcoind,
47 lnd,
48 fed,
49 gw_lnd,
50 gw_ldk,
51 gw_ldk_second,
52 esplora,
53 recurringd,
54 recurringdv2,
55 } = self;
56
57 join!(
58 spawn_drop(gw_lnd),
59 spawn_drop(gw_ldk),
60 spawn_drop(gw_ldk_second),
61 spawn_drop(fed),
62 spawn_drop(lnd),
63 spawn_drop(esplora),
64 spawn_drop(bitcoind),
65 spawn_drop(recurringd),
66 spawn_drop(recurringdv2),
67 );
68 }
69}
70pub async fn dev_fed(process_mgr: &ProcessManager) -> Result<DevFed> {
71 DevJitFed::new(process_mgr, false, false)?
72 .to_dev_fed(process_mgr)
73 .await
74}
75
76type JitArc<T> = JitTryAnyhow<Arc<T>>;
77
78#[derive(Clone)]
79pub struct DevJitFed {
80 bitcoind: JitArc<Bitcoind>,
81 lnd: JitArc<Lnd>,
82 fed: JitArc<Federation>,
83 gw_lnd: JitArc<Gatewayd>,
84 gw_ldk: JitArc<Gatewayd>,
85 gw_ldk_second: JitArc<Gatewayd>,
86 esplora: JitArc<Esplora>,
87 recurringd: JitArc<Recurringd>,
88 recurringdv2: JitArc<Recurringdv2>,
89 start_time: std::time::SystemTime,
90 gw_lnd_registered: JitArc<()>,
91 gw_ldk_connected: JitArc<()>,
92 gw_ldk_second_connected: JitArc<()>,
93 fed_epoch_generated: JitArc<()>,
94 channel_opened: JitArc<()>,
95 recurringd_connected: JitArc<()>,
96
97 skip_setup: bool,
98 pre_dkg: bool,
99}
100
101impl DevJitFed {
102 pub fn new(process_mgr: &ProcessManager, skip_setup: bool, pre_dkg: bool) -> Result<DevJitFed> {
103 let fed_size = process_mgr.globals.FM_FED_SIZE;
104 let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
105 anyhow::ensure!(
106 fed_size > 3 * offline_nodes,
107 "too many offline nodes ({offline_nodes}) to reach consensus"
108 );
109 let start_time = fedimint_core::time::now();
110
111 debug!(target: LOG_DEVIMINT, %fed_size, %offline_nodes, "Starting dev federation");
112
113 let bitcoind = JitTry::new_try({
114 let process_mgr = process_mgr.to_owned();
115 move || async move {
116 debug!(target: LOG_DEVIMINT, "Starting bitcoind...");
117 let start_time = fedimint_core::time::now();
118 let bitcoind = Bitcoind::new(&process_mgr, skip_setup).await?;
119 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started bitcoind");
120 Ok(Arc::new(bitcoind))
121 }
122 });
123 let lnd = JitTry::new_try({
124 let process_mgr = process_mgr.to_owned();
125 let bitcoind = bitcoind.clone();
126 || async move {
127 let bitcoind = bitcoind.get_try().await?.deref().clone();
128 debug!(target: LOG_DEVIMINT, "Starting lnd...");
129 let start_time = fedimint_core::time::now();
130 let lnd = Lnd::new(&process_mgr, bitcoind).await?;
131 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started lnd");
132 Ok(Arc::new(lnd))
133 }
134 });
135 let esplora = JitTryAnyhow::new_try({
136 let process_mgr = process_mgr.to_owned();
137 let bitcoind = bitcoind.clone();
138 || async move {
139 let bitcoind = bitcoind.get_try().await?.deref().clone();
140 debug!(target: LOG_DEVIMINT, "Starting esplora...");
141 let start_time = fedimint_core::time::now();
142 let esplora = Esplora::new(&process_mgr, bitcoind).await?;
143 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started esplora");
144 Ok(Arc::new(esplora))
145 }
146 });
147
148 let fed = JitTryAnyhow::new_try({
149 let process_mgr = process_mgr.to_owned();
150 let bitcoind = bitcoind.clone();
151 move || async move {
152 let bitcoind = bitcoind.get_try().await?.deref().clone();
153 debug!(target: LOG_DEVIMINT, "Starting federation...");
154 let start_time = fedimint_core::time::now();
155 let mut fed = Federation::new(
156 &process_mgr,
157 bitcoind,
158 skip_setup,
159 pre_dkg,
160 0,
161 "default".to_string(),
162 )
163 .await?;
164
165 fed.degrade_federation(&process_mgr).await?;
167
168 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started federation");
169
170 Ok(Arc::new(fed))
171 }
172 });
173
174 let gw_lnd = JitTryAnyhow::new_try({
175 let process_mgr = process_mgr.to_owned();
176 let lnd = lnd.clone();
177 || async move {
178 let lnd = lnd.get_try().await?.deref().clone();
179 debug!(target: LOG_DEVIMINT, "Starting lnd gateway...");
180 let start_time = fedimint_core::time::now();
181 let lnd_gw = Gatewayd::new(&process_mgr, LightningNode::Lnd(lnd), 0).await?;
182 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started lnd gateway");
183 Ok(Arc::new(lnd_gw))
184 }
185 });
186 let gw_lnd_registered = JitTryAnyhow::new_try({
187 let gw_lnd = gw_lnd.clone();
188 let fed = fed.clone();
189 move || async move {
190 let gw_lnd = gw_lnd.get_try().await?.deref();
191 let fed = fed.get_try().await?.deref();
192 debug!(target: LOG_DEVIMINT, "Registering lnd gateway...");
193 let start_time = fedimint_core::time::now();
194 if !skip_setup && !pre_dkg {
195 gw_lnd.connect_fed(fed).await?;
196 }
197 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Registered lnd gateway");
198 Ok(Arc::new(()))
199 }
200 });
201
202 let gw_ldk = JitTryAnyhow::new_try({
203 let process_mgr = process_mgr.to_owned();
204 let bitcoind = bitcoind.clone();
205 move || async move {
206 bitcoind.get_try().await?;
207 debug!(target: LOG_DEVIMINT, "Starting ldk gateway...");
208 let start_time = fedimint_core::time::now();
209 let ldk_gw = Gatewayd::new(
210 &process_mgr,
211 LightningNode::Ldk {
212 name: "gatewayd-ldk-0".to_string(),
213 gw_port: process_mgr.globals.FM_PORT_GW_LDK,
214 ldk_port: process_mgr.globals.FM_PORT_LDK,
215 metrics_port: process_mgr.globals.FM_PORT_GW_LDK_METRICS,
216 },
217 1,
218 )
219 .await?;
220 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started ldk gateway");
221 Ok(Arc::new(ldk_gw))
222 }
223 });
224 let gw_ldk_second = JitTryAnyhow::new_try({
225 let process_mgr = process_mgr.to_owned();
226 let bitcoind = bitcoind.clone();
227 move || async move {
228 bitcoind.get_try().await?;
229 debug!(target: LOG_DEVIMINT, "Starting ldk gateway 2...");
230 let start_time = fedimint_core::time::now();
231 let ldk_gw2 = Gatewayd::new(
232 &process_mgr,
233 LightningNode::Ldk {
234 name: "gatewayd-ldk-1".to_string(),
235 gw_port: process_mgr.globals.FM_PORT_GW_LDK2,
236 ldk_port: process_mgr.globals.FM_PORT_LDK2,
237 metrics_port: process_mgr.globals.FM_PORT_GW_LDK2_METRICS,
238 },
239 2,
240 )
241 .await?;
242 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started ldk gateway 2");
243 Ok(Arc::new(ldk_gw2))
244 }
245 });
246 let gw_ldk_connected = JitTryAnyhow::new_try({
247 let gw_ldk = gw_ldk.clone();
248 let fed = fed.clone();
249 move || async move {
250 let gw_ldk = gw_ldk.get_try().await?.deref();
251 if supports_lnv2() {
252 let fed = fed.get_try().await?.deref();
253 let start_time = fedimint_core::time::now();
254 if !skip_setup && !pre_dkg {
255 debug!(target: LOG_DEVIMINT, "Registering ldk gateway...");
256 gw_ldk.connect_fed(fed).await?;
257 } else {
258 debug!(target: LOG_DEVIMINT, "Skipping registering ldk gateway");
259 }
260 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected ldk gateway");
261 }
262 Ok(Arc::new(()))
263 }
264 });
265 let gw_ldk_second_connected = JitTryAnyhow::new_try({
266 let gw_ldk_second = gw_ldk_second.clone();
267 let fed = fed.clone();
268 move || async move {
269 let gw_ldk2 = gw_ldk_second.get_try().await?.deref();
270 if supports_lnv2() {
271 let fed = fed.get_try().await?.deref();
272 debug!(target: LOG_DEVIMINT, "Registering ldk gateway 2...");
273 let start_time = fedimint_core::time::now();
274 if !skip_setup && !pre_dkg {
275 gw_ldk2.connect_fed(fed).await?;
276 }
277 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected ldk gateway 2");
278 }
279 Ok(Arc::new(()))
280 }
281 });
282
283 let fed_epoch_generated = JitTryAnyhow::new_try({
284 let fed = fed.clone();
285 move || async move {
286 let fed = fed.get_try().await?.deref().clone();
287 debug!(target: LOG_DEVIMINT, "Generating federation epoch...");
288 let start_time = fedimint_core::time::now();
289 if !skip_setup && !pre_dkg {
290 fed.mine_then_wait_blocks_sync(10).await?;
291 }
292 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Generated federation epoch");
293 Ok(Arc::new(()))
294 }
295 });
296
297 let channel_opened = JitTryAnyhow::new_try({
298 let gw_lnd = gw_lnd.clone();
299 let gw_ldk = gw_ldk.clone();
300 let gw_ldk_second = gw_ldk_second.clone();
301 let bitcoind = bitcoind.clone();
302 let fed_epoch_generated = fed_epoch_generated.clone();
303 move || async move {
304 let bitcoind = bitcoind.get_try().await?.deref().clone();
308
309 fed_epoch_generated.get_try().await?;
312
313 let gw_ldk_second = gw_ldk_second.get_try().await?.deref();
314 let gw_ldk = gw_ldk.get_try().await?.deref();
315 let gw_lnd = gw_lnd.get_try().await?.deref();
316 let gateways: &[NamedGateway<'_>] =
317 &[(gw_ldk_second, "LDK2"), (gw_lnd, "LND"), (gw_ldk, "LDK")];
318
319 debug!(target: LOG_DEVIMINT, "Opening channels between gateways...");
320 let start_time = fedimint_core::time::now();
321 open_channels_between_gateways(&bitcoind, gateways).await?;
322 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Opened channels between gateways");
323
324 Ok(Arc::new(()))
325 }
326 });
327
328 let recurringd = JitTryAnyhow::new_try({
329 let process_mgr = process_mgr.to_owned();
330 move || async move {
331 debug!(target: LOG_DEVIMINT, "Starting recurringd...");
332 let start_time = fedimint_core::time::now();
333 let recurringd = Recurringd::new(&process_mgr).await?;
334 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started recurringd");
335 Ok(Arc::new(recurringd))
336 }
337 });
338
339 let recurringd_connected = JitTryAnyhow::new_try({
340 let recurringd = recurringd.clone();
341 let fed = fed.clone();
342 move || async move {
343 let recurringd = recurringd.get_try().await?.deref();
344 let fed = fed.get_try().await?.deref();
345 debug!(target: LOG_DEVIMINT, "Connecting recurringd to federation...");
346 let start_time = fedimint_core::time::now();
347 if !skip_setup && !pre_dkg {
348 let invite_code = fed.invite_code()?;
349 recurringd.add_federation(&invite_code).await?;
350 }
351 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected recurringd to federation");
352 Ok(Arc::new(()))
353 }
354 });
355
356 let recurringdv2 = JitTryAnyhow::new_try({
357 let process_mgr = process_mgr.to_owned();
358 move || async move {
359 debug!(target: LOG_DEVIMINT, "Starting recurringdv2...");
360 let start_time = fedimint_core::time::now();
361 let recurringdv2 = Recurringdv2::new(&process_mgr).await?;
362 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started recurringdv2");
363 Ok(Arc::new(recurringdv2))
364 }
365 });
366
367 Ok(DevJitFed {
368 bitcoind,
369 lnd,
370 fed,
371 gw_lnd,
372 gw_ldk,
373 gw_ldk_second,
374 esplora,
375 recurringd,
376 recurringdv2,
377 start_time,
378 gw_lnd_registered,
379 gw_ldk_connected,
380 gw_ldk_second_connected,
381 fed_epoch_generated,
382 channel_opened,
383 recurringd_connected,
384 skip_setup,
385 pre_dkg,
386 })
387 }
388
389 pub async fn esplora(&self) -> anyhow::Result<&Esplora> {
390 Ok(self.esplora.get_try().await?.deref())
391 }
392 pub async fn lnd(&self) -> anyhow::Result<&Lnd> {
393 Ok(self.lnd.get_try().await?.deref())
394 }
395 pub async fn gw_lnd(&self) -> anyhow::Result<&Gatewayd> {
396 Ok(self.gw_lnd.get_try().await?.deref())
397 }
398 pub async fn gw_lnd_registered(&self) -> anyhow::Result<&Gatewayd> {
399 self.gw_lnd_registered.get_try().await?;
400 Ok(self.gw_lnd.get_try().await?.deref())
401 }
402 pub async fn gw_ldk(&self) -> anyhow::Result<&Gatewayd> {
403 Ok(self.gw_ldk.get_try().await?.deref())
404 }
405 pub async fn gw_ldk_second(&self) -> anyhow::Result<&Gatewayd> {
406 Ok(self.gw_ldk_second.get_try().await?.deref())
407 }
408 pub async fn gw_ldk_connected(&self) -> anyhow::Result<&Gatewayd> {
409 self.gw_ldk_connected.get_try().await?;
410 Ok(self.gw_ldk.get_try().await?.deref())
411 }
412 pub async fn gw_ldk_second_connected(&self) -> anyhow::Result<&Gatewayd> {
413 self.gw_ldk_second_connected.get_try().await?;
414 Ok(self.gw_ldk_second.get_try().await?.deref())
415 }
416 pub async fn fed(&self) -> anyhow::Result<&Federation> {
417 Ok(self.fed.get_try().await?.deref())
418 }
419 pub async fn bitcoind(&self) -> anyhow::Result<&Bitcoind> {
420 Ok(self.bitcoind.get_try().await?.deref())
421 }
422
423 pub async fn internal_client(&self) -> anyhow::Result<Client> {
424 Ok(self.fed().await?.internal_client().await?.clone())
425 }
426
427 pub async fn internal_client_gw_registered(&self) -> anyhow::Result<Client> {
430 self.fed().await?.await_gateways_registered().await?;
431 Ok(self.fed().await?.internal_client().await?.clone())
432 }
433
434 pub async fn recurringd(&self) -> anyhow::Result<&Recurringd> {
435 Ok(self.recurringd.get_try().await?.deref())
436 }
437
438 pub async fn recurringd_connected(&self) -> anyhow::Result<&Recurringd> {
439 self.recurringd_connected.get_try().await?;
440 Ok(self.recurringd.get_try().await?.deref())
441 }
442
443 pub async fn recurringdv2(&self) -> anyhow::Result<&Recurringdv2> {
444 Ok(self.recurringdv2.get_try().await?.deref())
445 }
446
447 pub async fn finalize(&self, process_mgr: &ProcessManager) -> anyhow::Result<()> {
448 let fed_size = process_mgr.globals.FM_FED_SIZE;
449 let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
450 anyhow::ensure!(
451 fed_size > 3 * offline_nodes,
452 "too many offline nodes ({offline_nodes}) to reach consensus"
453 );
454
455 if !self.pre_dkg && !self.skip_setup {
456 let _ = self.internal_client_gw_registered().await?;
457 }
458 let _ = self.channel_opened.get_try().await?;
459 let _ = self.gw_lnd_registered().await?;
460 let _ = self.gw_ldk_connected().await?;
461 let _ = self.gw_ldk_second_connected().await?;
462 let _ = self.lnd().await?;
463 let _ = self.esplora().await?;
464 let _ = self.recurringd_connected().await?;
465 let _ = self.recurringdv2().await?;
466 let _ = self.fed_epoch_generated.get_try().await?;
467
468 debug!(
469 target: LOG_DEVIMINT,
470 fed_size,
471 offline_nodes,
472 elapsed_ms = %self.start_time.elapsed()?.as_millis(),
473 "Dev federation ready",
474 );
475 Ok(())
476 }
477
478 pub async fn to_dev_fed(self, process_mgr: &ProcessManager) -> anyhow::Result<DevFed> {
479 self.finalize(process_mgr).await?;
480 Ok(DevFed {
481 bitcoind: self.bitcoind().await?.to_owned(),
482 lnd: self.lnd().await?.to_owned(),
483 fed: self.fed().await?.to_owned(),
484 gw_lnd: self.gw_lnd().await?.to_owned(),
485 gw_ldk: self.gw_ldk().await?.to_owned(),
486 gw_ldk_second: self.gw_ldk_second().await?.to_owned(),
487 esplora: self.esplora().await?.to_owned(),
488 recurringd: self.recurringd().await?.to_owned(),
489 recurringdv2: self.recurringdv2().await?.to_owned(),
490 })
491 }
492
493 pub async fn fast_terminate(self) {
494 let Self {
495 bitcoind,
496 lnd,
497 fed,
498 gw_lnd,
499 esplora,
500 gw_ldk,
501 gw_ldk_second,
502 recurringd,
503 recurringdv2,
504 ..
505 } = self;
506
507 join!(
508 spawn_drop(gw_lnd),
509 spawn_drop(gw_ldk),
510 spawn_drop(gw_ldk_second),
511 spawn_drop(fed),
512 spawn_drop(lnd),
513 spawn_drop(esplora),
514 spawn_drop(bitcoind),
515 spawn_drop(recurringd),
516 spawn_drop(recurringdv2),
517 );
518 }
519}