1use std::ops::Deref as _;
2use std::sync::Arc;
3
4use anyhow::Result;
5use fedimint_core::runtime;
6use fedimint_core::task::jit::{JitTry, JitTryAnyhow};
7use fedimint_logging::LOG_DEVIMINT;
8use tokio::join;
9use tracing::{debug, info};
10
11use crate::LightningNode;
12use crate::external::{Bitcoind, Esplora, Lnd, NamedGateway, open_channels_between_gateways};
13use crate::federation::{Client, Federation};
14use crate::gatewayd::Gatewayd;
15use crate::recurringd::Recurringd;
16use crate::recurringdv2::Recurringdv2;
17use crate::util::{ProcessManager, supports_lnv2};
18
19async fn spawn_drop<T>(t: T)
20where
21 T: Send + 'static,
22{
23 runtime::spawn("spawn_drop", async {
24 drop(t);
25 })
26 .await
27 .expect("drop panic");
28}
29
30#[derive(Clone)]
31pub struct DevFed {
32 pub bitcoind: Bitcoind,
33 pub lnd: Lnd,
34 pub fed: Federation,
35 pub gw_lnd: Gatewayd,
36 pub gw_ldk: Gatewayd,
37 pub gw_ldk_second: Gatewayd,
38 pub esplora: Esplora,
39 pub recurringd: Recurringd,
40 pub recurringdv2: Recurringdv2,
41}
42
43impl DevFed {
44 pub async fn fast_terminate(self) {
45 let Self {
46 bitcoind,
47 lnd,
48 fed,
49 gw_lnd,
50 gw_ldk,
51 gw_ldk_second,
52 esplora,
53 recurringd,
54 recurringdv2,
55 } = self;
56
57 join!(
58 spawn_drop(gw_lnd),
59 spawn_drop(gw_ldk),
60 spawn_drop(gw_ldk_second),
61 spawn_drop(fed),
62 spawn_drop(lnd),
63 spawn_drop(esplora),
64 spawn_drop(bitcoind),
65 spawn_drop(recurringd),
66 spawn_drop(recurringdv2),
67 );
68 }
69}
70pub async fn dev_fed(process_mgr: &ProcessManager) -> Result<DevFed> {
71 DevJitFed::new(process_mgr, false, false)?
72 .to_dev_fed(process_mgr)
73 .await
74}
75
76type JitArc<T> = JitTryAnyhow<Arc<T>>;
77
78#[derive(Clone)]
79pub struct DevJitFed {
80 bitcoind: JitArc<Bitcoind>,
81 lnd: JitArc<Lnd>,
82 fed: JitArc<Federation>,
83 gw_lnd: JitArc<Gatewayd>,
84 gw_ldk: JitArc<Gatewayd>,
85 gw_ldk_second: JitArc<Gatewayd>,
86 esplora: JitArc<Esplora>,
87 recurringd: JitArc<Recurringd>,
88 recurringdv2: JitArc<Recurringdv2>,
89 start_time: std::time::SystemTime,
90 gw_lnd_registered: JitArc<()>,
91 gw_ldk_connected: JitArc<()>,
92 gw_ldk_second_connected: JitArc<()>,
93 fed_epoch_generated: JitArc<()>,
94 channel_opened: JitArc<()>,
95 recurringd_connected: JitArc<()>,
96
97 skip_setup: bool,
98 pre_dkg: bool,
99}
100
101impl DevJitFed {
102 pub fn new(process_mgr: &ProcessManager, skip_setup: bool, pre_dkg: bool) -> Result<DevJitFed> {
103 let fed_size = process_mgr.globals.FM_FED_SIZE;
104 let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
105 anyhow::ensure!(
106 fed_size > 3 * offline_nodes,
107 "too many offline nodes ({offline_nodes}) to reach consensus"
108 );
109 let start_time = fedimint_core::time::now();
110
111 debug!(target: LOG_DEVIMINT, %fed_size, %offline_nodes, "Starting dev federation");
112
113 let bitcoind = JitTry::new_try({
114 let process_mgr = process_mgr.to_owned();
115 move || async move {
116 debug!(target: LOG_DEVIMINT, "Starting bitcoind...");
117 let start_time = fedimint_core::time::now();
118 let bitcoind = Bitcoind::new(&process_mgr, skip_setup).await?;
119 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started bitcoind");
120 Ok(Arc::new(bitcoind))
121 }
122 });
123 let lnd = JitTry::new_try({
124 let process_mgr = process_mgr.to_owned();
125 let bitcoind = bitcoind.clone();
126 || async move {
127 let bitcoind = bitcoind.get_try().await?.deref().clone();
128 debug!(target: LOG_DEVIMINT, "Starting lnd...");
129 let start_time = fedimint_core::time::now();
130 let lnd = Lnd::new(&process_mgr, bitcoind).await?;
131 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started lnd");
132 Ok(Arc::new(lnd))
133 }
134 });
135 let esplora = JitTryAnyhow::new_try({
136 let process_mgr = process_mgr.to_owned();
137 let bitcoind = bitcoind.clone();
138 || async move {
139 let bitcoind = bitcoind.get_try().await?.deref().clone();
140 debug!(target: LOG_DEVIMINT, "Starting esplora...");
141 let start_time = fedimint_core::time::now();
142 let esplora = Esplora::new(&process_mgr, bitcoind).await?;
143 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started esplora");
144 Ok(Arc::new(esplora))
145 }
146 });
147
148 let fed = JitTryAnyhow::new_try({
149 let process_mgr = process_mgr.to_owned();
150 let bitcoind = bitcoind.clone();
151 move || async move {
152 let bitcoind = bitcoind.get_try().await?.deref().clone();
153 debug!(target: LOG_DEVIMINT, "Starting federation...");
154 let start_time = fedimint_core::time::now();
155 let mut fed = Federation::new(
156 &process_mgr,
157 bitcoind,
158 skip_setup,
159 pre_dkg,
160 0,
161 "default".to_string(),
162 )
163 .await?;
164
165 fed.degrade_federation(&process_mgr).await?;
167
168 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started federation");
169
170 Ok(Arc::new(fed))
171 }
172 });
173
174 let gw_lnd = JitTryAnyhow::new_try({
175 let process_mgr = process_mgr.to_owned();
176 let lnd = lnd.clone();
177 || async move {
178 let lnd = lnd.get_try().await?.deref().clone();
179 debug!(target: LOG_DEVIMINT, "Starting lnd gateway...");
180 let start_time = fedimint_core::time::now();
181 let lnd_gw = Gatewayd::new(&process_mgr, LightningNode::Lnd(lnd), 0).await?;
182 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started lnd gateway");
183 Ok(Arc::new(lnd_gw))
184 }
185 });
186 let gw_lnd_registered = JitTryAnyhow::new_try({
187 let gw_lnd = gw_lnd.clone();
188 let fed = fed.clone();
189 move || async move {
190 let gw_lnd = gw_lnd.get_try().await?.deref();
191 let fed = fed.get_try().await?.deref();
192 debug!(target: LOG_DEVIMINT, "Registering lnd gateway...");
193 let start_time = fedimint_core::time::now();
194 if !skip_setup && !pre_dkg {
195 gw_lnd.connect_fed(fed).await?;
196 }
197 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Registered lnd gateway");
198 Ok(Arc::new(()))
199 }
200 });
201
202 let gw_ldk = JitTryAnyhow::new_try({
203 let process_mgr = process_mgr.to_owned();
204 let bitcoind = bitcoind.clone();
205 move || async move {
206 bitcoind.get_try().await?;
207 debug!(target: LOG_DEVIMINT, "Starting ldk gateway...");
208 let start_time = fedimint_core::time::now();
209 let ldk_gw = Gatewayd::new(
210 &process_mgr,
211 LightningNode::Ldk {
212 name: "gatewayd-ldk-0".to_string(),
213 gw_port: process_mgr.globals.FM_PORT_GW_LDK,
214 ldk_port: process_mgr.globals.FM_PORT_LDK,
215 },
216 1,
217 )
218 .await?;
219 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started ldk gateway");
220 Ok(Arc::new(ldk_gw))
221 }
222 });
223 let gw_ldk_second = JitTryAnyhow::new_try({
224 let process_mgr = process_mgr.to_owned();
225 let bitcoind = bitcoind.clone();
226 move || async move {
227 bitcoind.get_try().await?;
228 debug!(target: LOG_DEVIMINT, "Starting ldk gateway 2...");
229 let start_time = fedimint_core::time::now();
230 let ldk_gw2 = Gatewayd::new(
231 &process_mgr,
232 LightningNode::Ldk {
233 name: "gatewayd-ldk-1".to_string(),
234 gw_port: process_mgr.globals.FM_PORT_GW_LDK2,
235 ldk_port: process_mgr.globals.FM_PORT_LDK2,
236 },
237 2,
238 )
239 .await?;
240 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started ldk gateway 2");
241 Ok(Arc::new(ldk_gw2))
242 }
243 });
244 let gw_ldk_connected = JitTryAnyhow::new_try({
245 let gw_ldk = gw_ldk.clone();
246 let fed = fed.clone();
247 move || async move {
248 let gw_ldk = gw_ldk.get_try().await?.deref();
249 if supports_lnv2() {
250 let fed = fed.get_try().await?.deref();
251 let start_time = fedimint_core::time::now();
252 if !skip_setup && !pre_dkg {
253 debug!(target: LOG_DEVIMINT, "Registering ldk gateway...");
254 gw_ldk.connect_fed(fed).await?;
255 } else {
256 debug!(target: LOG_DEVIMINT, "Skipping registering ldk gateway");
257 }
258 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected ldk gateway");
259 }
260 Ok(Arc::new(()))
261 }
262 });
263 let gw_ldk_second_connected = JitTryAnyhow::new_try({
264 let gw_ldk_second = gw_ldk_second.clone();
265 let fed = fed.clone();
266 move || async move {
267 let gw_ldk2 = gw_ldk_second.get_try().await?.deref();
268 if supports_lnv2() {
269 let fed = fed.get_try().await?.deref();
270 debug!(target: LOG_DEVIMINT, "Registering ldk gateway 2...");
271 let start_time = fedimint_core::time::now();
272 if !skip_setup && !pre_dkg {
273 gw_ldk2.connect_fed(fed).await?;
274 }
275 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected ldk gateway 2");
276 }
277 Ok(Arc::new(()))
278 }
279 });
280
281 let fed_epoch_generated = JitTryAnyhow::new_try({
282 let fed = fed.clone();
283 move || async move {
284 let fed = fed.get_try().await?.deref().clone();
285 debug!(target: LOG_DEVIMINT, "Generating federation epoch...");
286 let start_time = fedimint_core::time::now();
287 if !skip_setup && !pre_dkg {
288 fed.mine_then_wait_blocks_sync(10).await?;
289 }
290 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Generated federation epoch");
291 Ok(Arc::new(()))
292 }
293 });
294
295 let channel_opened = JitTryAnyhow::new_try({
296 let gw_lnd = gw_lnd.clone();
297 let gw_ldk = gw_ldk.clone();
298 let gw_ldk_second = gw_ldk_second.clone();
299 let bitcoind = bitcoind.clone();
300 let fed_epoch_generated = fed_epoch_generated.clone();
301 move || async move {
302 let bitcoind = bitcoind.get_try().await?.deref().clone();
306
307 fed_epoch_generated.get_try().await?;
310
311 let gw_ldk_second = gw_ldk_second.get_try().await?.deref();
312 let gw_ldk = gw_ldk.get_try().await?.deref();
313 let gw_lnd = gw_lnd.get_try().await?.deref();
314 let gateways: &[NamedGateway<'_>] =
315 &[(gw_ldk_second, "LDK2"), (gw_lnd, "LND"), (gw_ldk, "LDK")];
316
317 debug!(target: LOG_DEVIMINT, "Opening channels between gateways...");
318 let start_time = fedimint_core::time::now();
319 open_channels_between_gateways(&bitcoind, gateways).await?;
320 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Opened channels between gateways");
321
322 Ok(Arc::new(()))
323 }
324 });
325
326 let recurringd = JitTryAnyhow::new_try({
327 let process_mgr = process_mgr.to_owned();
328 move || async move {
329 debug!(target: LOG_DEVIMINT, "Starting recurringd...");
330 let start_time = fedimint_core::time::now();
331 let recurringd = Recurringd::new(&process_mgr).await?;
332 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started recurringd");
333 Ok(Arc::new(recurringd))
334 }
335 });
336
337 let recurringd_connected = JitTryAnyhow::new_try({
338 let recurringd = recurringd.clone();
339 let fed = fed.clone();
340 move || async move {
341 let recurringd = recurringd.get_try().await?.deref();
342 let fed = fed.get_try().await?.deref();
343 debug!(target: LOG_DEVIMINT, "Connecting recurringd to federation...");
344 let start_time = fedimint_core::time::now();
345 if !skip_setup && !pre_dkg {
346 let invite_code = fed.invite_code()?;
347 recurringd.add_federation(&invite_code).await?;
348 }
349 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected recurringd to federation");
350 Ok(Arc::new(()))
351 }
352 });
353
354 let recurringdv2 = JitTryAnyhow::new_try({
355 let process_mgr = process_mgr.to_owned();
356 move || async move {
357 debug!(target: LOG_DEVIMINT, "Starting recurringdv2...");
358 let start_time = fedimint_core::time::now();
359 let recurringdv2 = Recurringdv2::new(&process_mgr).await?;
360 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started recurringdv2");
361 Ok(Arc::new(recurringdv2))
362 }
363 });
364
365 Ok(DevJitFed {
366 bitcoind,
367 lnd,
368 fed,
369 gw_lnd,
370 gw_ldk,
371 gw_ldk_second,
372 esplora,
373 recurringd,
374 recurringdv2,
375 start_time,
376 gw_lnd_registered,
377 gw_ldk_connected,
378 gw_ldk_second_connected,
379 fed_epoch_generated,
380 channel_opened,
381 recurringd_connected,
382 skip_setup,
383 pre_dkg,
384 })
385 }
386
387 pub async fn esplora(&self) -> anyhow::Result<&Esplora> {
388 Ok(self.esplora.get_try().await?.deref())
389 }
390 pub async fn lnd(&self) -> anyhow::Result<&Lnd> {
391 Ok(self.lnd.get_try().await?.deref())
392 }
393 pub async fn gw_lnd(&self) -> anyhow::Result<&Gatewayd> {
394 Ok(self.gw_lnd.get_try().await?.deref())
395 }
396 pub async fn gw_lnd_registered(&self) -> anyhow::Result<&Gatewayd> {
397 self.gw_lnd_registered.get_try().await?;
398 Ok(self.gw_lnd.get_try().await?.deref())
399 }
400 pub async fn gw_ldk(&self) -> anyhow::Result<&Gatewayd> {
401 Ok(self.gw_ldk.get_try().await?.deref())
402 }
403 pub async fn gw_ldk_second(&self) -> anyhow::Result<&Gatewayd> {
404 Ok(self.gw_ldk_second.get_try().await?.deref())
405 }
406 pub async fn gw_ldk_connected(&self) -> anyhow::Result<&Gatewayd> {
407 self.gw_ldk_connected.get_try().await?;
408 Ok(self.gw_ldk.get_try().await?.deref())
409 }
410 pub async fn gw_ldk_second_connected(&self) -> anyhow::Result<&Gatewayd> {
411 self.gw_ldk_second_connected.get_try().await?;
412 Ok(self.gw_ldk_second.get_try().await?.deref())
413 }
414 pub async fn fed(&self) -> anyhow::Result<&Federation> {
415 Ok(self.fed.get_try().await?.deref())
416 }
417 pub async fn bitcoind(&self) -> anyhow::Result<&Bitcoind> {
418 Ok(self.bitcoind.get_try().await?.deref())
419 }
420
421 pub async fn internal_client(&self) -> anyhow::Result<Client> {
422 Ok(self.fed().await?.internal_client().await?.clone())
423 }
424
425 pub async fn internal_client_gw_registered(&self) -> anyhow::Result<Client> {
428 self.fed().await?.await_gateways_registered().await?;
429 Ok(self.fed().await?.internal_client().await?.clone())
430 }
431
432 pub async fn recurringd(&self) -> anyhow::Result<&Recurringd> {
433 Ok(self.recurringd.get_try().await?.deref())
434 }
435
436 pub async fn recurringd_connected(&self) -> anyhow::Result<&Recurringd> {
437 self.recurringd_connected.get_try().await?;
438 Ok(self.recurringd.get_try().await?.deref())
439 }
440
441 pub async fn recurringdv2(&self) -> anyhow::Result<&Recurringdv2> {
442 Ok(self.recurringdv2.get_try().await?.deref())
443 }
444
445 pub async fn finalize(&self, process_mgr: &ProcessManager) -> anyhow::Result<()> {
446 let fed_size = process_mgr.globals.FM_FED_SIZE;
447 let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
448 anyhow::ensure!(
449 fed_size > 3 * offline_nodes,
450 "too many offline nodes ({offline_nodes}) to reach consensus"
451 );
452
453 if !self.pre_dkg && !self.skip_setup {
454 let _ = self.internal_client_gw_registered().await?;
455 }
456 let _ = self.channel_opened.get_try().await?;
457 let _ = self.gw_lnd_registered().await?;
458 let _ = self.gw_ldk_connected().await?;
459 let _ = self.gw_ldk_second_connected().await?;
460 let _ = self.lnd().await?;
461 let _ = self.esplora().await?;
462 let _ = self.recurringd_connected().await?;
463 let _ = self.recurringdv2().await?;
464 let _ = self.fed_epoch_generated.get_try().await?;
465
466 debug!(
467 target: LOG_DEVIMINT,
468 fed_size,
469 offline_nodes,
470 elapsed_ms = %self.start_time.elapsed()?.as_millis(),
471 "Dev federation ready",
472 );
473 Ok(())
474 }
475
476 pub async fn to_dev_fed(self, process_mgr: &ProcessManager) -> anyhow::Result<DevFed> {
477 self.finalize(process_mgr).await?;
478 Ok(DevFed {
479 bitcoind: self.bitcoind().await?.to_owned(),
480 lnd: self.lnd().await?.to_owned(),
481 fed: self.fed().await?.to_owned(),
482 gw_lnd: self.gw_lnd().await?.to_owned(),
483 gw_ldk: self.gw_ldk().await?.to_owned(),
484 gw_ldk_second: self.gw_ldk_second().await?.to_owned(),
485 esplora: self.esplora().await?.to_owned(),
486 recurringd: self.recurringd().await?.to_owned(),
487 recurringdv2: self.recurringdv2().await?.to_owned(),
488 })
489 }
490
491 pub async fn fast_terminate(self) {
492 let Self {
493 bitcoind,
494 lnd,
495 fed,
496 gw_lnd,
497 esplora,
498 gw_ldk,
499 gw_ldk_second,
500 recurringd,
501 recurringdv2,
502 ..
503 } = self;
504
505 join!(
506 spawn_drop(gw_lnd),
507 spawn_drop(gw_ldk),
508 spawn_drop(gw_ldk_second),
509 spawn_drop(fed),
510 spawn_drop(lnd),
511 spawn_drop(esplora),
512 spawn_drop(bitcoind),
513 spawn_drop(recurringd),
514 spawn_drop(recurringdv2),
515 );
516 }
517}