devimint/
devfed.rs

1use std::ops::Deref as _;
2use std::sync::Arc;
3
4use anyhow::Result;
5use fedimint_core::runtime;
6use fedimint_core::task::jit::{JitTry, JitTryAnyhow};
7use fedimint_logging::LOG_DEVIMINT;
8use tokio::join;
9use tracing::{debug, info};
10
11use crate::LightningNode;
12use crate::external::{Bitcoind, Esplora, Lnd, NamedGateway, open_channels_between_gateways};
13use crate::federation::{Client, Federation};
14use crate::gatewayd::{Gatewayd, LdkChainSource};
15use crate::recurringd::Recurringd;
16use crate::util::{ProcessManager, supports_lnv2};
17
18async fn spawn_drop<T>(t: T)
19where
20    T: Send + 'static,
21{
22    runtime::spawn("spawn_drop", async {
23        drop(t);
24    })
25    .await
26    .expect("drop panic");
27}
28
29#[derive(Clone)]
30pub struct DevFed {
31    pub bitcoind: Bitcoind,
32    pub lnd: Lnd,
33    pub fed: Federation,
34    pub gw_lnd: Gatewayd,
35    pub gw_ldk: Gatewayd,
36    pub gw_ldk_second: Gatewayd,
37    pub esplora: Esplora,
38    pub recurringd: Recurringd,
39}
40
41impl DevFed {
42    pub async fn fast_terminate(self) {
43        let Self {
44            bitcoind,
45            lnd,
46            fed,
47            gw_lnd,
48            gw_ldk,
49            gw_ldk_second,
50            esplora,
51            recurringd,
52        } = self;
53
54        join!(
55            spawn_drop(gw_lnd),
56            spawn_drop(gw_ldk),
57            spawn_drop(gw_ldk_second),
58            spawn_drop(fed),
59            spawn_drop(lnd),
60            spawn_drop(esplora),
61            spawn_drop(bitcoind),
62            spawn_drop(recurringd),
63        );
64    }
65}
66pub async fn dev_fed(process_mgr: &ProcessManager) -> Result<DevFed> {
67    DevJitFed::new(process_mgr, false, false)?
68        .to_dev_fed(process_mgr)
69        .await
70}
71
72type JitArc<T> = JitTryAnyhow<Arc<T>>;
73
74#[derive(Clone)]
75pub struct DevJitFed {
76    bitcoind: JitArc<Bitcoind>,
77    lnd: JitArc<Lnd>,
78    fed: JitArc<Federation>,
79    gw_lnd: JitArc<Gatewayd>,
80    gw_ldk: JitArc<Gatewayd>,
81    gw_ldk_second: JitArc<Gatewayd>,
82    esplora: JitArc<Esplora>,
83    recurringd: JitArc<Recurringd>,
84    start_time: std::time::SystemTime,
85    gw_lnd_registered: JitArc<()>,
86    gw_ldk_connected: JitArc<()>,
87    gw_ldk_second_connected: JitArc<()>,
88    fed_epoch_generated: JitArc<()>,
89    channel_opened: JitArc<()>,
90    recurringd_connected: JitArc<()>,
91
92    skip_setup: bool,
93    pre_dkg: bool,
94}
95
96impl DevJitFed {
97    pub fn new(process_mgr: &ProcessManager, skip_setup: bool, pre_dkg: bool) -> Result<DevJitFed> {
98        let fed_size = process_mgr.globals.FM_FED_SIZE;
99        let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
100        anyhow::ensure!(
101            fed_size > 3 * offline_nodes,
102            "too many offline nodes ({offline_nodes}) to reach consensus"
103        );
104        let start_time = fedimint_core::time::now();
105
106        debug!("Starting dev federation");
107
108        let bitcoind = JitTry::new_try({
109            let process_mgr = process_mgr.to_owned();
110            move || async move {
111                debug!(target: LOG_DEVIMINT, "Starting bitcoind...");
112                let start_time = fedimint_core::time::now();
113                let bitcoind = Bitcoind::new(&process_mgr, skip_setup).await?;
114                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started bitcoind");
115                Ok(Arc::new(bitcoind))
116            }
117        });
118        let lnd = JitTry::new_try({
119            let process_mgr = process_mgr.to_owned();
120            let bitcoind = bitcoind.clone();
121            || async move {
122                let bitcoind = bitcoind.get_try().await?.deref().clone();
123                debug!(target: LOG_DEVIMINT, "Starting lnd...");
124                let start_time = fedimint_core::time::now();
125                let lnd = Lnd::new(&process_mgr, bitcoind).await?;
126                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started lnd");
127                Ok(Arc::new(lnd))
128            }
129        });
130        let esplora = JitTryAnyhow::new_try({
131            let process_mgr = process_mgr.to_owned();
132            let bitcoind = bitcoind.clone();
133            || async move {
134                let bitcoind = bitcoind.get_try().await?.deref().clone();
135                debug!(target: LOG_DEVIMINT, "Starting esplora...");
136                let start_time = fedimint_core::time::now();
137                let esplora = Esplora::new(&process_mgr, bitcoind).await?;
138                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started esplora");
139                Ok(Arc::new(esplora))
140            }
141        });
142
143        let fed = JitTryAnyhow::new_try({
144            let process_mgr = process_mgr.to_owned();
145            let bitcoind = bitcoind.clone();
146            move || async move {
147                let bitcoind = bitcoind.get_try().await?.deref().clone();
148                debug!(target: LOG_DEVIMINT, "Starting federation...");
149                let start_time = fedimint_core::time::now();
150                let mut fed = Federation::new(
151                    &process_mgr,
152                    bitcoind,
153                    skip_setup,
154                    pre_dkg,
155                    0,
156                    "default".to_string(),
157                )
158                .await?;
159
160                // Create a degraded federation if there are offline nodes
161                fed.degrade_federation(&process_mgr).await?;
162
163                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started federation");
164
165                Ok(Arc::new(fed))
166            }
167        });
168
169        let gw_lnd = JitTryAnyhow::new_try({
170            let process_mgr = process_mgr.to_owned();
171            let lnd = lnd.clone();
172            || async move {
173                let lnd = lnd.get_try().await?.deref().clone();
174                debug!(target: LOG_DEVIMINT, "Starting lnd gateway...");
175                let start_time = fedimint_core::time::now();
176                let lnd_gw = Gatewayd::new(&process_mgr, LightningNode::Lnd(lnd)).await?;
177                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started lnd gateway");
178                Ok(Arc::new(lnd_gw))
179            }
180        });
181        let gw_lnd_registered = JitTryAnyhow::new_try({
182            let gw_lnd = gw_lnd.clone();
183            let fed = fed.clone();
184            move || async move {
185                let gw_lnd = gw_lnd.get_try().await?.deref();
186                let fed = fed.get_try().await?.deref();
187                debug!(target: LOG_DEVIMINT, "Registering lnd gateway...");
188                let start_time = fedimint_core::time::now();
189                if !skip_setup && !pre_dkg {
190                    gw_lnd.connect_fed(fed).await?;
191                }
192                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Registered lnd gateway");
193                Ok(Arc::new(()))
194            }
195        });
196
197        let gw_ldk = JitTryAnyhow::new_try({
198            let process_mgr = process_mgr.to_owned();
199            move || async move {
200                debug!(target: LOG_DEVIMINT, "Starting ldk gateway...");
201                let start_time = fedimint_core::time::now();
202                let ldk_gw = Gatewayd::new(
203                    &process_mgr,
204                    LightningNode::Ldk {
205                        name: "gatewayd-ldk-0".to_string(),
206                        gw_port: process_mgr.globals.FM_PORT_GW_LDK,
207                        ldk_port: process_mgr.globals.FM_PORT_LDK,
208                        chain_source: LdkChainSource::Bitcoind,
209                    },
210                )
211                .await?;
212                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started ldk gateway");
213                Ok(Arc::new(ldk_gw))
214            }
215        });
216        let gw_ldk_second = JitTryAnyhow::new_try({
217            let process_mgr = process_mgr.to_owned();
218            move || async move {
219                debug!(target: LOG_DEVIMINT, "Starting ldk gateway 2...");
220                let start_time = fedimint_core::time::now();
221                let ldk_gw2 = Gatewayd::new(
222                    &process_mgr,
223                    LightningNode::Ldk {
224                        name: "gatewayd-ldk-1".to_string(),
225                        gw_port: process_mgr.globals.FM_PORT_GW_LDK2,
226                        ldk_port: process_mgr.globals.FM_PORT_LDK2,
227                        chain_source: LdkChainSource::Bitcoind,
228                    },
229                )
230                .await?;
231                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started ldk gateway 2");
232                Ok(Arc::new(ldk_gw2))
233            }
234        });
235        let gw_ldk_connected = JitTryAnyhow::new_try({
236            let gw_ldk = gw_ldk.clone();
237            let fed = fed.clone();
238            move || async move {
239                let gw_ldk = gw_ldk.get_try().await?.deref();
240                if supports_lnv2() {
241                    let fed = fed.get_try().await?.deref();
242                    let start_time = fedimint_core::time::now();
243                    if !skip_setup && !pre_dkg {
244                        debug!(target: LOG_DEVIMINT, "Registering ldk gateway...");
245                        gw_ldk.connect_fed(fed).await?;
246                    } else {
247                        debug!(target: LOG_DEVIMINT, "Skipping registering ldk gateway");
248                    }
249                    info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected ldk gateway");
250                }
251                Ok(Arc::new(()))
252            }
253        });
254        let gw_ldk_second_connected = JitTryAnyhow::new_try({
255            let gw_ldk_second = gw_ldk_second.clone();
256            let fed = fed.clone();
257            move || async move {
258                let gw_ldk2 = gw_ldk_second.get_try().await?.deref();
259                if supports_lnv2() {
260                    let fed = fed.get_try().await?.deref();
261                    debug!(target: LOG_DEVIMINT, "Registering ldk gateway 2...");
262                    let start_time = fedimint_core::time::now();
263                    if !skip_setup && !pre_dkg {
264                        gw_ldk2.connect_fed(fed).await?;
265                    }
266                    info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected ldk gateway 2");
267                }
268                Ok(Arc::new(()))
269            }
270        });
271
272        let fed_epoch_generated = JitTryAnyhow::new_try({
273            let fed = fed.clone();
274            move || async move {
275                let fed = fed.get_try().await?.deref().clone();
276                debug!(target: LOG_DEVIMINT, "Generating federation epoch...");
277                let start_time = fedimint_core::time::now();
278                if !skip_setup && !pre_dkg {
279                    fed.mine_then_wait_blocks_sync(10).await?;
280                }
281                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Generated federation epoch");
282                Ok(Arc::new(()))
283            }
284        });
285
286        let channel_opened = JitTryAnyhow::new_try({
287            let gw_lnd = gw_lnd.clone();
288            let gw_ldk = gw_ldk.clone();
289            let gw_ldk_second = gw_ldk_second.clone();
290            let bitcoind = bitcoind.clone();
291            let fed_epoch_generated = fed_epoch_generated.clone();
292            move || async move {
293                // Note: We open new channel even if starting from existing state
294                // as ports change on every start, and without this nodes will not find each
295                // other.
296                let bitcoind = bitcoind.get_try().await?.deref().clone();
297
298                // Wait for an epoch to occur since that mines blocks, which can cause opening
299                // channels to be racy
300                fed_epoch_generated.get_try().await?;
301
302                let gw_ldk_second = gw_ldk_second.get_try().await?.deref();
303                let gw_ldk = gw_ldk.get_try().await?.deref();
304                let gw_lnd = gw_lnd.get_try().await?.deref();
305                let gateways: &[NamedGateway<'_>] =
306                    &[(gw_ldk_second, "LDK2"), (gw_lnd, "LND"), (gw_ldk, "LDK")];
307
308                debug!(target: LOG_DEVIMINT, "Opening channels between gateways...");
309                let start_time = fedimint_core::time::now();
310                open_channels_between_gateways(&bitcoind, gateways).await?;
311                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Opened channels between gateways");
312
313                Ok(Arc::new(()))
314            }
315        });
316
317        let recurringd = JitTryAnyhow::new_try({
318            let process_mgr = process_mgr.to_owned();
319            move || async move {
320                debug!(target: LOG_DEVIMINT, "Starting recurringd...");
321                let start_time = fedimint_core::time::now();
322                let recurringd = Recurringd::new(&process_mgr).await?;
323                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started recurringd");
324                Ok(Arc::new(recurringd))
325            }
326        });
327
328        let recurringd_connected = JitTryAnyhow::new_try({
329            let recurringd = recurringd.clone();
330            let fed = fed.clone();
331            move || async move {
332                let recurringd = recurringd.get_try().await?.deref();
333                let fed = fed.get_try().await?.deref();
334                debug!(target: LOG_DEVIMINT, "Connecting recurringd to federation...");
335                let start_time = fedimint_core::time::now();
336                if !skip_setup && !pre_dkg {
337                    let invite_code = fed.invite_code()?;
338                    recurringd.add_federation(&invite_code).await?;
339                }
340                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected recurringd to federation");
341                Ok(Arc::new(()))
342            }
343        });
344
345        Ok(DevJitFed {
346            bitcoind,
347            lnd,
348            fed,
349            gw_lnd,
350            gw_ldk,
351            gw_ldk_second,
352            esplora,
353            recurringd,
354            start_time,
355            gw_lnd_registered,
356            gw_ldk_connected,
357            gw_ldk_second_connected,
358            fed_epoch_generated,
359            channel_opened,
360            recurringd_connected,
361            skip_setup,
362            pre_dkg,
363        })
364    }
365
366    pub async fn esplora(&self) -> anyhow::Result<&Esplora> {
367        Ok(self.esplora.get_try().await?.deref())
368    }
369    pub async fn lnd(&self) -> anyhow::Result<&Lnd> {
370        Ok(self.lnd.get_try().await?.deref())
371    }
372    pub async fn gw_lnd(&self) -> anyhow::Result<&Gatewayd> {
373        Ok(self.gw_lnd.get_try().await?.deref())
374    }
375    pub async fn gw_lnd_registered(&self) -> anyhow::Result<&Gatewayd> {
376        self.gw_lnd_registered.get_try().await?;
377        Ok(self.gw_lnd.get_try().await?.deref())
378    }
379    pub async fn gw_ldk(&self) -> anyhow::Result<&Gatewayd> {
380        Ok(self.gw_ldk.get_try().await?.deref())
381    }
382    pub async fn gw_ldk_second(&self) -> anyhow::Result<&Gatewayd> {
383        Ok(self.gw_ldk_second.get_try().await?.deref())
384    }
385    pub async fn gw_ldk_connected(&self) -> anyhow::Result<&Gatewayd> {
386        self.gw_ldk_connected.get_try().await?;
387        Ok(self.gw_ldk.get_try().await?.deref())
388    }
389    pub async fn gw_ldk_second_connected(&self) -> anyhow::Result<&Gatewayd> {
390        self.gw_ldk_second_connected.get_try().await?;
391        Ok(self.gw_ldk_second.get_try().await?.deref())
392    }
393    pub async fn fed(&self) -> anyhow::Result<&Federation> {
394        Ok(self.fed.get_try().await?.deref())
395    }
396    pub async fn bitcoind(&self) -> anyhow::Result<&Bitcoind> {
397        Ok(self.bitcoind.get_try().await?.deref())
398    }
399
400    pub async fn internal_client(&self) -> anyhow::Result<Client> {
401        Ok(self.fed().await?.internal_client().await?.clone())
402    }
403
404    /// Like [`Self::internal_client`] but will check and wait for a LN gateway
405    /// to be registered
406    pub async fn internal_client_gw_registered(&self) -> anyhow::Result<Client> {
407        self.fed().await?.await_gateways_registered().await?;
408        Ok(self.fed().await?.internal_client().await?.clone())
409    }
410
411    pub async fn recurringd(&self) -> anyhow::Result<&Recurringd> {
412        Ok(self.recurringd.get_try().await?.deref())
413    }
414
415    pub async fn recurringd_connected(&self) -> anyhow::Result<&Recurringd> {
416        self.recurringd_connected.get_try().await?;
417        Ok(self.recurringd.get_try().await?.deref())
418    }
419
420    pub async fn finalize(&self, process_mgr: &ProcessManager) -> anyhow::Result<()> {
421        let fed_size = process_mgr.globals.FM_FED_SIZE;
422        let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
423        anyhow::ensure!(
424            fed_size > 3 * offline_nodes,
425            "too many offline nodes ({offline_nodes}) to reach consensus"
426        );
427
428        if !self.pre_dkg && !self.skip_setup {
429            let _ = self.internal_client_gw_registered().await?;
430        }
431        let _ = self.channel_opened.get_try().await?;
432        let _ = self.gw_lnd_registered().await?;
433        let _ = self.gw_ldk_connected().await?;
434        let _ = self.gw_ldk_second_connected().await?;
435        let _ = self.lnd().await?;
436        let _ = self.esplora().await?;
437        let _ = self.recurringd_connected().await?;
438        let _ = self.fed_epoch_generated.get_try().await?;
439
440        debug!(
441            target: LOG_DEVIMINT,
442            fed_size,
443            offline_nodes,
444            elapsed_ms = %self.start_time.elapsed()?.as_millis(),
445            "Dev federation ready",
446        );
447        Ok(())
448    }
449
450    pub async fn to_dev_fed(self, process_mgr: &ProcessManager) -> anyhow::Result<DevFed> {
451        self.finalize(process_mgr).await?;
452        Ok(DevFed {
453            bitcoind: self.bitcoind().await?.to_owned(),
454            lnd: self.lnd().await?.to_owned(),
455            fed: self.fed().await?.to_owned(),
456            gw_lnd: self.gw_lnd().await?.to_owned(),
457            gw_ldk: self.gw_ldk().await?.to_owned(),
458            gw_ldk_second: self.gw_ldk_second().await?.to_owned(),
459            esplora: self.esplora().await?.to_owned(),
460            recurringd: self.recurringd().await?.to_owned(),
461        })
462    }
463
464    pub async fn fast_terminate(self) {
465        let Self {
466            bitcoind,
467            lnd,
468            fed,
469            gw_lnd,
470            esplora,
471            gw_ldk,
472            gw_ldk_second,
473            recurringd,
474            ..
475        } = self;
476
477        join!(
478            spawn_drop(gw_lnd),
479            spawn_drop(gw_ldk),
480            spawn_drop(gw_ldk_second),
481            spawn_drop(fed),
482            spawn_drop(lnd),
483            spawn_drop(esplora),
484            spawn_drop(bitcoind),
485            spawn_drop(recurringd),
486        );
487    }
488}