devimint/
devfed.rs

1use std::ops::Deref as _;
2use std::sync::Arc;
3
4use anyhow::Result;
5use fedimint_core::runtime;
6use fedimint_core::task::jit::{JitTry, JitTryAnyhow};
7use fedimint_logging::LOG_DEVIMINT;
8use tokio::join;
9use tracing::{debug, info};
10
11use crate::LightningNode;
12use crate::external::{Bitcoind, Esplora, Lnd, NamedGateway, open_channels_between_gateways};
13use crate::federation::{Client, Federation};
14use crate::gatewayd::Gatewayd;
15use crate::recurringd::Recurringd;
16use crate::recurringdv2::Recurringdv2;
17use crate::util::{ProcessManager, supports_lnv2};
18
19async fn spawn_drop<T>(t: T)
20where
21    T: Send + 'static,
22{
23    runtime::spawn("spawn_drop", async {
24        drop(t);
25    })
26    .await
27    .expect("drop panic");
28}
29
30#[derive(Clone)]
31pub struct DevFed {
32    pub bitcoind: Bitcoind,
33    pub lnd: Lnd,
34    pub fed: Federation,
35    pub gw_lnd: Gatewayd,
36    pub gw_ldk: Gatewayd,
37    pub gw_ldk_second: Gatewayd,
38    pub esplora: Esplora,
39    pub recurringd: Recurringd,
40    pub recurringdv2: Recurringdv2,
41}
42
43impl DevFed {
44    pub async fn fast_terminate(self) {
45        let Self {
46            bitcoind,
47            lnd,
48            fed,
49            gw_lnd,
50            gw_ldk,
51            gw_ldk_second,
52            esplora,
53            recurringd,
54            recurringdv2,
55        } = self;
56
57        join!(
58            spawn_drop(gw_lnd),
59            spawn_drop(gw_ldk),
60            spawn_drop(gw_ldk_second),
61            spawn_drop(fed),
62            spawn_drop(lnd),
63            spawn_drop(esplora),
64            spawn_drop(bitcoind),
65            spawn_drop(recurringd),
66            spawn_drop(recurringdv2),
67        );
68    }
69}
70pub async fn dev_fed(process_mgr: &ProcessManager) -> Result<DevFed> {
71    DevJitFed::new(process_mgr, false, false)?
72        .to_dev_fed(process_mgr)
73        .await
74}
75
76type JitArc<T> = JitTryAnyhow<Arc<T>>;
77
78#[derive(Clone)]
79pub struct DevJitFed {
80    bitcoind: JitArc<Bitcoind>,
81    lnd: JitArc<Lnd>,
82    fed: JitArc<Federation>,
83    gw_lnd: JitArc<Gatewayd>,
84    gw_ldk: JitArc<Gatewayd>,
85    gw_ldk_second: JitArc<Gatewayd>,
86    esplora: JitArc<Esplora>,
87    recurringd: JitArc<Recurringd>,
88    recurringdv2: JitArc<Recurringdv2>,
89    start_time: std::time::SystemTime,
90    gw_lnd_registered: JitArc<()>,
91    gw_ldk_connected: JitArc<()>,
92    gw_ldk_second_connected: JitArc<()>,
93    fed_epoch_generated: JitArc<()>,
94    channel_opened: JitArc<()>,
95    recurringd_connected: JitArc<()>,
96
97    skip_setup: bool,
98    pre_dkg: bool,
99}
100
101impl DevJitFed {
102    pub fn new(process_mgr: &ProcessManager, skip_setup: bool, pre_dkg: bool) -> Result<DevJitFed> {
103        let fed_size = process_mgr.globals.FM_FED_SIZE;
104        let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
105        anyhow::ensure!(
106            fed_size > 3 * offline_nodes,
107            "too many offline nodes ({offline_nodes}) to reach consensus"
108        );
109        let start_time = fedimint_core::time::now();
110
111        debug!(target: LOG_DEVIMINT, %fed_size, %offline_nodes, "Starting dev federation");
112
113        let bitcoind = JitTry::new_try({
114            let process_mgr = process_mgr.to_owned();
115            move || async move {
116                debug!(target: LOG_DEVIMINT, "Starting bitcoind...");
117                let start_time = fedimint_core::time::now();
118                let bitcoind = Bitcoind::new(&process_mgr, skip_setup).await?;
119                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started bitcoind");
120                Ok(Arc::new(bitcoind))
121            }
122        });
123        let lnd = JitTry::new_try({
124            let process_mgr = process_mgr.to_owned();
125            let bitcoind = bitcoind.clone();
126            || async move {
127                let bitcoind = bitcoind.get_try().await?.deref().clone();
128                debug!(target: LOG_DEVIMINT, "Starting lnd...");
129                let start_time = fedimint_core::time::now();
130                let lnd = Lnd::new(&process_mgr, bitcoind).await?;
131                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started lnd");
132                Ok(Arc::new(lnd))
133            }
134        });
135        let esplora = JitTryAnyhow::new_try({
136            let process_mgr = process_mgr.to_owned();
137            let bitcoind = bitcoind.clone();
138            || async move {
139                let bitcoind = bitcoind.get_try().await?.deref().clone();
140                debug!(target: LOG_DEVIMINT, "Starting esplora...");
141                let start_time = fedimint_core::time::now();
142                let esplora = Esplora::new(&process_mgr, bitcoind).await?;
143                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started esplora");
144                Ok(Arc::new(esplora))
145            }
146        });
147
148        let fed = JitTryAnyhow::new_try({
149            let process_mgr = process_mgr.to_owned();
150            let bitcoind = bitcoind.clone();
151            move || async move {
152                let bitcoind = bitcoind.get_try().await?.deref().clone();
153                debug!(target: LOG_DEVIMINT, "Starting federation...");
154                let start_time = fedimint_core::time::now();
155                let mut fed = Federation::new(
156                    &process_mgr,
157                    bitcoind,
158                    skip_setup,
159                    pre_dkg,
160                    0,
161                    "default".to_string(),
162                )
163                .await?;
164
165                // Create a degraded federation if there are offline nodes
166                fed.degrade_federation(&process_mgr).await?;
167
168                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started federation");
169
170                Ok(Arc::new(fed))
171            }
172        });
173
174        let gw_lnd = JitTryAnyhow::new_try({
175            let process_mgr = process_mgr.to_owned();
176            let lnd = lnd.clone();
177            || async move {
178                let lnd = lnd.get_try().await?.deref().clone();
179                debug!(target: LOG_DEVIMINT, "Starting lnd gateway...");
180                let start_time = fedimint_core::time::now();
181                let lnd_gw = Gatewayd::new(&process_mgr, LightningNode::Lnd(lnd), 0).await?;
182                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started lnd gateway");
183                Ok(Arc::new(lnd_gw))
184            }
185        });
186        let gw_lnd_registered = JitTryAnyhow::new_try({
187            let gw_lnd = gw_lnd.clone();
188            let fed = fed.clone();
189            move || async move {
190                let gw_lnd = gw_lnd.get_try().await?.deref();
191                let fed = fed.get_try().await?.deref();
192                debug!(target: LOG_DEVIMINT, "Registering lnd gateway...");
193                let start_time = fedimint_core::time::now();
194                if !skip_setup && !pre_dkg {
195                    gw_lnd.connect_fed(fed).await?;
196                }
197                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Registered lnd gateway");
198                Ok(Arc::new(()))
199            }
200        });
201
202        let gw_ldk = JitTryAnyhow::new_try({
203            let process_mgr = process_mgr.to_owned();
204            let bitcoind = bitcoind.clone();
205            move || async move {
206                bitcoind.get_try().await?;
207                debug!(target: LOG_DEVIMINT, "Starting ldk gateway...");
208                let start_time = fedimint_core::time::now();
209                let ldk_gw = Gatewayd::new(
210                    &process_mgr,
211                    LightningNode::Ldk {
212                        name: "gatewayd-ldk-0".to_string(),
213                        gw_port: process_mgr.globals.FM_PORT_GW_LDK,
214                        ldk_port: process_mgr.globals.FM_PORT_LDK,
215                    },
216                    1,
217                )
218                .await?;
219                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started ldk gateway");
220                Ok(Arc::new(ldk_gw))
221            }
222        });
223        let gw_ldk_second = JitTryAnyhow::new_try({
224            let process_mgr = process_mgr.to_owned();
225            let bitcoind = bitcoind.clone();
226            move || async move {
227                bitcoind.get_try().await?;
228                debug!(target: LOG_DEVIMINT, "Starting ldk gateway 2...");
229                let start_time = fedimint_core::time::now();
230                let ldk_gw2 = Gatewayd::new(
231                    &process_mgr,
232                    LightningNode::Ldk {
233                        name: "gatewayd-ldk-1".to_string(),
234                        gw_port: process_mgr.globals.FM_PORT_GW_LDK2,
235                        ldk_port: process_mgr.globals.FM_PORT_LDK2,
236                    },
237                    2,
238                )
239                .await?;
240                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started ldk gateway 2");
241                Ok(Arc::new(ldk_gw2))
242            }
243        });
244        let gw_ldk_connected = JitTryAnyhow::new_try({
245            let gw_ldk = gw_ldk.clone();
246            let fed = fed.clone();
247            move || async move {
248                let gw_ldk = gw_ldk.get_try().await?.deref();
249                if supports_lnv2() {
250                    let fed = fed.get_try().await?.deref();
251                    let start_time = fedimint_core::time::now();
252                    if !skip_setup && !pre_dkg {
253                        debug!(target: LOG_DEVIMINT, "Registering ldk gateway...");
254                        gw_ldk.connect_fed(fed).await?;
255                    } else {
256                        debug!(target: LOG_DEVIMINT, "Skipping registering ldk gateway");
257                    }
258                    info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected ldk gateway");
259                }
260                Ok(Arc::new(()))
261            }
262        });
263        let gw_ldk_second_connected = JitTryAnyhow::new_try({
264            let gw_ldk_second = gw_ldk_second.clone();
265            let fed = fed.clone();
266            move || async move {
267                let gw_ldk2 = gw_ldk_second.get_try().await?.deref();
268                if supports_lnv2() {
269                    let fed = fed.get_try().await?.deref();
270                    debug!(target: LOG_DEVIMINT, "Registering ldk gateway 2...");
271                    let start_time = fedimint_core::time::now();
272                    if !skip_setup && !pre_dkg {
273                        gw_ldk2.connect_fed(fed).await?;
274                    }
275                    info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected ldk gateway 2");
276                }
277                Ok(Arc::new(()))
278            }
279        });
280
281        let fed_epoch_generated = JitTryAnyhow::new_try({
282            let fed = fed.clone();
283            move || async move {
284                let fed = fed.get_try().await?.deref().clone();
285                debug!(target: LOG_DEVIMINT, "Generating federation epoch...");
286                let start_time = fedimint_core::time::now();
287                if !skip_setup && !pre_dkg {
288                    fed.mine_then_wait_blocks_sync(10).await?;
289                }
290                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Generated federation epoch");
291                Ok(Arc::new(()))
292            }
293        });
294
295        let channel_opened = JitTryAnyhow::new_try({
296            let gw_lnd = gw_lnd.clone();
297            let gw_ldk = gw_ldk.clone();
298            let gw_ldk_second = gw_ldk_second.clone();
299            let bitcoind = bitcoind.clone();
300            let fed_epoch_generated = fed_epoch_generated.clone();
301            move || async move {
302                // Note: We open new channel even if starting from existing state
303                // as ports change on every start, and without this nodes will not find each
304                // other.
305                let bitcoind = bitcoind.get_try().await?.deref().clone();
306
307                // Wait for an epoch to occur since that mines blocks, which can cause opening
308                // channels to be racy
309                fed_epoch_generated.get_try().await?;
310
311                let gw_ldk_second = gw_ldk_second.get_try().await?.deref();
312                let gw_ldk = gw_ldk.get_try().await?.deref();
313                let gw_lnd = gw_lnd.get_try().await?.deref();
314                let gateways: &[NamedGateway<'_>] =
315                    &[(gw_ldk_second, "LDK2"), (gw_lnd, "LND"), (gw_ldk, "LDK")];
316
317                debug!(target: LOG_DEVIMINT, "Opening channels between gateways...");
318                let start_time = fedimint_core::time::now();
319                open_channels_between_gateways(&bitcoind, gateways).await?;
320                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Opened channels between gateways");
321
322                Ok(Arc::new(()))
323            }
324        });
325
326        let recurringd = JitTryAnyhow::new_try({
327            let process_mgr = process_mgr.to_owned();
328            move || async move {
329                debug!(target: LOG_DEVIMINT, "Starting recurringd...");
330                let start_time = fedimint_core::time::now();
331                let recurringd = Recurringd::new(&process_mgr).await?;
332                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started recurringd");
333                Ok(Arc::new(recurringd))
334            }
335        });
336
337        let recurringd_connected = JitTryAnyhow::new_try({
338            let recurringd = recurringd.clone();
339            let fed = fed.clone();
340            move || async move {
341                let recurringd = recurringd.get_try().await?.deref();
342                let fed = fed.get_try().await?.deref();
343                debug!(target: LOG_DEVIMINT, "Connecting recurringd to federation...");
344                let start_time = fedimint_core::time::now();
345                if !skip_setup && !pre_dkg {
346                    let invite_code = fed.invite_code()?;
347                    recurringd.add_federation(&invite_code).await?;
348                }
349                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected recurringd to federation");
350                Ok(Arc::new(()))
351            }
352        });
353
354        let recurringdv2 = JitTryAnyhow::new_try({
355            let process_mgr = process_mgr.to_owned();
356            move || async move {
357                debug!(target: LOG_DEVIMINT, "Starting recurringdv2...");
358                let start_time = fedimint_core::time::now();
359                let recurringdv2 = Recurringdv2::new(&process_mgr).await?;
360                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started recurringdv2");
361                Ok(Arc::new(recurringdv2))
362            }
363        });
364
365        Ok(DevJitFed {
366            bitcoind,
367            lnd,
368            fed,
369            gw_lnd,
370            gw_ldk,
371            gw_ldk_second,
372            esplora,
373            recurringd,
374            recurringdv2,
375            start_time,
376            gw_lnd_registered,
377            gw_ldk_connected,
378            gw_ldk_second_connected,
379            fed_epoch_generated,
380            channel_opened,
381            recurringd_connected,
382            skip_setup,
383            pre_dkg,
384        })
385    }
386
387    pub async fn esplora(&self) -> anyhow::Result<&Esplora> {
388        Ok(self.esplora.get_try().await?.deref())
389    }
390    pub async fn lnd(&self) -> anyhow::Result<&Lnd> {
391        Ok(self.lnd.get_try().await?.deref())
392    }
393    pub async fn gw_lnd(&self) -> anyhow::Result<&Gatewayd> {
394        Ok(self.gw_lnd.get_try().await?.deref())
395    }
396    pub async fn gw_lnd_registered(&self) -> anyhow::Result<&Gatewayd> {
397        self.gw_lnd_registered.get_try().await?;
398        Ok(self.gw_lnd.get_try().await?.deref())
399    }
400    pub async fn gw_ldk(&self) -> anyhow::Result<&Gatewayd> {
401        Ok(self.gw_ldk.get_try().await?.deref())
402    }
403    pub async fn gw_ldk_second(&self) -> anyhow::Result<&Gatewayd> {
404        Ok(self.gw_ldk_second.get_try().await?.deref())
405    }
406    pub async fn gw_ldk_connected(&self) -> anyhow::Result<&Gatewayd> {
407        self.gw_ldk_connected.get_try().await?;
408        Ok(self.gw_ldk.get_try().await?.deref())
409    }
410    pub async fn gw_ldk_second_connected(&self) -> anyhow::Result<&Gatewayd> {
411        self.gw_ldk_second_connected.get_try().await?;
412        Ok(self.gw_ldk_second.get_try().await?.deref())
413    }
414    pub async fn fed(&self) -> anyhow::Result<&Federation> {
415        Ok(self.fed.get_try().await?.deref())
416    }
417    pub async fn bitcoind(&self) -> anyhow::Result<&Bitcoind> {
418        Ok(self.bitcoind.get_try().await?.deref())
419    }
420
421    pub async fn internal_client(&self) -> anyhow::Result<Client> {
422        Ok(self.fed().await?.internal_client().await?.clone())
423    }
424
425    /// Like [`Self::internal_client`] but will check and wait for a LN gateway
426    /// to be registered
427    pub async fn internal_client_gw_registered(&self) -> anyhow::Result<Client> {
428        self.fed().await?.await_gateways_registered().await?;
429        Ok(self.fed().await?.internal_client().await?.clone())
430    }
431
432    pub async fn recurringd(&self) -> anyhow::Result<&Recurringd> {
433        Ok(self.recurringd.get_try().await?.deref())
434    }
435
436    pub async fn recurringd_connected(&self) -> anyhow::Result<&Recurringd> {
437        self.recurringd_connected.get_try().await?;
438        Ok(self.recurringd.get_try().await?.deref())
439    }
440
441    pub async fn recurringdv2(&self) -> anyhow::Result<&Recurringdv2> {
442        Ok(self.recurringdv2.get_try().await?.deref())
443    }
444
445    pub async fn finalize(&self, process_mgr: &ProcessManager) -> anyhow::Result<()> {
446        let fed_size = process_mgr.globals.FM_FED_SIZE;
447        let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
448        anyhow::ensure!(
449            fed_size > 3 * offline_nodes,
450            "too many offline nodes ({offline_nodes}) to reach consensus"
451        );
452
453        if !self.pre_dkg && !self.skip_setup {
454            let _ = self.internal_client_gw_registered().await?;
455        }
456        let _ = self.channel_opened.get_try().await?;
457        let _ = self.gw_lnd_registered().await?;
458        let _ = self.gw_ldk_connected().await?;
459        let _ = self.gw_ldk_second_connected().await?;
460        let _ = self.lnd().await?;
461        let _ = self.esplora().await?;
462        let _ = self.recurringd_connected().await?;
463        let _ = self.recurringdv2().await?;
464        let _ = self.fed_epoch_generated.get_try().await?;
465
466        debug!(
467            target: LOG_DEVIMINT,
468            fed_size,
469            offline_nodes,
470            elapsed_ms = %self.start_time.elapsed()?.as_millis(),
471            "Dev federation ready",
472        );
473        Ok(())
474    }
475
476    pub async fn to_dev_fed(self, process_mgr: &ProcessManager) -> anyhow::Result<DevFed> {
477        self.finalize(process_mgr).await?;
478        Ok(DevFed {
479            bitcoind: self.bitcoind().await?.to_owned(),
480            lnd: self.lnd().await?.to_owned(),
481            fed: self.fed().await?.to_owned(),
482            gw_lnd: self.gw_lnd().await?.to_owned(),
483            gw_ldk: self.gw_ldk().await?.to_owned(),
484            gw_ldk_second: self.gw_ldk_second().await?.to_owned(),
485            esplora: self.esplora().await?.to_owned(),
486            recurringd: self.recurringd().await?.to_owned(),
487            recurringdv2: self.recurringdv2().await?.to_owned(),
488        })
489    }
490
491    pub async fn fast_terminate(self) {
492        let Self {
493            bitcoind,
494            lnd,
495            fed,
496            gw_lnd,
497            esplora,
498            gw_ldk,
499            gw_ldk_second,
500            recurringd,
501            recurringdv2,
502            ..
503        } = self;
504
505        join!(
506            spawn_drop(gw_lnd),
507            spawn_drop(gw_ldk),
508            spawn_drop(gw_ldk_second),
509            spawn_drop(fed),
510            spawn_drop(lnd),
511            spawn_drop(esplora),
512            spawn_drop(bitcoind),
513            spawn_drop(recurringd),
514            spawn_drop(recurringdv2),
515        );
516    }
517}