1use std::ops::Deref as _;
2use std::sync::Arc;
3
4use anyhow::Result;
5use fedimint_core::runtime;
6use fedimint_core::task::jit::{JitTry, JitTryAnyhow};
7use fedimint_logging::LOG_DEVIMINT;
8use tokio::join;
9use tracing::{debug, info};
10
11use crate::LightningNode;
12use crate::external::{
13 Bitcoind, Electrs, Esplora, Lightningd, Lnd, NamedGateway, open_channel,
14 open_channels_between_gateways,
15};
16use crate::federation::{Client, Federation};
17use crate::gatewayd::Gatewayd;
18use crate::util::{ProcessManager, supports_lnv2};
19
20async fn spawn_drop<T>(t: T)
21where
22 T: Send + 'static,
23{
24 runtime::spawn("spawn_drop", async {
25 drop(t);
26 })
27 .await
28 .expect("drop panic");
29}
30
31#[derive(Clone)]
32pub struct DevFed {
33 pub bitcoind: Bitcoind,
34 pub cln: Lightningd,
35 pub lnd: Lnd,
36 pub fed: Federation,
37 pub gw_lnd: Gatewayd,
38 pub gw_ldk: Gatewayd,
39 pub electrs: Electrs,
40 pub esplora: Esplora,
41}
42
43impl DevFed {
44 pub async fn fast_terminate(self) {
45 let Self {
46 bitcoind,
47 cln,
48 lnd,
49 fed,
50 gw_lnd,
51 gw_ldk,
52 electrs,
53 esplora,
54 } = self;
55
56 join!(
57 spawn_drop(gw_lnd),
58 spawn_drop(gw_ldk),
59 spawn_drop(fed),
60 spawn_drop(lnd),
61 spawn_drop(cln),
62 spawn_drop(esplora),
63 spawn_drop(electrs),
64 spawn_drop(bitcoind),
65 );
66 }
67}
68pub async fn dev_fed(process_mgr: &ProcessManager) -> Result<DevFed> {
69 DevJitFed::new(process_mgr, false)?
70 .to_dev_fed(process_mgr)
71 .await
72}
73
74type JitArc<T> = JitTryAnyhow<Arc<T>>;
75
76#[derive(Clone)]
77pub struct DevJitFed {
78 bitcoind: JitArc<Bitcoind>,
79 cln: JitArc<Lightningd>,
80 lnd: JitArc<Lnd>,
81 fed: JitArc<Federation>,
82 gw_lnd: JitArc<Gatewayd>,
83 gw_ldk: JitArc<Gatewayd>,
84 electrs: JitArc<Electrs>,
85 esplora: JitArc<Esplora>,
86 start_time: std::time::SystemTime,
87 gw_lnd_registered: JitArc<()>,
88 gw_ldk_connected: JitArc<()>,
89 fed_epoch_generated: JitArc<()>,
90 channel_opened: JitArc<()>,
91}
92
93impl DevJitFed {
94 pub fn new(process_mgr: &ProcessManager, skip_setup: bool) -> Result<DevJitFed> {
95 let fed_size = process_mgr.globals.FM_FED_SIZE;
96 let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
97 anyhow::ensure!(
98 fed_size > 3 * offline_nodes,
99 "too many offline nodes ({offline_nodes}) to reach consensus"
100 );
101 let start_time = fedimint_core::time::now();
102
103 debug!("Starting dev federation");
104
105 let bitcoind = JitTry::new_try({
106 let process_mgr = process_mgr.to_owned();
107 move || async move {
108 debug!(target: LOG_DEVIMINT, "Starting bitcoind...");
109 let start_time = fedimint_core::time::now();
110 let bitcoind = Bitcoind::new(&process_mgr, skip_setup).await?;
111 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started bitcoind");
112 Ok(Arc::new(bitcoind))
113 }
114 });
115 let cln = JitTry::new_try({
116 let process_mgr = process_mgr.to_owned();
117 let bitcoind = bitcoind.clone();
118 || async move {
119 let bitcoind = bitcoind.get_try().await?.deref().clone();
120 debug!(target: LOG_DEVIMINT, "Starting cln...");
121 let start_time = fedimint_core::time::now();
122 let lightningd = Lightningd::new(&process_mgr, bitcoind).await?;
123 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started cln");
124 Ok(Arc::new(lightningd))
125 }
126 });
127 let lnd = JitTry::new_try({
128 let process_mgr = process_mgr.to_owned();
129 let bitcoind = bitcoind.clone();
130 || async move {
131 let bitcoind = bitcoind.get_try().await?.deref().clone();
132 debug!(target: LOG_DEVIMINT, "Starting lnd...");
133 let start_time = fedimint_core::time::now();
134 let lnd = Lnd::new(&process_mgr, bitcoind).await?;
135 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started lnd");
136 Ok(Arc::new(lnd))
137 }
138 });
139 let electrs = JitTryAnyhow::new_try({
140 let process_mgr = process_mgr.to_owned();
141 let bitcoind = bitcoind.clone();
142 || async move {
143 let bitcoind = bitcoind.get_try().await?.deref().clone();
144 debug!(target: LOG_DEVIMINT, "Starting electrs...");
145 let start_time = fedimint_core::time::now();
146 let electrs = Electrs::new(&process_mgr, bitcoind).await?;
147 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started electrs");
148 Ok(Arc::new(electrs))
149 }
150 });
151 let esplora = JitTryAnyhow::new_try({
152 let process_mgr = process_mgr.to_owned();
153 let bitcoind = bitcoind.clone();
154 || async move {
155 let bitcoind = bitcoind.get_try().await?.deref().clone();
156 debug!(target: LOG_DEVIMINT, "Starting esplora...");
157 let start_time = fedimint_core::time::now();
158 let esplora = Esplora::new(&process_mgr, bitcoind).await?;
159 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started esplora");
160 Ok(Arc::new(esplora))
161 }
162 });
163
164 let fed = JitTryAnyhow::new_try({
165 let process_mgr = process_mgr.to_owned();
166 let bitcoind = bitcoind.clone();
167 move || async move {
168 let bitcoind = bitcoind.get_try().await?.deref().clone();
169 debug!(target: LOG_DEVIMINT, "Starting federation...");
170 let start_time = fedimint_core::time::now();
171 let mut fed =
172 Federation::new(&process_mgr, bitcoind, skip_setup, 0, "default".to_string())
173 .await?;
174
175 fed.degrade_federation(&process_mgr).await?;
177
178 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started federation");
179
180 Ok(Arc::new(fed))
181 }
182 });
183
184 let gw_lnd = JitTryAnyhow::new_try({
185 let process_mgr = process_mgr.to_owned();
186 let lnd = lnd.clone();
187 || async move {
188 let lnd = lnd.get_try().await?.deref().clone();
189 debug!(target: LOG_DEVIMINT, "Starting lnd gateway...");
190 let start_time = fedimint_core::time::now();
191 let lnd_gw = Gatewayd::new(&process_mgr, LightningNode::Lnd(lnd)).await?;
192 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started lnd gateway");
193 Ok(Arc::new(lnd_gw))
194 }
195 });
196 let gw_lnd_registered = JitTryAnyhow::new_try({
197 let gw_lnd = gw_lnd.clone();
198 let fed = fed.clone();
199 move || async move {
200 let gw_lnd = gw_lnd.get_try().await?.deref();
201 let fed = fed.get_try().await?.deref();
202 debug!(target: LOG_DEVIMINT, "Registering lnd gateway...");
203 let start_time = fedimint_core::time::now();
204 if !skip_setup {
205 gw_lnd.connect_fed(fed).await?;
206 }
207 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Registered lnd gateway");
208 Ok(Arc::new(()))
209 }
210 });
211
212 let gw_ldk = JitTryAnyhow::new_try({
213 let process_mgr = process_mgr.to_owned();
214 move || async move {
215 debug!(target: LOG_DEVIMINT, "Starting ldk gateway...");
216 let start_time = fedimint_core::time::now();
217 let ldk_gw = Gatewayd::new(
218 &process_mgr,
219 LightningNode::Ldk {
220 name: "gatewayd-ldk-0".to_string(),
221 },
222 )
223 .await?;
224 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Started ldk gateway");
225 Ok(Arc::new(ldk_gw))
226 }
227 });
228 let gw_ldk_connected = JitTryAnyhow::new_try({
229 let gw_ldk = gw_ldk.clone();
230 let fed = fed.clone();
231 move || async move {
232 let gw_ldk = gw_ldk.get_try().await?.deref();
233 if supports_lnv2() {
234 let fed = fed.get_try().await?.deref();
235 debug!(target: LOG_DEVIMINT, "Registering ldk gateway...");
236 let start_time = fedimint_core::time::now();
237 if !skip_setup {
238 gw_ldk.connect_fed(fed).await?;
239 }
240 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Connected ldk gateway");
241 }
242 Ok(Arc::new(()))
243 }
244 });
245
246 let channel_opened = JitTryAnyhow::new_try({
247 let process_mgr = process_mgr.to_owned();
248 let lnd = lnd.clone();
249 let gw_lnd = gw_lnd.clone();
250 let cln = cln.clone();
251 let gw_ldk = gw_ldk.clone();
252 let bitcoind = bitcoind.clone();
253 move || async move {
254 let bitcoind = bitcoind.get_try().await?.deref().clone();
259 let gw_ldk = gw_ldk.get_try().await?.deref();
260
261 tokio::try_join!(
262 async {
263 let gw_lnd = gw_lnd.get_try().await?.deref();
264 let gateways: &[NamedGateway<'_>] = &[(gw_lnd, "LND"), (gw_ldk, "LDK")];
265
266 debug!(target: LOG_DEVIMINT, "Opening channels between gateways...");
267 let start_time = fedimint_core::time::now();
268 let res = open_channels_between_gateways(&bitcoind, gateways).await;
269 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Opened channels between gateways");
270 res
271 },
272 async {
273 let lnd = lnd.get_try().await?.deref().clone();
274 let cln = cln.get_try().await?.deref().clone();
275
276 debug!(target: LOG_DEVIMINT, "Opening channels between cln and lnd...");
277 let start_time = fedimint_core::time::now();
278 let res = open_channel(&process_mgr, &bitcoind, &cln, &lnd).await;
279 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Opened channels between cln and lnd");
280 res
281 }
282 )?;
283
284 Ok(Arc::new(()))
285 }
286 });
287
288 let fed_epoch_generated = JitTryAnyhow::new_try({
289 let fed = fed.clone();
290 move || async move {
291 let fed = fed.get_try().await?.deref().clone();
292 debug!(target: LOG_DEVIMINT, "Generating federation epoch...");
293 let start_time = fedimint_core::time::now();
294 if !skip_setup {
295 fed.mine_then_wait_blocks_sync(10).await?;
296 }
297 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed()?.as_millis(), "Generated federation epoch");
298 Ok(Arc::new(()))
299 }
300 });
301
302 Ok(DevJitFed {
303 bitcoind,
304 cln,
305 lnd,
306 fed,
307 gw_lnd,
308 gw_ldk,
309 electrs,
310 esplora,
311 start_time,
312 gw_lnd_registered,
313 gw_ldk_connected,
314 fed_epoch_generated,
315 channel_opened,
316 })
317 }
318
319 pub async fn electrs(&self) -> anyhow::Result<&Electrs> {
320 Ok(self.electrs.get_try().await?.deref())
321 }
322 pub async fn esplora(&self) -> anyhow::Result<&Esplora> {
323 Ok(self.esplora.get_try().await?.deref())
324 }
325 pub async fn cln(&self) -> anyhow::Result<&Lightningd> {
326 Ok(self.cln.get_try().await?.deref())
327 }
328 pub async fn lnd(&self) -> anyhow::Result<&Lnd> {
329 Ok(self.lnd.get_try().await?.deref())
330 }
331 pub async fn gw_lnd(&self) -> anyhow::Result<&Gatewayd> {
332 Ok(self.gw_lnd.get_try().await?.deref())
333 }
334 pub async fn gw_lnd_registered(&self) -> anyhow::Result<&Gatewayd> {
335 self.gw_lnd_registered.get_try().await?;
336 Ok(self.gw_lnd.get_try().await?.deref())
337 }
338 pub async fn gw_ldk(&self) -> anyhow::Result<&Gatewayd> {
339 Ok(self.gw_ldk.get_try().await?.deref())
340 }
341 pub async fn gw_ldk_connected(&self) -> anyhow::Result<&Gatewayd> {
342 self.gw_ldk_connected.get_try().await?;
343 Ok(self.gw_ldk.get_try().await?.deref())
344 }
345 pub async fn fed(&self) -> anyhow::Result<&Federation> {
346 Ok(self.fed.get_try().await?.deref())
347 }
348 pub async fn bitcoind(&self) -> anyhow::Result<&Bitcoind> {
349 Ok(self.bitcoind.get_try().await?.deref())
350 }
351
352 pub async fn internal_client(&self) -> anyhow::Result<Client> {
353 Ok(self.fed().await?.internal_client().await?.clone())
354 }
355
356 pub async fn internal_client_gw_registered(&self) -> anyhow::Result<Client> {
359 self.fed().await?.await_gateways_registered().await?;
360 Ok(self.fed().await?.internal_client().await?.clone())
361 }
362
363 pub async fn finalize(&self, process_mgr: &ProcessManager) -> anyhow::Result<()> {
364 let fed_size = process_mgr.globals.FM_FED_SIZE;
365 let offline_nodes = process_mgr.globals.FM_OFFLINE_NODES;
366 anyhow::ensure!(
367 fed_size > 3 * offline_nodes,
368 "too many offline nodes ({offline_nodes}) to reach consensus"
369 );
370
371 let _ = self.internal_client_gw_registered().await?;
372 let _ = self.channel_opened.get_try().await?;
373 let _ = self.gw_lnd_registered().await?;
374 let _ = self.gw_ldk_connected().await?;
375 let _ = self.cln().await?;
376 let _ = self.lnd().await?;
377 let _ = self.electrs().await?;
378 let _ = self.esplora().await?;
379 let _ = self.fed_epoch_generated.get_try().await?;
380
381 debug!(
382 target: LOG_DEVIMINT,
383 fed_size,
384 offline_nodes,
385 elapsed_ms = %self.start_time.elapsed()?.as_millis(),
386 "Dev federation ready",
387 );
388 Ok(())
389 }
390
391 pub async fn to_dev_fed(self, process_mgr: &ProcessManager) -> anyhow::Result<DevFed> {
392 self.finalize(process_mgr).await?;
393 Ok(DevFed {
394 bitcoind: self.bitcoind().await?.to_owned(),
395 cln: self.cln().await?.to_owned(),
396 lnd: self.lnd().await?.to_owned(),
397 fed: self.fed().await?.to_owned(),
398 gw_lnd: self.gw_lnd().await?.to_owned(),
399 gw_ldk: self.gw_ldk().await?.to_owned(),
400 esplora: self.esplora().await?.to_owned(),
401 electrs: self.electrs().await?.to_owned(),
402 })
403 }
404
405 pub async fn fast_terminate(self) {
406 let Self {
407 bitcoind,
408 cln,
409 lnd,
410 fed,
411 gw_lnd,
412 electrs,
413 esplora,
414 ..
415 } = self;
416
417 join!(
418 spawn_drop(gw_lnd),
419 spawn_drop(fed),
420 spawn_drop(lnd),
421 spawn_drop(cln),
422 spawn_drop(esplora),
423 spawn_drop(electrs),
424 spawn_drop(bitcoind),
425 );
426 }
427}