1use std::ops::Deref as _;
2use std::sync::Arc;
3
4use anyhow::Result;
5use fedimint_core::runtime;
6use fedimint_core::task::jit::{JitTry, JitTryAnyhow};
7use fedimint_logging::LOG_DEVIMINT;
8use tokio::join;
9use tracing::{debug, info};
10
11use crate::LightningNode;
12use crate::external::{Bitcoind, Esplora, Lnd, NamedGateway, open_channels_between_gateways};
13use crate::federation::{Client, Federation};
14use crate::gatewayd::{Gatewayd, LdkChainSource};
15use crate::recurringd::Recurringd;
16use crate::util::{ProcessManager, supports_lnv2};
17
18async fn spawn_drop<T>(t: T)
19where
20 T: Send + 'static,
21{
22 runtime::spawn("spawn_drop", async {
23 drop(t);
24 })
25 .await
26 .expect("drop panic");
27}
28
29#[derive(Clone)]
30pub struct DevFed {
31 pub bitcoind: Bitcoind,
32 pub lnd: Lnd,
33 pub fed: Federation,
34 pub gw_lnd: Gatewayd,
35 pub gw_ldk: Gatewayd,
36 pub gw_ldk_second: Gatewayd,
37 pub esplora: Esplora,
38 pub recurringd: Recurringd,
39}
40
41impl DevFed {
42 pub async fn fast_terminate(self) {
43 let Self {
44 bitcoind,
45 lnd,
46 fed,
47 gw_lnd,
48 gw_ldk,
49 gw_ldk_second,
50 esplora,
51 recurringd,
52 } = self;
53
54 join!(
55 spawn_drop(gw_lnd),
56 spawn_drop(gw_ldk),
57 spawn_drop(gw_ldk_second),
58 spawn_drop(fed),
59 spawn_drop(lnd),
60 spawn_drop(esplora),
61 spawn_drop(bitcoind),
62 spawn_drop(recurringd),
63 );
64 }
65}
66pub async fn dev_fed(process_mgr: &ProcessManager) -> Result<DevFed> {
67 DevJitFed::new(process_mgr, false, false)?
68 .to_dev_fed(process_mgr)
69 .await
70}
71
72type JitArc<T> = JitTryAnyhow<Arc<T>>;
73
74#[derive(Clone)]
75pub struct DevJitFed {
76 bitcoind: JitArc<Bitcoind>,
77 lnd: JitArc<Lnd>,
78 fed: JitArc<Federation>,
79 gw_lnd: JitArc<Gatewayd>,
80 gw_ldk: JitArc<Gatewayd>,
81 gw_ldk_second: JitArc<Gatewayd>,
82 esplora: JitArc<Esplora>,
83 recurringd: JitArc<Recurringd>,
84 start_time: std::time::SystemTime,
85 gw_lnd_registered: JitArc<()>,
86 gw_ldk_connected: JitArc<()>,
87 gw_ldk_second_connected: JitArc<()>,
88 fed_epoch_generated: JitArc<()>,
89 channel_opened: JitArc<()>,
90 recurringd_connected: JitArc<()>,
91
92 skip_setup: bool,
93 pre_dkg: bool,
94}
95
96impl DevJitFed {
97 pub fn new(process_mgr: &ProcessManager, skip_setup: bool, pre_dkg: bool) -> Result<DevJitFed> {
98 let fed_size = process_mgr.globals.FM_FED_SIZE;
99 let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
100 anyhow::ensure!(
101 fed_size > 3 * offline_nodes,
102 "too many offline nodes ({offline_nodes}) to reach consensus"
103 );
104 let start_time = fedimint_core::time::now();
105
106 debug!("Starting dev federation");
107
108 let bitcoind = JitTry::new_try({
109 let process_mgr = process_mgr.to_owned();
110 move || async move {
111 debug!(target: LOG_DEVIMINT, "Starting bitcoind...");
112 let start_time = fedimint_core::time::now();
113 let bitcoind = Bitcoind::new(&process_mgr, skip_setup).await?;
114 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started bitcoind");
115 Ok(Arc::new(bitcoind))
116 }
117 });
118 let lnd = JitTry::new_try({
119 let process_mgr = process_mgr.to_owned();
120 let bitcoind = bitcoind.clone();
121 || async move {
122 let bitcoind = bitcoind.get_try().await?.deref().clone();
123 debug!(target: LOG_DEVIMINT, "Starting lnd...");
124 let start_time = fedimint_core::time::now();
125 let lnd = Lnd::new(&process_mgr, bitcoind).await?;
126 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started lnd");
127 Ok(Arc::new(lnd))
128 }
129 });
130 let esplora = JitTryAnyhow::new_try({
131 let process_mgr = process_mgr.to_owned();
132 let bitcoind = bitcoind.clone();
133 || async move {
134 let bitcoind = bitcoind.get_try().await?.deref().clone();
135 debug!(target: LOG_DEVIMINT, "Starting esplora...");
136 let start_time = fedimint_core::time::now();
137 let esplora = Esplora::new(&process_mgr, bitcoind).await?;
138 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started esplora");
139 Ok(Arc::new(esplora))
140 }
141 });
142
143 let fed = JitTryAnyhow::new_try({
144 let process_mgr = process_mgr.to_owned();
145 let bitcoind = bitcoind.clone();
146 move || async move {
147 let bitcoind = bitcoind.get_try().await?.deref().clone();
148 debug!(target: LOG_DEVIMINT, "Starting federation...");
149 let start_time = fedimint_core::time::now();
150 let mut fed = Federation::new(
151 &process_mgr,
152 bitcoind,
153 skip_setup,
154 pre_dkg,
155 0,
156 "default".to_string(),
157 )
158 .await?;
159
160 fed.degrade_federation(&process_mgr).await?;
162
163 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started federation");
164
165 Ok(Arc::new(fed))
166 }
167 });
168
169 let gw_lnd = JitTryAnyhow::new_try({
170 let process_mgr = process_mgr.to_owned();
171 let lnd = lnd.clone();
172 || async move {
173 let lnd = lnd.get_try().await?.deref().clone();
174 debug!(target: LOG_DEVIMINT, "Starting lnd gateway...");
175 let start_time = fedimint_core::time::now();
176 let lnd_gw = Gatewayd::new(&process_mgr, LightningNode::Lnd(lnd)).await?;
177 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started lnd gateway");
178 Ok(Arc::new(lnd_gw))
179 }
180 });
181 let gw_lnd_registered = JitTryAnyhow::new_try({
182 let gw_lnd = gw_lnd.clone();
183 let fed = fed.clone();
184 move || async move {
185 let gw_lnd = gw_lnd.get_try().await?.deref();
186 let fed = fed.get_try().await?.deref();
187 debug!(target: LOG_DEVIMINT, "Registering lnd gateway...");
188 let start_time = fedimint_core::time::now();
189 if !skip_setup && !pre_dkg {
190 gw_lnd.connect_fed(fed).await?;
191 }
192 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Registered lnd gateway");
193 Ok(Arc::new(()))
194 }
195 });
196
197 let gw_ldk = JitTryAnyhow::new_try({
198 let process_mgr = process_mgr.to_owned();
199 move || async move {
200 debug!(target: LOG_DEVIMINT, "Starting ldk gateway...");
201 let start_time = fedimint_core::time::now();
202 let ldk_gw = Gatewayd::new(
203 &process_mgr,
204 LightningNode::Ldk {
205 name: "gatewayd-ldk-0".to_string(),
206 gw_port: process_mgr.globals.FM_PORT_GW_LDK,
207 ldk_port: process_mgr.globals.FM_PORT_LDK,
208 chain_source: LdkChainSource::Bitcoind,
209 },
210 )
211 .await?;
212 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started ldk gateway");
213 Ok(Arc::new(ldk_gw))
214 }
215 });
216 let gw_ldk_second = JitTryAnyhow::new_try({
217 let process_mgr = process_mgr.to_owned();
218 move || async move {
219 debug!(target: LOG_DEVIMINT, "Starting ldk gateway 2...");
220 let start_time = fedimint_core::time::now();
221 let ldk_gw2 = Gatewayd::new(
222 &process_mgr,
223 LightningNode::Ldk {
224 name: "gatewayd-ldk-1".to_string(),
225 gw_port: process_mgr.globals.FM_PORT_GW_LDK2,
226 ldk_port: process_mgr.globals.FM_PORT_LDK2,
227 chain_source: LdkChainSource::Bitcoind,
228 },
229 )
230 .await?;
231 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started ldk gateway 2");
232 Ok(Arc::new(ldk_gw2))
233 }
234 });
235 let gw_ldk_connected = JitTryAnyhow::new_try({
236 let gw_ldk = gw_ldk.clone();
237 let fed = fed.clone();
238 move || async move {
239 let gw_ldk = gw_ldk.get_try().await?.deref();
240 if supports_lnv2() {
241 let fed = fed.get_try().await?.deref();
242 let start_time = fedimint_core::time::now();
243 if !skip_setup && !pre_dkg {
244 debug!(target: LOG_DEVIMINT, "Registering ldk gateway...");
245 gw_ldk.connect_fed(fed).await?;
246 } else {
247 debug!(target: LOG_DEVIMINT, "Skipping registering ldk gateway");
248 }
249 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected ldk gateway");
250 }
251 Ok(Arc::new(()))
252 }
253 });
254 let gw_ldk_second_connected = JitTryAnyhow::new_try({
255 let gw_ldk_second = gw_ldk_second.clone();
256 let fed = fed.clone();
257 move || async move {
258 let gw_ldk2 = gw_ldk_second.get_try().await?.deref();
259 if supports_lnv2() {
260 let fed = fed.get_try().await?.deref();
261 debug!(target: LOG_DEVIMINT, "Registering ldk gateway 2...");
262 let start_time = fedimint_core::time::now();
263 if !skip_setup && !pre_dkg {
264 gw_ldk2.connect_fed(fed).await?;
265 }
266 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected ldk gateway 2");
267 }
268 Ok(Arc::new(()))
269 }
270 });
271
272 let fed_epoch_generated = JitTryAnyhow::new_try({
273 let fed = fed.clone();
274 move || async move {
275 let fed = fed.get_try().await?.deref().clone();
276 debug!(target: LOG_DEVIMINT, "Generating federation epoch...");
277 let start_time = fedimint_core::time::now();
278 if !skip_setup && !pre_dkg {
279 fed.mine_then_wait_blocks_sync(10).await?;
280 }
281 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Generated federation epoch");
282 Ok(Arc::new(()))
283 }
284 });
285
286 let channel_opened = JitTryAnyhow::new_try({
287 let gw_lnd = gw_lnd.clone();
288 let gw_ldk = gw_ldk.clone();
289 let gw_ldk_second = gw_ldk_second.clone();
290 let bitcoind = bitcoind.clone();
291 let fed_epoch_generated = fed_epoch_generated.clone();
292 move || async move {
293 let bitcoind = bitcoind.get_try().await?.deref().clone();
297
298 fed_epoch_generated.get_try().await?;
301
302 let gw_ldk_second = gw_ldk_second.get_try().await?.deref();
303 let gw_ldk = gw_ldk.get_try().await?.deref();
304 let gw_lnd = gw_lnd.get_try().await?.deref();
305 let gateways: &[NamedGateway<'_>] =
306 &[(gw_ldk_second, "LDK2"), (gw_lnd, "LND"), (gw_ldk, "LDK")];
307
308 debug!(target: LOG_DEVIMINT, "Opening channels between gateways...");
309 let start_time = fedimint_core::time::now();
310 open_channels_between_gateways(&bitcoind, gateways).await?;
311 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Opened channels between gateways");
312
313 Ok(Arc::new(()))
314 }
315 });
316
317 let recurringd = JitTryAnyhow::new_try({
318 let process_mgr = process_mgr.to_owned();
319 move || async move {
320 debug!(target: LOG_DEVIMINT, "Starting recurringd...");
321 let start_time = fedimint_core::time::now();
322 let recurringd = Recurringd::new(&process_mgr).await?;
323 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started recurringd");
324 Ok(Arc::new(recurringd))
325 }
326 });
327
328 let recurringd_connected = JitTryAnyhow::new_try({
329 let recurringd = recurringd.clone();
330 let fed = fed.clone();
331 move || async move {
332 let recurringd = recurringd.get_try().await?.deref();
333 let fed = fed.get_try().await?.deref();
334 debug!(target: LOG_DEVIMINT, "Connecting recurringd to federation...");
335 let start_time = fedimint_core::time::now();
336 if !skip_setup && !pre_dkg {
337 let invite_code = fed.invite_code()?;
338 recurringd.add_federation(&invite_code).await?;
339 }
340 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected recurringd to federation");
341 Ok(Arc::new(()))
342 }
343 });
344
345 Ok(DevJitFed {
346 bitcoind,
347 lnd,
348 fed,
349 gw_lnd,
350 gw_ldk,
351 gw_ldk_second,
352 esplora,
353 recurringd,
354 start_time,
355 gw_lnd_registered,
356 gw_ldk_connected,
357 gw_ldk_second_connected,
358 fed_epoch_generated,
359 channel_opened,
360 recurringd_connected,
361 skip_setup,
362 pre_dkg,
363 })
364 }
365
366 pub async fn esplora(&self) -> anyhow::Result<&Esplora> {
367 Ok(self.esplora.get_try().await?.deref())
368 }
369 pub async fn lnd(&self) -> anyhow::Result<&Lnd> {
370 Ok(self.lnd.get_try().await?.deref())
371 }
372 pub async fn gw_lnd(&self) -> anyhow::Result<&Gatewayd> {
373 Ok(self.gw_lnd.get_try().await?.deref())
374 }
375 pub async fn gw_lnd_registered(&self) -> anyhow::Result<&Gatewayd> {
376 self.gw_lnd_registered.get_try().await?;
377 Ok(self.gw_lnd.get_try().await?.deref())
378 }
379 pub async fn gw_ldk(&self) -> anyhow::Result<&Gatewayd> {
380 Ok(self.gw_ldk.get_try().await?.deref())
381 }
382 pub async fn gw_ldk_second(&self) -> anyhow::Result<&Gatewayd> {
383 Ok(self.gw_ldk_second.get_try().await?.deref())
384 }
385 pub async fn gw_ldk_connected(&self) -> anyhow::Result<&Gatewayd> {
386 self.gw_ldk_connected.get_try().await?;
387 Ok(self.gw_ldk.get_try().await?.deref())
388 }
389 pub async fn gw_ldk_second_connected(&self) -> anyhow::Result<&Gatewayd> {
390 self.gw_ldk_second_connected.get_try().await?;
391 Ok(self.gw_ldk_second.get_try().await?.deref())
392 }
393 pub async fn fed(&self) -> anyhow::Result<&Federation> {
394 Ok(self.fed.get_try().await?.deref())
395 }
396 pub async fn bitcoind(&self) -> anyhow::Result<&Bitcoind> {
397 Ok(self.bitcoind.get_try().await?.deref())
398 }
399
400 pub async fn internal_client(&self) -> anyhow::Result<Client> {
401 Ok(self.fed().await?.internal_client().await?.clone())
402 }
403
404 pub async fn internal_client_gw_registered(&self) -> anyhow::Result<Client> {
407 self.fed().await?.await_gateways_registered().await?;
408 Ok(self.fed().await?.internal_client().await?.clone())
409 }
410
411 pub async fn recurringd(&self) -> anyhow::Result<&Recurringd> {
412 Ok(self.recurringd.get_try().await?.deref())
413 }
414
415 pub async fn recurringd_connected(&self) -> anyhow::Result<&Recurringd> {
416 self.recurringd_connected.get_try().await?;
417 Ok(self.recurringd.get_try().await?.deref())
418 }
419
420 pub async fn finalize(&self, process_mgr: &ProcessManager) -> anyhow::Result<()> {
421 let fed_size = process_mgr.globals.FM_FED_SIZE;
422 let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
423 anyhow::ensure!(
424 fed_size > 3 * offline_nodes,
425 "too many offline nodes ({offline_nodes}) to reach consensus"
426 );
427
428 if !self.pre_dkg && !self.skip_setup {
429 let _ = self.internal_client_gw_registered().await?;
430 }
431 let _ = self.channel_opened.get_try().await?;
432 let _ = self.gw_lnd_registered().await?;
433 let _ = self.gw_ldk_connected().await?;
434 let _ = self.gw_ldk_second_connected().await?;
435 let _ = self.lnd().await?;
436 let _ = self.esplora().await?;
437 let _ = self.recurringd_connected().await?;
438 let _ = self.fed_epoch_generated.get_try().await?;
439
440 debug!(
441 target: LOG_DEVIMINT,
442 fed_size,
443 offline_nodes,
444 elapsed_ms = %self.start_time.elapsed()?.as_millis(),
445 "Dev federation ready",
446 );
447 Ok(())
448 }
449
450 pub async fn to_dev_fed(self, process_mgr: &ProcessManager) -> anyhow::Result<DevFed> {
451 self.finalize(process_mgr).await?;
452 Ok(DevFed {
453 bitcoind: self.bitcoind().await?.to_owned(),
454 lnd: self.lnd().await?.to_owned(),
455 fed: self.fed().await?.to_owned(),
456 gw_lnd: self.gw_lnd().await?.to_owned(),
457 gw_ldk: self.gw_ldk().await?.to_owned(),
458 gw_ldk_second: self.gw_ldk_second().await?.to_owned(),
459 esplora: self.esplora().await?.to_owned(),
460 recurringd: self.recurringd().await?.to_owned(),
461 })
462 }
463
464 pub async fn fast_terminate(self) {
465 let Self {
466 bitcoind,
467 lnd,
468 fed,
469 gw_lnd,
470 esplora,
471 gw_ldk,
472 gw_ldk_second,
473 recurringd,
474 ..
475 } = self;
476
477 join!(
478 spawn_drop(gw_lnd),
479 spawn_drop(gw_ldk),
480 spawn_drop(gw_ldk_second),
481 spawn_drop(fed),
482 spawn_drop(lnd),
483 spawn_drop(esplora),
484 spawn_drop(bitcoind),
485 spawn_drop(recurringd),
486 );
487 }
488}