Skip to main content

devimint/
devfed.rs

1use std::ops::Deref as _;
2use std::sync::Arc;
3
4use anyhow::Result;
5use fedimint_core::runtime;
6use fedimint_core::task::jit::{JitTry, JitTryAnyhow};
7use fedimint_logging::LOG_DEVIMINT;
8use tokio::join;
9use tracing::{debug, info};
10
11use crate::LightningNode;
12use crate::external::{Bitcoind, Esplora, Lnd, NamedGateway, open_channels_between_gateways};
13use crate::federation::{Client, Federation};
14use crate::gatewayd::Gatewayd;
15use crate::recurringd::Recurringd;
16use crate::recurringdv2::Recurringdv2;
17use crate::util::{ProcessManager, supports_lnv2};
18
19async fn spawn_drop<T>(t: T)
20where
21    T: Send + 'static,
22{
23    runtime::spawn("spawn_drop", async {
24        drop(t);
25    })
26    .await
27    .expect("drop panic");
28}
29
30#[derive(Clone)]
31pub struct DevFed {
32    pub bitcoind: Bitcoind,
33    pub lnd: Lnd,
34    pub fed: Federation,
35    pub gw_lnd: Gatewayd,
36    pub gw_ldk: Gatewayd,
37    pub gw_ldk_second: Gatewayd,
38    pub esplora: Esplora,
39    pub recurringd: Recurringd,
40    pub recurringdv2: Recurringdv2,
41}
42
43impl DevFed {
44    pub async fn fast_terminate(self) {
45        let Self {
46            bitcoind,
47            lnd,
48            fed,
49            gw_lnd,
50            gw_ldk,
51            gw_ldk_second,
52            esplora,
53            recurringd,
54            recurringdv2,
55        } = self;
56
57        join!(
58            spawn_drop(gw_lnd),
59            spawn_drop(gw_ldk),
60            spawn_drop(gw_ldk_second),
61            spawn_drop(fed),
62            spawn_drop(lnd),
63            spawn_drop(esplora),
64            spawn_drop(bitcoind),
65            spawn_drop(recurringd),
66            spawn_drop(recurringdv2),
67        );
68    }
69}
70pub async fn dev_fed(process_mgr: &ProcessManager) -> Result<DevFed> {
71    DevJitFed::new(process_mgr, false, false)?
72        .to_dev_fed(process_mgr)
73        .await
74}
75
76type JitArc<T> = JitTryAnyhow<Arc<T>>;
77
78#[derive(Clone)]
79pub struct DevJitFed {
80    bitcoind: JitArc<Bitcoind>,
81    lnd: JitArc<Lnd>,
82    fed: JitArc<Federation>,
83    gw_lnd: JitArc<Gatewayd>,
84    gw_ldk: JitArc<Gatewayd>,
85    gw_ldk_second: JitArc<Gatewayd>,
86    esplora: JitArc<Esplora>,
87    recurringd: JitArc<Recurringd>,
88    recurringdv2: JitArc<Recurringdv2>,
89    start_time: std::time::SystemTime,
90    gw_lnd_registered: JitArc<()>,
91    gw_ldk_connected: JitArc<()>,
92    gw_ldk_second_connected: JitArc<()>,
93    fed_epoch_generated: JitArc<()>,
94    channel_opened: JitArc<()>,
95    recurringd_connected: JitArc<()>,
96
97    skip_setup: bool,
98    pre_dkg: bool,
99}
100
101impl DevJitFed {
102    pub fn new(process_mgr: &ProcessManager, skip_setup: bool, pre_dkg: bool) -> Result<DevJitFed> {
103        Self::new_with_pre_restore(process_mgr, skip_setup, pre_dkg, false)
104    }
105
106    pub fn new_with_pre_restore(
107        process_mgr: &ProcessManager,
108        skip_setup: bool,
109        pre_dkg: bool,
110        pre_restore: bool,
111    ) -> Result<DevJitFed> {
112        let fed_size = process_mgr.globals.FM_FED_SIZE;
113        let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
114        anyhow::ensure!(
115            fed_size > 3 * offline_nodes,
116            "too many offline nodes ({offline_nodes}) to reach consensus"
117        );
118        let start_time = fedimint_core::time::now();
119
120        debug!(target: LOG_DEVIMINT, %fed_size, %offline_nodes, "Starting dev federation");
121
122        let bitcoind = JitTry::new_try({
123            let process_mgr = process_mgr.to_owned();
124            move || async move {
125                debug!(target: LOG_DEVIMINT, "Starting bitcoind...");
126                let start_time = fedimint_core::time::now();
127                let bitcoind = Bitcoind::new(&process_mgr, skip_setup).await?;
128                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started bitcoind");
129                Ok(Arc::new(bitcoind))
130            }
131        });
132        let lnd = JitTry::new_try({
133            let process_mgr = process_mgr.to_owned();
134            let bitcoind = bitcoind.clone();
135            || async move {
136                let bitcoind = bitcoind.get_try().await?.deref().clone();
137                debug!(target: LOG_DEVIMINT, "Starting lnd...");
138                let start_time = fedimint_core::time::now();
139                let lnd = Lnd::new(&process_mgr, bitcoind).await?;
140                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started lnd");
141                Ok(Arc::new(lnd))
142            }
143        });
144        let esplora = JitTryAnyhow::new_try({
145            let process_mgr = process_mgr.to_owned();
146            let bitcoind = bitcoind.clone();
147            || async move {
148                let bitcoind = bitcoind.get_try().await?.deref().clone();
149                debug!(target: LOG_DEVIMINT, "Starting esplora...");
150                let start_time = fedimint_core::time::now();
151                let esplora = Esplora::new(&process_mgr, bitcoind).await?;
152                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started esplora");
153                Ok(Arc::new(esplora))
154            }
155        });
156
157        let fed = JitTryAnyhow::new_try({
158            let process_mgr = process_mgr.to_owned();
159            let bitcoind = bitcoind.clone();
160            move || async move {
161                let bitcoind = bitcoind.get_try().await?.deref().clone();
162                debug!(target: LOG_DEVIMINT, "Starting federation...");
163                let start_time = fedimint_core::time::now();
164                let mut fed = Federation::new(
165                    &process_mgr,
166                    bitcoind,
167                    skip_setup,
168                    pre_dkg,
169                    pre_restore,
170                    0,
171                    "default".to_string(),
172                )
173                .await?;
174
175                // Create a degraded federation if there are offline nodes
176                fed.degrade_federation(&process_mgr).await?;
177
178                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started federation");
179
180                Ok(Arc::new(fed))
181            }
182        });
183
184        let gw_lnd = JitTryAnyhow::new_try({
185            let process_mgr = process_mgr.to_owned();
186            let lnd = lnd.clone();
187            || async move {
188                let lnd = lnd.get_try().await?.deref().clone();
189                debug!(target: LOG_DEVIMINT, "Starting lnd gateway...");
190                let start_time = fedimint_core::time::now();
191                let lnd_gw = Gatewayd::new(&process_mgr, LightningNode::Lnd(lnd), 0).await?;
192                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started lnd gateway");
193                Ok(Arc::new(lnd_gw))
194            }
195        });
196        let gw_lnd_registered = JitTryAnyhow::new_try({
197            let gw_lnd = gw_lnd.clone();
198            let fed = fed.clone();
199            move || async move {
200                let gw_lnd = gw_lnd.get_try().await?.deref();
201                let fed = fed.get_try().await?.deref();
202                debug!(target: LOG_DEVIMINT, "Registering lnd gateway...");
203                let start_time = fedimint_core::time::now();
204                if !skip_setup && !pre_dkg {
205                    let invite = fed.invite_code()?;
206                    gw_lnd.client().connect_fed(invite).await?;
207                }
208                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Registered lnd gateway");
209                Ok(Arc::new(()))
210            }
211        });
212
213        let gw_ldk = JitTryAnyhow::new_try({
214            let process_mgr = process_mgr.to_owned();
215            let bitcoind = bitcoind.clone();
216            move || async move {
217                bitcoind.get_try().await?;
218                debug!(target: LOG_DEVIMINT, "Starting ldk gateway...");
219                let start_time = fedimint_core::time::now();
220                let ldk_gw = Gatewayd::new(
221                    &process_mgr,
222                    LightningNode::Ldk {
223                        name: "gatewayd-ldk-0".to_string(),
224                        gw_port: process_mgr.globals.FM_PORT_GW_LDK,
225                        ldk_port: process_mgr.globals.FM_PORT_LDK,
226                        metrics_port: process_mgr.globals.FM_PORT_GW_LDK_METRICS,
227                    },
228                    1,
229                )
230                .await?;
231                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started ldk gateway");
232                Ok(Arc::new(ldk_gw))
233            }
234        });
235        let gw_ldk_second = JitTryAnyhow::new_try({
236            let process_mgr = process_mgr.to_owned();
237            let bitcoind = bitcoind.clone();
238            move || async move {
239                bitcoind.get_try().await?;
240                debug!(target: LOG_DEVIMINT, "Starting ldk gateway 2...");
241                let start_time = fedimint_core::time::now();
242                let ldk_gw2 = Gatewayd::new(
243                    &process_mgr,
244                    LightningNode::Ldk {
245                        name: "gatewayd-ldk-1".to_string(),
246                        gw_port: process_mgr.globals.FM_PORT_GW_LDK2,
247                        ldk_port: process_mgr.globals.FM_PORT_LDK2,
248                        metrics_port: process_mgr.globals.FM_PORT_GW_LDK2_METRICS,
249                    },
250                    2,
251                )
252                .await?;
253                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started ldk gateway 2");
254                Ok(Arc::new(ldk_gw2))
255            }
256        });
257        let gw_ldk_connected = JitTryAnyhow::new_try({
258            let gw_ldk = gw_ldk.clone();
259            let fed = fed.clone();
260            move || async move {
261                let gw_ldk = gw_ldk.get_try().await?.deref();
262                if supports_lnv2() {
263                    let fed = fed.get_try().await?.deref();
264                    let start_time = fedimint_core::time::now();
265                    if !skip_setup && !pre_dkg {
266                        debug!(target: LOG_DEVIMINT, "Registering ldk gateway...");
267                        let invite = fed.invite_code()?;
268                        gw_ldk.client().connect_fed(invite).await?;
269                    } else {
270                        debug!(target: LOG_DEVIMINT, "Skipping registering ldk gateway");
271                    }
272                    info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected ldk gateway");
273                }
274                Ok(Arc::new(()))
275            }
276        });
277        let gw_ldk_second_connected = JitTryAnyhow::new_try({
278            let gw_ldk_second = gw_ldk_second.clone();
279            let fed = fed.clone();
280            move || async move {
281                let gw_ldk2 = gw_ldk_second.get_try().await?.deref();
282                if supports_lnv2() {
283                    let fed = fed.get_try().await?.deref();
284                    debug!(target: LOG_DEVIMINT, "Registering ldk gateway 2...");
285                    let start_time = fedimint_core::time::now();
286                    if !skip_setup && !pre_dkg {
287                        let invite = fed.invite_code()?;
288                        gw_ldk2.client().connect_fed(invite).await?;
289                    }
290                    info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected ldk gateway 2");
291                }
292                Ok(Arc::new(()))
293            }
294        });
295
296        let fed_epoch_generated = JitTryAnyhow::new_try({
297            let fed = fed.clone();
298            move || async move {
299                let fed = fed.get_try().await?.deref().clone();
300                debug!(target: LOG_DEVIMINT, "Generating federation epoch...");
301                let start_time = fedimint_core::time::now();
302                if !skip_setup && !pre_dkg {
303                    fed.mine_then_wait_blocks_sync(10).await?;
304                }
305                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Generated federation epoch");
306                Ok(Arc::new(()))
307            }
308        });
309
310        let channel_opened = JitTryAnyhow::new_try({
311            let gw_lnd = gw_lnd.clone();
312            let gw_ldk = gw_ldk.clone();
313            let gw_ldk_second = gw_ldk_second.clone();
314            let bitcoind = bitcoind.clone();
315            let fed_epoch_generated = fed_epoch_generated.clone();
316            move || async move {
317                // Note: We open new channel even if starting from existing state
318                // as ports change on every start, and without this nodes will not find each
319                // other.
320                let bitcoind = bitcoind.get_try().await?.deref().clone();
321
322                // Wait for an epoch to occur since that mines blocks, which can cause opening
323                // channels to be racy
324                fed_epoch_generated.get_try().await?;
325
326                let gw_ldk_second = gw_ldk_second.get_try().await?.deref();
327                let gw_ldk = gw_ldk.get_try().await?.deref();
328                let gw_lnd = gw_lnd.get_try().await?.deref();
329                let gateways: &[NamedGateway<'_>] =
330                    &[(gw_ldk_second, "LDK2"), (gw_lnd, "LND"), (gw_ldk, "LDK")];
331                if !skip_setup && !pre_dkg {
332                    debug!(target: LOG_DEVIMINT, "Opening channels between gateways...");
333                    let start_time = fedimint_core::time::now();
334                    open_channels_between_gateways(&bitcoind, gateways).await?;
335                    info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Opened channels between gateways");
336                }
337
338                Ok(Arc::new(()))
339            }
340        });
341
342        let recurringd = JitTryAnyhow::new_try({
343            let process_mgr = process_mgr.to_owned();
344            move || async move {
345                debug!(target: LOG_DEVIMINT, "Starting recurringd...");
346                let start_time = fedimint_core::time::now();
347                let recurringd = Recurringd::new(&process_mgr).await?;
348                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started recurringd");
349                Ok(Arc::new(recurringd))
350            }
351        });
352
353        let recurringd_connected = JitTryAnyhow::new_try({
354            let recurringd = recurringd.clone();
355            let fed = fed.clone();
356            move || async move {
357                let recurringd = recurringd.get_try().await?.deref();
358                let fed = fed.get_try().await?.deref();
359                debug!(target: LOG_DEVIMINT, "Connecting recurringd to federation...");
360                let start_time = fedimint_core::time::now();
361                if !skip_setup && !pre_dkg {
362                    let invite_code = fed.invite_code()?;
363                    recurringd.add_federation(&invite_code).await?;
364                }
365                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected recurringd to federation");
366                Ok(Arc::new(()))
367            }
368        });
369
370        let recurringdv2 = JitTryAnyhow::new_try({
371            let process_mgr = process_mgr.to_owned();
372            move || async move {
373                debug!(target: LOG_DEVIMINT, "Starting recurringdv2...");
374                let start_time = fedimint_core::time::now();
375                let recurringdv2 = Recurringdv2::new(&process_mgr).await?;
376                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started recurringdv2");
377                Ok(Arc::new(recurringdv2))
378            }
379        });
380
381        Ok(DevJitFed {
382            bitcoind,
383            lnd,
384            fed,
385            gw_lnd,
386            gw_ldk,
387            gw_ldk_second,
388            esplora,
389            recurringd,
390            recurringdv2,
391            start_time,
392            gw_lnd_registered,
393            gw_ldk_connected,
394            gw_ldk_second_connected,
395            fed_epoch_generated,
396            channel_opened,
397            recurringd_connected,
398            skip_setup,
399            pre_dkg,
400        })
401    }
402
403    pub async fn esplora(&self) -> anyhow::Result<&Esplora> {
404        Ok(self.esplora.get_try().await?.deref())
405    }
406    pub async fn lnd(&self) -> anyhow::Result<&Lnd> {
407        Ok(self.lnd.get_try().await?.deref())
408    }
409    pub async fn gw_lnd(&self) -> anyhow::Result<&Gatewayd> {
410        Ok(self.gw_lnd.get_try().await?.deref())
411    }
412    pub async fn gw_lnd_registered(&self) -> anyhow::Result<&Gatewayd> {
413        self.gw_lnd_registered.get_try().await?;
414        Ok(self.gw_lnd.get_try().await?.deref())
415    }
416    pub async fn gw_ldk(&self) -> anyhow::Result<&Gatewayd> {
417        Ok(self.gw_ldk.get_try().await?.deref())
418    }
419    pub async fn gw_ldk_second(&self) -> anyhow::Result<&Gatewayd> {
420        Ok(self.gw_ldk_second.get_try().await?.deref())
421    }
422    pub async fn gw_ldk_connected(&self) -> anyhow::Result<&Gatewayd> {
423        self.gw_ldk_connected.get_try().await?;
424        Ok(self.gw_ldk.get_try().await?.deref())
425    }
426    pub async fn gw_ldk_second_connected(&self) -> anyhow::Result<&Gatewayd> {
427        self.gw_ldk_second_connected.get_try().await?;
428        Ok(self.gw_ldk_second.get_try().await?.deref())
429    }
430    pub async fn fed(&self) -> anyhow::Result<&Federation> {
431        Ok(self.fed.get_try().await?.deref())
432    }
433    pub async fn bitcoind(&self) -> anyhow::Result<&Bitcoind> {
434        Ok(self.bitcoind.get_try().await?.deref())
435    }
436
437    pub async fn internal_client(&self) -> anyhow::Result<Client> {
438        Ok(self.fed().await?.internal_client().await?.clone())
439    }
440
441    /// Like [`Self::internal_client`] but will check and wait for a LN gateway
442    /// to be registered
443    pub async fn internal_client_gw_registered(&self) -> anyhow::Result<Client> {
444        self.fed().await?.await_gateways_registered().await?;
445        Ok(self.fed().await?.internal_client().await?.clone())
446    }
447
448    pub async fn recurringd(&self) -> anyhow::Result<&Recurringd> {
449        Ok(self.recurringd.get_try().await?.deref())
450    }
451
452    pub async fn recurringd_connected(&self) -> anyhow::Result<&Recurringd> {
453        self.recurringd_connected.get_try().await?;
454        Ok(self.recurringd.get_try().await?.deref())
455    }
456
457    pub async fn recurringdv2(&self) -> anyhow::Result<&Recurringdv2> {
458        Ok(self.recurringdv2.get_try().await?.deref())
459    }
460
461    pub async fn finalize(&self, process_mgr: &ProcessManager) -> anyhow::Result<()> {
462        let fed_size = process_mgr.globals.FM_FED_SIZE;
463        let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
464        anyhow::ensure!(
465            fed_size > 3 * offline_nodes,
466            "too many offline nodes ({offline_nodes}) to reach consensus"
467        );
468
469        if !self.pre_dkg && !self.skip_setup {
470            let _ = self.internal_client_gw_registered().await?;
471        }
472        let _ = self.channel_opened.get_try().await?;
473        let _ = self.gw_lnd_registered().await?;
474        let _ = self.gw_ldk_connected().await?;
475        let _ = self.gw_ldk_second_connected().await?;
476        let _ = self.lnd().await?;
477        let _ = self.esplora().await?;
478        let _ = self.recurringd_connected().await?;
479        let _ = self.recurringdv2().await?;
480        let _ = self.fed_epoch_generated.get_try().await?;
481
482        debug!(
483            target: LOG_DEVIMINT,
484            fed_size,
485            offline_nodes,
486            elapsed_ms = %self.start_time.elapsed()?.as_millis(),
487            "Dev federation ready",
488        );
489        Ok(())
490    }
491
492    pub async fn to_dev_fed(self, process_mgr: &ProcessManager) -> anyhow::Result<DevFed> {
493        self.finalize(process_mgr).await?;
494        Ok(DevFed {
495            bitcoind: self.bitcoind().await?.to_owned(),
496            lnd: self.lnd().await?.to_owned(),
497            fed: self.fed().await?.to_owned(),
498            gw_lnd: self.gw_lnd().await?.to_owned(),
499            gw_ldk: self.gw_ldk().await?.to_owned(),
500            gw_ldk_second: self.gw_ldk_second().await?.to_owned(),
501            esplora: self.esplora().await?.to_owned(),
502            recurringd: self.recurringd().await?.to_owned(),
503            recurringdv2: self.recurringdv2().await?.to_owned(),
504        })
505    }
506
507    pub async fn fast_terminate(self) {
508        let Self {
509            bitcoind,
510            lnd,
511            fed,
512            gw_lnd,
513            esplora,
514            gw_ldk,
515            gw_ldk_second,
516            recurringd,
517            recurringdv2,
518            ..
519        } = self;
520
521        join!(
522            spawn_drop(gw_lnd),
523            spawn_drop(gw_ldk),
524            spawn_drop(gw_ldk_second),
525            spawn_drop(fed),
526            spawn_drop(lnd),
527            spawn_drop(esplora),
528            spawn_drop(bitcoind),
529            spawn_drop(recurringd),
530            spawn_drop(recurringdv2),
531        );
532    }
533}