1use std::ops::Deref as _;
2use std::sync::Arc;
3
4use anyhow::Result;
5use fedimint_core::runtime;
6use fedimint_core::task::jit::{JitTry, JitTryAnyhow};
7use fedimint_logging::LOG_DEVIMINT;
8use tokio::join;
9use tracing::{debug, info};
10
11use crate::LightningNode;
12use crate::external::{Bitcoind, Esplora, Lnd, NamedGateway, open_channels_between_gateways};
13use crate::federation::{Client, Federation};
14use crate::gatewayd::Gatewayd;
15use crate::recurringd::Recurringd;
16use crate::util::{ProcessManager, supports_lnv2};
17
18async fn spawn_drop<T>(t: T)
19where
20 T: Send + 'static,
21{
22 runtime::spawn("spawn_drop", async {
23 drop(t);
24 })
25 .await
26 .expect("drop panic");
27}
28
29#[derive(Clone)]
30pub struct DevFed {
31 pub bitcoind: Bitcoind,
32 pub lnd: Lnd,
33 pub fed: Federation,
34 pub gw_lnd: Gatewayd,
35 pub gw_ldk: Gatewayd,
36 pub gw_ldk_second: Gatewayd,
37 pub esplora: Esplora,
38 pub recurringd: Recurringd,
39}
40
41impl DevFed {
42 pub async fn fast_terminate(self) {
43 let Self {
44 bitcoind,
45 lnd,
46 fed,
47 gw_lnd,
48 gw_ldk,
49 gw_ldk_second,
50 esplora,
51 recurringd,
52 } = self;
53
54 join!(
55 spawn_drop(gw_lnd),
56 spawn_drop(gw_ldk),
57 spawn_drop(gw_ldk_second),
58 spawn_drop(fed),
59 spawn_drop(lnd),
60 spawn_drop(esplora),
61 spawn_drop(bitcoind),
62 spawn_drop(recurringd),
63 );
64 }
65}
66pub async fn dev_fed(process_mgr: &ProcessManager) -> Result<DevFed> {
67 DevJitFed::new(process_mgr, false, false)?
68 .to_dev_fed(process_mgr)
69 .await
70}
71
72type JitArc<T> = JitTryAnyhow<Arc<T>>;
73
74#[derive(Clone)]
75pub struct DevJitFed {
76 bitcoind: JitArc<Bitcoind>,
77 lnd: JitArc<Lnd>,
78 fed: JitArc<Federation>,
79 gw_lnd: JitArc<Gatewayd>,
80 gw_ldk: JitArc<Gatewayd>,
81 gw_ldk_second: JitArc<Gatewayd>,
82 esplora: JitArc<Esplora>,
83 recurringd: JitArc<Recurringd>,
84 start_time: std::time::SystemTime,
85 gw_lnd_registered: JitArc<()>,
86 gw_ldk_connected: JitArc<()>,
87 gw_ldk_second_connected: JitArc<()>,
88 fed_epoch_generated: JitArc<()>,
89 channel_opened: JitArc<()>,
90 recurringd_connected: JitArc<()>,
91
92 skip_setup: bool,
93 pre_dkg: bool,
94}
95
96impl DevJitFed {
97 pub fn new(process_mgr: &ProcessManager, skip_setup: bool, pre_dkg: bool) -> Result<DevJitFed> {
98 let fed_size = process_mgr.globals.FM_FED_SIZE;
99 let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
100 anyhow::ensure!(
101 fed_size > 3 * offline_nodes,
102 "too many offline nodes ({offline_nodes}) to reach consensus"
103 );
104 let start_time = fedimint_core::time::now();
105
106 debug!(target: LOG_DEVIMINT, %fed_size, %offline_nodes, "Starting dev federation");
107
108 let bitcoind = JitTry::new_try({
109 let process_mgr = process_mgr.to_owned();
110 move || async move {
111 debug!(target: LOG_DEVIMINT, "Starting bitcoind...");
112 let start_time = fedimint_core::time::now();
113 let bitcoind = Bitcoind::new(&process_mgr, skip_setup).await?;
114 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started bitcoind");
115 Ok(Arc::new(bitcoind))
116 }
117 });
118 let lnd = JitTry::new_try({
119 let process_mgr = process_mgr.to_owned();
120 let bitcoind = bitcoind.clone();
121 || async move {
122 let bitcoind = bitcoind.get_try().await?.deref().clone();
123 debug!(target: LOG_DEVIMINT, "Starting lnd...");
124 let start_time = fedimint_core::time::now();
125 let lnd = Lnd::new(&process_mgr, bitcoind).await?;
126 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started lnd");
127 Ok(Arc::new(lnd))
128 }
129 });
130 let esplora = JitTryAnyhow::new_try({
131 let process_mgr = process_mgr.to_owned();
132 let bitcoind = bitcoind.clone();
133 || async move {
134 let bitcoind = bitcoind.get_try().await?.deref().clone();
135 debug!(target: LOG_DEVIMINT, "Starting esplora...");
136 let start_time = fedimint_core::time::now();
137 let esplora = Esplora::new(&process_mgr, bitcoind).await?;
138 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started esplora");
139 Ok(Arc::new(esplora))
140 }
141 });
142
143 let fed = JitTryAnyhow::new_try({
144 let process_mgr = process_mgr.to_owned();
145 let bitcoind = bitcoind.clone();
146 move || async move {
147 let bitcoind = bitcoind.get_try().await?.deref().clone();
148 debug!(target: LOG_DEVIMINT, "Starting federation...");
149 let start_time = fedimint_core::time::now();
150 let mut fed = Federation::new(
151 &process_mgr,
152 bitcoind,
153 skip_setup,
154 pre_dkg,
155 0,
156 "default".to_string(),
157 )
158 .await?;
159
160 fed.degrade_federation(&process_mgr).await?;
162
163 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started federation");
164
165 Ok(Arc::new(fed))
166 }
167 });
168
169 let gw_lnd = JitTryAnyhow::new_try({
170 let process_mgr = process_mgr.to_owned();
171 let lnd = lnd.clone();
172 || async move {
173 let lnd = lnd.get_try().await?.deref().clone();
174 debug!(target: LOG_DEVIMINT, "Starting lnd gateway...");
175 let start_time = fedimint_core::time::now();
176 let lnd_gw = Gatewayd::new(&process_mgr, LightningNode::Lnd(lnd)).await?;
177 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started lnd gateway");
178 Ok(Arc::new(lnd_gw))
179 }
180 });
181 let gw_lnd_registered = JitTryAnyhow::new_try({
182 let gw_lnd = gw_lnd.clone();
183 let fed = fed.clone();
184 move || async move {
185 let gw_lnd = gw_lnd.get_try().await?.deref();
186 let fed = fed.get_try().await?.deref();
187 debug!(target: LOG_DEVIMINT, "Registering lnd gateway...");
188 let start_time = fedimint_core::time::now();
189 if !skip_setup && !pre_dkg {
190 gw_lnd.connect_fed(fed).await?;
191 }
192 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Registered lnd gateway");
193 Ok(Arc::new(()))
194 }
195 });
196
197 let gw_ldk = JitTryAnyhow::new_try({
198 let process_mgr = process_mgr.to_owned();
199 let bitcoind = bitcoind.clone();
200 move || async move {
201 bitcoind.get_try().await?;
202 debug!(target: LOG_DEVIMINT, "Starting ldk gateway...");
203 let start_time = fedimint_core::time::now();
204 let ldk_gw = Gatewayd::new(
205 &process_mgr,
206 LightningNode::Ldk {
207 name: "gatewayd-ldk-0".to_string(),
208 gw_port: process_mgr.globals.FM_PORT_GW_LDK,
209 ldk_port: process_mgr.globals.FM_PORT_LDK,
210 iroh_port: process_mgr.globals.FM_PORT_GW_LDK_IROH,
211 },
212 )
213 .await?;
214 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started ldk gateway");
215 Ok(Arc::new(ldk_gw))
216 }
217 });
218 let gw_ldk_second = JitTryAnyhow::new_try({
219 let process_mgr = process_mgr.to_owned();
220 let bitcoind = bitcoind.clone();
221 move || async move {
222 bitcoind.get_try().await?;
223 debug!(target: LOG_DEVIMINT, "Starting ldk gateway 2...");
224 let start_time = fedimint_core::time::now();
225 let ldk_gw2 = Gatewayd::new(
226 &process_mgr,
227 LightningNode::Ldk {
228 name: "gatewayd-ldk-1".to_string(),
229 gw_port: process_mgr.globals.FM_PORT_GW_LDK2,
230 ldk_port: process_mgr.globals.FM_PORT_LDK2,
231 iroh_port: process_mgr.globals.FM_PORT_GW_LDK2_IROH,
232 },
233 )
234 .await?;
235 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started ldk gateway 2");
236 Ok(Arc::new(ldk_gw2))
237 }
238 });
239 let gw_ldk_connected = JitTryAnyhow::new_try({
240 let gw_ldk = gw_ldk.clone();
241 let fed = fed.clone();
242 move || async move {
243 let gw_ldk = gw_ldk.get_try().await?.deref();
244 if supports_lnv2() {
245 let fed = fed.get_try().await?.deref();
246 let start_time = fedimint_core::time::now();
247 if !skip_setup && !pre_dkg {
248 debug!(target: LOG_DEVIMINT, "Registering ldk gateway...");
249 gw_ldk.connect_fed(fed).await?;
250 } else {
251 debug!(target: LOG_DEVIMINT, "Skipping registering ldk gateway");
252 }
253 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected ldk gateway");
254 }
255 Ok(Arc::new(()))
256 }
257 });
258 let gw_ldk_second_connected = JitTryAnyhow::new_try({
259 let gw_ldk_second = gw_ldk_second.clone();
260 let fed = fed.clone();
261 move || async move {
262 let gw_ldk2 = gw_ldk_second.get_try().await?.deref();
263 if supports_lnv2() {
264 let fed = fed.get_try().await?.deref();
265 debug!(target: LOG_DEVIMINT, "Registering ldk gateway 2...");
266 let start_time = fedimint_core::time::now();
267 if !skip_setup && !pre_dkg {
268 gw_ldk2.connect_fed(fed).await?;
269 }
270 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected ldk gateway 2");
271 }
272 Ok(Arc::new(()))
273 }
274 });
275
276 let fed_epoch_generated = JitTryAnyhow::new_try({
277 let fed = fed.clone();
278 move || async move {
279 let fed = fed.get_try().await?.deref().clone();
280 debug!(target: LOG_DEVIMINT, "Generating federation epoch...");
281 let start_time = fedimint_core::time::now();
282 if !skip_setup && !pre_dkg {
283 fed.mine_then_wait_blocks_sync(10).await?;
284 }
285 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Generated federation epoch");
286 Ok(Arc::new(()))
287 }
288 });
289
290 let channel_opened = JitTryAnyhow::new_try({
291 let gw_lnd = gw_lnd.clone();
292 let gw_ldk = gw_ldk.clone();
293 let gw_ldk_second = gw_ldk_second.clone();
294 let bitcoind = bitcoind.clone();
295 let fed_epoch_generated = fed_epoch_generated.clone();
296 move || async move {
297 let bitcoind = bitcoind.get_try().await?.deref().clone();
301
302 fed_epoch_generated.get_try().await?;
305
306 let gw_ldk_second = gw_ldk_second.get_try().await?.deref();
307 let gw_ldk = gw_ldk.get_try().await?.deref();
308 let gw_lnd = gw_lnd.get_try().await?.deref();
309 let gateways: &[NamedGateway<'_>] =
310 &[(gw_ldk_second, "LDK2"), (gw_lnd, "LND"), (gw_ldk, "LDK")];
311
312 debug!(target: LOG_DEVIMINT, "Opening channels between gateways...");
313 let start_time = fedimint_core::time::now();
314 open_channels_between_gateways(&bitcoind, gateways).await?;
315 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Opened channels between gateways");
316
317 Ok(Arc::new(()))
318 }
319 });
320
321 let recurringd = JitTryAnyhow::new_try({
322 let process_mgr = process_mgr.to_owned();
323 move || async move {
324 debug!(target: LOG_DEVIMINT, "Starting recurringd...");
325 let start_time = fedimint_core::time::now();
326 let recurringd = Recurringd::new(&process_mgr).await?;
327 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started recurringd");
328 Ok(Arc::new(recurringd))
329 }
330 });
331
332 let recurringd_connected = JitTryAnyhow::new_try({
333 let recurringd = recurringd.clone();
334 let fed = fed.clone();
335 move || async move {
336 let recurringd = recurringd.get_try().await?.deref();
337 let fed = fed.get_try().await?.deref();
338 debug!(target: LOG_DEVIMINT, "Connecting recurringd to federation...");
339 let start_time = fedimint_core::time::now();
340 if !skip_setup && !pre_dkg {
341 let invite_code = fed.invite_code()?;
342 recurringd.add_federation(&invite_code).await?;
343 }
344 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected recurringd to federation");
345 Ok(Arc::new(()))
346 }
347 });
348
349 Ok(DevJitFed {
350 bitcoind,
351 lnd,
352 fed,
353 gw_lnd,
354 gw_ldk,
355 gw_ldk_second,
356 esplora,
357 recurringd,
358 start_time,
359 gw_lnd_registered,
360 gw_ldk_connected,
361 gw_ldk_second_connected,
362 fed_epoch_generated,
363 channel_opened,
364 recurringd_connected,
365 skip_setup,
366 pre_dkg,
367 })
368 }
369
370 pub async fn esplora(&self) -> anyhow::Result<&Esplora> {
371 Ok(self.esplora.get_try().await?.deref())
372 }
373 pub async fn lnd(&self) -> anyhow::Result<&Lnd> {
374 Ok(self.lnd.get_try().await?.deref())
375 }
376 pub async fn gw_lnd(&self) -> anyhow::Result<&Gatewayd> {
377 Ok(self.gw_lnd.get_try().await?.deref())
378 }
379 pub async fn gw_lnd_registered(&self) -> anyhow::Result<&Gatewayd> {
380 self.gw_lnd_registered.get_try().await?;
381 Ok(self.gw_lnd.get_try().await?.deref())
382 }
383 pub async fn gw_ldk(&self) -> anyhow::Result<&Gatewayd> {
384 Ok(self.gw_ldk.get_try().await?.deref())
385 }
386 pub async fn gw_ldk_second(&self) -> anyhow::Result<&Gatewayd> {
387 Ok(self.gw_ldk_second.get_try().await?.deref())
388 }
389 pub async fn gw_ldk_connected(&self) -> anyhow::Result<&Gatewayd> {
390 self.gw_ldk_connected.get_try().await?;
391 Ok(self.gw_ldk.get_try().await?.deref())
392 }
393 pub async fn gw_ldk_second_connected(&self) -> anyhow::Result<&Gatewayd> {
394 self.gw_ldk_second_connected.get_try().await?;
395 Ok(self.gw_ldk_second.get_try().await?.deref())
396 }
397 pub async fn fed(&self) -> anyhow::Result<&Federation> {
398 Ok(self.fed.get_try().await?.deref())
399 }
400 pub async fn bitcoind(&self) -> anyhow::Result<&Bitcoind> {
401 Ok(self.bitcoind.get_try().await?.deref())
402 }
403
404 pub async fn internal_client(&self) -> anyhow::Result<Client> {
405 Ok(self.fed().await?.internal_client().await?.clone())
406 }
407
408 pub async fn internal_client_gw_registered(&self) -> anyhow::Result<Client> {
411 self.fed().await?.await_gateways_registered().await?;
412 Ok(self.fed().await?.internal_client().await?.clone())
413 }
414
415 pub async fn recurringd(&self) -> anyhow::Result<&Recurringd> {
416 Ok(self.recurringd.get_try().await?.deref())
417 }
418
419 pub async fn recurringd_connected(&self) -> anyhow::Result<&Recurringd> {
420 self.recurringd_connected.get_try().await?;
421 Ok(self.recurringd.get_try().await?.deref())
422 }
423
424 pub async fn finalize(&self, process_mgr: &ProcessManager) -> anyhow::Result<()> {
425 let fed_size = process_mgr.globals.FM_FED_SIZE;
426 let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
427 anyhow::ensure!(
428 fed_size > 3 * offline_nodes,
429 "too many offline nodes ({offline_nodes}) to reach consensus"
430 );
431
432 if !self.pre_dkg && !self.skip_setup {
433 let _ = self.internal_client_gw_registered().await?;
434 }
435 let _ = self.channel_opened.get_try().await?;
436 let _ = self.gw_lnd_registered().await?;
437 let _ = self.gw_ldk_connected().await?;
438 let _ = self.gw_ldk_second_connected().await?;
439 let _ = self.lnd().await?;
440 let _ = self.esplora().await?;
441 let _ = self.recurringd_connected().await?;
442 let _ = self.fed_epoch_generated.get_try().await?;
443
444 debug!(
445 target: LOG_DEVIMINT,
446 fed_size,
447 offline_nodes,
448 elapsed_ms = %self.start_time.elapsed()?.as_millis(),
449 "Dev federation ready",
450 );
451 Ok(())
452 }
453
454 pub async fn to_dev_fed(self, process_mgr: &ProcessManager) -> anyhow::Result<DevFed> {
455 self.finalize(process_mgr).await?;
456 Ok(DevFed {
457 bitcoind: self.bitcoind().await?.to_owned(),
458 lnd: self.lnd().await?.to_owned(),
459 fed: self.fed().await?.to_owned(),
460 gw_lnd: self.gw_lnd().await?.to_owned(),
461 gw_ldk: self.gw_ldk().await?.to_owned(),
462 gw_ldk_second: self.gw_ldk_second().await?.to_owned(),
463 esplora: self.esplora().await?.to_owned(),
464 recurringd: self.recurringd().await?.to_owned(),
465 })
466 }
467
468 pub async fn fast_terminate(self) {
469 let Self {
470 bitcoind,
471 lnd,
472 fed,
473 gw_lnd,
474 esplora,
475 gw_ldk,
476 gw_ldk_second,
477 recurringd,
478 ..
479 } = self;
480
481 join!(
482 spawn_drop(gw_lnd),
483 spawn_drop(gw_ldk),
484 spawn_drop(gw_ldk_second),
485 spawn_drop(fed),
486 spawn_drop(lnd),
487 spawn_drop(esplora),
488 spawn_drop(bitcoind),
489 spawn_drop(recurringd),
490 );
491 }
492}