devimint/
devfed.rs

1use std::ops::Deref as _;
2use std::sync::Arc;
3
4use anyhow::Result;
5use fedimint_core::runtime;
6use fedimint_core::task::jit::{JitTry, JitTryAnyhow};
7use fedimint_logging::LOG_DEVIMINT;
8use tokio::join;
9use tracing::{debug, info};
10
11use crate::LightningNode;
12use crate::external::{
13    Bitcoind, Electrs, Esplora, Lnd, NamedGateway, open_channels_between_gateways,
14};
15use crate::federation::{Client, Federation};
16use crate::gatewayd::{Gatewayd, LdkChainSource};
17use crate::recurringd::Recurringd;
18use crate::util::{ProcessManager, supports_lnv2};
19
20async fn spawn_drop<T>(t: T)
21where
22    T: Send + 'static,
23{
24    runtime::spawn("spawn_drop", async {
25        drop(t);
26    })
27    .await
28    .expect("drop panic");
29}
30
31#[derive(Clone)]
32pub struct DevFed {
33    pub bitcoind: Bitcoind,
34    pub lnd: Lnd,
35    pub fed: Federation,
36    pub gw_lnd: Gatewayd,
37    pub gw_ldk: Gatewayd,
38    pub gw_ldk_second: Gatewayd,
39    pub electrs: Electrs,
40    pub esplora: Esplora,
41    pub recurringd: Recurringd,
42}
43
44impl DevFed {
45    pub async fn fast_terminate(self) {
46        let Self {
47            bitcoind,
48            lnd,
49            fed,
50            gw_lnd,
51            gw_ldk,
52            gw_ldk_second,
53            electrs,
54            esplora,
55            recurringd,
56        } = self;
57
58        join!(
59            spawn_drop(gw_lnd),
60            spawn_drop(gw_ldk),
61            spawn_drop(gw_ldk_second),
62            spawn_drop(fed),
63            spawn_drop(lnd),
64            spawn_drop(esplora),
65            spawn_drop(electrs),
66            spawn_drop(bitcoind),
67            spawn_drop(recurringd),
68        );
69    }
70}
71pub async fn dev_fed(process_mgr: &ProcessManager) -> Result<DevFed> {
72    DevJitFed::new(process_mgr, false, false)?
73        .to_dev_fed(process_mgr)
74        .await
75}
76
77type JitArc<T> = JitTryAnyhow<Arc<T>>;
78
79#[derive(Clone)]
80pub struct DevJitFed {
81    bitcoind: JitArc<Bitcoind>,
82    lnd: JitArc<Lnd>,
83    fed: JitArc<Federation>,
84    gw_lnd: JitArc<Gatewayd>,
85    gw_ldk: JitArc<Gatewayd>,
86    gw_ldk_second: JitArc<Gatewayd>,
87    electrs: JitArc<Electrs>,
88    esplora: JitArc<Esplora>,
89    recurringd: JitArc<Recurringd>,
90    start_time: std::time::SystemTime,
91    gw_lnd_registered: JitArc<()>,
92    gw_ldk_connected: JitArc<()>,
93    gw_ldk_second_connected: JitArc<()>,
94    fed_epoch_generated: JitArc<()>,
95    channel_opened: JitArc<()>,
96    recurringd_connected: JitArc<()>,
97
98    skip_setup: bool,
99    pre_dkg: bool,
100}
101
102impl DevJitFed {
103    pub fn new(process_mgr: &ProcessManager, skip_setup: bool, pre_dkg: bool) -> Result<DevJitFed> {
104        let fed_size = process_mgr.globals.FM_FED_SIZE;
105        let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
106        anyhow::ensure!(
107            fed_size > 3 * offline_nodes,
108            "too many offline nodes ({offline_nodes}) to reach consensus"
109        );
110        let start_time = fedimint_core::time::now();
111
112        debug!("Starting dev federation");
113
114        let bitcoind = JitTry::new_try({
115            let process_mgr = process_mgr.to_owned();
116            move || async move {
117                debug!(target: LOG_DEVIMINT, "Starting bitcoind...");
118                let start_time = fedimint_core::time::now();
119                let bitcoind = Bitcoind::new(&process_mgr, skip_setup).await?;
120                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started bitcoind");
121                Ok(Arc::new(bitcoind))
122            }
123        });
124        let lnd = JitTry::new_try({
125            let process_mgr = process_mgr.to_owned();
126            let bitcoind = bitcoind.clone();
127            || async move {
128                let bitcoind = bitcoind.get_try().await?.deref().clone();
129                debug!(target: LOG_DEVIMINT, "Starting lnd...");
130                let start_time = fedimint_core::time::now();
131                let lnd = Lnd::new(&process_mgr, bitcoind).await?;
132                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started lnd");
133                Ok(Arc::new(lnd))
134            }
135        });
136        let electrs = JitTryAnyhow::new_try({
137            let process_mgr = process_mgr.to_owned();
138            let bitcoind = bitcoind.clone();
139            || async move {
140                let bitcoind = bitcoind.get_try().await?.deref().clone();
141                debug!(target: LOG_DEVIMINT, "Starting electrs...");
142                let start_time = fedimint_core::time::now();
143                let electrs = Electrs::new(&process_mgr, bitcoind).await?;
144                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started electrs");
145                Ok(Arc::new(electrs))
146            }
147        });
148        let esplora = JitTryAnyhow::new_try({
149            let process_mgr = process_mgr.to_owned();
150            let bitcoind = bitcoind.clone();
151            || async move {
152                let bitcoind = bitcoind.get_try().await?.deref().clone();
153                debug!(target: LOG_DEVIMINT, "Starting esplora...");
154                let start_time = fedimint_core::time::now();
155                let esplora = Esplora::new(&process_mgr, bitcoind).await?;
156                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started esplora");
157                Ok(Arc::new(esplora))
158            }
159        });
160
161        let fed = JitTryAnyhow::new_try({
162            let process_mgr = process_mgr.to_owned();
163            let bitcoind = bitcoind.clone();
164            move || async move {
165                let bitcoind = bitcoind.get_try().await?.deref().clone();
166                debug!(target: LOG_DEVIMINT, "Starting federation...");
167                let start_time = fedimint_core::time::now();
168                let mut fed = Federation::new(
169                    &process_mgr,
170                    bitcoind,
171                    skip_setup,
172                    pre_dkg,
173                    0,
174                    "default".to_string(),
175                )
176                .await?;
177
178                // Create a degraded federation if there are offline nodes
179                fed.degrade_federation(&process_mgr).await?;
180
181                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started federation");
182
183                Ok(Arc::new(fed))
184            }
185        });
186
187        let gw_lnd = JitTryAnyhow::new_try({
188            let process_mgr = process_mgr.to_owned();
189            let lnd = lnd.clone();
190            || async move {
191                let lnd = lnd.get_try().await?.deref().clone();
192                debug!(target: LOG_DEVIMINT, "Starting lnd gateway...");
193                let start_time = fedimint_core::time::now();
194                let lnd_gw = Gatewayd::new(&process_mgr, LightningNode::Lnd(lnd)).await?;
195                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started lnd gateway");
196                Ok(Arc::new(lnd_gw))
197            }
198        });
199        let gw_lnd_registered = JitTryAnyhow::new_try({
200            let gw_lnd = gw_lnd.clone();
201            let fed = fed.clone();
202            move || async move {
203                let gw_lnd = gw_lnd.get_try().await?.deref();
204                let fed = fed.get_try().await?.deref();
205                debug!(target: LOG_DEVIMINT, "Registering lnd gateway...");
206                let start_time = fedimint_core::time::now();
207                if !skip_setup && !pre_dkg {
208                    gw_lnd.connect_fed(fed).await?;
209                }
210                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Registered lnd gateway");
211                Ok(Arc::new(()))
212            }
213        });
214
215        let gw_ldk = JitTryAnyhow::new_try({
216            let process_mgr = process_mgr.to_owned();
217            move || async move {
218                debug!(target: LOG_DEVIMINT, "Starting ldk gateway...");
219                let start_time = fedimint_core::time::now();
220                let ldk_gw = Gatewayd::new(
221                    &process_mgr,
222                    LightningNode::Ldk {
223                        name: "gatewayd-ldk-0".to_string(),
224                        gw_port: process_mgr.globals.FM_PORT_GW_LDK,
225                        ldk_port: process_mgr.globals.FM_PORT_LDK,
226                        chain_source: LdkChainSource::Bitcoind,
227                    },
228                )
229                .await?;
230                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started ldk gateway");
231                Ok(Arc::new(ldk_gw))
232            }
233        });
234        let gw_ldk_second = JitTryAnyhow::new_try({
235            let process_mgr = process_mgr.to_owned();
236            move || async move {
237                debug!(target: LOG_DEVIMINT, "Starting ldk gateway 2...");
238                let start_time = fedimint_core::time::now();
239                let ldk_gw2 = Gatewayd::new(
240                    &process_mgr,
241                    LightningNode::Ldk {
242                        name: "gatewayd-ldk-1".to_string(),
243                        gw_port: process_mgr.globals.FM_PORT_GW_LDK2,
244                        ldk_port: process_mgr.globals.FM_PORT_LDK2,
245                        chain_source: LdkChainSource::Bitcoind,
246                    },
247                )
248                .await?;
249                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started ldk gateway 2");
250                Ok(Arc::new(ldk_gw2))
251            }
252        });
253        let gw_ldk_connected = JitTryAnyhow::new_try({
254            let gw_ldk = gw_ldk.clone();
255            let fed = fed.clone();
256            move || async move {
257                let gw_ldk = gw_ldk.get_try().await?.deref();
258                if supports_lnv2() {
259                    let fed = fed.get_try().await?.deref();
260                    debug!(target: LOG_DEVIMINT, "Registering ldk gateway...");
261                    let start_time = fedimint_core::time::now();
262                    if !skip_setup && !pre_dkg {
263                        gw_ldk.connect_fed(fed).await?;
264                    }
265                    info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected ldk gateway");
266                }
267                Ok(Arc::new(()))
268            }
269        });
270        let gw_ldk_second_connected = JitTryAnyhow::new_try({
271            let gw_ldk_second = gw_ldk_second.clone();
272            let fed = fed.clone();
273            move || async move {
274                let gw_ldk2 = gw_ldk_second.get_try().await?.deref();
275                if supports_lnv2() {
276                    let fed = fed.get_try().await?.deref();
277                    debug!(target: LOG_DEVIMINT, "Registering ldk gateway 2...");
278                    let start_time = fedimint_core::time::now();
279                    if !skip_setup && !pre_dkg {
280                        gw_ldk2.connect_fed(fed).await?;
281                    }
282                    info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected ldk gateway 2");
283                }
284                Ok(Arc::new(()))
285            }
286        });
287
288        let channel_opened = JitTryAnyhow::new_try({
289            let gw_lnd = gw_lnd.clone();
290            let gw_ldk = gw_ldk.clone();
291            let gw_ldk_second = gw_ldk_second.clone();
292            let bitcoind = bitcoind.clone();
293            move || async move {
294                // Note: We open new channel even if starting from existing state
295                // as ports change on every start, and without this nodes will not find each
296                // other.
297                let bitcoind = bitcoind.get_try().await?.deref().clone();
298
299                let gw_ldk_second = gw_ldk_second.get_try().await?.deref();
300                let gw_ldk = gw_ldk.get_try().await?.deref();
301                let gw_lnd = gw_lnd.get_try().await?.deref();
302                let gateways: &[NamedGateway<'_>] =
303                    &[(gw_ldk_second, "LDK2"), (gw_lnd, "LND"), (gw_ldk, "LDK")];
304
305                debug!(target: LOG_DEVIMINT, "Opening channels between gateways...");
306                let start_time = fedimint_core::time::now();
307                open_channels_between_gateways(&bitcoind, gateways).await?;
308                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Opened channels between gateways");
309
310                Ok(Arc::new(()))
311            }
312        });
313
314        let fed_epoch_generated = JitTryAnyhow::new_try({
315            let fed = fed.clone();
316            move || async move {
317                let fed = fed.get_try().await?.deref().clone();
318                debug!(target: LOG_DEVIMINT, "Generating federation epoch...");
319                let start_time = fedimint_core::time::now();
320                if !skip_setup && !pre_dkg {
321                    fed.mine_then_wait_blocks_sync(10).await?;
322                }
323                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Generated federation epoch");
324                Ok(Arc::new(()))
325            }
326        });
327
328        let recurringd = JitTryAnyhow::new_try({
329            let process_mgr = process_mgr.to_owned();
330            move || async move {
331                debug!(target: LOG_DEVIMINT, "Starting recurringd...");
332                let start_time = fedimint_core::time::now();
333                let recurringd = Recurringd::new(&process_mgr).await?;
334                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started recurringd");
335                Ok(Arc::new(recurringd))
336            }
337        });
338
339        let recurringd_connected = JitTryAnyhow::new_try({
340            let recurringd = recurringd.clone();
341            let fed = fed.clone();
342            move || async move {
343                let recurringd = recurringd.get_try().await?.deref();
344                let fed = fed.get_try().await?.deref();
345                debug!(target: LOG_DEVIMINT, "Connecting recurringd to federation...");
346                let start_time = fedimint_core::time::now();
347                if !skip_setup && !pre_dkg {
348                    let invite_code = fed.invite_code()?;
349                    recurringd.add_federation(&invite_code).await?;
350                }
351                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected recurringd to federation");
352                Ok(Arc::new(()))
353            }
354        });
355
356        Ok(DevJitFed {
357            bitcoind,
358            lnd,
359            fed,
360            gw_lnd,
361            gw_ldk,
362            gw_ldk_second,
363            electrs,
364            esplora,
365            recurringd,
366            start_time,
367            gw_lnd_registered,
368            gw_ldk_connected,
369            gw_ldk_second_connected,
370            fed_epoch_generated,
371            channel_opened,
372            recurringd_connected,
373            skip_setup,
374            pre_dkg,
375        })
376    }
377
378    pub async fn electrs(&self) -> anyhow::Result<&Electrs> {
379        Ok(self.electrs.get_try().await?.deref())
380    }
381    pub async fn esplora(&self) -> anyhow::Result<&Esplora> {
382        Ok(self.esplora.get_try().await?.deref())
383    }
384    pub async fn lnd(&self) -> anyhow::Result<&Lnd> {
385        Ok(self.lnd.get_try().await?.deref())
386    }
387    pub async fn gw_lnd(&self) -> anyhow::Result<&Gatewayd> {
388        Ok(self.gw_lnd.get_try().await?.deref())
389    }
390    pub async fn gw_lnd_registered(&self) -> anyhow::Result<&Gatewayd> {
391        self.gw_lnd_registered.get_try().await?;
392        Ok(self.gw_lnd.get_try().await?.deref())
393    }
394    pub async fn gw_ldk(&self) -> anyhow::Result<&Gatewayd> {
395        Ok(self.gw_ldk.get_try().await?.deref())
396    }
397    pub async fn gw_ldk_second(&self) -> anyhow::Result<&Gatewayd> {
398        Ok(self.gw_ldk_second.get_try().await?.deref())
399    }
400    pub async fn gw_ldk_connected(&self) -> anyhow::Result<&Gatewayd> {
401        self.gw_ldk_connected.get_try().await?;
402        Ok(self.gw_ldk.get_try().await?.deref())
403    }
404    pub async fn gw_ldk_second_connected(&self) -> anyhow::Result<&Gatewayd> {
405        self.gw_ldk_second_connected.get_try().await?;
406        Ok(self.gw_ldk_second.get_try().await?.deref())
407    }
408    pub async fn fed(&self) -> anyhow::Result<&Federation> {
409        Ok(self.fed.get_try().await?.deref())
410    }
411    pub async fn bitcoind(&self) -> anyhow::Result<&Bitcoind> {
412        Ok(self.bitcoind.get_try().await?.deref())
413    }
414
415    pub async fn internal_client(&self) -> anyhow::Result<Client> {
416        Ok(self.fed().await?.internal_client().await?.clone())
417    }
418
419    /// Like [`Self::internal_client`] but will check and wait for a LN gateway
420    /// to be registered
421    pub async fn internal_client_gw_registered(&self) -> anyhow::Result<Client> {
422        self.fed().await?.await_gateways_registered().await?;
423        Ok(self.fed().await?.internal_client().await?.clone())
424    }
425
426    pub async fn recurringd(&self) -> anyhow::Result<&Recurringd> {
427        Ok(self.recurringd.get_try().await?.deref())
428    }
429
430    pub async fn recurringd_connected(&self) -> anyhow::Result<&Recurringd> {
431        self.recurringd_connected.get_try().await?;
432        Ok(self.recurringd.get_try().await?.deref())
433    }
434
435    pub async fn finalize(&self, process_mgr: &ProcessManager) -> anyhow::Result<()> {
436        let fed_size = process_mgr.globals.FM_FED_SIZE;
437        let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
438        anyhow::ensure!(
439            fed_size > 3 * offline_nodes,
440            "too many offline nodes ({offline_nodes}) to reach consensus"
441        );
442
443        if !self.pre_dkg && !self.skip_setup {
444            let _ = self.internal_client_gw_registered().await?;
445        }
446        let _ = self.channel_opened.get_try().await?;
447        let _ = self.gw_lnd_registered().await?;
448        let _ = self.gw_ldk_connected().await?;
449        let _ = self.gw_ldk_second_connected().await?;
450        let _ = self.lnd().await?;
451        let _ = self.electrs().await?;
452        let _ = self.esplora().await?;
453        let _ = self.recurringd_connected().await?;
454        let _ = self.fed_epoch_generated.get_try().await?;
455
456        debug!(
457            target: LOG_DEVIMINT,
458            fed_size,
459            offline_nodes,
460            elapsed_ms = %self.start_time.elapsed()?.as_millis(),
461            "Dev federation ready",
462        );
463        Ok(())
464    }
465
466    pub async fn to_dev_fed(self, process_mgr: &ProcessManager) -> anyhow::Result<DevFed> {
467        self.finalize(process_mgr).await?;
468        Ok(DevFed {
469            bitcoind: self.bitcoind().await?.to_owned(),
470            lnd: self.lnd().await?.to_owned(),
471            fed: self.fed().await?.to_owned(),
472            gw_lnd: self.gw_lnd().await?.to_owned(),
473            gw_ldk: self.gw_ldk().await?.to_owned(),
474            gw_ldk_second: self.gw_ldk_second().await?.to_owned(),
475            esplora: self.esplora().await?.to_owned(),
476            electrs: self.electrs().await?.to_owned(),
477            recurringd: self.recurringd().await?.to_owned(),
478        })
479    }
480
481    pub async fn fast_terminate(self) {
482        let Self {
483            bitcoind,
484            lnd,
485            fed,
486            gw_lnd,
487            electrs,
488            esplora,
489            gw_ldk,
490            gw_ldk_second,
491            recurringd,
492            ..
493        } = self;
494
495        join!(
496            spawn_drop(gw_lnd),
497            spawn_drop(gw_ldk),
498            spawn_drop(gw_ldk_second),
499            spawn_drop(fed),
500            spawn_drop(lnd),
501            spawn_drop(esplora),
502            spawn_drop(electrs),
503            spawn_drop(bitcoind),
504            spawn_drop(recurringd),
505        );
506    }
507}