Skip to main content

devimint/
devfed.rs

1use std::ops::Deref as _;
2use std::sync::Arc;
3
4use anyhow::Result;
5use fedimint_core::runtime;
6use fedimint_core::task::jit::{JitTry, JitTryAnyhow};
7use fedimint_logging::LOG_DEVIMINT;
8use tokio::join;
9use tracing::{debug, info};
10
11use crate::LightningNode;
12use crate::external::{Bitcoind, Esplora, Lnd, NamedGateway, open_channels_between_gateways};
13use crate::federation::{Client, Federation};
14use crate::gatewayd::Gatewayd;
15use crate::recurringd::Recurringd;
16use crate::recurringdv2::Recurringdv2;
17use crate::util::{ProcessManager, supports_lnv2};
18
19async fn spawn_drop<T>(t: T)
20where
21    T: Send + 'static,
22{
23    runtime::spawn("spawn_drop", async {
24        drop(t);
25    })
26    .await
27    .expect("drop panic");
28}
29
30#[derive(Clone)]
31pub struct DevFed {
32    pub bitcoind: Bitcoind,
33    pub lnd: Lnd,
34    pub fed: Federation,
35    pub gw_lnd: Gatewayd,
36    pub gw_ldk: Gatewayd,
37    pub gw_ldk_second: Gatewayd,
38    pub esplora: Esplora,
39    pub recurringd: Recurringd,
40    pub recurringdv2: Recurringdv2,
41}
42
43impl DevFed {
44    pub async fn fast_terminate(self) {
45        let Self {
46            bitcoind,
47            lnd,
48            fed,
49            gw_lnd,
50            gw_ldk,
51            gw_ldk_second,
52            esplora,
53            recurringd,
54            recurringdv2,
55        } = self;
56
57        join!(
58            spawn_drop(gw_lnd),
59            spawn_drop(gw_ldk),
60            spawn_drop(gw_ldk_second),
61            spawn_drop(fed),
62            spawn_drop(lnd),
63            spawn_drop(esplora),
64            spawn_drop(bitcoind),
65            spawn_drop(recurringd),
66            spawn_drop(recurringdv2),
67        );
68    }
69}
70pub async fn dev_fed(process_mgr: &ProcessManager) -> Result<DevFed> {
71    DevJitFed::new(process_mgr, false, false)?
72        .to_dev_fed(process_mgr)
73        .await
74}
75
76type JitArc<T> = JitTryAnyhow<Arc<T>>;
77
78#[derive(Clone)]
79pub struct DevJitFed {
80    bitcoind: JitArc<Bitcoind>,
81    lnd: JitArc<Lnd>,
82    fed: JitArc<Federation>,
83    gw_lnd: JitArc<Gatewayd>,
84    gw_ldk: JitArc<Gatewayd>,
85    gw_ldk_second: JitArc<Gatewayd>,
86    esplora: JitArc<Esplora>,
87    recurringd: JitArc<Recurringd>,
88    recurringdv2: JitArc<Recurringdv2>,
89    start_time: std::time::SystemTime,
90    gw_lnd_registered: JitArc<()>,
91    gw_ldk_connected: JitArc<()>,
92    gw_ldk_second_connected: JitArc<()>,
93    fed_epoch_generated: JitArc<()>,
94    channel_opened: JitArc<()>,
95    recurringd_connected: JitArc<()>,
96
97    skip_setup: bool,
98    pre_dkg: bool,
99}
100
101impl DevJitFed {
102    pub fn new(process_mgr: &ProcessManager, skip_setup: bool, pre_dkg: bool) -> Result<DevJitFed> {
103        let fed_size = process_mgr.globals.FM_FED_SIZE;
104        let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
105        anyhow::ensure!(
106            fed_size > 3 * offline_nodes,
107            "too many offline nodes ({offline_nodes}) to reach consensus"
108        );
109        let start_time = fedimint_core::time::now();
110
111        debug!(target: LOG_DEVIMINT, %fed_size, %offline_nodes, "Starting dev federation");
112
113        let bitcoind = JitTry::new_try({
114            let process_mgr = process_mgr.to_owned();
115            move || async move {
116                debug!(target: LOG_DEVIMINT, "Starting bitcoind...");
117                let start_time = fedimint_core::time::now();
118                let bitcoind = Bitcoind::new(&process_mgr, skip_setup).await?;
119                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started bitcoind");
120                Ok(Arc::new(bitcoind))
121            }
122        });
123        let lnd = JitTry::new_try({
124            let process_mgr = process_mgr.to_owned();
125            let bitcoind = bitcoind.clone();
126            || async move {
127                let bitcoind = bitcoind.get_try().await?.deref().clone();
128                debug!(target: LOG_DEVIMINT, "Starting lnd...");
129                let start_time = fedimint_core::time::now();
130                let lnd = Lnd::new(&process_mgr, bitcoind).await?;
131                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started lnd");
132                Ok(Arc::new(lnd))
133            }
134        });
135        let esplora = JitTryAnyhow::new_try({
136            let process_mgr = process_mgr.to_owned();
137            let bitcoind = bitcoind.clone();
138            || async move {
139                let bitcoind = bitcoind.get_try().await?.deref().clone();
140                debug!(target: LOG_DEVIMINT, "Starting esplora...");
141                let start_time = fedimint_core::time::now();
142                let esplora = Esplora::new(&process_mgr, bitcoind).await?;
143                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started esplora");
144                Ok(Arc::new(esplora))
145            }
146        });
147
148        let fed = JitTryAnyhow::new_try({
149            let process_mgr = process_mgr.to_owned();
150            let bitcoind = bitcoind.clone();
151            move || async move {
152                let bitcoind = bitcoind.get_try().await?.deref().clone();
153                debug!(target: LOG_DEVIMINT, "Starting federation...");
154                let start_time = fedimint_core::time::now();
155                let mut fed = Federation::new(
156                    &process_mgr,
157                    bitcoind,
158                    skip_setup,
159                    pre_dkg,
160                    0,
161                    "default".to_string(),
162                )
163                .await?;
164
165                // Create a degraded federation if there are offline nodes
166                fed.degrade_federation(&process_mgr).await?;
167
168                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started federation");
169
170                Ok(Arc::new(fed))
171            }
172        });
173
174        let gw_lnd = JitTryAnyhow::new_try({
175            let process_mgr = process_mgr.to_owned();
176            let lnd = lnd.clone();
177            || async move {
178                let lnd = lnd.get_try().await?.deref().clone();
179                debug!(target: LOG_DEVIMINT, "Starting lnd gateway...");
180                let start_time = fedimint_core::time::now();
181                let lnd_gw = Gatewayd::new(&process_mgr, LightningNode::Lnd(lnd), 0).await?;
182                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started lnd gateway");
183                Ok(Arc::new(lnd_gw))
184            }
185        });
186        let gw_lnd_registered = JitTryAnyhow::new_try({
187            let gw_lnd = gw_lnd.clone();
188            let fed = fed.clone();
189            move || async move {
190                let gw_lnd = gw_lnd.get_try().await?.deref();
191                let fed = fed.get_try().await?.deref();
192                debug!(target: LOG_DEVIMINT, "Registering lnd gateway...");
193                let start_time = fedimint_core::time::now();
194                if !skip_setup && !pre_dkg {
195                    let invite = fed.invite_code()?;
196                    gw_lnd.client().connect_fed(invite).await?;
197                }
198                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Registered lnd gateway");
199                Ok(Arc::new(()))
200            }
201        });
202
203        let gw_ldk = JitTryAnyhow::new_try({
204            let process_mgr = process_mgr.to_owned();
205            let bitcoind = bitcoind.clone();
206            move || async move {
207                bitcoind.get_try().await?;
208                debug!(target: LOG_DEVIMINT, "Starting ldk gateway...");
209                let start_time = fedimint_core::time::now();
210                let ldk_gw = Gatewayd::new(
211                    &process_mgr,
212                    LightningNode::Ldk {
213                        name: "gatewayd-ldk-0".to_string(),
214                        gw_port: process_mgr.globals.FM_PORT_GW_LDK,
215                        ldk_port: process_mgr.globals.FM_PORT_LDK,
216                        metrics_port: process_mgr.globals.FM_PORT_GW_LDK_METRICS,
217                    },
218                    1,
219                )
220                .await?;
221                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started ldk gateway");
222                Ok(Arc::new(ldk_gw))
223            }
224        });
225        let gw_ldk_second = JitTryAnyhow::new_try({
226            let process_mgr = process_mgr.to_owned();
227            let bitcoind = bitcoind.clone();
228            move || async move {
229                bitcoind.get_try().await?;
230                debug!(target: LOG_DEVIMINT, "Starting ldk gateway 2...");
231                let start_time = fedimint_core::time::now();
232                let ldk_gw2 = Gatewayd::new(
233                    &process_mgr,
234                    LightningNode::Ldk {
235                        name: "gatewayd-ldk-1".to_string(),
236                        gw_port: process_mgr.globals.FM_PORT_GW_LDK2,
237                        ldk_port: process_mgr.globals.FM_PORT_LDK2,
238                        metrics_port: process_mgr.globals.FM_PORT_GW_LDK2_METRICS,
239                    },
240                    2,
241                )
242                .await?;
243                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started ldk gateway 2");
244                Ok(Arc::new(ldk_gw2))
245            }
246        });
247        let gw_ldk_connected = JitTryAnyhow::new_try({
248            let gw_ldk = gw_ldk.clone();
249            let fed = fed.clone();
250            move || async move {
251                let gw_ldk = gw_ldk.get_try().await?.deref();
252                if supports_lnv2() {
253                    let fed = fed.get_try().await?.deref();
254                    let start_time = fedimint_core::time::now();
255                    if !skip_setup && !pre_dkg {
256                        debug!(target: LOG_DEVIMINT, "Registering ldk gateway...");
257                        let invite = fed.invite_code()?;
258                        gw_ldk.client().connect_fed(invite).await?;
259                    } else {
260                        debug!(target: LOG_DEVIMINT, "Skipping registering ldk gateway");
261                    }
262                    info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected ldk gateway");
263                }
264                Ok(Arc::new(()))
265            }
266        });
267        let gw_ldk_second_connected = JitTryAnyhow::new_try({
268            let gw_ldk_second = gw_ldk_second.clone();
269            let fed = fed.clone();
270            move || async move {
271                let gw_ldk2 = gw_ldk_second.get_try().await?.deref();
272                if supports_lnv2() {
273                    let fed = fed.get_try().await?.deref();
274                    debug!(target: LOG_DEVIMINT, "Registering ldk gateway 2...");
275                    let start_time = fedimint_core::time::now();
276                    if !skip_setup && !pre_dkg {
277                        let invite = fed.invite_code()?;
278                        gw_ldk2.client().connect_fed(invite).await?;
279                    }
280                    info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected ldk gateway 2");
281                }
282                Ok(Arc::new(()))
283            }
284        });
285
286        let fed_epoch_generated = JitTryAnyhow::new_try({
287            let fed = fed.clone();
288            move || async move {
289                let fed = fed.get_try().await?.deref().clone();
290                debug!(target: LOG_DEVIMINT, "Generating federation epoch...");
291                let start_time = fedimint_core::time::now();
292                if !skip_setup && !pre_dkg {
293                    fed.mine_then_wait_blocks_sync(10).await?;
294                }
295                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Generated federation epoch");
296                Ok(Arc::new(()))
297            }
298        });
299
300        let channel_opened = JitTryAnyhow::new_try({
301            let gw_lnd = gw_lnd.clone();
302            let gw_ldk = gw_ldk.clone();
303            let gw_ldk_second = gw_ldk_second.clone();
304            let bitcoind = bitcoind.clone();
305            let fed_epoch_generated = fed_epoch_generated.clone();
306            move || async move {
307                // Note: We open new channel even if starting from existing state
308                // as ports change on every start, and without this nodes will not find each
309                // other.
310                let bitcoind = bitcoind.get_try().await?.deref().clone();
311
312                // Wait for an epoch to occur since that mines blocks, which can cause opening
313                // channels to be racy
314                fed_epoch_generated.get_try().await?;
315
316                let gw_ldk_second = gw_ldk_second.get_try().await?.deref();
317                let gw_ldk = gw_ldk.get_try().await?.deref();
318                let gw_lnd = gw_lnd.get_try().await?.deref();
319                let gateways: &[NamedGateway<'_>] =
320                    &[(gw_ldk_second, "LDK2"), (gw_lnd, "LND"), (gw_ldk, "LDK")];
321
322                debug!(target: LOG_DEVIMINT, "Opening channels between gateways...");
323                let start_time = fedimint_core::time::now();
324                open_channels_between_gateways(&bitcoind, gateways).await?;
325                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Opened channels between gateways");
326
327                Ok(Arc::new(()))
328            }
329        });
330
331        let recurringd = JitTryAnyhow::new_try({
332            let process_mgr = process_mgr.to_owned();
333            move || async move {
334                debug!(target: LOG_DEVIMINT, "Starting recurringd...");
335                let start_time = fedimint_core::time::now();
336                let recurringd = Recurringd::new(&process_mgr).await?;
337                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started recurringd");
338                Ok(Arc::new(recurringd))
339            }
340        });
341
342        let recurringd_connected = JitTryAnyhow::new_try({
343            let recurringd = recurringd.clone();
344            let fed = fed.clone();
345            move || async move {
346                let recurringd = recurringd.get_try().await?.deref();
347                let fed = fed.get_try().await?.deref();
348                debug!(target: LOG_DEVIMINT, "Connecting recurringd to federation...");
349                let start_time = fedimint_core::time::now();
350                if !skip_setup && !pre_dkg {
351                    let invite_code = fed.invite_code()?;
352                    recurringd.add_federation(&invite_code).await?;
353                }
354                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected recurringd to federation");
355                Ok(Arc::new(()))
356            }
357        });
358
359        let recurringdv2 = JitTryAnyhow::new_try({
360            let process_mgr = process_mgr.to_owned();
361            move || async move {
362                debug!(target: LOG_DEVIMINT, "Starting recurringdv2...");
363                let start_time = fedimint_core::time::now();
364                let recurringdv2 = Recurringdv2::new(&process_mgr).await?;
365                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started recurringdv2");
366                Ok(Arc::new(recurringdv2))
367            }
368        });
369
370        Ok(DevJitFed {
371            bitcoind,
372            lnd,
373            fed,
374            gw_lnd,
375            gw_ldk,
376            gw_ldk_second,
377            esplora,
378            recurringd,
379            recurringdv2,
380            start_time,
381            gw_lnd_registered,
382            gw_ldk_connected,
383            gw_ldk_second_connected,
384            fed_epoch_generated,
385            channel_opened,
386            recurringd_connected,
387            skip_setup,
388            pre_dkg,
389        })
390    }
391
392    pub async fn esplora(&self) -> anyhow::Result<&Esplora> {
393        Ok(self.esplora.get_try().await?.deref())
394    }
395    pub async fn lnd(&self) -> anyhow::Result<&Lnd> {
396        Ok(self.lnd.get_try().await?.deref())
397    }
398    pub async fn gw_lnd(&self) -> anyhow::Result<&Gatewayd> {
399        Ok(self.gw_lnd.get_try().await?.deref())
400    }
401    pub async fn gw_lnd_registered(&self) -> anyhow::Result<&Gatewayd> {
402        self.gw_lnd_registered.get_try().await?;
403        Ok(self.gw_lnd.get_try().await?.deref())
404    }
405    pub async fn gw_ldk(&self) -> anyhow::Result<&Gatewayd> {
406        Ok(self.gw_ldk.get_try().await?.deref())
407    }
408    pub async fn gw_ldk_second(&self) -> anyhow::Result<&Gatewayd> {
409        Ok(self.gw_ldk_second.get_try().await?.deref())
410    }
411    pub async fn gw_ldk_connected(&self) -> anyhow::Result<&Gatewayd> {
412        self.gw_ldk_connected.get_try().await?;
413        Ok(self.gw_ldk.get_try().await?.deref())
414    }
415    pub async fn gw_ldk_second_connected(&self) -> anyhow::Result<&Gatewayd> {
416        self.gw_ldk_second_connected.get_try().await?;
417        Ok(self.gw_ldk_second.get_try().await?.deref())
418    }
419    pub async fn fed(&self) -> anyhow::Result<&Federation> {
420        Ok(self.fed.get_try().await?.deref())
421    }
422    pub async fn bitcoind(&self) -> anyhow::Result<&Bitcoind> {
423        Ok(self.bitcoind.get_try().await?.deref())
424    }
425
426    pub async fn internal_client(&self) -> anyhow::Result<Client> {
427        Ok(self.fed().await?.internal_client().await?.clone())
428    }
429
430    /// Like [`Self::internal_client`] but will check and wait for a LN gateway
431    /// to be registered
432    pub async fn internal_client_gw_registered(&self) -> anyhow::Result<Client> {
433        self.fed().await?.await_gateways_registered().await?;
434        Ok(self.fed().await?.internal_client().await?.clone())
435    }
436
437    pub async fn recurringd(&self) -> anyhow::Result<&Recurringd> {
438        Ok(self.recurringd.get_try().await?.deref())
439    }
440
441    pub async fn recurringd_connected(&self) -> anyhow::Result<&Recurringd> {
442        self.recurringd_connected.get_try().await?;
443        Ok(self.recurringd.get_try().await?.deref())
444    }
445
446    pub async fn recurringdv2(&self) -> anyhow::Result<&Recurringdv2> {
447        Ok(self.recurringdv2.get_try().await?.deref())
448    }
449
450    pub async fn finalize(&self, process_mgr: &ProcessManager) -> anyhow::Result<()> {
451        let fed_size = process_mgr.globals.FM_FED_SIZE;
452        let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
453        anyhow::ensure!(
454            fed_size > 3 * offline_nodes,
455            "too many offline nodes ({offline_nodes}) to reach consensus"
456        );
457
458        if !self.pre_dkg && !self.skip_setup {
459            let _ = self.internal_client_gw_registered().await?;
460        }
461        let _ = self.channel_opened.get_try().await?;
462        let _ = self.gw_lnd_registered().await?;
463        let _ = self.gw_ldk_connected().await?;
464        let _ = self.gw_ldk_second_connected().await?;
465        let _ = self.lnd().await?;
466        let _ = self.esplora().await?;
467        let _ = self.recurringd_connected().await?;
468        let _ = self.recurringdv2().await?;
469        let _ = self.fed_epoch_generated.get_try().await?;
470
471        debug!(
472            target: LOG_DEVIMINT,
473            fed_size,
474            offline_nodes,
475            elapsed_ms = %self.start_time.elapsed()?.as_millis(),
476            "Dev federation ready",
477        );
478        Ok(())
479    }
480
481    pub async fn to_dev_fed(self, process_mgr: &ProcessManager) -> anyhow::Result<DevFed> {
482        self.finalize(process_mgr).await?;
483        Ok(DevFed {
484            bitcoind: self.bitcoind().await?.to_owned(),
485            lnd: self.lnd().await?.to_owned(),
486            fed: self.fed().await?.to_owned(),
487            gw_lnd: self.gw_lnd().await?.to_owned(),
488            gw_ldk: self.gw_ldk().await?.to_owned(),
489            gw_ldk_second: self.gw_ldk_second().await?.to_owned(),
490            esplora: self.esplora().await?.to_owned(),
491            recurringd: self.recurringd().await?.to_owned(),
492            recurringdv2: self.recurringdv2().await?.to_owned(),
493        })
494    }
495
496    pub async fn fast_terminate(self) {
497        let Self {
498            bitcoind,
499            lnd,
500            fed,
501            gw_lnd,
502            esplora,
503            gw_ldk,
504            gw_ldk_second,
505            recurringd,
506            recurringdv2,
507            ..
508        } = self;
509
510        join!(
511            spawn_drop(gw_lnd),
512            spawn_drop(gw_ldk),
513            spawn_drop(gw_ldk_second),
514            spawn_drop(fed),
515            spawn_drop(lnd),
516            spawn_drop(esplora),
517            spawn_drop(bitcoind),
518            spawn_drop(recurringd),
519            spawn_drop(recurringdv2),
520        );
521    }
522}