1use std::ops::Deref as _;
2use std::sync::Arc;
3
4use anyhow::Result;
5use fedimint_core::runtime;
6use fedimint_core::task::jit::{JitTry, JitTryAnyhow};
7use fedimint_logging::LOG_DEVIMINT;
8use tokio::join;
9use tracing::{debug, info};
10
11use crate::LightningNode;
12use crate::external::{Bitcoind, Esplora, Lnd, NamedGateway, open_channels_between_gateways};
13use crate::federation::{Client, Federation};
14use crate::gatewayd::Gatewayd;
15use crate::recurringd::Recurringd;
16use crate::recurringdv2::Recurringdv2;
17use crate::util::{ProcessManager, supports_lnv2};
18
19async fn spawn_drop<T>(t: T)
20where
21 T: Send + 'static,
22{
23 runtime::spawn("spawn_drop", async {
24 drop(t);
25 })
26 .await
27 .expect("drop panic");
28}
29
30#[derive(Clone)]
31pub struct DevFed {
32 pub bitcoind: Bitcoind,
33 pub lnd: Lnd,
34 pub fed: Federation,
35 pub gw_lnd: Gatewayd,
36 pub gw_ldk: Gatewayd,
37 pub gw_ldk_second: Gatewayd,
38 pub esplora: Esplora,
39 pub recurringd: Recurringd,
40 pub recurringdv2: Recurringdv2,
41}
42
43impl DevFed {
44 pub async fn fast_terminate(self) {
45 let Self {
46 bitcoind,
47 lnd,
48 fed,
49 gw_lnd,
50 gw_ldk,
51 gw_ldk_second,
52 esplora,
53 recurringd,
54 recurringdv2,
55 } = self;
56
57 join!(
58 spawn_drop(gw_lnd),
59 spawn_drop(gw_ldk),
60 spawn_drop(gw_ldk_second),
61 spawn_drop(fed),
62 spawn_drop(lnd),
63 spawn_drop(esplora),
64 spawn_drop(bitcoind),
65 spawn_drop(recurringd),
66 spawn_drop(recurringdv2),
67 );
68 }
69}
70pub async fn dev_fed(process_mgr: &ProcessManager) -> Result<DevFed> {
71 DevJitFed::new(process_mgr, false, false)?
72 .to_dev_fed(process_mgr)
73 .await
74}
75
76type JitArc<T> = JitTryAnyhow<Arc<T>>;
77
78#[derive(Clone)]
79pub struct DevJitFed {
80 bitcoind: JitArc<Bitcoind>,
81 lnd: JitArc<Lnd>,
82 fed: JitArc<Federation>,
83 gw_lnd: JitArc<Gatewayd>,
84 gw_ldk: JitArc<Gatewayd>,
85 gw_ldk_second: JitArc<Gatewayd>,
86 esplora: JitArc<Esplora>,
87 recurringd: JitArc<Recurringd>,
88 recurringdv2: JitArc<Recurringdv2>,
89 start_time: std::time::SystemTime,
90 gw_lnd_registered: JitArc<()>,
91 gw_ldk_connected: JitArc<()>,
92 gw_ldk_second_connected: JitArc<()>,
93 fed_epoch_generated: JitArc<()>,
94 channel_opened: JitArc<()>,
95 recurringd_connected: JitArc<()>,
96
97 skip_setup: bool,
98 pre_dkg: bool,
99}
100
101impl DevJitFed {
102 pub fn new(process_mgr: &ProcessManager, skip_setup: bool, pre_dkg: bool) -> Result<DevJitFed> {
103 Self::new_with_pre_restore(process_mgr, skip_setup, pre_dkg, false)
104 }
105
106 pub fn new_with_pre_restore(
107 process_mgr: &ProcessManager,
108 skip_setup: bool,
109 pre_dkg: bool,
110 pre_restore: bool,
111 ) -> Result<DevJitFed> {
112 let fed_size = process_mgr.globals.FM_FED_SIZE;
113 let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
114 anyhow::ensure!(
115 fed_size > 3 * offline_nodes,
116 "too many offline nodes ({offline_nodes}) to reach consensus"
117 );
118 let start_time = fedimint_core::time::now();
119
120 debug!(target: LOG_DEVIMINT, %fed_size, %offline_nodes, "Starting dev federation");
121
122 let bitcoind = JitTry::new_try({
123 let process_mgr = process_mgr.to_owned();
124 move || async move {
125 debug!(target: LOG_DEVIMINT, "Starting bitcoind...");
126 let start_time = fedimint_core::time::now();
127 let bitcoind = Bitcoind::new(&process_mgr, skip_setup).await?;
128 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started bitcoind");
129 Ok(Arc::new(bitcoind))
130 }
131 });
132 let lnd = JitTry::new_try({
133 let process_mgr = process_mgr.to_owned();
134 let bitcoind = bitcoind.clone();
135 || async move {
136 let bitcoind = bitcoind.get_try().await?.deref().clone();
137 debug!(target: LOG_DEVIMINT, "Starting lnd...");
138 let start_time = fedimint_core::time::now();
139 let lnd = Lnd::new(&process_mgr, bitcoind).await?;
140 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started lnd");
141 Ok(Arc::new(lnd))
142 }
143 });
144 let esplora = JitTryAnyhow::new_try({
145 let process_mgr = process_mgr.to_owned();
146 let bitcoind = bitcoind.clone();
147 || async move {
148 let bitcoind = bitcoind.get_try().await?.deref().clone();
149 debug!(target: LOG_DEVIMINT, "Starting esplora...");
150 let start_time = fedimint_core::time::now();
151 let esplora = Esplora::new(&process_mgr, bitcoind).await?;
152 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started esplora");
153 Ok(Arc::new(esplora))
154 }
155 });
156
157 let fed = JitTryAnyhow::new_try({
158 let process_mgr = process_mgr.to_owned();
159 let bitcoind = bitcoind.clone();
160 move || async move {
161 let bitcoind = bitcoind.get_try().await?.deref().clone();
162 debug!(target: LOG_DEVIMINT, "Starting federation...");
163 let start_time = fedimint_core::time::now();
164 let mut fed = Federation::new(
165 &process_mgr,
166 bitcoind,
167 skip_setup,
168 pre_dkg,
169 pre_restore,
170 0,
171 "default".to_string(),
172 )
173 .await?;
174
175 fed.degrade_federation(&process_mgr).await?;
177
178 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started federation");
179
180 Ok(Arc::new(fed))
181 }
182 });
183
184 let gw_lnd = JitTryAnyhow::new_try({
185 let process_mgr = process_mgr.to_owned();
186 let lnd = lnd.clone();
187 || async move {
188 let lnd = lnd.get_try().await?.deref().clone();
189 debug!(target: LOG_DEVIMINT, "Starting lnd gateway...");
190 let start_time = fedimint_core::time::now();
191 let lnd_gw = Gatewayd::new(&process_mgr, LightningNode::Lnd(lnd), 0).await?;
192 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started lnd gateway");
193 Ok(Arc::new(lnd_gw))
194 }
195 });
196 let gw_lnd_registered = JitTryAnyhow::new_try({
197 let gw_lnd = gw_lnd.clone();
198 let fed = fed.clone();
199 move || async move {
200 let gw_lnd = gw_lnd.get_try().await?.deref();
201 let fed = fed.get_try().await?.deref();
202 debug!(target: LOG_DEVIMINT, "Registering lnd gateway...");
203 let start_time = fedimint_core::time::now();
204 if !skip_setup && !pre_dkg {
205 let invite = fed.invite_code()?;
206 gw_lnd.client().connect_fed(invite).await?;
207 }
208 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Registered lnd gateway");
209 Ok(Arc::new(()))
210 }
211 });
212
213 let gw_ldk = JitTryAnyhow::new_try({
214 let process_mgr = process_mgr.to_owned();
215 let bitcoind = bitcoind.clone();
216 move || async move {
217 bitcoind.get_try().await?;
218 debug!(target: LOG_DEVIMINT, "Starting ldk gateway...");
219 let start_time = fedimint_core::time::now();
220 let ldk_gw = Gatewayd::new(
221 &process_mgr,
222 LightningNode::Ldk {
223 name: "gatewayd-ldk-0".to_string(),
224 gw_port: process_mgr.globals.FM_PORT_GW_LDK,
225 ldk_port: process_mgr.globals.FM_PORT_LDK,
226 metrics_port: process_mgr.globals.FM_PORT_GW_LDK_METRICS,
227 },
228 1,
229 )
230 .await?;
231 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started ldk gateway");
232 Ok(Arc::new(ldk_gw))
233 }
234 });
235 let gw_ldk_second = JitTryAnyhow::new_try({
236 let process_mgr = process_mgr.to_owned();
237 let bitcoind = bitcoind.clone();
238 move || async move {
239 bitcoind.get_try().await?;
240 debug!(target: LOG_DEVIMINT, "Starting ldk gateway 2...");
241 let start_time = fedimint_core::time::now();
242 let ldk_gw2 = Gatewayd::new(
243 &process_mgr,
244 LightningNode::Ldk {
245 name: "gatewayd-ldk-1".to_string(),
246 gw_port: process_mgr.globals.FM_PORT_GW_LDK2,
247 ldk_port: process_mgr.globals.FM_PORT_LDK2,
248 metrics_port: process_mgr.globals.FM_PORT_GW_LDK2_METRICS,
249 },
250 2,
251 )
252 .await?;
253 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started ldk gateway 2");
254 Ok(Arc::new(ldk_gw2))
255 }
256 });
257 let gw_ldk_connected = JitTryAnyhow::new_try({
258 let gw_ldk = gw_ldk.clone();
259 let fed = fed.clone();
260 move || async move {
261 let gw_ldk = gw_ldk.get_try().await?.deref();
262 if supports_lnv2() {
263 let fed = fed.get_try().await?.deref();
264 let start_time = fedimint_core::time::now();
265 if !skip_setup && !pre_dkg {
266 debug!(target: LOG_DEVIMINT, "Registering ldk gateway...");
267 let invite = fed.invite_code()?;
268 gw_ldk.client().connect_fed(invite).await?;
269 } else {
270 debug!(target: LOG_DEVIMINT, "Skipping registering ldk gateway");
271 }
272 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected ldk gateway");
273 }
274 Ok(Arc::new(()))
275 }
276 });
277 let gw_ldk_second_connected = JitTryAnyhow::new_try({
278 let gw_ldk_second = gw_ldk_second.clone();
279 let fed = fed.clone();
280 move || async move {
281 let gw_ldk2 = gw_ldk_second.get_try().await?.deref();
282 if supports_lnv2() {
283 let fed = fed.get_try().await?.deref();
284 debug!(target: LOG_DEVIMINT, "Registering ldk gateway 2...");
285 let start_time = fedimint_core::time::now();
286 if !skip_setup && !pre_dkg {
287 let invite = fed.invite_code()?;
288 gw_ldk2.client().connect_fed(invite).await?;
289 }
290 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected ldk gateway 2");
291 }
292 Ok(Arc::new(()))
293 }
294 });
295
296 let fed_epoch_generated = JitTryAnyhow::new_try({
297 let fed = fed.clone();
298 move || async move {
299 let fed = fed.get_try().await?.deref().clone();
300 debug!(target: LOG_DEVIMINT, "Generating federation epoch...");
301 let start_time = fedimint_core::time::now();
302 if !skip_setup && !pre_dkg {
303 fed.mine_then_wait_blocks_sync(10).await?;
304 }
305 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Generated federation epoch");
306 Ok(Arc::new(()))
307 }
308 });
309
310 let channel_opened = JitTryAnyhow::new_try({
311 let gw_lnd = gw_lnd.clone();
312 let gw_ldk = gw_ldk.clone();
313 let gw_ldk_second = gw_ldk_second.clone();
314 let bitcoind = bitcoind.clone();
315 let fed_epoch_generated = fed_epoch_generated.clone();
316 move || async move {
317 let bitcoind = bitcoind.get_try().await?.deref().clone();
321
322 fed_epoch_generated.get_try().await?;
325
326 let gw_ldk_second = gw_ldk_second.get_try().await?.deref();
327 let gw_ldk = gw_ldk.get_try().await?.deref();
328 let gw_lnd = gw_lnd.get_try().await?.deref();
329 let gateways: &[NamedGateway<'_>] =
330 &[(gw_ldk_second, "LDK2"), (gw_lnd, "LND"), (gw_ldk, "LDK")];
331 if !skip_setup && !pre_dkg {
332 debug!(target: LOG_DEVIMINT, "Opening channels between gateways...");
333 let start_time = fedimint_core::time::now();
334 open_channels_between_gateways(&bitcoind, gateways).await?;
335 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Opened channels between gateways");
336 }
337
338 Ok(Arc::new(()))
339 }
340 });
341
342 let recurringd = JitTryAnyhow::new_try({
343 let process_mgr = process_mgr.to_owned();
344 move || async move {
345 debug!(target: LOG_DEVIMINT, "Starting recurringd...");
346 let start_time = fedimint_core::time::now();
347 let recurringd = Recurringd::new(&process_mgr).await?;
348 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started recurringd");
349 Ok(Arc::new(recurringd))
350 }
351 });
352
353 let recurringd_connected = JitTryAnyhow::new_try({
354 let recurringd = recurringd.clone();
355 let fed = fed.clone();
356 move || async move {
357 let recurringd = recurringd.get_try().await?.deref();
358 let fed = fed.get_try().await?.deref();
359 debug!(target: LOG_DEVIMINT, "Connecting recurringd to federation...");
360 let start_time = fedimint_core::time::now();
361 if !skip_setup && !pre_dkg {
362 let invite_code = fed.invite_code()?;
363 recurringd.add_federation(&invite_code).await?;
364 }
365 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected recurringd to federation");
366 Ok(Arc::new(()))
367 }
368 });
369
370 let recurringdv2 = JitTryAnyhow::new_try({
371 let process_mgr = process_mgr.to_owned();
372 move || async move {
373 debug!(target: LOG_DEVIMINT, "Starting recurringdv2...");
374 let start_time = fedimint_core::time::now();
375 let recurringdv2 = Recurringdv2::new(&process_mgr).await?;
376 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started recurringdv2");
377 Ok(Arc::new(recurringdv2))
378 }
379 });
380
381 Ok(DevJitFed {
382 bitcoind,
383 lnd,
384 fed,
385 gw_lnd,
386 gw_ldk,
387 gw_ldk_second,
388 esplora,
389 recurringd,
390 recurringdv2,
391 start_time,
392 gw_lnd_registered,
393 gw_ldk_connected,
394 gw_ldk_second_connected,
395 fed_epoch_generated,
396 channel_opened,
397 recurringd_connected,
398 skip_setup,
399 pre_dkg,
400 })
401 }
402
403 pub async fn esplora(&self) -> anyhow::Result<&Esplora> {
404 Ok(self.esplora.get_try().await?.deref())
405 }
406 pub async fn lnd(&self) -> anyhow::Result<&Lnd> {
407 Ok(self.lnd.get_try().await?.deref())
408 }
409 pub async fn gw_lnd(&self) -> anyhow::Result<&Gatewayd> {
410 Ok(self.gw_lnd.get_try().await?.deref())
411 }
412 pub async fn gw_lnd_registered(&self) -> anyhow::Result<&Gatewayd> {
413 self.gw_lnd_registered.get_try().await?;
414 Ok(self.gw_lnd.get_try().await?.deref())
415 }
416 pub async fn gw_ldk(&self) -> anyhow::Result<&Gatewayd> {
417 Ok(self.gw_ldk.get_try().await?.deref())
418 }
419 pub async fn gw_ldk_second(&self) -> anyhow::Result<&Gatewayd> {
420 Ok(self.gw_ldk_second.get_try().await?.deref())
421 }
422 pub async fn gw_ldk_connected(&self) -> anyhow::Result<&Gatewayd> {
423 self.gw_ldk_connected.get_try().await?;
424 Ok(self.gw_ldk.get_try().await?.deref())
425 }
426 pub async fn gw_ldk_second_connected(&self) -> anyhow::Result<&Gatewayd> {
427 self.gw_ldk_second_connected.get_try().await?;
428 Ok(self.gw_ldk_second.get_try().await?.deref())
429 }
430 pub async fn fed(&self) -> anyhow::Result<&Federation> {
431 Ok(self.fed.get_try().await?.deref())
432 }
433 pub async fn bitcoind(&self) -> anyhow::Result<&Bitcoind> {
434 Ok(self.bitcoind.get_try().await?.deref())
435 }
436
437 pub async fn internal_client(&self) -> anyhow::Result<Client> {
438 Ok(self.fed().await?.internal_client().await?.clone())
439 }
440
441 pub async fn internal_client_gw_registered(&self) -> anyhow::Result<Client> {
444 self.fed().await?.await_gateways_registered().await?;
445 Ok(self.fed().await?.internal_client().await?.clone())
446 }
447
448 pub async fn recurringd(&self) -> anyhow::Result<&Recurringd> {
449 Ok(self.recurringd.get_try().await?.deref())
450 }
451
452 pub async fn recurringd_connected(&self) -> anyhow::Result<&Recurringd> {
453 self.recurringd_connected.get_try().await?;
454 Ok(self.recurringd.get_try().await?.deref())
455 }
456
457 pub async fn recurringdv2(&self) -> anyhow::Result<&Recurringdv2> {
458 Ok(self.recurringdv2.get_try().await?.deref())
459 }
460
461 pub async fn finalize(&self, process_mgr: &ProcessManager) -> anyhow::Result<()> {
462 let fed_size = process_mgr.globals.FM_FED_SIZE;
463 let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
464 anyhow::ensure!(
465 fed_size > 3 * offline_nodes,
466 "too many offline nodes ({offline_nodes}) to reach consensus"
467 );
468
469 if !self.pre_dkg && !self.skip_setup {
470 let _ = self.internal_client_gw_registered().await?;
471 }
472 let _ = self.channel_opened.get_try().await?;
473 let _ = self.gw_lnd_registered().await?;
474 let _ = self.gw_ldk_connected().await?;
475 let _ = self.gw_ldk_second_connected().await?;
476 let _ = self.lnd().await?;
477 let _ = self.esplora().await?;
478 let _ = self.recurringd_connected().await?;
479 let _ = self.recurringdv2().await?;
480 let _ = self.fed_epoch_generated.get_try().await?;
481
482 debug!(
483 target: LOG_DEVIMINT,
484 fed_size,
485 offline_nodes,
486 elapsed_ms = %self.start_time.elapsed()?.as_millis(),
487 "Dev federation ready",
488 );
489 Ok(())
490 }
491
492 pub async fn to_dev_fed(self, process_mgr: &ProcessManager) -> anyhow::Result<DevFed> {
493 self.finalize(process_mgr).await?;
494 Ok(DevFed {
495 bitcoind: self.bitcoind().await?.to_owned(),
496 lnd: self.lnd().await?.to_owned(),
497 fed: self.fed().await?.to_owned(),
498 gw_lnd: self.gw_lnd().await?.to_owned(),
499 gw_ldk: self.gw_ldk().await?.to_owned(),
500 gw_ldk_second: self.gw_ldk_second().await?.to_owned(),
501 esplora: self.esplora().await?.to_owned(),
502 recurringd: self.recurringd().await?.to_owned(),
503 recurringdv2: self.recurringdv2().await?.to_owned(),
504 })
505 }
506
507 pub async fn fast_terminate(self) {
508 let Self {
509 bitcoind,
510 lnd,
511 fed,
512 gw_lnd,
513 esplora,
514 gw_ldk,
515 gw_ldk_second,
516 recurringd,
517 recurringdv2,
518 ..
519 } = self;
520
521 join!(
522 spawn_drop(gw_lnd),
523 spawn_drop(gw_ldk),
524 spawn_drop(gw_ldk_second),
525 spawn_drop(fed),
526 spawn_drop(lnd),
527 spawn_drop(esplora),
528 spawn_drop(bitcoind),
529 spawn_drop(recurringd),
530 spawn_drop(recurringdv2),
531 );
532 }
533}