1use std::ops::Deref as _;
2use std::sync::Arc;
3
4use anyhow::Result;
5use fedimint_core::runtime;
6use fedimint_core::task::jit::{JitTry, JitTryAnyhow};
7use fedimint_logging::LOG_DEVIMINT;
8use tokio::join;
9use tracing::{debug, info};
10
11use crate::LightningNode;
12use crate::external::{Bitcoind, Esplora, Lnd, NamedGateway, open_channels_between_gateways};
13use crate::federation::{Client, Federation};
14use crate::gatewayd::Gatewayd;
15use crate::recurringd::Recurringd;
16use crate::util::{ProcessManager, supports_lnv2};
17
18async fn spawn_drop<T>(t: T)
19where
20 T: Send + 'static,
21{
22 runtime::spawn("spawn_drop", async {
23 drop(t);
24 })
25 .await
26 .expect("drop panic");
27}
28
29#[derive(Clone)]
30pub struct DevFed {
31 pub bitcoind: Bitcoind,
32 pub lnd: Lnd,
33 pub fed: Federation,
34 pub gw_lnd: Gatewayd,
35 pub gw_ldk: Gatewayd,
36 pub gw_ldk_second: Gatewayd,
37 pub esplora: Esplora,
38 pub recurringd: Recurringd,
39}
40
41impl DevFed {
42 pub async fn fast_terminate(self) {
43 let Self {
44 bitcoind,
45 lnd,
46 fed,
47 gw_lnd,
48 gw_ldk,
49 gw_ldk_second,
50 esplora,
51 recurringd,
52 } = self;
53
54 join!(
55 spawn_drop(gw_lnd),
56 spawn_drop(gw_ldk),
57 spawn_drop(gw_ldk_second),
58 spawn_drop(fed),
59 spawn_drop(lnd),
60 spawn_drop(esplora),
61 spawn_drop(bitcoind),
62 spawn_drop(recurringd),
63 );
64 }
65}
66pub async fn dev_fed(process_mgr: &ProcessManager) -> Result<DevFed> {
67 DevJitFed::new(process_mgr, false, false)?
68 .to_dev_fed(process_mgr)
69 .await
70}
71
72type JitArc<T> = JitTryAnyhow<Arc<T>>;
73
74#[derive(Clone)]
75pub struct DevJitFed {
76 bitcoind: JitArc<Bitcoind>,
77 lnd: JitArc<Lnd>,
78 fed: JitArc<Federation>,
79 gw_lnd: JitArc<Gatewayd>,
80 gw_ldk: JitArc<Gatewayd>,
81 gw_ldk_second: JitArc<Gatewayd>,
82 esplora: JitArc<Esplora>,
83 recurringd: JitArc<Recurringd>,
84 start_time: std::time::SystemTime,
85 gw_lnd_registered: JitArc<()>,
86 gw_ldk_connected: JitArc<()>,
87 gw_ldk_second_connected: JitArc<()>,
88 fed_epoch_generated: JitArc<()>,
89 channel_opened: JitArc<()>,
90 recurringd_connected: JitArc<()>,
91
92 skip_setup: bool,
93 pre_dkg: bool,
94}
95
96impl DevJitFed {
97 pub fn new(process_mgr: &ProcessManager, skip_setup: bool, pre_dkg: bool) -> Result<DevJitFed> {
98 let fed_size = process_mgr.globals.FM_FED_SIZE;
99 let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
100 anyhow::ensure!(
101 fed_size > 3 * offline_nodes,
102 "too many offline nodes ({offline_nodes}) to reach consensus"
103 );
104 let start_time = fedimint_core::time::now();
105
106 debug!("Starting dev federation");
107
108 let bitcoind = JitTry::new_try({
109 let process_mgr = process_mgr.to_owned();
110 move || async move {
111 debug!(target: LOG_DEVIMINT, "Starting bitcoind...");
112 let start_time = fedimint_core::time::now();
113 let bitcoind = Bitcoind::new(&process_mgr, skip_setup).await?;
114 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started bitcoind");
115 Ok(Arc::new(bitcoind))
116 }
117 });
118 let lnd = JitTry::new_try({
119 let process_mgr = process_mgr.to_owned();
120 let bitcoind = bitcoind.clone();
121 || async move {
122 let bitcoind = bitcoind.get_try().await?.deref().clone();
123 debug!(target: LOG_DEVIMINT, "Starting lnd...");
124 let start_time = fedimint_core::time::now();
125 let lnd = Lnd::new(&process_mgr, bitcoind).await?;
126 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started lnd");
127 Ok(Arc::new(lnd))
128 }
129 });
130 let esplora = JitTryAnyhow::new_try({
131 let process_mgr = process_mgr.to_owned();
132 let bitcoind = bitcoind.clone();
133 || async move {
134 let bitcoind = bitcoind.get_try().await?.deref().clone();
135 debug!(target: LOG_DEVIMINT, "Starting esplora...");
136 let start_time = fedimint_core::time::now();
137 let esplora = Esplora::new(&process_mgr, bitcoind).await?;
138 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started esplora");
139 Ok(Arc::new(esplora))
140 }
141 });
142
143 let fed = JitTryAnyhow::new_try({
144 let process_mgr = process_mgr.to_owned();
145 let bitcoind = bitcoind.clone();
146 move || async move {
147 let bitcoind = bitcoind.get_try().await?.deref().clone();
148 debug!(target: LOG_DEVIMINT, "Starting federation...");
149 let start_time = fedimint_core::time::now();
150 let mut fed = Federation::new(
151 &process_mgr,
152 bitcoind,
153 skip_setup,
154 pre_dkg,
155 0,
156 "default".to_string(),
157 )
158 .await?;
159
160 fed.degrade_federation(&process_mgr).await?;
162
163 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started federation");
164
165 Ok(Arc::new(fed))
166 }
167 });
168
169 let gw_lnd = JitTryAnyhow::new_try({
170 let process_mgr = process_mgr.to_owned();
171 let lnd = lnd.clone();
172 || async move {
173 let lnd = lnd.get_try().await?.deref().clone();
174 debug!(target: LOG_DEVIMINT, "Starting lnd gateway...");
175 let start_time = fedimint_core::time::now();
176 let lnd_gw = Gatewayd::new(&process_mgr, LightningNode::Lnd(lnd)).await?;
177 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started lnd gateway");
178 Ok(Arc::new(lnd_gw))
179 }
180 });
181 let gw_lnd_registered = JitTryAnyhow::new_try({
182 let gw_lnd = gw_lnd.clone();
183 let fed = fed.clone();
184 move || async move {
185 let gw_lnd = gw_lnd.get_try().await?.deref();
186 let fed = fed.get_try().await?.deref();
187 debug!(target: LOG_DEVIMINT, "Registering lnd gateway...");
188 let start_time = fedimint_core::time::now();
189 if !skip_setup && !pre_dkg {
190 gw_lnd.connect_fed(fed).await?;
191 }
192 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Registered lnd gateway");
193 Ok(Arc::new(()))
194 }
195 });
196
197 let gw_ldk = JitTryAnyhow::new_try({
198 let process_mgr = process_mgr.to_owned();
199 let bitcoind = bitcoind.clone();
200 move || async move {
201 bitcoind.get_try().await?;
202 debug!(target: LOG_DEVIMINT, "Starting ldk gateway...");
203 let start_time = fedimint_core::time::now();
204 let ldk_gw = Gatewayd::new(
205 &process_mgr,
206 LightningNode::Ldk {
207 name: "gatewayd-ldk-0".to_string(),
208 gw_port: process_mgr.globals.FM_PORT_GW_LDK,
209 ldk_port: process_mgr.globals.FM_PORT_LDK,
210 },
211 )
212 .await?;
213 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started ldk gateway");
214 Ok(Arc::new(ldk_gw))
215 }
216 });
217 let gw_ldk_second = JitTryAnyhow::new_try({
218 let process_mgr = process_mgr.to_owned();
219 let bitcoind = bitcoind.clone();
220 move || async move {
221 bitcoind.get_try().await?;
222 debug!(target: LOG_DEVIMINT, "Starting ldk gateway 2...");
223 let start_time = fedimint_core::time::now();
224 let ldk_gw2 = Gatewayd::new(
225 &process_mgr,
226 LightningNode::Ldk {
227 name: "gatewayd-ldk-1".to_string(),
228 gw_port: process_mgr.globals.FM_PORT_GW_LDK2,
229 ldk_port: process_mgr.globals.FM_PORT_LDK2,
230 },
231 )
232 .await?;
233 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started ldk gateway 2");
234 Ok(Arc::new(ldk_gw2))
235 }
236 });
237 let gw_ldk_connected = JitTryAnyhow::new_try({
238 let gw_ldk = gw_ldk.clone();
239 let fed = fed.clone();
240 move || async move {
241 let gw_ldk = gw_ldk.get_try().await?.deref();
242 if supports_lnv2() {
243 let fed = fed.get_try().await?.deref();
244 let start_time = fedimint_core::time::now();
245 if !skip_setup && !pre_dkg {
246 debug!(target: LOG_DEVIMINT, "Registering ldk gateway...");
247 gw_ldk.connect_fed(fed).await?;
248 } else {
249 debug!(target: LOG_DEVIMINT, "Skipping registering ldk gateway");
250 }
251 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected ldk gateway");
252 }
253 Ok(Arc::new(()))
254 }
255 });
256 let gw_ldk_second_connected = JitTryAnyhow::new_try({
257 let gw_ldk_second = gw_ldk_second.clone();
258 let fed = fed.clone();
259 move || async move {
260 let gw_ldk2 = gw_ldk_second.get_try().await?.deref();
261 if supports_lnv2() {
262 let fed = fed.get_try().await?.deref();
263 debug!(target: LOG_DEVIMINT, "Registering ldk gateway 2...");
264 let start_time = fedimint_core::time::now();
265 if !skip_setup && !pre_dkg {
266 gw_ldk2.connect_fed(fed).await?;
267 }
268 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected ldk gateway 2");
269 }
270 Ok(Arc::new(()))
271 }
272 });
273
274 let fed_epoch_generated = JitTryAnyhow::new_try({
275 let fed = fed.clone();
276 move || async move {
277 let fed = fed.get_try().await?.deref().clone();
278 debug!(target: LOG_DEVIMINT, "Generating federation epoch...");
279 let start_time = fedimint_core::time::now();
280 if !skip_setup && !pre_dkg {
281 fed.mine_then_wait_blocks_sync(10).await?;
282 }
283 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Generated federation epoch");
284 Ok(Arc::new(()))
285 }
286 });
287
288 let channel_opened = JitTryAnyhow::new_try({
289 let gw_lnd = gw_lnd.clone();
290 let gw_ldk = gw_ldk.clone();
291 let gw_ldk_second = gw_ldk_second.clone();
292 let bitcoind = bitcoind.clone();
293 let fed_epoch_generated = fed_epoch_generated.clone();
294 move || async move {
295 let bitcoind = bitcoind.get_try().await?.deref().clone();
299
300 fed_epoch_generated.get_try().await?;
303
304 let gw_ldk_second = gw_ldk_second.get_try().await?.deref();
305 let gw_ldk = gw_ldk.get_try().await?.deref();
306 let gw_lnd = gw_lnd.get_try().await?.deref();
307 let gateways: &[NamedGateway<'_>] =
308 &[(gw_ldk_second, "LDK2"), (gw_lnd, "LND"), (gw_ldk, "LDK")];
309
310 debug!(target: LOG_DEVIMINT, "Opening channels between gateways...");
311 let start_time = fedimint_core::time::now();
312 open_channels_between_gateways(&bitcoind, gateways).await?;
313 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Opened channels between gateways");
314
315 Ok(Arc::new(()))
316 }
317 });
318
319 let recurringd = JitTryAnyhow::new_try({
320 let process_mgr = process_mgr.to_owned();
321 move || async move {
322 debug!(target: LOG_DEVIMINT, "Starting recurringd...");
323 let start_time = fedimint_core::time::now();
324 let recurringd = Recurringd::new(&process_mgr).await?;
325 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started recurringd");
326 Ok(Arc::new(recurringd))
327 }
328 });
329
330 let recurringd_connected = JitTryAnyhow::new_try({
331 let recurringd = recurringd.clone();
332 let fed = fed.clone();
333 move || async move {
334 let recurringd = recurringd.get_try().await?.deref();
335 let fed = fed.get_try().await?.deref();
336 debug!(target: LOG_DEVIMINT, "Connecting recurringd to federation...");
337 let start_time = fedimint_core::time::now();
338 if !skip_setup && !pre_dkg {
339 let invite_code = fed.invite_code()?;
340 recurringd.add_federation(&invite_code).await?;
341 }
342 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected recurringd to federation");
343 Ok(Arc::new(()))
344 }
345 });
346
347 Ok(DevJitFed {
348 bitcoind,
349 lnd,
350 fed,
351 gw_lnd,
352 gw_ldk,
353 gw_ldk_second,
354 esplora,
355 recurringd,
356 start_time,
357 gw_lnd_registered,
358 gw_ldk_connected,
359 gw_ldk_second_connected,
360 fed_epoch_generated,
361 channel_opened,
362 recurringd_connected,
363 skip_setup,
364 pre_dkg,
365 })
366 }
367
368 pub async fn esplora(&self) -> anyhow::Result<&Esplora> {
369 Ok(self.esplora.get_try().await?.deref())
370 }
371 pub async fn lnd(&self) -> anyhow::Result<&Lnd> {
372 Ok(self.lnd.get_try().await?.deref())
373 }
374 pub async fn gw_lnd(&self) -> anyhow::Result<&Gatewayd> {
375 Ok(self.gw_lnd.get_try().await?.deref())
376 }
377 pub async fn gw_lnd_registered(&self) -> anyhow::Result<&Gatewayd> {
378 self.gw_lnd_registered.get_try().await?;
379 Ok(self.gw_lnd.get_try().await?.deref())
380 }
381 pub async fn gw_ldk(&self) -> anyhow::Result<&Gatewayd> {
382 Ok(self.gw_ldk.get_try().await?.deref())
383 }
384 pub async fn gw_ldk_second(&self) -> anyhow::Result<&Gatewayd> {
385 Ok(self.gw_ldk_second.get_try().await?.deref())
386 }
387 pub async fn gw_ldk_connected(&self) -> anyhow::Result<&Gatewayd> {
388 self.gw_ldk_connected.get_try().await?;
389 Ok(self.gw_ldk.get_try().await?.deref())
390 }
391 pub async fn gw_ldk_second_connected(&self) -> anyhow::Result<&Gatewayd> {
392 self.gw_ldk_second_connected.get_try().await?;
393 Ok(self.gw_ldk_second.get_try().await?.deref())
394 }
395 pub async fn fed(&self) -> anyhow::Result<&Federation> {
396 Ok(self.fed.get_try().await?.deref())
397 }
398 pub async fn bitcoind(&self) -> anyhow::Result<&Bitcoind> {
399 Ok(self.bitcoind.get_try().await?.deref())
400 }
401
402 pub async fn internal_client(&self) -> anyhow::Result<Client> {
403 Ok(self.fed().await?.internal_client().await?.clone())
404 }
405
406 pub async fn internal_client_gw_registered(&self) -> anyhow::Result<Client> {
409 self.fed().await?.await_gateways_registered().await?;
410 Ok(self.fed().await?.internal_client().await?.clone())
411 }
412
413 pub async fn recurringd(&self) -> anyhow::Result<&Recurringd> {
414 Ok(self.recurringd.get_try().await?.deref())
415 }
416
417 pub async fn recurringd_connected(&self) -> anyhow::Result<&Recurringd> {
418 self.recurringd_connected.get_try().await?;
419 Ok(self.recurringd.get_try().await?.deref())
420 }
421
422 pub async fn finalize(&self, process_mgr: &ProcessManager) -> anyhow::Result<()> {
423 let fed_size = process_mgr.globals.FM_FED_SIZE;
424 let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
425 anyhow::ensure!(
426 fed_size > 3 * offline_nodes,
427 "too many offline nodes ({offline_nodes}) to reach consensus"
428 );
429
430 if !self.pre_dkg && !self.skip_setup {
431 let _ = self.internal_client_gw_registered().await?;
432 }
433 let _ = self.channel_opened.get_try().await?;
434 let _ = self.gw_lnd_registered().await?;
435 let _ = self.gw_ldk_connected().await?;
436 let _ = self.gw_ldk_second_connected().await?;
437 let _ = self.lnd().await?;
438 let _ = self.esplora().await?;
439 let _ = self.recurringd_connected().await?;
440 let _ = self.fed_epoch_generated.get_try().await?;
441
442 debug!(
443 target: LOG_DEVIMINT,
444 fed_size,
445 offline_nodes,
446 elapsed_ms = %self.start_time.elapsed()?.as_millis(),
447 "Dev federation ready",
448 );
449 Ok(())
450 }
451
452 pub async fn to_dev_fed(self, process_mgr: &ProcessManager) -> anyhow::Result<DevFed> {
453 self.finalize(process_mgr).await?;
454 Ok(DevFed {
455 bitcoind: self.bitcoind().await?.to_owned(),
456 lnd: self.lnd().await?.to_owned(),
457 fed: self.fed().await?.to_owned(),
458 gw_lnd: self.gw_lnd().await?.to_owned(),
459 gw_ldk: self.gw_ldk().await?.to_owned(),
460 gw_ldk_second: self.gw_ldk_second().await?.to_owned(),
461 esplora: self.esplora().await?.to_owned(),
462 recurringd: self.recurringd().await?.to_owned(),
463 })
464 }
465
466 pub async fn fast_terminate(self) {
467 let Self {
468 bitcoind,
469 lnd,
470 fed,
471 gw_lnd,
472 esplora,
473 gw_ldk,
474 gw_ldk_second,
475 recurringd,
476 ..
477 } = self;
478
479 join!(
480 spawn_drop(gw_lnd),
481 spawn_drop(gw_ldk),
482 spawn_drop(gw_ldk_second),
483 spawn_drop(fed),
484 spawn_drop(lnd),
485 spawn_drop(esplora),
486 spawn_drop(bitcoind),
487 spawn_drop(recurringd),
488 );
489 }
490}