devimint/
devfed.rs

1use std::ops::Deref as _;
2use std::sync::Arc;
3
4use anyhow::Result;
5use fedimint_core::runtime;
6use fedimint_core::task::jit::{JitTry, JitTryAnyhow};
7use fedimint_logging::LOG_DEVIMINT;
8use tokio::join;
9use tracing::{debug, info};
10
11use crate::LightningNode;
12use crate::external::{Bitcoind, Esplora, Lnd, NamedGateway, open_channels_between_gateways};
13use crate::federation::{Client, Federation};
14use crate::gatewayd::Gatewayd;
15use crate::recurringd::Recurringd;
16use crate::recurringdv2::Recurringdv2;
17use crate::util::{ProcessManager, supports_lnv2};
18
19async fn spawn_drop<T>(t: T)
20where
21    T: Send + 'static,
22{
23    runtime::spawn("spawn_drop", async {
24        drop(t);
25    })
26    .await
27    .expect("drop panic");
28}
29
30#[derive(Clone)]
31pub struct DevFed {
32    pub bitcoind: Bitcoind,
33    pub lnd: Lnd,
34    pub fed: Federation,
35    pub gw_lnd: Gatewayd,
36    pub gw_ldk: Gatewayd,
37    pub gw_ldk_second: Gatewayd,
38    pub esplora: Esplora,
39    pub recurringd: Recurringd,
40    pub recurringdv2: Recurringdv2,
41}
42
43impl DevFed {
44    pub async fn fast_terminate(self) {
45        let Self {
46            bitcoind,
47            lnd,
48            fed,
49            gw_lnd,
50            gw_ldk,
51            gw_ldk_second,
52            esplora,
53            recurringd,
54            recurringdv2,
55        } = self;
56
57        join!(
58            spawn_drop(gw_lnd),
59            spawn_drop(gw_ldk),
60            spawn_drop(gw_ldk_second),
61            spawn_drop(fed),
62            spawn_drop(lnd),
63            spawn_drop(esplora),
64            spawn_drop(bitcoind),
65            spawn_drop(recurringd),
66            spawn_drop(recurringdv2),
67        );
68    }
69}
70pub async fn dev_fed(process_mgr: &ProcessManager) -> Result<DevFed> {
71    DevJitFed::new(process_mgr, false, false)?
72        .to_dev_fed(process_mgr)
73        .await
74}
75
76type JitArc<T> = JitTryAnyhow<Arc<T>>;
77
78#[derive(Clone)]
79pub struct DevJitFed {
80    bitcoind: JitArc<Bitcoind>,
81    lnd: JitArc<Lnd>,
82    fed: JitArc<Federation>,
83    gw_lnd: JitArc<Gatewayd>,
84    gw_ldk: JitArc<Gatewayd>,
85    gw_ldk_second: JitArc<Gatewayd>,
86    esplora: JitArc<Esplora>,
87    recurringd: JitArc<Recurringd>,
88    recurringdv2: JitArc<Recurringdv2>,
89    start_time: std::time::SystemTime,
90    gw_lnd_registered: JitArc<()>,
91    gw_ldk_connected: JitArc<()>,
92    gw_ldk_second_connected: JitArc<()>,
93    fed_epoch_generated: JitArc<()>,
94    channel_opened: JitArc<()>,
95    recurringd_connected: JitArc<()>,
96
97    skip_setup: bool,
98    pre_dkg: bool,
99}
100
101impl DevJitFed {
102    pub fn new(process_mgr: &ProcessManager, skip_setup: bool, pre_dkg: bool) -> Result<DevJitFed> {
103        let fed_size = process_mgr.globals.FM_FED_SIZE;
104        let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
105        anyhow::ensure!(
106            fed_size > 3 * offline_nodes,
107            "too many offline nodes ({offline_nodes}) to reach consensus"
108        );
109        let start_time = fedimint_core::time::now();
110
111        debug!(target: LOG_DEVIMINT, %fed_size, %offline_nodes, "Starting dev federation");
112
113        let bitcoind = JitTry::new_try({
114            let process_mgr = process_mgr.to_owned();
115            move || async move {
116                debug!(target: LOG_DEVIMINT, "Starting bitcoind...");
117                let start_time = fedimint_core::time::now();
118                let bitcoind = Bitcoind::new(&process_mgr, skip_setup).await?;
119                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started bitcoind");
120                Ok(Arc::new(bitcoind))
121            }
122        });
123        let lnd = JitTry::new_try({
124            let process_mgr = process_mgr.to_owned();
125            let bitcoind = bitcoind.clone();
126            || async move {
127                let bitcoind = bitcoind.get_try().await?.deref().clone();
128                debug!(target: LOG_DEVIMINT, "Starting lnd...");
129                let start_time = fedimint_core::time::now();
130                let lnd = Lnd::new(&process_mgr, bitcoind).await?;
131                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started lnd");
132                Ok(Arc::new(lnd))
133            }
134        });
135        let esplora = JitTryAnyhow::new_try({
136            let process_mgr = process_mgr.to_owned();
137            let bitcoind = bitcoind.clone();
138            || async move {
139                let bitcoind = bitcoind.get_try().await?.deref().clone();
140                debug!(target: LOG_DEVIMINT, "Starting esplora...");
141                let start_time = fedimint_core::time::now();
142                let esplora = Esplora::new(&process_mgr, bitcoind).await?;
143                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started esplora");
144                Ok(Arc::new(esplora))
145            }
146        });
147
148        let fed = JitTryAnyhow::new_try({
149            let process_mgr = process_mgr.to_owned();
150            let bitcoind = bitcoind.clone();
151            move || async move {
152                let bitcoind = bitcoind.get_try().await?.deref().clone();
153                debug!(target: LOG_DEVIMINT, "Starting federation...");
154                let start_time = fedimint_core::time::now();
155                let mut fed = Federation::new(
156                    &process_mgr,
157                    bitcoind,
158                    skip_setup,
159                    pre_dkg,
160                    0,
161                    "default".to_string(),
162                )
163                .await?;
164
165                // Create a degraded federation if there are offline nodes
166                fed.degrade_federation(&process_mgr).await?;
167
168                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started federation");
169
170                Ok(Arc::new(fed))
171            }
172        });
173
174        let gw_lnd = JitTryAnyhow::new_try({
175            let process_mgr = process_mgr.to_owned();
176            let lnd = lnd.clone();
177            || async move {
178                let lnd = lnd.get_try().await?.deref().clone();
179                debug!(target: LOG_DEVIMINT, "Starting lnd gateway...");
180                let start_time = fedimint_core::time::now();
181                let lnd_gw = Gatewayd::new(&process_mgr, LightningNode::Lnd(lnd), 0).await?;
182                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started lnd gateway");
183                Ok(Arc::new(lnd_gw))
184            }
185        });
186        let gw_lnd_registered = JitTryAnyhow::new_try({
187            let gw_lnd = gw_lnd.clone();
188            let fed = fed.clone();
189            move || async move {
190                let gw_lnd = gw_lnd.get_try().await?.deref();
191                let fed = fed.get_try().await?.deref();
192                debug!(target: LOG_DEVIMINT, "Registering lnd gateway...");
193                let start_time = fedimint_core::time::now();
194                if !skip_setup && !pre_dkg {
195                    gw_lnd.connect_fed(fed).await?;
196                }
197                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Registered lnd gateway");
198                Ok(Arc::new(()))
199            }
200        });
201
202        let gw_ldk = JitTryAnyhow::new_try({
203            let process_mgr = process_mgr.to_owned();
204            let bitcoind = bitcoind.clone();
205            move || async move {
206                bitcoind.get_try().await?;
207                debug!(target: LOG_DEVIMINT, "Starting ldk gateway...");
208                let start_time = fedimint_core::time::now();
209                let ldk_gw = Gatewayd::new(
210                    &process_mgr,
211                    LightningNode::Ldk {
212                        name: "gatewayd-ldk-0".to_string(),
213                        gw_port: process_mgr.globals.FM_PORT_GW_LDK,
214                        ldk_port: process_mgr.globals.FM_PORT_LDK,
215                        metrics_port: process_mgr.globals.FM_PORT_GW_LDK_METRICS,
216                    },
217                    1,
218                )
219                .await?;
220                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started ldk gateway");
221                Ok(Arc::new(ldk_gw))
222            }
223        });
224        let gw_ldk_second = JitTryAnyhow::new_try({
225            let process_mgr = process_mgr.to_owned();
226            let bitcoind = bitcoind.clone();
227            move || async move {
228                bitcoind.get_try().await?;
229                debug!(target: LOG_DEVIMINT, "Starting ldk gateway 2...");
230                let start_time = fedimint_core::time::now();
231                let ldk_gw2 = Gatewayd::new(
232                    &process_mgr,
233                    LightningNode::Ldk {
234                        name: "gatewayd-ldk-1".to_string(),
235                        gw_port: process_mgr.globals.FM_PORT_GW_LDK2,
236                        ldk_port: process_mgr.globals.FM_PORT_LDK2,
237                        metrics_port: process_mgr.globals.FM_PORT_GW_LDK2_METRICS,
238                    },
239                    2,
240                )
241                .await?;
242                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started ldk gateway 2");
243                Ok(Arc::new(ldk_gw2))
244            }
245        });
246        let gw_ldk_connected = JitTryAnyhow::new_try({
247            let gw_ldk = gw_ldk.clone();
248            let fed = fed.clone();
249            move || async move {
250                let gw_ldk = gw_ldk.get_try().await?.deref();
251                if supports_lnv2() {
252                    let fed = fed.get_try().await?.deref();
253                    let start_time = fedimint_core::time::now();
254                    if !skip_setup && !pre_dkg {
255                        debug!(target: LOG_DEVIMINT, "Registering ldk gateway...");
256                        gw_ldk.connect_fed(fed).await?;
257                    } else {
258                        debug!(target: LOG_DEVIMINT, "Skipping registering ldk gateway");
259                    }
260                    info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected ldk gateway");
261                }
262                Ok(Arc::new(()))
263            }
264        });
265        let gw_ldk_second_connected = JitTryAnyhow::new_try({
266            let gw_ldk_second = gw_ldk_second.clone();
267            let fed = fed.clone();
268            move || async move {
269                let gw_ldk2 = gw_ldk_second.get_try().await?.deref();
270                if supports_lnv2() {
271                    let fed = fed.get_try().await?.deref();
272                    debug!(target: LOG_DEVIMINT, "Registering ldk gateway 2...");
273                    let start_time = fedimint_core::time::now();
274                    if !skip_setup && !pre_dkg {
275                        gw_ldk2.connect_fed(fed).await?;
276                    }
277                    info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected ldk gateway 2");
278                }
279                Ok(Arc::new(()))
280            }
281        });
282
283        let fed_epoch_generated = JitTryAnyhow::new_try({
284            let fed = fed.clone();
285            move || async move {
286                let fed = fed.get_try().await?.deref().clone();
287                debug!(target: LOG_DEVIMINT, "Generating federation epoch...");
288                let start_time = fedimint_core::time::now();
289                if !skip_setup && !pre_dkg {
290                    fed.mine_then_wait_blocks_sync(10).await?;
291                }
292                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Generated federation epoch");
293                Ok(Arc::new(()))
294            }
295        });
296
297        let channel_opened = JitTryAnyhow::new_try({
298            let gw_lnd = gw_lnd.clone();
299            let gw_ldk = gw_ldk.clone();
300            let gw_ldk_second = gw_ldk_second.clone();
301            let bitcoind = bitcoind.clone();
302            let fed_epoch_generated = fed_epoch_generated.clone();
303            move || async move {
304                // Note: We open new channel even if starting from existing state
305                // as ports change on every start, and without this nodes will not find each
306                // other.
307                let bitcoind = bitcoind.get_try().await?.deref().clone();
308
309                // Wait for an epoch to occur since that mines blocks, which can cause opening
310                // channels to be racy
311                fed_epoch_generated.get_try().await?;
312
313                let gw_ldk_second = gw_ldk_second.get_try().await?.deref();
314                let gw_ldk = gw_ldk.get_try().await?.deref();
315                let gw_lnd = gw_lnd.get_try().await?.deref();
316                let gateways: &[NamedGateway<'_>] =
317                    &[(gw_ldk_second, "LDK2"), (gw_lnd, "LND"), (gw_ldk, "LDK")];
318
319                debug!(target: LOG_DEVIMINT, "Opening channels between gateways...");
320                let start_time = fedimint_core::time::now();
321                open_channels_between_gateways(&bitcoind, gateways).await?;
322                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Opened channels between gateways");
323
324                Ok(Arc::new(()))
325            }
326        });
327
328        let recurringd = JitTryAnyhow::new_try({
329            let process_mgr = process_mgr.to_owned();
330            move || async move {
331                debug!(target: LOG_DEVIMINT, "Starting recurringd...");
332                let start_time = fedimint_core::time::now();
333                let recurringd = Recurringd::new(&process_mgr).await?;
334                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started recurringd");
335                Ok(Arc::new(recurringd))
336            }
337        });
338
339        let recurringd_connected = JitTryAnyhow::new_try({
340            let recurringd = recurringd.clone();
341            let fed = fed.clone();
342            move || async move {
343                let recurringd = recurringd.get_try().await?.deref();
344                let fed = fed.get_try().await?.deref();
345                debug!(target: LOG_DEVIMINT, "Connecting recurringd to federation...");
346                let start_time = fedimint_core::time::now();
347                if !skip_setup && !pre_dkg {
348                    let invite_code = fed.invite_code()?;
349                    recurringd.add_federation(&invite_code).await?;
350                }
351                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected recurringd to federation");
352                Ok(Arc::new(()))
353            }
354        });
355
356        let recurringdv2 = JitTryAnyhow::new_try({
357            let process_mgr = process_mgr.to_owned();
358            move || async move {
359                debug!(target: LOG_DEVIMINT, "Starting recurringdv2...");
360                let start_time = fedimint_core::time::now();
361                let recurringdv2 = Recurringdv2::new(&process_mgr).await?;
362                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started recurringdv2");
363                Ok(Arc::new(recurringdv2))
364            }
365        });
366
367        Ok(DevJitFed {
368            bitcoind,
369            lnd,
370            fed,
371            gw_lnd,
372            gw_ldk,
373            gw_ldk_second,
374            esplora,
375            recurringd,
376            recurringdv2,
377            start_time,
378            gw_lnd_registered,
379            gw_ldk_connected,
380            gw_ldk_second_connected,
381            fed_epoch_generated,
382            channel_opened,
383            recurringd_connected,
384            skip_setup,
385            pre_dkg,
386        })
387    }
388
389    pub async fn esplora(&self) -> anyhow::Result<&Esplora> {
390        Ok(self.esplora.get_try().await?.deref())
391    }
392    pub async fn lnd(&self) -> anyhow::Result<&Lnd> {
393        Ok(self.lnd.get_try().await?.deref())
394    }
395    pub async fn gw_lnd(&self) -> anyhow::Result<&Gatewayd> {
396        Ok(self.gw_lnd.get_try().await?.deref())
397    }
398    pub async fn gw_lnd_registered(&self) -> anyhow::Result<&Gatewayd> {
399        self.gw_lnd_registered.get_try().await?;
400        Ok(self.gw_lnd.get_try().await?.deref())
401    }
402    pub async fn gw_ldk(&self) -> anyhow::Result<&Gatewayd> {
403        Ok(self.gw_ldk.get_try().await?.deref())
404    }
405    pub async fn gw_ldk_second(&self) -> anyhow::Result<&Gatewayd> {
406        Ok(self.gw_ldk_second.get_try().await?.deref())
407    }
408    pub async fn gw_ldk_connected(&self) -> anyhow::Result<&Gatewayd> {
409        self.gw_ldk_connected.get_try().await?;
410        Ok(self.gw_ldk.get_try().await?.deref())
411    }
412    pub async fn gw_ldk_second_connected(&self) -> anyhow::Result<&Gatewayd> {
413        self.gw_ldk_second_connected.get_try().await?;
414        Ok(self.gw_ldk_second.get_try().await?.deref())
415    }
416    pub async fn fed(&self) -> anyhow::Result<&Federation> {
417        Ok(self.fed.get_try().await?.deref())
418    }
419    pub async fn bitcoind(&self) -> anyhow::Result<&Bitcoind> {
420        Ok(self.bitcoind.get_try().await?.deref())
421    }
422
423    pub async fn internal_client(&self) -> anyhow::Result<Client> {
424        Ok(self.fed().await?.internal_client().await?.clone())
425    }
426
427    /// Like [`Self::internal_client`] but will check and wait for a LN gateway
428    /// to be registered
429    pub async fn internal_client_gw_registered(&self) -> anyhow::Result<Client> {
430        self.fed().await?.await_gateways_registered().await?;
431        Ok(self.fed().await?.internal_client().await?.clone())
432    }
433
434    pub async fn recurringd(&self) -> anyhow::Result<&Recurringd> {
435        Ok(self.recurringd.get_try().await?.deref())
436    }
437
438    pub async fn recurringd_connected(&self) -> anyhow::Result<&Recurringd> {
439        self.recurringd_connected.get_try().await?;
440        Ok(self.recurringd.get_try().await?.deref())
441    }
442
443    pub async fn recurringdv2(&self) -> anyhow::Result<&Recurringdv2> {
444        Ok(self.recurringdv2.get_try().await?.deref())
445    }
446
447    pub async fn finalize(&self, process_mgr: &ProcessManager) -> anyhow::Result<()> {
448        let fed_size = process_mgr.globals.FM_FED_SIZE;
449        let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
450        anyhow::ensure!(
451            fed_size > 3 * offline_nodes,
452            "too many offline nodes ({offline_nodes}) to reach consensus"
453        );
454
455        if !self.pre_dkg && !self.skip_setup {
456            let _ = self.internal_client_gw_registered().await?;
457        }
458        let _ = self.channel_opened.get_try().await?;
459        let _ = self.gw_lnd_registered().await?;
460        let _ = self.gw_ldk_connected().await?;
461        let _ = self.gw_ldk_second_connected().await?;
462        let _ = self.lnd().await?;
463        let _ = self.esplora().await?;
464        let _ = self.recurringd_connected().await?;
465        let _ = self.recurringdv2().await?;
466        let _ = self.fed_epoch_generated.get_try().await?;
467
468        debug!(
469            target: LOG_DEVIMINT,
470            fed_size,
471            offline_nodes,
472            elapsed_ms = %self.start_time.elapsed()?.as_millis(),
473            "Dev federation ready",
474        );
475        Ok(())
476    }
477
478    pub async fn to_dev_fed(self, process_mgr: &ProcessManager) -> anyhow::Result<DevFed> {
479        self.finalize(process_mgr).await?;
480        Ok(DevFed {
481            bitcoind: self.bitcoind().await?.to_owned(),
482            lnd: self.lnd().await?.to_owned(),
483            fed: self.fed().await?.to_owned(),
484            gw_lnd: self.gw_lnd().await?.to_owned(),
485            gw_ldk: self.gw_ldk().await?.to_owned(),
486            gw_ldk_second: self.gw_ldk_second().await?.to_owned(),
487            esplora: self.esplora().await?.to_owned(),
488            recurringd: self.recurringd().await?.to_owned(),
489            recurringdv2: self.recurringdv2().await?.to_owned(),
490        })
491    }
492
493    pub async fn fast_terminate(self) {
494        let Self {
495            bitcoind,
496            lnd,
497            fed,
498            gw_lnd,
499            esplora,
500            gw_ldk,
501            gw_ldk_second,
502            recurringd,
503            recurringdv2,
504            ..
505        } = self;
506
507        join!(
508            spawn_drop(gw_lnd),
509            spawn_drop(gw_ldk),
510            spawn_drop(gw_ldk_second),
511            spawn_drop(fed),
512            spawn_drop(lnd),
513            spawn_drop(esplora),
514            spawn_drop(bitcoind),
515            spawn_drop(recurringd),
516            spawn_drop(recurringdv2),
517        );
518    }
519}