1use std::fmt::Write;
2use std::ops::ControlFlow;
3use std::path::{Path, PathBuf};
4use std::time::Duration;
5use std::{env, ffi};
6
7use anyhow::{Context, Result, anyhow, ensure};
8use clap::{Parser, Subcommand};
9use fedimint_core::task::TaskGroup;
10use fedimint_core::util::{FmtCompactAnyhow as _, write_overwrite_async};
11use fedimint_logging::LOG_DEVIMINT;
12use rand::Rng as _;
13use rand::distributions::Alphanumeric;
14use tokio::fs;
15use tokio::time::Instant;
16use tracing::{debug, error, info, trace, warn};
17
18use crate::devfed::DevJitFed;
19use crate::envs::{
20 FM_DEVIMINT_STATIC_DATA_DIR_ENV, FM_FED_SIZE_ENV, FM_FEDERATIONS_BASE_PORT_ENV,
21 FM_INVITE_CODE_ENV, FM_LINK_TEST_DIR_ENV, FM_NUM_FEDS_ENV, FM_OFFLINE_NODES_ENV,
22 FM_PRE_DKG_ENV, FM_TEST_DIR_ENV,
23};
24use crate::util::{ProcessManager, poll};
25use crate::vars::mkdir;
26use crate::{external_daemons, vars};
27
28fn random_test_dir_suffix() -> String {
29 rand::thread_rng()
30 .sample_iter(&Alphanumeric)
31 .filter(u8::is_ascii_digit)
32 .take(3)
33 .map(char::from)
34 .collect::<String>()
35}
36
37#[derive(Parser, Clone, Default)]
38pub struct CommonArgs {
39 #[clap(short = 'd', long, env = FM_TEST_DIR_ENV)]
40 pub test_dir: Option<PathBuf>,
41
42 #[arg(long, env = "FM_SKIP_SETUP")]
45 skip_setup: bool,
46
47 #[arg(long, env = FM_PRE_DKG_ENV)]
49 pre_dkg: bool,
50
51 #[clap(short = 'n', long, env = FM_FED_SIZE_ENV, default_value = "4")]
53 pub fed_size: usize,
54
55 #[clap(long, env = FM_NUM_FEDS_ENV, default_value = "1")]
57 pub num_feds: usize,
58
59 #[clap(long, env = FM_LINK_TEST_DIR_ENV)]
60 pub link_test_dir: Option<PathBuf>,
62
63 #[clap(long, default_value_t = random_test_dir_suffix())]
64 pub link_test_dir_suffix: String,
65
66 #[clap(long, env = FM_OFFLINE_NODES_ENV, default_value = "0")]
68 pub offline_nodes: usize,
69
70 #[clap(long, env = FM_FEDERATIONS_BASE_PORT_ENV)]
72 pub federations_base_port: Option<u16>,
73}
74
75impl CommonArgs {
76 pub fn mk_test_dir(&self) -> Result<PathBuf> {
77 if self.skip_setup {
78 ensure!(
79 self.test_dir.is_some(),
80 "When using `--skip-setup`, `--test-dir` must be set"
81 );
82 }
83 let path = self.test_dir();
84
85 std::fs::create_dir_all(&path)
86 .with_context(|| format!("Creating tmp directory {}", path.display()))?;
87
88 Ok(path)
89 }
90
91 pub fn test_dir(&self) -> PathBuf {
92 self.test_dir.clone().unwrap_or_else(|| {
93 std::env::temp_dir().join(format!(
94 "devimint-{}-{}",
95 std::process::id(),
96 self.link_test_dir_suffix
97 ))
98 })
99 }
100}
101
102#[derive(Subcommand)]
103pub enum Cmd {
104 ExternalDaemons {
106 #[arg(long, trailing_var_arg = true, allow_hyphen_values = true, num_args=1..)]
107 exec: Option<Vec<ffi::OsString>>,
108 },
109 DevFed {
114 #[arg(long, trailing_var_arg = true, allow_hyphen_values = true, num_args=1..)]
115 exec: Option<Vec<ffi::OsString>>,
116 },
117 #[clap(flatten)]
120 Rpc(RpcCmd),
121}
122
123#[derive(Subcommand)]
124pub enum RpcCmd {
125 Wait,
126 Env,
127}
128
129pub async fn setup(arg: CommonArgs) -> Result<(ProcessManager, TaskGroup)> {
130 let test_dir = &arg.mk_test_dir()?;
131 mkdir(test_dir.clone()).await?;
132 let logs_dir: PathBuf = test_dir.join("logs");
133 mkdir(logs_dir.clone()).await?;
134
135 let log_file = fs::OpenOptions::new()
136 .write(true)
137 .create(true)
138 .append(true)
139 .open(logs_dir.join("devimint.log"))
140 .await?
141 .into_std()
142 .await;
143
144 fedimint_logging::TracingSetup::default()
145 .with_file(Some(log_file))
146 .with_directive("jsonrpsee-client=off")
148 .init()?;
149
150 let globals = vars::Global::new(
151 test_dir,
152 arg.num_feds,
153 arg.fed_size,
154 arg.offline_nodes,
155 arg.federations_base_port,
156 )
157 .await?;
158
159 if let Some(link_test_dir) = arg.link_test_dir.as_ref() {
160 update_test_dir_link(link_test_dir, &arg.test_dir()).await?;
161 }
162 info!(target: LOG_DEVIMINT, path=%globals.FM_DATA_DIR.display() , "Devimint data dir");
163
164 let mut env_string = String::new();
165 for (var, value) in globals.vars() {
166 debug!(var, value, "Env variable set");
167 writeln!(env_string, r#"export {var}="{value}""#)?; unsafe { std::env::set_var(var, value) };
170 }
171 write_overwrite_async(globals.FM_TEST_DIR.join("env"), env_string).await?;
172 let process_mgr = ProcessManager::new(globals);
173 let task_group = TaskGroup::new();
174 task_group.install_kill_handler();
175 Ok((process_mgr, task_group))
176}
177
178pub async fn update_test_dir_link(
179 link_test_dir: &Path,
180 test_dir: &Path,
181) -> Result<(), anyhow::Error> {
182 let make_link = match fs::read_link(link_test_dir).await {
183 Ok(existing) => {
184 if existing == test_dir {
185 false
186 } else {
187 debug!(
188 old = %existing.display(),
189 new = %test_dir.display(),
190 link = %link_test_dir.display(),
191 "Updating exinst test dir link"
192 );
193
194 fs::remove_file(link_test_dir).await?;
195 true
196 }
197 }
198 _ => true,
199 };
200 if make_link {
201 debug!(src = %test_dir.display(), dst = %link_test_dir.display(), "Linking test dir");
202 fs::symlink(&test_dir, link_test_dir).await?;
203 }
204 Ok(())
205}
206
207pub async fn cleanup_on_exit<T>(
208 main_process: impl futures::Future<Output = Result<T>>,
209 task_group: TaskGroup,
210) -> Result<Option<T>> {
211 match task_group
212 .make_handle()
213 .cancel_on_shutdown(main_process)
214 .await
215 {
216 Err(_) => {
217 info!("Received shutdown signal before finishing main process, exiting early");
218 Ok(None)
219 }
220 Ok(Ok(v)) => {
221 debug!(target: LOG_DEVIMINT, "Main process finished successfully, shutting down task group");
222 task_group
223 .shutdown_join_all(Duration::from_secs(30))
224 .await?;
225
226 Ok(Some(v))
228 }
229 Ok(Err(err)) => {
230 warn!(target: LOG_DEVIMINT, err = %err.fmt_compact_anyhow(), "Main process failed, will shutdown");
231 Err(err)
232 }
233 }
234}
235
236pub async fn write_ready_file<T>(global: &vars::Global, result: Result<T>) -> Result<T> {
237 let ready_file = &global.FM_READY_FILE;
238 match result {
239 Ok(_) => write_overwrite_async(ready_file, "READY").await?,
240 Err(_) => write_overwrite_async(ready_file, "ERROR").await?,
241 }
242 result
243}
244
245pub async fn handle_command(cmd: Cmd, common_args: CommonArgs) -> Result<()> {
246 match cmd {
247 Cmd::ExternalDaemons { exec } => {
248 let (process_mgr, task_group) = setup(common_args).await?;
249 let _daemons =
250 write_ready_file(&process_mgr.globals, external_daemons(&process_mgr).await)
251 .await?;
252 if let Some(exec) = exec {
253 exec_user_command(exec).await?;
254 task_group.shutdown();
255 }
256 task_group.make_handle().make_shutdown_rx().await;
257 }
258 Cmd::DevFed { exec } => {
259 trace!(target: LOG_DEVIMINT, "Starting dev fed");
260 let start_time = Instant::now();
261 let skip_setup = common_args.skip_setup;
262 let pre_dkg = common_args.pre_dkg;
263 let (process_mgr, task_group) = setup(common_args).await?;
264 let main = {
265 let task_group = task_group.clone();
266 async move {
267 let dev_fed = DevJitFed::new(&process_mgr, skip_setup, pre_dkg)?;
268
269 let pegin_start_time = Instant::now();
270 debug!(target: LOG_DEVIMINT, "Peging in client and gateways");
271
272 if !skip_setup && !pre_dkg {
273 const GW_PEGIN_AMOUNT: u64 = 1_000_000;
274 const CLIENT_PEGIN_AMOUNT: u64 = 1_000_000;
275
276 let (operation_id, (), ()) = tokio::try_join!(
277 async {
278 let (address, operation_id) =
279 dev_fed.internal_client().await?.get_deposit_addr().await?;
280 debug!(
281 target: LOG_DEVIMINT,
282 %address,
283 %operation_id,
284 "Sending funds to client deposit addr"
285 );
286 dev_fed
287 .bitcoind()
288 .await?
289 .send_to(address, CLIENT_PEGIN_AMOUNT)
290 .await?;
291 Ok(operation_id)
292 },
293 async {
294 let address = dev_fed
295 .gw_lnd_registered()
296 .await?
297 .get_pegin_addr(&dev_fed.fed().await?.calculate_federation_id())
298 .await?;
299 debug!(
300 target: LOG_DEVIMINT,
301 %address,
302 "Sending funds to LND deposit addr"
303 );
304 dev_fed
305 .bitcoind()
306 .await?
307 .send_to(address, GW_PEGIN_AMOUNT)
308 .await
309 .map(|_| ())
310 },
311 async {
312 if crate::util::supports_lnv2() {
313 let gw_ldk = dev_fed.gw_ldk_connected().await?;
314 let address = gw_ldk
315 .get_pegin_addr(
316 &dev_fed.fed().await?.calculate_federation_id(),
317 )
318 .await?;
319 debug!(
320 target: LOG_DEVIMINT,
321 %address,
322 "Sending funds to LDK deposit addr"
323 );
324 dev_fed
325 .bitcoind()
326 .await?
327 .send_to(address, GW_PEGIN_AMOUNT)
328 .await
329 .map(|_| ())
330 } else {
331 Ok(())
332 }
333 },
334 )?;
335
336 dev_fed.bitcoind().await?.mine_blocks_no_wait(11).await?;
337 dev_fed
338 .internal_client()
339 .await?
340 .await_deposit(&operation_id)
341 .await?;
342
343 info!(
344 target: LOG_DEVIMINT,
345 elapsed_ms = %pegin_start_time.elapsed().as_millis(),
346 "Pegins completed"
347 );
348 }
349
350 if !pre_dkg {
351 unsafe {
354 std::env::set_var(
355 FM_INVITE_CODE_ENV,
356 dev_fed.fed().await?.invite_code()?,
357 );
358 };
359 }
360
361 dev_fed.finalize(&process_mgr).await?;
362
363 let daemons = write_ready_file(&process_mgr.globals, Ok(dev_fed)).await?;
364
365 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed().as_millis(), "Devfed ready");
366 if let Some(exec) = exec {
367 debug!(target: LOG_DEVIMINT, "Starting exec command");
368 exec_user_command(exec).await?;
369 task_group.shutdown();
370 }
371
372 debug!(target: LOG_DEVIMINT, "Waiting for group task shutdown");
373 task_group.make_handle().make_shutdown_rx().await;
374
375 Ok::<_, anyhow::Error>(daemons)
376 }
377 };
378 if let Some(fed) = cleanup_on_exit(main, task_group).await? {
379 fed.fast_terminate().await;
380 }
381 }
382 Cmd::Rpc(rpc_cmd) => rpc_command(rpc_cmd, common_args).await?,
383 }
384 Ok(())
385}
386
387pub async fn exec_user_command(path: Vec<ffi::OsString>) -> Result<(), anyhow::Error> {
388 let cmd_str = path
389 .join(ffi::OsStr::new(" "))
390 .to_string_lossy()
391 .to_string();
392
393 let path_with_aliases = if let Some(existing_path) = env::var_os("PATH") {
394 let mut path = devimint_static_data_dir();
395 path.push("/aliases:");
396 path.push(existing_path);
397 path
398 } else {
399 let mut path = devimint_static_data_dir();
400 path.push("/aliases");
401 path
402 };
403 debug!(target: LOG_DEVIMINT, cmd = %cmd_str, "Executing user command");
404 if !tokio::process::Command::new(&path[0])
405 .args(&path[1..])
406 .env("PATH", path_with_aliases)
407 .kill_on_drop(true)
408 .status()
409 .await
410 .with_context(|| format!("Executing user command failed: {cmd_str}"))?
411 .success()
412 {
413 error!(cmd = %cmd_str, "User command failed");
414 return Err(anyhow!("User command failed: {cmd_str}"));
415 }
416 Ok(())
417}
418
419fn devimint_static_data_dir() -> ffi::OsString {
420 env::var_os(FM_DEVIMINT_STATIC_DATA_DIR_ENV).unwrap_or(
422 env!(
423 "FM_DEVIMINT_STATIC_DATA_DIR"
425 )
426 .into(),
427 )
428}
429
430pub async fn rpc_command(rpc: RpcCmd, common: CommonArgs) -> Result<()> {
431 fedimint_logging::TracingSetup::default().init()?;
432 match rpc {
433 RpcCmd::Env => {
434 let env_file = common.test_dir().join("env");
435 poll("env file", || async {
436 if fs::try_exists(&env_file)
437 .await
438 .context("env file")
439 .map_err(ControlFlow::Continue)?
440 {
441 Ok(())
442 } else {
443 Err(ControlFlow::Continue(anyhow!("env file not found")))
444 }
445 })
446 .await?;
447 let env = fs::read_to_string(&env_file).await?;
448 print!("{env}");
449 Ok(())
450 }
451 RpcCmd::Wait => {
452 let ready_file = common.test_dir().join("ready");
453 poll("ready file", || async {
454 if fs::try_exists(&ready_file)
455 .await
456 .context("ready file")
457 .map_err(ControlFlow::Continue)?
458 {
459 Ok(())
460 } else {
461 Err(ControlFlow::Continue(anyhow!("ready file not found")))
462 }
463 })
464 .await?;
465 let env = fs::read_to_string(&ready_file).await?;
466 print!("{env}");
467
468 let test_dir = &common.test_dir();
470 let env_file = test_dir.join("env");
471 let invite_file = test_dir.join("cfg/invite-code");
472 if fs::try_exists(&env_file).await.ok().unwrap_or(false)
473 && fs::try_exists(&invite_file).await.ok().unwrap_or(false)
474 {
475 let invite = fs::read_to_string(&invite_file).await?;
476 let mut env_string = fs::read_to_string(&env_file).await?;
477 writeln!(env_string, r#"export FM_INVITE_CODE="{invite}""#)?;
478 unsafe { std::env::set_var(FM_INVITE_CODE_ENV, invite) };
480 write_overwrite_async(env_file, env_string).await?;
481 }
482
483 Ok(())
484 }
485 }
486}