devimint/
devfed.rs

1use std::ops::Deref as _;
2use std::sync::Arc;
3
4use anyhow::Result;
5use fedimint_core::runtime;
6use fedimint_core::task::jit::{JitTry, JitTryAnyhow};
7use fedimint_logging::LOG_DEVIMINT;
8use tokio::join;
9use tracing::{debug, info};
10
11use crate::LightningNode;
12use crate::external::{Bitcoind, Esplora, Lnd, NamedGateway, open_channels_between_gateways};
13use crate::federation::{Client, Federation};
14use crate::gatewayd::Gatewayd;
15use crate::recurringd::Recurringd;
16use crate::util::{ProcessManager, supports_lnv2};
17
18async fn spawn_drop<T>(t: T)
19where
20    T: Send + 'static,
21{
22    runtime::spawn("spawn_drop", async {
23        drop(t);
24    })
25    .await
26    .expect("drop panic");
27}
28
29#[derive(Clone)]
30pub struct DevFed {
31    pub bitcoind: Bitcoind,
32    pub lnd: Lnd,
33    pub fed: Federation,
34    pub gw_lnd: Gatewayd,
35    pub gw_ldk: Gatewayd,
36    pub gw_ldk_second: Gatewayd,
37    pub esplora: Esplora,
38    pub recurringd: Recurringd,
39}
40
41impl DevFed {
42    pub async fn fast_terminate(self) {
43        let Self {
44            bitcoind,
45            lnd,
46            fed,
47            gw_lnd,
48            gw_ldk,
49            gw_ldk_second,
50            esplora,
51            recurringd,
52        } = self;
53
54        join!(
55            spawn_drop(gw_lnd),
56            spawn_drop(gw_ldk),
57            spawn_drop(gw_ldk_second),
58            spawn_drop(fed),
59            spawn_drop(lnd),
60            spawn_drop(esplora),
61            spawn_drop(bitcoind),
62            spawn_drop(recurringd),
63        );
64    }
65}
66pub async fn dev_fed(process_mgr: &ProcessManager) -> Result<DevFed> {
67    DevJitFed::new(process_mgr, false, false)?
68        .to_dev_fed(process_mgr)
69        .await
70}
71
72type JitArc<T> = JitTryAnyhow<Arc<T>>;
73
74#[derive(Clone)]
75pub struct DevJitFed {
76    bitcoind: JitArc<Bitcoind>,
77    lnd: JitArc<Lnd>,
78    fed: JitArc<Federation>,
79    gw_lnd: JitArc<Gatewayd>,
80    gw_ldk: JitArc<Gatewayd>,
81    gw_ldk_second: JitArc<Gatewayd>,
82    esplora: JitArc<Esplora>,
83    recurringd: JitArc<Recurringd>,
84    start_time: std::time::SystemTime,
85    gw_lnd_registered: JitArc<()>,
86    gw_ldk_connected: JitArc<()>,
87    gw_ldk_second_connected: JitArc<()>,
88    fed_epoch_generated: JitArc<()>,
89    channel_opened: JitArc<()>,
90    recurringd_connected: JitArc<()>,
91
92    skip_setup: bool,
93    pre_dkg: bool,
94}
95
96impl DevJitFed {
97    pub fn new(process_mgr: &ProcessManager, skip_setup: bool, pre_dkg: bool) -> Result<DevJitFed> {
98        let fed_size = process_mgr.globals.FM_FED_SIZE;
99        let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
100        anyhow::ensure!(
101            fed_size > 3 * offline_nodes,
102            "too many offline nodes ({offline_nodes}) to reach consensus"
103        );
104        let start_time = fedimint_core::time::now();
105
106        debug!("Starting dev federation");
107
108        let bitcoind = JitTry::new_try({
109            let process_mgr = process_mgr.to_owned();
110            move || async move {
111                debug!(target: LOG_DEVIMINT, "Starting bitcoind...");
112                let start_time = fedimint_core::time::now();
113                let bitcoind = Bitcoind::new(&process_mgr, skip_setup).await?;
114                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started bitcoind");
115                Ok(Arc::new(bitcoind))
116            }
117        });
118        let lnd = JitTry::new_try({
119            let process_mgr = process_mgr.to_owned();
120            let bitcoind = bitcoind.clone();
121            || async move {
122                let bitcoind = bitcoind.get_try().await?.deref().clone();
123                debug!(target: LOG_DEVIMINT, "Starting lnd...");
124                let start_time = fedimint_core::time::now();
125                let lnd = Lnd::new(&process_mgr, bitcoind).await?;
126                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started lnd");
127                Ok(Arc::new(lnd))
128            }
129        });
130        let esplora = JitTryAnyhow::new_try({
131            let process_mgr = process_mgr.to_owned();
132            let bitcoind = bitcoind.clone();
133            || async move {
134                let bitcoind = bitcoind.get_try().await?.deref().clone();
135                debug!(target: LOG_DEVIMINT, "Starting esplora...");
136                let start_time = fedimint_core::time::now();
137                let esplora = Esplora::new(&process_mgr, bitcoind).await?;
138                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started esplora");
139                Ok(Arc::new(esplora))
140            }
141        });
142
143        let fed = JitTryAnyhow::new_try({
144            let process_mgr = process_mgr.to_owned();
145            let bitcoind = bitcoind.clone();
146            move || async move {
147                let bitcoind = bitcoind.get_try().await?.deref().clone();
148                debug!(target: LOG_DEVIMINT, "Starting federation...");
149                let start_time = fedimint_core::time::now();
150                let mut fed = Federation::new(
151                    &process_mgr,
152                    bitcoind,
153                    skip_setup,
154                    pre_dkg,
155                    0,
156                    "default".to_string(),
157                )
158                .await?;
159
160                // Create a degraded federation if there are offline nodes
161                fed.degrade_federation(&process_mgr).await?;
162
163                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started federation");
164
165                Ok(Arc::new(fed))
166            }
167        });
168
169        let gw_lnd = JitTryAnyhow::new_try({
170            let process_mgr = process_mgr.to_owned();
171            let lnd = lnd.clone();
172            || async move {
173                let lnd = lnd.get_try().await?.deref().clone();
174                debug!(target: LOG_DEVIMINT, "Starting lnd gateway...");
175                let start_time = fedimint_core::time::now();
176                let lnd_gw = Gatewayd::new(&process_mgr, LightningNode::Lnd(lnd)).await?;
177                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started lnd gateway");
178                Ok(Arc::new(lnd_gw))
179            }
180        });
181        let gw_lnd_registered = JitTryAnyhow::new_try({
182            let gw_lnd = gw_lnd.clone();
183            let fed = fed.clone();
184            move || async move {
185                let gw_lnd = gw_lnd.get_try().await?.deref();
186                let fed = fed.get_try().await?.deref();
187                debug!(target: LOG_DEVIMINT, "Registering lnd gateway...");
188                let start_time = fedimint_core::time::now();
189                if !skip_setup && !pre_dkg {
190                    gw_lnd.connect_fed(fed).await?;
191                }
192                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Registered lnd gateway");
193                Ok(Arc::new(()))
194            }
195        });
196
197        let gw_ldk = JitTryAnyhow::new_try({
198            let process_mgr = process_mgr.to_owned();
199            let bitcoind = bitcoind.clone();
200            move || async move {
201                bitcoind.get_try().await?;
202                debug!(target: LOG_DEVIMINT, "Starting ldk gateway...");
203                let start_time = fedimint_core::time::now();
204                let ldk_gw = Gatewayd::new(
205                    &process_mgr,
206                    LightningNode::Ldk {
207                        name: "gatewayd-ldk-0".to_string(),
208                        gw_port: process_mgr.globals.FM_PORT_GW_LDK,
209                        ldk_port: process_mgr.globals.FM_PORT_LDK,
210                    },
211                )
212                .await?;
213                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started ldk gateway");
214                Ok(Arc::new(ldk_gw))
215            }
216        });
217        let gw_ldk_second = JitTryAnyhow::new_try({
218            let process_mgr = process_mgr.to_owned();
219            let bitcoind = bitcoind.clone();
220            move || async move {
221                bitcoind.get_try().await?;
222                debug!(target: LOG_DEVIMINT, "Starting ldk gateway 2...");
223                let start_time = fedimint_core::time::now();
224                let ldk_gw2 = Gatewayd::new(
225                    &process_mgr,
226                    LightningNode::Ldk {
227                        name: "gatewayd-ldk-1".to_string(),
228                        gw_port: process_mgr.globals.FM_PORT_GW_LDK2,
229                        ldk_port: process_mgr.globals.FM_PORT_LDK2,
230                    },
231                )
232                .await?;
233                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started ldk gateway 2");
234                Ok(Arc::new(ldk_gw2))
235            }
236        });
237        let gw_ldk_connected = JitTryAnyhow::new_try({
238            let gw_ldk = gw_ldk.clone();
239            let fed = fed.clone();
240            move || async move {
241                let gw_ldk = gw_ldk.get_try().await?.deref();
242                if supports_lnv2() {
243                    let fed = fed.get_try().await?.deref();
244                    let start_time = fedimint_core::time::now();
245                    if !skip_setup && !pre_dkg {
246                        debug!(target: LOG_DEVIMINT, "Registering ldk gateway...");
247                        gw_ldk.connect_fed(fed).await?;
248                    } else {
249                        debug!(target: LOG_DEVIMINT, "Skipping registering ldk gateway");
250                    }
251                    info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected ldk gateway");
252                }
253                Ok(Arc::new(()))
254            }
255        });
256        let gw_ldk_second_connected = JitTryAnyhow::new_try({
257            let gw_ldk_second = gw_ldk_second.clone();
258            let fed = fed.clone();
259            move || async move {
260                let gw_ldk2 = gw_ldk_second.get_try().await?.deref();
261                if supports_lnv2() {
262                    let fed = fed.get_try().await?.deref();
263                    debug!(target: LOG_DEVIMINT, "Registering ldk gateway 2...");
264                    let start_time = fedimint_core::time::now();
265                    if !skip_setup && !pre_dkg {
266                        gw_ldk2.connect_fed(fed).await?;
267                    }
268                    info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected ldk gateway 2");
269                }
270                Ok(Arc::new(()))
271            }
272        });
273
274        let fed_epoch_generated = JitTryAnyhow::new_try({
275            let fed = fed.clone();
276            move || async move {
277                let fed = fed.get_try().await?.deref().clone();
278                debug!(target: LOG_DEVIMINT, "Generating federation epoch...");
279                let start_time = fedimint_core::time::now();
280                if !skip_setup && !pre_dkg {
281                    fed.mine_then_wait_blocks_sync(10).await?;
282                }
283                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Generated federation epoch");
284                Ok(Arc::new(()))
285            }
286        });
287
288        let channel_opened = JitTryAnyhow::new_try({
289            let gw_lnd = gw_lnd.clone();
290            let gw_ldk = gw_ldk.clone();
291            let gw_ldk_second = gw_ldk_second.clone();
292            let bitcoind = bitcoind.clone();
293            let fed_epoch_generated = fed_epoch_generated.clone();
294            move || async move {
295                // Note: We open new channel even if starting from existing state
296                // as ports change on every start, and without this nodes will not find each
297                // other.
298                let bitcoind = bitcoind.get_try().await?.deref().clone();
299
300                // Wait for an epoch to occur since that mines blocks, which can cause opening
301                // channels to be racy
302                fed_epoch_generated.get_try().await?;
303
304                let gw_ldk_second = gw_ldk_second.get_try().await?.deref();
305                let gw_ldk = gw_ldk.get_try().await?.deref();
306                let gw_lnd = gw_lnd.get_try().await?.deref();
307                let gateways: &[NamedGateway<'_>] =
308                    &[(gw_ldk_second, "LDK2"), (gw_lnd, "LND"), (gw_ldk, "LDK")];
309
310                debug!(target: LOG_DEVIMINT, "Opening channels between gateways...");
311                let start_time = fedimint_core::time::now();
312                open_channels_between_gateways(&bitcoind, gateways).await?;
313                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Opened channels between gateways");
314
315                Ok(Arc::new(()))
316            }
317        });
318
319        let recurringd = JitTryAnyhow::new_try({
320            let process_mgr = process_mgr.to_owned();
321            move || async move {
322                debug!(target: LOG_DEVIMINT, "Starting recurringd...");
323                let start_time = fedimint_core::time::now();
324                let recurringd = Recurringd::new(&process_mgr).await?;
325                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started recurringd");
326                Ok(Arc::new(recurringd))
327            }
328        });
329
330        let recurringd_connected = JitTryAnyhow::new_try({
331            let recurringd = recurringd.clone();
332            let fed = fed.clone();
333            move || async move {
334                let recurringd = recurringd.get_try().await?.deref();
335                let fed = fed.get_try().await?.deref();
336                debug!(target: LOG_DEVIMINT, "Connecting recurringd to federation...");
337                let start_time = fedimint_core::time::now();
338                if !skip_setup && !pre_dkg {
339                    let invite_code = fed.invite_code()?;
340                    recurringd.add_federation(&invite_code).await?;
341                }
342                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected recurringd to federation");
343                Ok(Arc::new(()))
344            }
345        });
346
347        Ok(DevJitFed {
348            bitcoind,
349            lnd,
350            fed,
351            gw_lnd,
352            gw_ldk,
353            gw_ldk_second,
354            esplora,
355            recurringd,
356            start_time,
357            gw_lnd_registered,
358            gw_ldk_connected,
359            gw_ldk_second_connected,
360            fed_epoch_generated,
361            channel_opened,
362            recurringd_connected,
363            skip_setup,
364            pre_dkg,
365        })
366    }
367
368    pub async fn esplora(&self) -> anyhow::Result<&Esplora> {
369        Ok(self.esplora.get_try().await?.deref())
370    }
371    pub async fn lnd(&self) -> anyhow::Result<&Lnd> {
372        Ok(self.lnd.get_try().await?.deref())
373    }
374    pub async fn gw_lnd(&self) -> anyhow::Result<&Gatewayd> {
375        Ok(self.gw_lnd.get_try().await?.deref())
376    }
377    pub async fn gw_lnd_registered(&self) -> anyhow::Result<&Gatewayd> {
378        self.gw_lnd_registered.get_try().await?;
379        Ok(self.gw_lnd.get_try().await?.deref())
380    }
381    pub async fn gw_ldk(&self) -> anyhow::Result<&Gatewayd> {
382        Ok(self.gw_ldk.get_try().await?.deref())
383    }
384    pub async fn gw_ldk_second(&self) -> anyhow::Result<&Gatewayd> {
385        Ok(self.gw_ldk_second.get_try().await?.deref())
386    }
387    pub async fn gw_ldk_connected(&self) -> anyhow::Result<&Gatewayd> {
388        self.gw_ldk_connected.get_try().await?;
389        Ok(self.gw_ldk.get_try().await?.deref())
390    }
391    pub async fn gw_ldk_second_connected(&self) -> anyhow::Result<&Gatewayd> {
392        self.gw_ldk_second_connected.get_try().await?;
393        Ok(self.gw_ldk_second.get_try().await?.deref())
394    }
395    pub async fn fed(&self) -> anyhow::Result<&Federation> {
396        Ok(self.fed.get_try().await?.deref())
397    }
398    pub async fn bitcoind(&self) -> anyhow::Result<&Bitcoind> {
399        Ok(self.bitcoind.get_try().await?.deref())
400    }
401
402    pub async fn internal_client(&self) -> anyhow::Result<Client> {
403        Ok(self.fed().await?.internal_client().await?.clone())
404    }
405
406    /// Like [`Self::internal_client`] but will check and wait for a LN gateway
407    /// to be registered
408    pub async fn internal_client_gw_registered(&self) -> anyhow::Result<Client> {
409        self.fed().await?.await_gateways_registered().await?;
410        Ok(self.fed().await?.internal_client().await?.clone())
411    }
412
413    pub async fn recurringd(&self) -> anyhow::Result<&Recurringd> {
414        Ok(self.recurringd.get_try().await?.deref())
415    }
416
417    pub async fn recurringd_connected(&self) -> anyhow::Result<&Recurringd> {
418        self.recurringd_connected.get_try().await?;
419        Ok(self.recurringd.get_try().await?.deref())
420    }
421
422    pub async fn finalize(&self, process_mgr: &ProcessManager) -> anyhow::Result<()> {
423        let fed_size = process_mgr.globals.FM_FED_SIZE;
424        let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
425        anyhow::ensure!(
426            fed_size > 3 * offline_nodes,
427            "too many offline nodes ({offline_nodes}) to reach consensus"
428        );
429
430        if !self.pre_dkg && !self.skip_setup {
431            let _ = self.internal_client_gw_registered().await?;
432        }
433        let _ = self.channel_opened.get_try().await?;
434        let _ = self.gw_lnd_registered().await?;
435        let _ = self.gw_ldk_connected().await?;
436        let _ = self.gw_ldk_second_connected().await?;
437        let _ = self.lnd().await?;
438        let _ = self.esplora().await?;
439        let _ = self.recurringd_connected().await?;
440        let _ = self.fed_epoch_generated.get_try().await?;
441
442        debug!(
443            target: LOG_DEVIMINT,
444            fed_size,
445            offline_nodes,
446            elapsed_ms = %self.start_time.elapsed()?.as_millis(),
447            "Dev federation ready",
448        );
449        Ok(())
450    }
451
452    pub async fn to_dev_fed(self, process_mgr: &ProcessManager) -> anyhow::Result<DevFed> {
453        self.finalize(process_mgr).await?;
454        Ok(DevFed {
455            bitcoind: self.bitcoind().await?.to_owned(),
456            lnd: self.lnd().await?.to_owned(),
457            fed: self.fed().await?.to_owned(),
458            gw_lnd: self.gw_lnd().await?.to_owned(),
459            gw_ldk: self.gw_ldk().await?.to_owned(),
460            gw_ldk_second: self.gw_ldk_second().await?.to_owned(),
461            esplora: self.esplora().await?.to_owned(),
462            recurringd: self.recurringd().await?.to_owned(),
463        })
464    }
465
466    pub async fn fast_terminate(self) {
467        let Self {
468            bitcoind,
469            lnd,
470            fed,
471            gw_lnd,
472            esplora,
473            gw_ldk,
474            gw_ldk_second,
475            recurringd,
476            ..
477        } = self;
478
479        join!(
480            spawn_drop(gw_lnd),
481            spawn_drop(gw_ldk),
482            spawn_drop(gw_ldk_second),
483            spawn_drop(fed),
484            spawn_drop(lnd),
485            spawn_drop(esplora),
486            spawn_drop(bitcoind),
487            spawn_drop(recurringd),
488        );
489    }
490}