devimint/
devfed.rs

1use std::ops::Deref as _;
2use std::sync::Arc;
3
4use anyhow::Result;
5use fedimint_core::runtime;
6use fedimint_core::task::jit::{JitTry, JitTryAnyhow};
7use fedimint_logging::LOG_DEVIMINT;
8use tokio::join;
9use tracing::{debug, info};
10
11use crate::LightningNode;
12use crate::external::{Bitcoind, Esplora, Lnd, NamedGateway, open_channels_between_gateways};
13use crate::federation::{Client, Federation};
14use crate::gatewayd::Gatewayd;
15use crate::recurringd::Recurringd;
16use crate::util::{ProcessManager, supports_lnv2};
17
18async fn spawn_drop<T>(t: T)
19where
20    T: Send + 'static,
21{
22    runtime::spawn("spawn_drop", async {
23        drop(t);
24    })
25    .await
26    .expect("drop panic");
27}
28
29#[derive(Clone)]
30pub struct DevFed {
31    pub bitcoind: Bitcoind,
32    pub lnd: Lnd,
33    pub fed: Federation,
34    pub gw_lnd: Gatewayd,
35    pub gw_ldk: Gatewayd,
36    pub gw_ldk_second: Gatewayd,
37    pub esplora: Esplora,
38    pub recurringd: Recurringd,
39}
40
41impl DevFed {
42    pub async fn fast_terminate(self) {
43        let Self {
44            bitcoind,
45            lnd,
46            fed,
47            gw_lnd,
48            gw_ldk,
49            gw_ldk_second,
50            esplora,
51            recurringd,
52        } = self;
53
54        join!(
55            spawn_drop(gw_lnd),
56            spawn_drop(gw_ldk),
57            spawn_drop(gw_ldk_second),
58            spawn_drop(fed),
59            spawn_drop(lnd),
60            spawn_drop(esplora),
61            spawn_drop(bitcoind),
62            spawn_drop(recurringd),
63        );
64    }
65}
66pub async fn dev_fed(process_mgr: &ProcessManager) -> Result<DevFed> {
67    DevJitFed::new(process_mgr, false, false)?
68        .to_dev_fed(process_mgr)
69        .await
70}
71
72type JitArc<T> = JitTryAnyhow<Arc<T>>;
73
74#[derive(Clone)]
75pub struct DevJitFed {
76    bitcoind: JitArc<Bitcoind>,
77    lnd: JitArc<Lnd>,
78    fed: JitArc<Federation>,
79    gw_lnd: JitArc<Gatewayd>,
80    gw_ldk: JitArc<Gatewayd>,
81    gw_ldk_second: JitArc<Gatewayd>,
82    esplora: JitArc<Esplora>,
83    recurringd: JitArc<Recurringd>,
84    start_time: std::time::SystemTime,
85    gw_lnd_registered: JitArc<()>,
86    gw_ldk_connected: JitArc<()>,
87    gw_ldk_second_connected: JitArc<()>,
88    fed_epoch_generated: JitArc<()>,
89    channel_opened: JitArc<()>,
90    recurringd_connected: JitArc<()>,
91
92    skip_setup: bool,
93    pre_dkg: bool,
94}
95
96impl DevJitFed {
97    pub fn new(process_mgr: &ProcessManager, skip_setup: bool, pre_dkg: bool) -> Result<DevJitFed> {
98        let fed_size = process_mgr.globals.FM_FED_SIZE;
99        let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
100        anyhow::ensure!(
101            fed_size > 3 * offline_nodes,
102            "too many offline nodes ({offline_nodes}) to reach consensus"
103        );
104        let start_time = fedimint_core::time::now();
105
106        debug!(target: LOG_DEVIMINT, %fed_size, %offline_nodes, "Starting dev federation");
107
108        let bitcoind = JitTry::new_try({
109            let process_mgr = process_mgr.to_owned();
110            move || async move {
111                debug!(target: LOG_DEVIMINT, "Starting bitcoind...");
112                let start_time = fedimint_core::time::now();
113                let bitcoind = Bitcoind::new(&process_mgr, skip_setup).await?;
114                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started bitcoind");
115                Ok(Arc::new(bitcoind))
116            }
117        });
118        let lnd = JitTry::new_try({
119            let process_mgr = process_mgr.to_owned();
120            let bitcoind = bitcoind.clone();
121            || async move {
122                let bitcoind = bitcoind.get_try().await?.deref().clone();
123                debug!(target: LOG_DEVIMINT, "Starting lnd...");
124                let start_time = fedimint_core::time::now();
125                let lnd = Lnd::new(&process_mgr, bitcoind).await?;
126                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started lnd");
127                Ok(Arc::new(lnd))
128            }
129        });
130        let esplora = JitTryAnyhow::new_try({
131            let process_mgr = process_mgr.to_owned();
132            let bitcoind = bitcoind.clone();
133            || async move {
134                let bitcoind = bitcoind.get_try().await?.deref().clone();
135                debug!(target: LOG_DEVIMINT, "Starting esplora...");
136                let start_time = fedimint_core::time::now();
137                let esplora = Esplora::new(&process_mgr, bitcoind).await?;
138                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started esplora");
139                Ok(Arc::new(esplora))
140            }
141        });
142
143        let fed = JitTryAnyhow::new_try({
144            let process_mgr = process_mgr.to_owned();
145            let bitcoind = bitcoind.clone();
146            move || async move {
147                let bitcoind = bitcoind.get_try().await?.deref().clone();
148                debug!(target: LOG_DEVIMINT, "Starting federation...");
149                let start_time = fedimint_core::time::now();
150                let mut fed = Federation::new(
151                    &process_mgr,
152                    bitcoind,
153                    skip_setup,
154                    pre_dkg,
155                    0,
156                    "default".to_string(),
157                )
158                .await?;
159
160                // Create a degraded federation if there are offline nodes
161                fed.degrade_federation(&process_mgr).await?;
162
163                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started federation");
164
165                Ok(Arc::new(fed))
166            }
167        });
168
169        let gw_lnd = JitTryAnyhow::new_try({
170            let process_mgr = process_mgr.to_owned();
171            let lnd = lnd.clone();
172            || async move {
173                let lnd = lnd.get_try().await?.deref().clone();
174                debug!(target: LOG_DEVIMINT, "Starting lnd gateway...");
175                let start_time = fedimint_core::time::now();
176                let lnd_gw = Gatewayd::new(&process_mgr, LightningNode::Lnd(lnd)).await?;
177                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started lnd gateway");
178                Ok(Arc::new(lnd_gw))
179            }
180        });
181        let gw_lnd_registered = JitTryAnyhow::new_try({
182            let gw_lnd = gw_lnd.clone();
183            let fed = fed.clone();
184            move || async move {
185                let gw_lnd = gw_lnd.get_try().await?.deref();
186                let fed = fed.get_try().await?.deref();
187                debug!(target: LOG_DEVIMINT, "Registering lnd gateway...");
188                let start_time = fedimint_core::time::now();
189                if !skip_setup && !pre_dkg {
190                    gw_lnd.connect_fed(fed).await?;
191                }
192                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Registered lnd gateway");
193                Ok(Arc::new(()))
194            }
195        });
196
197        let gw_ldk = JitTryAnyhow::new_try({
198            let process_mgr = process_mgr.to_owned();
199            let bitcoind = bitcoind.clone();
200            move || async move {
201                bitcoind.get_try().await?;
202                debug!(target: LOG_DEVIMINT, "Starting ldk gateway...");
203                let start_time = fedimint_core::time::now();
204                let ldk_gw = Gatewayd::new(
205                    &process_mgr,
206                    LightningNode::Ldk {
207                        name: "gatewayd-ldk-0".to_string(),
208                        gw_port: process_mgr.globals.FM_PORT_GW_LDK,
209                        ldk_port: process_mgr.globals.FM_PORT_LDK,
210                        iroh_port: process_mgr.globals.FM_PORT_GW_LDK_IROH,
211                    },
212                )
213                .await?;
214                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started ldk gateway");
215                Ok(Arc::new(ldk_gw))
216            }
217        });
218        let gw_ldk_second = JitTryAnyhow::new_try({
219            let process_mgr = process_mgr.to_owned();
220            let bitcoind = bitcoind.clone();
221            move || async move {
222                bitcoind.get_try().await?;
223                debug!(target: LOG_DEVIMINT, "Starting ldk gateway 2...");
224                let start_time = fedimint_core::time::now();
225                let ldk_gw2 = Gatewayd::new(
226                    &process_mgr,
227                    LightningNode::Ldk {
228                        name: "gatewayd-ldk-1".to_string(),
229                        gw_port: process_mgr.globals.FM_PORT_GW_LDK2,
230                        ldk_port: process_mgr.globals.FM_PORT_LDK2,
231                        iroh_port: process_mgr.globals.FM_PORT_GW_LDK2_IROH,
232                    },
233                )
234                .await?;
235                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started ldk gateway 2");
236                Ok(Arc::new(ldk_gw2))
237            }
238        });
239        let gw_ldk_connected = JitTryAnyhow::new_try({
240            let gw_ldk = gw_ldk.clone();
241            let fed = fed.clone();
242            move || async move {
243                let gw_ldk = gw_ldk.get_try().await?.deref();
244                if supports_lnv2() {
245                    let fed = fed.get_try().await?.deref();
246                    let start_time = fedimint_core::time::now();
247                    if !skip_setup && !pre_dkg {
248                        debug!(target: LOG_DEVIMINT, "Registering ldk gateway...");
249                        gw_ldk.connect_fed(fed).await?;
250                    } else {
251                        debug!(target: LOG_DEVIMINT, "Skipping registering ldk gateway");
252                    }
253                    info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected ldk gateway");
254                }
255                Ok(Arc::new(()))
256            }
257        });
258        let gw_ldk_second_connected = JitTryAnyhow::new_try({
259            let gw_ldk_second = gw_ldk_second.clone();
260            let fed = fed.clone();
261            move || async move {
262                let gw_ldk2 = gw_ldk_second.get_try().await?.deref();
263                if supports_lnv2() {
264                    let fed = fed.get_try().await?.deref();
265                    debug!(target: LOG_DEVIMINT, "Registering ldk gateway 2...");
266                    let start_time = fedimint_core::time::now();
267                    if !skip_setup && !pre_dkg {
268                        gw_ldk2.connect_fed(fed).await?;
269                    }
270                    info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected ldk gateway 2");
271                }
272                Ok(Arc::new(()))
273            }
274        });
275
276        let fed_epoch_generated = JitTryAnyhow::new_try({
277            let fed = fed.clone();
278            move || async move {
279                let fed = fed.get_try().await?.deref().clone();
280                debug!(target: LOG_DEVIMINT, "Generating federation epoch...");
281                let start_time = fedimint_core::time::now();
282                if !skip_setup && !pre_dkg {
283                    fed.mine_then_wait_blocks_sync(10).await?;
284                }
285                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Generated federation epoch");
286                Ok(Arc::new(()))
287            }
288        });
289
290        let channel_opened = JitTryAnyhow::new_try({
291            let gw_lnd = gw_lnd.clone();
292            let gw_ldk = gw_ldk.clone();
293            let gw_ldk_second = gw_ldk_second.clone();
294            let bitcoind = bitcoind.clone();
295            let fed_epoch_generated = fed_epoch_generated.clone();
296            move || async move {
297                // Note: We open new channel even if starting from existing state
298                // as ports change on every start, and without this nodes will not find each
299                // other.
300                let bitcoind = bitcoind.get_try().await?.deref().clone();
301
302                // Wait for an epoch to occur since that mines blocks, which can cause opening
303                // channels to be racy
304                fed_epoch_generated.get_try().await?;
305
306                let gw_ldk_second = gw_ldk_second.get_try().await?.deref();
307                let gw_ldk = gw_ldk.get_try().await?.deref();
308                let gw_lnd = gw_lnd.get_try().await?.deref();
309                let gateways: &[NamedGateway<'_>] =
310                    &[(gw_ldk_second, "LDK2"), (gw_lnd, "LND"), (gw_ldk, "LDK")];
311
312                debug!(target: LOG_DEVIMINT, "Opening channels between gateways...");
313                let start_time = fedimint_core::time::now();
314                open_channels_between_gateways(&bitcoind, gateways).await?;
315                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Opened channels between gateways");
316
317                Ok(Arc::new(()))
318            }
319        });
320
321        let recurringd = JitTryAnyhow::new_try({
322            let process_mgr = process_mgr.to_owned();
323            move || async move {
324                debug!(target: LOG_DEVIMINT, "Starting recurringd...");
325                let start_time = fedimint_core::time::now();
326                let recurringd = Recurringd::new(&process_mgr).await?;
327                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started recurringd");
328                Ok(Arc::new(recurringd))
329            }
330        });
331
332        let recurringd_connected = JitTryAnyhow::new_try({
333            let recurringd = recurringd.clone();
334            let fed = fed.clone();
335            move || async move {
336                let recurringd = recurringd.get_try().await?.deref();
337                let fed = fed.get_try().await?.deref();
338                debug!(target: LOG_DEVIMINT, "Connecting recurringd to federation...");
339                let start_time = fedimint_core::time::now();
340                if !skip_setup && !pre_dkg {
341                    let invite_code = fed.invite_code()?;
342                    recurringd.add_federation(&invite_code).await?;
343                }
344                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected recurringd to federation");
345                Ok(Arc::new(()))
346            }
347        });
348
349        Ok(DevJitFed {
350            bitcoind,
351            lnd,
352            fed,
353            gw_lnd,
354            gw_ldk,
355            gw_ldk_second,
356            esplora,
357            recurringd,
358            start_time,
359            gw_lnd_registered,
360            gw_ldk_connected,
361            gw_ldk_second_connected,
362            fed_epoch_generated,
363            channel_opened,
364            recurringd_connected,
365            skip_setup,
366            pre_dkg,
367        })
368    }
369
370    pub async fn esplora(&self) -> anyhow::Result<&Esplora> {
371        Ok(self.esplora.get_try().await?.deref())
372    }
373    pub async fn lnd(&self) -> anyhow::Result<&Lnd> {
374        Ok(self.lnd.get_try().await?.deref())
375    }
376    pub async fn gw_lnd(&self) -> anyhow::Result<&Gatewayd> {
377        Ok(self.gw_lnd.get_try().await?.deref())
378    }
379    pub async fn gw_lnd_registered(&self) -> anyhow::Result<&Gatewayd> {
380        self.gw_lnd_registered.get_try().await?;
381        Ok(self.gw_lnd.get_try().await?.deref())
382    }
383    pub async fn gw_ldk(&self) -> anyhow::Result<&Gatewayd> {
384        Ok(self.gw_ldk.get_try().await?.deref())
385    }
386    pub async fn gw_ldk_second(&self) -> anyhow::Result<&Gatewayd> {
387        Ok(self.gw_ldk_second.get_try().await?.deref())
388    }
389    pub async fn gw_ldk_connected(&self) -> anyhow::Result<&Gatewayd> {
390        self.gw_ldk_connected.get_try().await?;
391        Ok(self.gw_ldk.get_try().await?.deref())
392    }
393    pub async fn gw_ldk_second_connected(&self) -> anyhow::Result<&Gatewayd> {
394        self.gw_ldk_second_connected.get_try().await?;
395        Ok(self.gw_ldk_second.get_try().await?.deref())
396    }
397    pub async fn fed(&self) -> anyhow::Result<&Federation> {
398        Ok(self.fed.get_try().await?.deref())
399    }
400    pub async fn bitcoind(&self) -> anyhow::Result<&Bitcoind> {
401        Ok(self.bitcoind.get_try().await?.deref())
402    }
403
404    pub async fn internal_client(&self) -> anyhow::Result<Client> {
405        Ok(self.fed().await?.internal_client().await?.clone())
406    }
407
408    /// Like [`Self::internal_client`] but will check and wait for a LN gateway
409    /// to be registered
410    pub async fn internal_client_gw_registered(&self) -> anyhow::Result<Client> {
411        self.fed().await?.await_gateways_registered().await?;
412        Ok(self.fed().await?.internal_client().await?.clone())
413    }
414
415    pub async fn recurringd(&self) -> anyhow::Result<&Recurringd> {
416        Ok(self.recurringd.get_try().await?.deref())
417    }
418
419    pub async fn recurringd_connected(&self) -> anyhow::Result<&Recurringd> {
420        self.recurringd_connected.get_try().await?;
421        Ok(self.recurringd.get_try().await?.deref())
422    }
423
424    pub async fn finalize(&self, process_mgr: &ProcessManager) -> anyhow::Result<()> {
425        let fed_size = process_mgr.globals.FM_FED_SIZE;
426        let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
427        anyhow::ensure!(
428            fed_size > 3 * offline_nodes,
429            "too many offline nodes ({offline_nodes}) to reach consensus"
430        );
431
432        if !self.pre_dkg && !self.skip_setup {
433            let _ = self.internal_client_gw_registered().await?;
434        }
435        let _ = self.channel_opened.get_try().await?;
436        let _ = self.gw_lnd_registered().await?;
437        let _ = self.gw_ldk_connected().await?;
438        let _ = self.gw_ldk_second_connected().await?;
439        let _ = self.lnd().await?;
440        let _ = self.esplora().await?;
441        let _ = self.recurringd_connected().await?;
442        let _ = self.fed_epoch_generated.get_try().await?;
443
444        debug!(
445            target: LOG_DEVIMINT,
446            fed_size,
447            offline_nodes,
448            elapsed_ms = %self.start_time.elapsed()?.as_millis(),
449            "Dev federation ready",
450        );
451        Ok(())
452    }
453
454    pub async fn to_dev_fed(self, process_mgr: &ProcessManager) -> anyhow::Result<DevFed> {
455        self.finalize(process_mgr).await?;
456        Ok(DevFed {
457            bitcoind: self.bitcoind().await?.to_owned(),
458            lnd: self.lnd().await?.to_owned(),
459            fed: self.fed().await?.to_owned(),
460            gw_lnd: self.gw_lnd().await?.to_owned(),
461            gw_ldk: self.gw_ldk().await?.to_owned(),
462            gw_ldk_second: self.gw_ldk_second().await?.to_owned(),
463            esplora: self.esplora().await?.to_owned(),
464            recurringd: self.recurringd().await?.to_owned(),
465        })
466    }
467
468    pub async fn fast_terminate(self) {
469        let Self {
470            bitcoind,
471            lnd,
472            fed,
473            gw_lnd,
474            esplora,
475            gw_ldk,
476            gw_ldk_second,
477            recurringd,
478            ..
479        } = self;
480
481        join!(
482            spawn_drop(gw_lnd),
483            spawn_drop(gw_ldk),
484            spawn_drop(gw_ldk_second),
485            spawn_drop(fed),
486            spawn_drop(lnd),
487            spawn_drop(esplora),
488            spawn_drop(bitcoind),
489            spawn_drop(recurringd),
490        );
491    }
492}