1use std::ops::Deref as _;
2use std::sync::Arc;
3
4use anyhow::Result;
5use fedimint_core::runtime;
6use fedimint_core::task::jit::{JitTry, JitTryAnyhow};
7use fedimint_logging::LOG_DEVIMINT;
8use tokio::join;
9use tracing::{debug, info};
10
11use crate::LightningNode;
12use crate::external::{Bitcoind, Esplora, Lnd, NamedGateway, open_channels_between_gateways};
13use crate::federation::{Client, Federation};
14use crate::gatewayd::Gatewayd;
15use crate::recurringd::Recurringd;
16use crate::recurringdv2::Recurringdv2;
17use crate::util::{ProcessManager, supports_lnv2};
18
19async fn spawn_drop<T>(t: T)
20where
21 T: Send + 'static,
22{
23 runtime::spawn("spawn_drop", async {
24 drop(t);
25 })
26 .await
27 .expect("drop panic");
28}
29
30#[derive(Clone)]
31pub struct DevFed {
32 pub bitcoind: Bitcoind,
33 pub lnd: Lnd,
34 pub fed: Federation,
35 pub gw_lnd: Gatewayd,
36 pub gw_ldk: Gatewayd,
37 pub gw_ldk_second: Gatewayd,
38 pub esplora: Esplora,
39 pub recurringd: Recurringd,
40 pub recurringdv2: Recurringdv2,
41}
42
43impl DevFed {
44 pub async fn fast_terminate(self) {
45 let Self {
46 bitcoind,
47 lnd,
48 fed,
49 gw_lnd,
50 gw_ldk,
51 gw_ldk_second,
52 esplora,
53 recurringd,
54 recurringdv2,
55 } = self;
56
57 join!(
58 spawn_drop(gw_lnd),
59 spawn_drop(gw_ldk),
60 spawn_drop(gw_ldk_second),
61 spawn_drop(fed),
62 spawn_drop(lnd),
63 spawn_drop(esplora),
64 spawn_drop(bitcoind),
65 spawn_drop(recurringd),
66 spawn_drop(recurringdv2),
67 );
68 }
69}
70pub async fn dev_fed(process_mgr: &ProcessManager) -> Result<DevFed> {
71 DevJitFed::new(process_mgr, false, false)?
72 .to_dev_fed(process_mgr)
73 .await
74}
75
76type JitArc<T> = JitTryAnyhow<Arc<T>>;
77
78#[derive(Clone)]
79pub struct DevJitFed {
80 bitcoind: JitArc<Bitcoind>,
81 lnd: JitArc<Lnd>,
82 fed: JitArc<Federation>,
83 gw_lnd: JitArc<Gatewayd>,
84 gw_ldk: JitArc<Gatewayd>,
85 gw_ldk_second: JitArc<Gatewayd>,
86 esplora: JitArc<Esplora>,
87 recurringd: JitArc<Recurringd>,
88 recurringdv2: JitArc<Recurringdv2>,
89 start_time: std::time::SystemTime,
90 gw_lnd_registered: JitArc<()>,
91 gw_ldk_connected: JitArc<()>,
92 gw_ldk_second_connected: JitArc<()>,
93 fed_epoch_generated: JitArc<()>,
94 channel_opened: JitArc<()>,
95 recurringd_connected: JitArc<()>,
96
97 skip_setup: bool,
98 pre_dkg: bool,
99}
100
101impl DevJitFed {
102 pub fn new(process_mgr: &ProcessManager, skip_setup: bool, pre_dkg: bool) -> Result<DevJitFed> {
103 let fed_size = process_mgr.globals.FM_FED_SIZE;
104 let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
105 anyhow::ensure!(
106 fed_size > 3 * offline_nodes,
107 "too many offline nodes ({offline_nodes}) to reach consensus"
108 );
109 let start_time = fedimint_core::time::now();
110
111 debug!(target: LOG_DEVIMINT, %fed_size, %offline_nodes, "Starting dev federation");
112
113 let bitcoind = JitTry::new_try({
114 let process_mgr = process_mgr.to_owned();
115 move || async move {
116 debug!(target: LOG_DEVIMINT, "Starting bitcoind...");
117 let start_time = fedimint_core::time::now();
118 let bitcoind = Bitcoind::new(&process_mgr, skip_setup).await?;
119 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started bitcoind");
120 Ok(Arc::new(bitcoind))
121 }
122 });
123 let lnd = JitTry::new_try({
124 let process_mgr = process_mgr.to_owned();
125 let bitcoind = bitcoind.clone();
126 || async move {
127 let bitcoind = bitcoind.get_try().await?.deref().clone();
128 debug!(target: LOG_DEVIMINT, "Starting lnd...");
129 let start_time = fedimint_core::time::now();
130 let lnd = Lnd::new(&process_mgr, bitcoind).await?;
131 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started lnd");
132 Ok(Arc::new(lnd))
133 }
134 });
135 let esplora = JitTryAnyhow::new_try({
136 let process_mgr = process_mgr.to_owned();
137 let bitcoind = bitcoind.clone();
138 || async move {
139 let bitcoind = bitcoind.get_try().await?.deref().clone();
140 debug!(target: LOG_DEVIMINT, "Starting esplora...");
141 let start_time = fedimint_core::time::now();
142 let esplora = Esplora::new(&process_mgr, bitcoind).await?;
143 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started esplora");
144 Ok(Arc::new(esplora))
145 }
146 });
147
148 let fed = JitTryAnyhow::new_try({
149 let process_mgr = process_mgr.to_owned();
150 let bitcoind = bitcoind.clone();
151 move || async move {
152 let bitcoind = bitcoind.get_try().await?.deref().clone();
153 debug!(target: LOG_DEVIMINT, "Starting federation...");
154 let start_time = fedimint_core::time::now();
155 let mut fed = Federation::new(
156 &process_mgr,
157 bitcoind,
158 skip_setup,
159 pre_dkg,
160 0,
161 "default".to_string(),
162 )
163 .await?;
164
165 fed.degrade_federation(&process_mgr).await?;
167
168 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started federation");
169
170 Ok(Arc::new(fed))
171 }
172 });
173
174 let gw_lnd = JitTryAnyhow::new_try({
175 let process_mgr = process_mgr.to_owned();
176 let lnd = lnd.clone();
177 || async move {
178 let lnd = lnd.get_try().await?.deref().clone();
179 debug!(target: LOG_DEVIMINT, "Starting lnd gateway...");
180 let start_time = fedimint_core::time::now();
181 let lnd_gw = Gatewayd::new(&process_mgr, LightningNode::Lnd(lnd), 0).await?;
182 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started lnd gateway");
183 Ok(Arc::new(lnd_gw))
184 }
185 });
186 let gw_lnd_registered = JitTryAnyhow::new_try({
187 let gw_lnd = gw_lnd.clone();
188 let fed = fed.clone();
189 move || async move {
190 let gw_lnd = gw_lnd.get_try().await?.deref();
191 let fed = fed.get_try().await?.deref();
192 debug!(target: LOG_DEVIMINT, "Registering lnd gateway...");
193 let start_time = fedimint_core::time::now();
194 if !skip_setup && !pre_dkg {
195 let invite = fed.invite_code()?;
196 gw_lnd.client().connect_fed(invite).await?;
197 }
198 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Registered lnd gateway");
199 Ok(Arc::new(()))
200 }
201 });
202
203 let gw_ldk = JitTryAnyhow::new_try({
204 let process_mgr = process_mgr.to_owned();
205 let bitcoind = bitcoind.clone();
206 move || async move {
207 bitcoind.get_try().await?;
208 debug!(target: LOG_DEVIMINT, "Starting ldk gateway...");
209 let start_time = fedimint_core::time::now();
210 let ldk_gw = Gatewayd::new(
211 &process_mgr,
212 LightningNode::Ldk {
213 name: "gatewayd-ldk-0".to_string(),
214 gw_port: process_mgr.globals.FM_PORT_GW_LDK,
215 ldk_port: process_mgr.globals.FM_PORT_LDK,
216 metrics_port: process_mgr.globals.FM_PORT_GW_LDK_METRICS,
217 },
218 1,
219 )
220 .await?;
221 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started ldk gateway");
222 Ok(Arc::new(ldk_gw))
223 }
224 });
225 let gw_ldk_second = JitTryAnyhow::new_try({
226 let process_mgr = process_mgr.to_owned();
227 let bitcoind = bitcoind.clone();
228 move || async move {
229 bitcoind.get_try().await?;
230 debug!(target: LOG_DEVIMINT, "Starting ldk gateway 2...");
231 let start_time = fedimint_core::time::now();
232 let ldk_gw2 = Gatewayd::new(
233 &process_mgr,
234 LightningNode::Ldk {
235 name: "gatewayd-ldk-1".to_string(),
236 gw_port: process_mgr.globals.FM_PORT_GW_LDK2,
237 ldk_port: process_mgr.globals.FM_PORT_LDK2,
238 metrics_port: process_mgr.globals.FM_PORT_GW_LDK2_METRICS,
239 },
240 2,
241 )
242 .await?;
243 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started ldk gateway 2");
244 Ok(Arc::new(ldk_gw2))
245 }
246 });
247 let gw_ldk_connected = JitTryAnyhow::new_try({
248 let gw_ldk = gw_ldk.clone();
249 let fed = fed.clone();
250 move || async move {
251 let gw_ldk = gw_ldk.get_try().await?.deref();
252 if supports_lnv2() {
253 let fed = fed.get_try().await?.deref();
254 let start_time = fedimint_core::time::now();
255 if !skip_setup && !pre_dkg {
256 debug!(target: LOG_DEVIMINT, "Registering ldk gateway...");
257 let invite = fed.invite_code()?;
258 gw_ldk.client().connect_fed(invite).await?;
259 } else {
260 debug!(target: LOG_DEVIMINT, "Skipping registering ldk gateway");
261 }
262 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected ldk gateway");
263 }
264 Ok(Arc::new(()))
265 }
266 });
267 let gw_ldk_second_connected = JitTryAnyhow::new_try({
268 let gw_ldk_second = gw_ldk_second.clone();
269 let fed = fed.clone();
270 move || async move {
271 let gw_ldk2 = gw_ldk_second.get_try().await?.deref();
272 if supports_lnv2() {
273 let fed = fed.get_try().await?.deref();
274 debug!(target: LOG_DEVIMINT, "Registering ldk gateway 2...");
275 let start_time = fedimint_core::time::now();
276 if !skip_setup && !pre_dkg {
277 let invite = fed.invite_code()?;
278 gw_ldk2.client().connect_fed(invite).await?;
279 }
280 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected ldk gateway 2");
281 }
282 Ok(Arc::new(()))
283 }
284 });
285
286 let fed_epoch_generated = JitTryAnyhow::new_try({
287 let fed = fed.clone();
288 move || async move {
289 let fed = fed.get_try().await?.deref().clone();
290 debug!(target: LOG_DEVIMINT, "Generating federation epoch...");
291 let start_time = fedimint_core::time::now();
292 if !skip_setup && !pre_dkg {
293 fed.mine_then_wait_blocks_sync(10).await?;
294 }
295 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Generated federation epoch");
296 Ok(Arc::new(()))
297 }
298 });
299
300 let channel_opened = JitTryAnyhow::new_try({
301 let gw_lnd = gw_lnd.clone();
302 let gw_ldk = gw_ldk.clone();
303 let gw_ldk_second = gw_ldk_second.clone();
304 let bitcoind = bitcoind.clone();
305 let fed_epoch_generated = fed_epoch_generated.clone();
306 move || async move {
307 let bitcoind = bitcoind.get_try().await?.deref().clone();
311
312 fed_epoch_generated.get_try().await?;
315
316 let gw_ldk_second = gw_ldk_second.get_try().await?.deref();
317 let gw_ldk = gw_ldk.get_try().await?.deref();
318 let gw_lnd = gw_lnd.get_try().await?.deref();
319 let gateways: &[NamedGateway<'_>] =
320 &[(gw_ldk_second, "LDK2"), (gw_lnd, "LND"), (gw_ldk, "LDK")];
321
322 debug!(target: LOG_DEVIMINT, "Opening channels between gateways...");
323 let start_time = fedimint_core::time::now();
324 open_channels_between_gateways(&bitcoind, gateways).await?;
325 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Opened channels between gateways");
326
327 Ok(Arc::new(()))
328 }
329 });
330
331 let recurringd = JitTryAnyhow::new_try({
332 let process_mgr = process_mgr.to_owned();
333 move || async move {
334 debug!(target: LOG_DEVIMINT, "Starting recurringd...");
335 let start_time = fedimint_core::time::now();
336 let recurringd = Recurringd::new(&process_mgr).await?;
337 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started recurringd");
338 Ok(Arc::new(recurringd))
339 }
340 });
341
342 let recurringd_connected = JitTryAnyhow::new_try({
343 let recurringd = recurringd.clone();
344 let fed = fed.clone();
345 move || async move {
346 let recurringd = recurringd.get_try().await?.deref();
347 let fed = fed.get_try().await?.deref();
348 debug!(target: LOG_DEVIMINT, "Connecting recurringd to federation...");
349 let start_time = fedimint_core::time::now();
350 if !skip_setup && !pre_dkg {
351 let invite_code = fed.invite_code()?;
352 recurringd.add_federation(&invite_code).await?;
353 }
354 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected recurringd to federation");
355 Ok(Arc::new(()))
356 }
357 });
358
359 let recurringdv2 = JitTryAnyhow::new_try({
360 let process_mgr = process_mgr.to_owned();
361 move || async move {
362 debug!(target: LOG_DEVIMINT, "Starting recurringdv2...");
363 let start_time = fedimint_core::time::now();
364 let recurringdv2 = Recurringdv2::new(&process_mgr).await?;
365 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started recurringdv2");
366 Ok(Arc::new(recurringdv2))
367 }
368 });
369
370 Ok(DevJitFed {
371 bitcoind,
372 lnd,
373 fed,
374 gw_lnd,
375 gw_ldk,
376 gw_ldk_second,
377 esplora,
378 recurringd,
379 recurringdv2,
380 start_time,
381 gw_lnd_registered,
382 gw_ldk_connected,
383 gw_ldk_second_connected,
384 fed_epoch_generated,
385 channel_opened,
386 recurringd_connected,
387 skip_setup,
388 pre_dkg,
389 })
390 }
391
392 pub async fn esplora(&self) -> anyhow::Result<&Esplora> {
393 Ok(self.esplora.get_try().await?.deref())
394 }
395 pub async fn lnd(&self) -> anyhow::Result<&Lnd> {
396 Ok(self.lnd.get_try().await?.deref())
397 }
398 pub async fn gw_lnd(&self) -> anyhow::Result<&Gatewayd> {
399 Ok(self.gw_lnd.get_try().await?.deref())
400 }
401 pub async fn gw_lnd_registered(&self) -> anyhow::Result<&Gatewayd> {
402 self.gw_lnd_registered.get_try().await?;
403 Ok(self.gw_lnd.get_try().await?.deref())
404 }
405 pub async fn gw_ldk(&self) -> anyhow::Result<&Gatewayd> {
406 Ok(self.gw_ldk.get_try().await?.deref())
407 }
408 pub async fn gw_ldk_second(&self) -> anyhow::Result<&Gatewayd> {
409 Ok(self.gw_ldk_second.get_try().await?.deref())
410 }
411 pub async fn gw_ldk_connected(&self) -> anyhow::Result<&Gatewayd> {
412 self.gw_ldk_connected.get_try().await?;
413 Ok(self.gw_ldk.get_try().await?.deref())
414 }
415 pub async fn gw_ldk_second_connected(&self) -> anyhow::Result<&Gatewayd> {
416 self.gw_ldk_second_connected.get_try().await?;
417 Ok(self.gw_ldk_second.get_try().await?.deref())
418 }
419 pub async fn fed(&self) -> anyhow::Result<&Federation> {
420 Ok(self.fed.get_try().await?.deref())
421 }
422 pub async fn bitcoind(&self) -> anyhow::Result<&Bitcoind> {
423 Ok(self.bitcoind.get_try().await?.deref())
424 }
425
426 pub async fn internal_client(&self) -> anyhow::Result<Client> {
427 Ok(self.fed().await?.internal_client().await?.clone())
428 }
429
430 pub async fn internal_client_gw_registered(&self) -> anyhow::Result<Client> {
433 self.fed().await?.await_gateways_registered().await?;
434 Ok(self.fed().await?.internal_client().await?.clone())
435 }
436
437 pub async fn recurringd(&self) -> anyhow::Result<&Recurringd> {
438 Ok(self.recurringd.get_try().await?.deref())
439 }
440
441 pub async fn recurringd_connected(&self) -> anyhow::Result<&Recurringd> {
442 self.recurringd_connected.get_try().await?;
443 Ok(self.recurringd.get_try().await?.deref())
444 }
445
446 pub async fn recurringdv2(&self) -> anyhow::Result<&Recurringdv2> {
447 Ok(self.recurringdv2.get_try().await?.deref())
448 }
449
450 pub async fn finalize(&self, process_mgr: &ProcessManager) -> anyhow::Result<()> {
451 let fed_size = process_mgr.globals.FM_FED_SIZE;
452 let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
453 anyhow::ensure!(
454 fed_size > 3 * offline_nodes,
455 "too many offline nodes ({offline_nodes}) to reach consensus"
456 );
457
458 if !self.pre_dkg && !self.skip_setup {
459 let _ = self.internal_client_gw_registered().await?;
460 }
461 let _ = self.channel_opened.get_try().await?;
462 let _ = self.gw_lnd_registered().await?;
463 let _ = self.gw_ldk_connected().await?;
464 let _ = self.gw_ldk_second_connected().await?;
465 let _ = self.lnd().await?;
466 let _ = self.esplora().await?;
467 let _ = self.recurringd_connected().await?;
468 let _ = self.recurringdv2().await?;
469 let _ = self.fed_epoch_generated.get_try().await?;
470
471 debug!(
472 target: LOG_DEVIMINT,
473 fed_size,
474 offline_nodes,
475 elapsed_ms = %self.start_time.elapsed()?.as_millis(),
476 "Dev federation ready",
477 );
478 Ok(())
479 }
480
481 pub async fn to_dev_fed(self, process_mgr: &ProcessManager) -> anyhow::Result<DevFed> {
482 self.finalize(process_mgr).await?;
483 Ok(DevFed {
484 bitcoind: self.bitcoind().await?.to_owned(),
485 lnd: self.lnd().await?.to_owned(),
486 fed: self.fed().await?.to_owned(),
487 gw_lnd: self.gw_lnd().await?.to_owned(),
488 gw_ldk: self.gw_ldk().await?.to_owned(),
489 gw_ldk_second: self.gw_ldk_second().await?.to_owned(),
490 esplora: self.esplora().await?.to_owned(),
491 recurringd: self.recurringd().await?.to_owned(),
492 recurringdv2: self.recurringdv2().await?.to_owned(),
493 })
494 }
495
496 pub async fn fast_terminate(self) {
497 let Self {
498 bitcoind,
499 lnd,
500 fed,
501 gw_lnd,
502 esplora,
503 gw_ldk,
504 gw_ldk_second,
505 recurringd,
506 recurringdv2,
507 ..
508 } = self;
509
510 join!(
511 spawn_drop(gw_lnd),
512 spawn_drop(gw_ldk),
513 spawn_drop(gw_ldk_second),
514 spawn_drop(fed),
515 spawn_drop(lnd),
516 spawn_drop(esplora),
517 spawn_drop(bitcoind),
518 spawn_drop(recurringd),
519 spawn_drop(recurringdv2),
520 );
521 }
522}