Skip to main content

devimint/
devfed.rs

1use std::ops::Deref as _;
2use std::sync::Arc;
3
4use anyhow::Result;
5use fedimint_core::runtime;
6use fedimint_core::task::jit::{JitTry, JitTryAnyhow};
7use fedimint_logging::LOG_DEVIMINT;
8use tokio::join;
9use tracing::{debug, info};
10
11use crate::LightningNode;
12use crate::external::{Bitcoind, Esplora, Lnd, NamedGateway, open_channels_between_gateways};
13use crate::federation::{Client, Federation};
14use crate::gatewayd::Gatewayd;
15use crate::recurringd::Recurringd;
16use crate::recurringdv2::Recurringdv2;
17use crate::util::{ProcessManager, supports_lnv2};
18
19async fn spawn_drop<T>(t: T)
20where
21    T: Send + 'static,
22{
23    runtime::spawn("spawn_drop", async {
24        drop(t);
25    })
26    .await
27    .expect("drop panic");
28}
29
30#[derive(Clone)]
31pub struct DevFed {
32    pub bitcoind: Bitcoind,
33    pub lnd: Lnd,
34    pub fed: Federation,
35    pub gw_lnd: Gatewayd,
36    pub gw_ldk: Gatewayd,
37    pub gw_ldk_second: Gatewayd,
38    pub esplora: Esplora,
39    pub recurringd: Recurringd,
40    pub recurringdv2: Recurringdv2,
41}
42
43impl DevFed {
44    pub async fn fast_terminate(self) {
45        let Self {
46            bitcoind,
47            lnd,
48            fed,
49            gw_lnd,
50            gw_ldk,
51            gw_ldk_second,
52            esplora,
53            recurringd,
54            recurringdv2,
55        } = self;
56
57        join!(
58            spawn_drop(gw_lnd),
59            spawn_drop(gw_ldk),
60            spawn_drop(gw_ldk_second),
61            spawn_drop(fed),
62            spawn_drop(lnd),
63            spawn_drop(esplora),
64            spawn_drop(bitcoind),
65            spawn_drop(recurringd),
66            spawn_drop(recurringdv2),
67        );
68    }
69}
70pub async fn dev_fed(process_mgr: &ProcessManager) -> Result<DevFed> {
71    DevJitFed::new(process_mgr, false, false)?
72        .to_dev_fed(process_mgr)
73        .await
74}
75
76type JitArc<T> = JitTryAnyhow<Arc<T>>;
77
78#[derive(Clone)]
79pub struct DevJitFed {
80    bitcoind: JitArc<Bitcoind>,
81    lnd: JitArc<Lnd>,
82    fed: JitArc<Federation>,
83    gw_lnd: JitArc<Gatewayd>,
84    gw_ldk: JitArc<Gatewayd>,
85    gw_ldk_second: JitArc<Gatewayd>,
86    esplora: JitArc<Esplora>,
87    recurringd: JitArc<Recurringd>,
88    recurringdv2: JitArc<Recurringdv2>,
89    start_time: std::time::SystemTime,
90    gw_lnd_registered: JitArc<()>,
91    gw_ldk_connected: JitArc<()>,
92    gw_ldk_second_connected: JitArc<()>,
93    fed_epoch_generated: JitArc<()>,
94    channel_opened: JitArc<()>,
95    recurringd_connected: JitArc<()>,
96
97    skip_setup: bool,
98    pre_dkg: bool,
99}
100
101impl DevJitFed {
102    pub fn new(process_mgr: &ProcessManager, skip_setup: bool, pre_dkg: bool) -> Result<DevJitFed> {
103        let fed_size = process_mgr.globals.FM_FED_SIZE;
104        let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
105        anyhow::ensure!(
106            fed_size > 3 * offline_nodes,
107            "too many offline nodes ({offline_nodes}) to reach consensus"
108        );
109        let start_time = fedimint_core::time::now();
110
111        debug!(target: LOG_DEVIMINT, %fed_size, %offline_nodes, "Starting dev federation");
112
113        let bitcoind = JitTry::new_try({
114            let process_mgr = process_mgr.to_owned();
115            move || async move {
116                debug!(target: LOG_DEVIMINT, "Starting bitcoind...");
117                let start_time = fedimint_core::time::now();
118                let bitcoind = Bitcoind::new(&process_mgr, skip_setup).await?;
119                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started bitcoind");
120                Ok(Arc::new(bitcoind))
121            }
122        });
123        let lnd = JitTry::new_try({
124            let process_mgr = process_mgr.to_owned();
125            let bitcoind = bitcoind.clone();
126            || async move {
127                let bitcoind = bitcoind.get_try().await?.deref().clone();
128                debug!(target: LOG_DEVIMINT, "Starting lnd...");
129                let start_time = fedimint_core::time::now();
130                let lnd = Lnd::new(&process_mgr, bitcoind).await?;
131                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started lnd");
132                Ok(Arc::new(lnd))
133            }
134        });
135        let esplora = JitTryAnyhow::new_try({
136            let process_mgr = process_mgr.to_owned();
137            let bitcoind = bitcoind.clone();
138            || async move {
139                let bitcoind = bitcoind.get_try().await?.deref().clone();
140                debug!(target: LOG_DEVIMINT, "Starting esplora...");
141                let start_time = fedimint_core::time::now();
142                let esplora = Esplora::new(&process_mgr, bitcoind).await?;
143                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started esplora");
144                Ok(Arc::new(esplora))
145            }
146        });
147
148        let fed = JitTryAnyhow::new_try({
149            let process_mgr = process_mgr.to_owned();
150            let bitcoind = bitcoind.clone();
151            move || async move {
152                let bitcoind = bitcoind.get_try().await?.deref().clone();
153                debug!(target: LOG_DEVIMINT, "Starting federation...");
154                let start_time = fedimint_core::time::now();
155                let mut fed = Federation::new(
156                    &process_mgr,
157                    bitcoind,
158                    skip_setup,
159                    pre_dkg,
160                    0,
161                    "default".to_string(),
162                )
163                .await?;
164
165                // Create a degraded federation if there are offline nodes
166                fed.degrade_federation(&process_mgr).await?;
167
168                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started federation");
169
170                Ok(Arc::new(fed))
171            }
172        });
173
174        let gw_lnd = JitTryAnyhow::new_try({
175            let process_mgr = process_mgr.to_owned();
176            let lnd = lnd.clone();
177            || async move {
178                let lnd = lnd.get_try().await?.deref().clone();
179                debug!(target: LOG_DEVIMINT, "Starting lnd gateway...");
180                let start_time = fedimint_core::time::now();
181                let lnd_gw = Gatewayd::new(&process_mgr, LightningNode::Lnd(lnd), 0).await?;
182                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started lnd gateway");
183                Ok(Arc::new(lnd_gw))
184            }
185        });
186        let gw_lnd_registered = JitTryAnyhow::new_try({
187            let gw_lnd = gw_lnd.clone();
188            let fed = fed.clone();
189            move || async move {
190                let gw_lnd = gw_lnd.get_try().await?.deref();
191                let fed = fed.get_try().await?.deref();
192                debug!(target: LOG_DEVIMINT, "Registering lnd gateway...");
193                let start_time = fedimint_core::time::now();
194                if !skip_setup && !pre_dkg {
195                    let invite = fed.invite_code()?;
196                    gw_lnd.client().connect_fed(invite).await?;
197                }
198                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Registered lnd gateway");
199                Ok(Arc::new(()))
200            }
201        });
202
203        let gw_ldk = JitTryAnyhow::new_try({
204            let process_mgr = process_mgr.to_owned();
205            let bitcoind = bitcoind.clone();
206            move || async move {
207                bitcoind.get_try().await?;
208                debug!(target: LOG_DEVIMINT, "Starting ldk gateway...");
209                let start_time = fedimint_core::time::now();
210                let ldk_gw = Gatewayd::new(
211                    &process_mgr,
212                    LightningNode::Ldk {
213                        name: "gatewayd-ldk-0".to_string(),
214                        gw_port: process_mgr.globals.FM_PORT_GW_LDK,
215                        ldk_port: process_mgr.globals.FM_PORT_LDK,
216                        metrics_port: process_mgr.globals.FM_PORT_GW_LDK_METRICS,
217                    },
218                    1,
219                )
220                .await?;
221                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started ldk gateway");
222                Ok(Arc::new(ldk_gw))
223            }
224        });
225        let gw_ldk_second = JitTryAnyhow::new_try({
226            let process_mgr = process_mgr.to_owned();
227            let bitcoind = bitcoind.clone();
228            move || async move {
229                bitcoind.get_try().await?;
230                debug!(target: LOG_DEVIMINT, "Starting ldk gateway 2...");
231                let start_time = fedimint_core::time::now();
232                let ldk_gw2 = Gatewayd::new(
233                    &process_mgr,
234                    LightningNode::Ldk {
235                        name: "gatewayd-ldk-1".to_string(),
236                        gw_port: process_mgr.globals.FM_PORT_GW_LDK2,
237                        ldk_port: process_mgr.globals.FM_PORT_LDK2,
238                        metrics_port: process_mgr.globals.FM_PORT_GW_LDK2_METRICS,
239                    },
240                    2,
241                )
242                .await?;
243                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started ldk gateway 2");
244                Ok(Arc::new(ldk_gw2))
245            }
246        });
247        let gw_ldk_connected = JitTryAnyhow::new_try({
248            let gw_ldk = gw_ldk.clone();
249            let fed = fed.clone();
250            move || async move {
251                let gw_ldk = gw_ldk.get_try().await?.deref();
252                if supports_lnv2() {
253                    let fed = fed.get_try().await?.deref();
254                    let start_time = fedimint_core::time::now();
255                    if !skip_setup && !pre_dkg {
256                        debug!(target: LOG_DEVIMINT, "Registering ldk gateway...");
257                        let invite = fed.invite_code()?;
258                        gw_ldk.client().connect_fed(invite).await?;
259                    } else {
260                        debug!(target: LOG_DEVIMINT, "Skipping registering ldk gateway");
261                    }
262                    info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected ldk gateway");
263                }
264                Ok(Arc::new(()))
265            }
266        });
267        let gw_ldk_second_connected = JitTryAnyhow::new_try({
268            let gw_ldk_second = gw_ldk_second.clone();
269            let fed = fed.clone();
270            move || async move {
271                let gw_ldk2 = gw_ldk_second.get_try().await?.deref();
272                if supports_lnv2() {
273                    let fed = fed.get_try().await?.deref();
274                    debug!(target: LOG_DEVIMINT, "Registering ldk gateway 2...");
275                    let start_time = fedimint_core::time::now();
276                    if !skip_setup && !pre_dkg {
277                        let invite = fed.invite_code()?;
278                        gw_ldk2.client().connect_fed(invite).await?;
279                    }
280                    info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected ldk gateway 2");
281                }
282                Ok(Arc::new(()))
283            }
284        });
285
286        let fed_epoch_generated = JitTryAnyhow::new_try({
287            let fed = fed.clone();
288            move || async move {
289                let fed = fed.get_try().await?.deref().clone();
290                debug!(target: LOG_DEVIMINT, "Generating federation epoch...");
291                let start_time = fedimint_core::time::now();
292                if !skip_setup && !pre_dkg {
293                    fed.mine_then_wait_blocks_sync(10).await?;
294                }
295                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Generated federation epoch");
296                Ok(Arc::new(()))
297            }
298        });
299
300        let channel_opened = JitTryAnyhow::new_try({
301            let gw_lnd = gw_lnd.clone();
302            let gw_ldk = gw_ldk.clone();
303            let gw_ldk_second = gw_ldk_second.clone();
304            let bitcoind = bitcoind.clone();
305            let fed_epoch_generated = fed_epoch_generated.clone();
306            move || async move {
307                // Note: We open new channel even if starting from existing state
308                // as ports change on every start, and without this nodes will not find each
309                // other.
310                let bitcoind = bitcoind.get_try().await?.deref().clone();
311
312                // Wait for an epoch to occur since that mines blocks, which can cause opening
313                // channels to be racy
314                fed_epoch_generated.get_try().await?;
315
316                let gw_ldk_second = gw_ldk_second.get_try().await?.deref();
317                let gw_ldk = gw_ldk.get_try().await?.deref();
318                let gw_lnd = gw_lnd.get_try().await?.deref();
319                let gateways: &[NamedGateway<'_>] =
320                    &[(gw_ldk_second, "LDK2"), (gw_lnd, "LND"), (gw_ldk, "LDK")];
321                if !skip_setup && !pre_dkg {
322                    debug!(target: LOG_DEVIMINT, "Opening channels between gateways...");
323                    let start_time = fedimint_core::time::now();
324                    open_channels_between_gateways(&bitcoind, gateways).await?;
325                    info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Opened channels between gateways");
326                }
327
328                Ok(Arc::new(()))
329            }
330        });
331
332        let recurringd = JitTryAnyhow::new_try({
333            let process_mgr = process_mgr.to_owned();
334            move || async move {
335                debug!(target: LOG_DEVIMINT, "Starting recurringd...");
336                let start_time = fedimint_core::time::now();
337                let recurringd = Recurringd::new(&process_mgr).await?;
338                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started recurringd");
339                Ok(Arc::new(recurringd))
340            }
341        });
342
343        let recurringd_connected = JitTryAnyhow::new_try({
344            let recurringd = recurringd.clone();
345            let fed = fed.clone();
346            move || async move {
347                let recurringd = recurringd.get_try().await?.deref();
348                let fed = fed.get_try().await?.deref();
349                debug!(target: LOG_DEVIMINT, "Connecting recurringd to federation...");
350                let start_time = fedimint_core::time::now();
351                if !skip_setup && !pre_dkg {
352                    let invite_code = fed.invite_code()?;
353                    recurringd.add_federation(&invite_code).await?;
354                }
355                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected recurringd to federation");
356                Ok(Arc::new(()))
357            }
358        });
359
360        let recurringdv2 = JitTryAnyhow::new_try({
361            let process_mgr = process_mgr.to_owned();
362            move || async move {
363                debug!(target: LOG_DEVIMINT, "Starting recurringdv2...");
364                let start_time = fedimint_core::time::now();
365                let recurringdv2 = Recurringdv2::new(&process_mgr).await?;
366                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started recurringdv2");
367                Ok(Arc::new(recurringdv2))
368            }
369        });
370
371        Ok(DevJitFed {
372            bitcoind,
373            lnd,
374            fed,
375            gw_lnd,
376            gw_ldk,
377            gw_ldk_second,
378            esplora,
379            recurringd,
380            recurringdv2,
381            start_time,
382            gw_lnd_registered,
383            gw_ldk_connected,
384            gw_ldk_second_connected,
385            fed_epoch_generated,
386            channel_opened,
387            recurringd_connected,
388            skip_setup,
389            pre_dkg,
390        })
391    }
392
393    pub async fn esplora(&self) -> anyhow::Result<&Esplora> {
394        Ok(self.esplora.get_try().await?.deref())
395    }
396    pub async fn lnd(&self) -> anyhow::Result<&Lnd> {
397        Ok(self.lnd.get_try().await?.deref())
398    }
399    pub async fn gw_lnd(&self) -> anyhow::Result<&Gatewayd> {
400        Ok(self.gw_lnd.get_try().await?.deref())
401    }
402    pub async fn gw_lnd_registered(&self) -> anyhow::Result<&Gatewayd> {
403        self.gw_lnd_registered.get_try().await?;
404        Ok(self.gw_lnd.get_try().await?.deref())
405    }
406    pub async fn gw_ldk(&self) -> anyhow::Result<&Gatewayd> {
407        Ok(self.gw_ldk.get_try().await?.deref())
408    }
409    pub async fn gw_ldk_second(&self) -> anyhow::Result<&Gatewayd> {
410        Ok(self.gw_ldk_second.get_try().await?.deref())
411    }
412    pub async fn gw_ldk_connected(&self) -> anyhow::Result<&Gatewayd> {
413        self.gw_ldk_connected.get_try().await?;
414        Ok(self.gw_ldk.get_try().await?.deref())
415    }
416    pub async fn gw_ldk_second_connected(&self) -> anyhow::Result<&Gatewayd> {
417        self.gw_ldk_second_connected.get_try().await?;
418        Ok(self.gw_ldk_second.get_try().await?.deref())
419    }
420    pub async fn fed(&self) -> anyhow::Result<&Federation> {
421        Ok(self.fed.get_try().await?.deref())
422    }
423    pub async fn bitcoind(&self) -> anyhow::Result<&Bitcoind> {
424        Ok(self.bitcoind.get_try().await?.deref())
425    }
426
427    pub async fn internal_client(&self) -> anyhow::Result<Client> {
428        Ok(self.fed().await?.internal_client().await?.clone())
429    }
430
431    /// Like [`Self::internal_client`] but will check and wait for a LN gateway
432    /// to be registered
433    pub async fn internal_client_gw_registered(&self) -> anyhow::Result<Client> {
434        self.fed().await?.await_gateways_registered().await?;
435        Ok(self.fed().await?.internal_client().await?.clone())
436    }
437
438    pub async fn recurringd(&self) -> anyhow::Result<&Recurringd> {
439        Ok(self.recurringd.get_try().await?.deref())
440    }
441
442    pub async fn recurringd_connected(&self) -> anyhow::Result<&Recurringd> {
443        self.recurringd_connected.get_try().await?;
444        Ok(self.recurringd.get_try().await?.deref())
445    }
446
447    pub async fn recurringdv2(&self) -> anyhow::Result<&Recurringdv2> {
448        Ok(self.recurringdv2.get_try().await?.deref())
449    }
450
451    pub async fn finalize(&self, process_mgr: &ProcessManager) -> anyhow::Result<()> {
452        let fed_size = process_mgr.globals.FM_FED_SIZE;
453        let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
454        anyhow::ensure!(
455            fed_size > 3 * offline_nodes,
456            "too many offline nodes ({offline_nodes}) to reach consensus"
457        );
458
459        if !self.pre_dkg && !self.skip_setup {
460            let _ = self.internal_client_gw_registered().await?;
461        }
462        let _ = self.channel_opened.get_try().await?;
463        let _ = self.gw_lnd_registered().await?;
464        let _ = self.gw_ldk_connected().await?;
465        let _ = self.gw_ldk_second_connected().await?;
466        let _ = self.lnd().await?;
467        let _ = self.esplora().await?;
468        let _ = self.recurringd_connected().await?;
469        let _ = self.recurringdv2().await?;
470        let _ = self.fed_epoch_generated.get_try().await?;
471
472        debug!(
473            target: LOG_DEVIMINT,
474            fed_size,
475            offline_nodes,
476            elapsed_ms = %self.start_time.elapsed()?.as_millis(),
477            "Dev federation ready",
478        );
479        Ok(())
480    }
481
482    pub async fn to_dev_fed(self, process_mgr: &ProcessManager) -> anyhow::Result<DevFed> {
483        self.finalize(process_mgr).await?;
484        Ok(DevFed {
485            bitcoind: self.bitcoind().await?.to_owned(),
486            lnd: self.lnd().await?.to_owned(),
487            fed: self.fed().await?.to_owned(),
488            gw_lnd: self.gw_lnd().await?.to_owned(),
489            gw_ldk: self.gw_ldk().await?.to_owned(),
490            gw_ldk_second: self.gw_ldk_second().await?.to_owned(),
491            esplora: self.esplora().await?.to_owned(),
492            recurringd: self.recurringd().await?.to_owned(),
493            recurringdv2: self.recurringdv2().await?.to_owned(),
494        })
495    }
496
497    pub async fn fast_terminate(self) {
498        let Self {
499            bitcoind,
500            lnd,
501            fed,
502            gw_lnd,
503            esplora,
504            gw_ldk,
505            gw_ldk_second,
506            recurringd,
507            recurringdv2,
508            ..
509        } = self;
510
511        join!(
512            spawn_drop(gw_lnd),
513            spawn_drop(gw_ldk),
514            spawn_drop(gw_ldk_second),
515            spawn_drop(fed),
516            spawn_drop(lnd),
517            spawn_drop(esplora),
518            spawn_drop(bitcoind),
519            spawn_drop(recurringd),
520            spawn_drop(recurringdv2),
521        );
522    }
523}