devimint/
devfed.rs

1use std::ops::Deref as _;
2use std::sync::Arc;
3
4use anyhow::Result;
5use fedimint_core::runtime;
6use fedimint_core::task::jit::{JitTry, JitTryAnyhow};
7use fedimint_logging::LOG_DEVIMINT;
8use tokio::join;
9use tracing::{debug, info};
10
11use crate::LightningNode;
12use crate::external::{
13    Bitcoind, Electrs, Esplora, Lightningd, Lnd, NamedGateway, open_channel,
14    open_channels_between_gateways,
15};
16use crate::federation::{Client, Federation};
17use crate::gatewayd::Gatewayd;
18use crate::util::{ProcessManager, supports_lnv2};
19
20async fn spawn_drop<T>(t: T)
21where
22    T: Send + 'static,
23{
24    runtime::spawn("spawn_drop", async {
25        drop(t);
26    })
27    .await
28    .expect("drop panic");
29}
30
31#[derive(Clone)]
32pub struct DevFed {
33    pub bitcoind: Bitcoind,
34    pub cln: Lightningd,
35    pub lnd: Lnd,
36    pub fed: Federation,
37    pub gw_lnd: Gatewayd,
38    pub gw_ldk: Gatewayd,
39    pub electrs: Electrs,
40    pub esplora: Esplora,
41}
42
43impl DevFed {
44    pub async fn fast_terminate(self) {
45        let Self {
46            bitcoind,
47            cln,
48            lnd,
49            fed,
50            gw_lnd,
51            gw_ldk,
52            electrs,
53            esplora,
54        } = self;
55
56        join!(
57            spawn_drop(gw_lnd),
58            spawn_drop(gw_ldk),
59            spawn_drop(fed),
60            spawn_drop(lnd),
61            spawn_drop(cln),
62            spawn_drop(esplora),
63            spawn_drop(electrs),
64            spawn_drop(bitcoind),
65        );
66    }
67}
68pub async fn dev_fed(process_mgr: &ProcessManager) -> Result<DevFed> {
69    DevJitFed::new(process_mgr, false)?
70        .to_dev_fed(process_mgr)
71        .await
72}
73
74type JitArc<T> = JitTryAnyhow<Arc<T>>;
75
76#[derive(Clone)]
77pub struct DevJitFed {
78    bitcoind: JitArc<Bitcoind>,
79    cln: JitArc<Lightningd>,
80    lnd: JitArc<Lnd>,
81    fed: JitArc<Federation>,
82    gw_lnd: JitArc<Gatewayd>,
83    gw_ldk: JitArc<Gatewayd>,
84    electrs: JitArc<Electrs>,
85    esplora: JitArc<Esplora>,
86    start_time: std::time::SystemTime,
87    gw_lnd_registered: JitArc<()>,
88    gw_ldk_connected: JitArc<()>,
89    fed_epoch_generated: JitArc<()>,
90    channel_opened: JitArc<()>,
91}
92
93impl DevJitFed {
94    pub fn new(process_mgr: &ProcessManager, skip_setup: bool) -> Result<DevJitFed> {
95        let fed_size = process_mgr.globals.FM_FED_SIZE;
96        let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
97        anyhow::ensure!(
98            fed_size > 3 * offline_nodes,
99            "too many offline nodes ({offline_nodes}) to reach consensus"
100        );
101        let start_time = fedimint_core::time::now();
102
103        debug!("Starting dev federation");
104
105        let bitcoind = JitTry::new_try({
106            let process_mgr = process_mgr.to_owned();
107            move || async move {
108                debug!(target: LOG_DEVIMINT, "Starting bitcoind...");
109                let start_time = fedimint_core::time::now();
110                let bitcoind = Bitcoind::new(&process_mgr, skip_setup).await?;
111                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started bitcoind");
112                Ok(Arc::new(bitcoind))
113            }
114        });
115        let cln = JitTry::new_try({
116            let process_mgr = process_mgr.to_owned();
117            let bitcoind = bitcoind.clone();
118            || async move {
119                let bitcoind = bitcoind.get_try().await?.deref().clone();
120                debug!(target: LOG_DEVIMINT, "Starting cln...");
121                let start_time = fedimint_core::time::now();
122                let lightningd = Lightningd::new(&process_mgr, bitcoind).await?;
123                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started cln");
124                Ok(Arc::new(lightningd))
125            }
126        });
127        let lnd = JitTry::new_try({
128            let process_mgr = process_mgr.to_owned();
129            let bitcoind = bitcoind.clone();
130            || async move {
131                let bitcoind = bitcoind.get_try().await?.deref().clone();
132                debug!(target: LOG_DEVIMINT, "Starting lnd...");
133                let start_time = fedimint_core::time::now();
134                let lnd = Lnd::new(&process_mgr, bitcoind).await?;
135                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started lnd");
136                Ok(Arc::new(lnd))
137            }
138        });
139        let electrs = JitTryAnyhow::new_try({
140            let process_mgr = process_mgr.to_owned();
141            let bitcoind = bitcoind.clone();
142            || async move {
143                let bitcoind = bitcoind.get_try().await?.deref().clone();
144                debug!(target: LOG_DEVIMINT, "Starting electrs...");
145                let start_time = fedimint_core::time::now();
146                let electrs = Electrs::new(&process_mgr, bitcoind).await?;
147                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started electrs");
148                Ok(Arc::new(electrs))
149            }
150        });
151        let esplora = JitTryAnyhow::new_try({
152            let process_mgr = process_mgr.to_owned();
153            let bitcoind = bitcoind.clone();
154            || async move {
155                let bitcoind = bitcoind.get_try().await?.deref().clone();
156                debug!(target: LOG_DEVIMINT, "Starting esplora...");
157                let start_time = fedimint_core::time::now();
158                let esplora = Esplora::new(&process_mgr, bitcoind).await?;
159                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started esplora");
160                Ok(Arc::new(esplora))
161            }
162        });
163
164        let fed = JitTryAnyhow::new_try({
165            let process_mgr = process_mgr.to_owned();
166            let bitcoind = bitcoind.clone();
167            move || async move {
168                let bitcoind = bitcoind.get_try().await?.deref().clone();
169                debug!(target: LOG_DEVIMINT, "Starting federation...");
170                let start_time = fedimint_core::time::now();
171                let mut fed =
172                    Federation::new(&process_mgr, bitcoind, skip_setup, 0, "default".to_string())
173                        .await?;
174
175                // Create a degraded federation if there are offline nodes
176                fed.degrade_federation(&process_mgr).await?;
177
178                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started federation");
179
180                Ok(Arc::new(fed))
181            }
182        });
183
184        let gw_lnd = JitTryAnyhow::new_try({
185            let process_mgr = process_mgr.to_owned();
186            let lnd = lnd.clone();
187            || async move {
188                let lnd = lnd.get_try().await?.deref().clone();
189                debug!(target: LOG_DEVIMINT, "Starting lnd gateway...");
190                let start_time = fedimint_core::time::now();
191                let lnd_gw = Gatewayd::new(&process_mgr, LightningNode::Lnd(lnd)).await?;
192                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started lnd gateway");
193                Ok(Arc::new(lnd_gw))
194            }
195        });
196        let gw_lnd_registered = JitTryAnyhow::new_try({
197            let gw_lnd = gw_lnd.clone();
198            let fed = fed.clone();
199            move || async move {
200                let gw_lnd = gw_lnd.get_try().await?.deref();
201                let fed = fed.get_try().await?.deref();
202                debug!(target: LOG_DEVIMINT, "Registering lnd gateway...");
203                let start_time = fedimint_core::time::now();
204                if !skip_setup {
205                    gw_lnd.connect_fed(fed).await?;
206                }
207                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Registered lnd gateway");
208                Ok(Arc::new(()))
209            }
210        });
211
212        let gw_ldk = JitTryAnyhow::new_try({
213            let process_mgr = process_mgr.to_owned();
214            move || async move {
215                debug!(target: LOG_DEVIMINT, "Starting ldk gateway...");
216                let start_time = fedimint_core::time::now();
217                let ldk_gw = Gatewayd::new(
218                    &process_mgr,
219                    LightningNode::Ldk {
220                        name: "gatewayd-ldk-0".to_string(),
221                    },
222                )
223                .await?;
224                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started ldk gateway");
225                Ok(Arc::new(ldk_gw))
226            }
227        });
228        let gw_ldk_connected = JitTryAnyhow::new_try({
229            let gw_ldk = gw_ldk.clone();
230            let fed = fed.clone();
231            move || async move {
232                let gw_ldk = gw_ldk.get_try().await?.deref();
233                if supports_lnv2() {
234                    let fed = fed.get_try().await?.deref();
235                    debug!(target: LOG_DEVIMINT, "Registering ldk gateway...");
236                    let start_time = fedimint_core::time::now();
237                    if !skip_setup {
238                        gw_ldk.connect_fed(fed).await?;
239                    }
240                    info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected ldk gateway");
241                }
242                Ok(Arc::new(()))
243            }
244        });
245
246        let channel_opened = JitTryAnyhow::new_try({
247            let process_mgr = process_mgr.to_owned();
248            let lnd = lnd.clone();
249            let gw_lnd = gw_lnd.clone();
250            let cln = cln.clone();
251            let gw_ldk = gw_ldk.clone();
252            let bitcoind = bitcoind.clone();
253            move || async move {
254                // Note: We open new channel even if starting from existing state
255                // as ports change on every start, and without this nodes will not find each
256                // other.
257
258                let bitcoind = bitcoind.get_try().await?.deref().clone();
259                let gw_ldk = gw_ldk.get_try().await?.deref();
260
261                tokio::try_join!(
262                    async {
263                        let gw_lnd = gw_lnd.get_try().await?.deref();
264                        let gateways: &[NamedGateway<'_>] = &[(gw_lnd, "LND"), (gw_ldk, "LDK")];
265
266                        debug!(target: LOG_DEVIMINT, "Opening channels between gateways...");
267                        let start_time = fedimint_core::time::now();
268                        let res = open_channels_between_gateways(&bitcoind, gateways).await;
269                        info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Opened channels between gateways");
270                        res
271                    },
272                    async {
273                        let lnd = lnd.get_try().await?.deref().clone();
274                        let cln = cln.get_try().await?.deref().clone();
275
276                        debug!(target: LOG_DEVIMINT, "Opening channels between cln and lnd...");
277                        let start_time = fedimint_core::time::now();
278                        let res = open_channel(&process_mgr, &bitcoind, &cln, &lnd).await;
279                        info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Opened channels between cln and lnd");
280                        res
281                    }
282                )?;
283
284                Ok(Arc::new(()))
285            }
286        });
287
288        let fed_epoch_generated = JitTryAnyhow::new_try({
289            let fed = fed.clone();
290            move || async move {
291                let fed = fed.get_try().await?.deref().clone();
292                debug!(target: LOG_DEVIMINT, "Generating federation epoch...");
293                let start_time = fedimint_core::time::now();
294                if !skip_setup {
295                    fed.mine_then_wait_blocks_sync(10).await?;
296                }
297                info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Generated federation epoch");
298                Ok(Arc::new(()))
299            }
300        });
301
302        Ok(DevJitFed {
303            bitcoind,
304            cln,
305            lnd,
306            fed,
307            gw_lnd,
308            gw_ldk,
309            electrs,
310            esplora,
311            start_time,
312            gw_lnd_registered,
313            gw_ldk_connected,
314            fed_epoch_generated,
315            channel_opened,
316        })
317    }
318
319    pub async fn electrs(&self) -> anyhow::Result<&Electrs> {
320        Ok(self.electrs.get_try().await?.deref())
321    }
322    pub async fn esplora(&self) -> anyhow::Result<&Esplora> {
323        Ok(self.esplora.get_try().await?.deref())
324    }
325    pub async fn cln(&self) -> anyhow::Result<&Lightningd> {
326        Ok(self.cln.get_try().await?.deref())
327    }
328    pub async fn lnd(&self) -> anyhow::Result<&Lnd> {
329        Ok(self.lnd.get_try().await?.deref())
330    }
331    pub async fn gw_lnd(&self) -> anyhow::Result<&Gatewayd> {
332        Ok(self.gw_lnd.get_try().await?.deref())
333    }
334    pub async fn gw_lnd_registered(&self) -> anyhow::Result<&Gatewayd> {
335        self.gw_lnd_registered.get_try().await?;
336        Ok(self.gw_lnd.get_try().await?.deref())
337    }
338    pub async fn gw_ldk(&self) -> anyhow::Result<&Gatewayd> {
339        Ok(self.gw_ldk.get_try().await?.deref())
340    }
341    pub async fn gw_ldk_connected(&self) -> anyhow::Result<&Gatewayd> {
342        self.gw_ldk_connected.get_try().await?;
343        Ok(self.gw_ldk.get_try().await?.deref())
344    }
345    pub async fn fed(&self) -> anyhow::Result<&Federation> {
346        Ok(self.fed.get_try().await?.deref())
347    }
348    pub async fn bitcoind(&self) -> anyhow::Result<&Bitcoind> {
349        Ok(self.bitcoind.get_try().await?.deref())
350    }
351
352    pub async fn internal_client(&self) -> anyhow::Result<Client> {
353        Ok(self.fed().await?.internal_client().await?.clone())
354    }
355
356    /// Like [`Self::internal_client`] but will check and wait for a LN gateway
357    /// to be registered
358    pub async fn internal_client_gw_registered(&self) -> anyhow::Result<Client> {
359        self.fed().await?.await_gateways_registered().await?;
360        Ok(self.fed().await?.internal_client().await?.clone())
361    }
362
363    pub async fn finalize(&self, process_mgr: &ProcessManager) -> anyhow::Result<()> {
364        let fed_size = process_mgr.globals.FM_FED_SIZE;
365        let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
366        anyhow::ensure!(
367            fed_size > 3 * offline_nodes,
368            "too many offline nodes ({offline_nodes}) to reach consensus"
369        );
370
371        let _ = self.internal_client_gw_registered().await?;
372        let _ = self.channel_opened.get_try().await?;
373        let _ = self.gw_lnd_registered().await?;
374        let _ = self.gw_ldk_connected().await?;
375        let _ = self.cln().await?;
376        let _ = self.lnd().await?;
377        let _ = self.electrs().await?;
378        let _ = self.esplora().await?;
379        let _ = self.fed_epoch_generated.get_try().await?;
380
381        debug!(
382            target: LOG_DEVIMINT,
383            fed_size,
384            offline_nodes,
385            elapsed_ms = %self.start_time.elapsed()?.as_millis(),
386            "Dev federation ready",
387        );
388        Ok(())
389    }
390
391    pub async fn to_dev_fed(self, process_mgr: &ProcessManager) -> anyhow::Result<DevFed> {
392        self.finalize(process_mgr).await?;
393        Ok(DevFed {
394            bitcoind: self.bitcoind().await?.to_owned(),
395            cln: self.cln().await?.to_owned(),
396            lnd: self.lnd().await?.to_owned(),
397            fed: self.fed().await?.to_owned(),
398            gw_lnd: self.gw_lnd().await?.to_owned(),
399            gw_ldk: self.gw_ldk().await?.to_owned(),
400            esplora: self.esplora().await?.to_owned(),
401            electrs: self.electrs().await?.to_owned(),
402        })
403    }
404
405    pub async fn fast_terminate(self) {
406        let Self {
407            bitcoind,
408            cln,
409            lnd,
410            fed,
411            gw_lnd,
412            electrs,
413            esplora,
414            ..
415        } = self;
416
417        join!(
418            spawn_drop(gw_lnd),
419            spawn_drop(fed),
420            spawn_drop(lnd),
421            spawn_drop(cln),
422            spawn_drop(esplora),
423            spawn_drop(electrs),
424            spawn_drop(bitcoind),
425        );
426    }
427}