1use std::fmt::Write;
2use std::ops::ControlFlow;
3use std::path::{Path, PathBuf};
4use std::time::Duration;
5use std::{env, ffi};
6
7use anyhow::{Context, Result, anyhow, ensure};
8use clap::{Parser, Subcommand};
9use fedimint_core::task::TaskGroup;
10use fedimint_core::util::{FmtCompactAnyhow as _, write_overwrite_async};
11use fedimint_logging::LOG_DEVIMINT;
12use rand::Rng as _;
13use rand::distributions::Alphanumeric;
14use tokio::fs;
15use tokio::time::Instant;
16use tracing::{debug, error, info, trace, warn};
17
18use crate::devfed::DevJitFed;
19use crate::envs::{
20 FM_DEVIMINT_STATIC_DATA_DIR_ENV, FM_FED_SIZE_ENV, FM_FEDERATIONS_BASE_PORT_ENV,
21 FM_GATEWAY_BASE_PORT_ENV, FM_INVITE_CODE_ENV, FM_LINK_TEST_DIR_ENV, FM_NUM_FEDS_ENV,
22 FM_OFFLINE_NODES_ENV, FM_PRE_DKG_ENV, FM_TEST_DIR_ENV,
23};
24use crate::util::{ProcessManager, poll};
25use crate::vars::mkdir;
26use crate::{external_daemons, vars};
27
28fn random_test_dir_suffix() -> String {
29 rand::thread_rng()
30 .sample_iter(&Alphanumeric)
31 .filter(u8::is_ascii_digit)
32 .take(3)
33 .map(char::from)
34 .collect::<String>()
35}
36
37#[derive(Parser, Clone, Default)]
38pub struct CommonArgs {
39 #[clap(short = 'd', long, env = FM_TEST_DIR_ENV)]
40 pub test_dir: Option<PathBuf>,
41
42 #[arg(long, env = "FM_SKIP_SETUP")]
45 skip_setup: bool,
46
47 #[arg(long, env = FM_PRE_DKG_ENV)]
49 pre_dkg: bool,
50
51 #[clap(short = 'n', long, env = FM_FED_SIZE_ENV, default_value = "4")]
53 pub fed_size: usize,
54
55 #[clap(long, env = FM_NUM_FEDS_ENV, default_value = "1")]
57 pub num_feds: usize,
58
59 #[clap(long, env = FM_LINK_TEST_DIR_ENV)]
60 pub link_test_dir: Option<PathBuf>,
62
63 #[clap(long, default_value_t = random_test_dir_suffix())]
64 pub link_test_dir_suffix: String,
65
66 #[clap(long, env = FM_OFFLINE_NODES_ENV, default_value = "0")]
68 pub offline_nodes: usize,
69
70 #[clap(long, env = FM_FEDERATIONS_BASE_PORT_ENV)]
72 pub federations_base_port: Option<u16>,
73
74 #[clap(long, env = FM_GATEWAY_BASE_PORT_ENV)]
76 pub gateway_base_port: Option<u16>,
77}
78
79impl CommonArgs {
80 pub fn mk_test_dir(&self) -> Result<PathBuf> {
81 if self.skip_setup {
82 ensure!(
83 self.test_dir.is_some(),
84 "When using `--skip-setup`, `--test-dir` must be set"
85 );
86 }
87 let path = self.test_dir();
88
89 std::fs::create_dir_all(&path)
90 .with_context(|| format!("Creating tmp directory {}", path.display()))?;
91
92 Ok(path)
93 }
94
95 pub fn test_dir(&self) -> PathBuf {
96 self.test_dir.clone().unwrap_or_else(|| {
97 std::env::temp_dir().join(format!(
98 "devimint-{}-{}",
99 std::process::id(),
100 self.link_test_dir_suffix
101 ))
102 })
103 }
104}
105
106#[derive(Subcommand)]
107pub enum Cmd {
108 ExternalDaemons {
110 #[arg(long, trailing_var_arg = true, allow_hyphen_values = true, num_args=1..)]
111 exec: Option<Vec<ffi::OsString>>,
112 },
113 DevFed {
118 #[arg(long, trailing_var_arg = true, allow_hyphen_values = true, num_args=1..)]
119 exec: Option<Vec<ffi::OsString>>,
120 },
121 DevFedPreRestore {
124 #[arg(long, trailing_var_arg = true, allow_hyphen_values = true, num_args=1..)]
125 exec: Option<Vec<ffi::OsString>>,
126 },
127 #[clap(flatten)]
130 Rpc(RpcCmd),
131}
132
133#[derive(Subcommand)]
134pub enum RpcCmd {
135 Wait,
136 Env,
137}
138
139pub async fn setup(arg: CommonArgs) -> Result<(ProcessManager, TaskGroup)> {
140 let test_dir = &arg.mk_test_dir()?;
141 mkdir(test_dir.clone()).await?;
142 let logs_dir: PathBuf = test_dir.join("logs");
143 mkdir(logs_dir.clone()).await?;
144
145 let log_file = fs::OpenOptions::new()
146 .write(true)
147 .create(true)
148 .append(true)
149 .open(logs_dir.join("devimint.log"))
150 .await?
151 .into_std()
152 .await;
153
154 fedimint_logging::TracingSetup::default()
155 .with_file(Some(log_file))
156 .with_directive("jsonrpsee-client=off")
158 .init()?;
159
160 let globals = vars::Global::new(
161 test_dir,
162 arg.num_feds,
163 arg.fed_size,
164 arg.offline_nodes,
165 arg.federations_base_port,
166 arg.gateway_base_port,
167 )
168 .await?;
169
170 if let Some(link_test_dir) = arg.link_test_dir.as_ref() {
171 update_test_dir_link(link_test_dir, &arg.test_dir()).await?;
172 }
173 info!(target: LOG_DEVIMINT, path = %globals.FM_DATA_DIR.display(), "Devimint data dir");
174
175 let mut env_string = String::new();
176 for (var, value) in globals.vars() {
177 debug!(var, value, "Env variable set");
178 writeln!(env_string, r#"export {var}="{value}""#)?; unsafe { std::env::set_var(var, value) };
181 }
182 write_overwrite_async(globals.FM_TEST_DIR.join("env"), env_string).await?;
183 let process_mgr = ProcessManager::new(globals);
184 let task_group = TaskGroup::new();
185 task_group.install_kill_handler();
186 Ok((process_mgr, task_group))
187}
188
189pub async fn update_test_dir_link(
190 link_test_dir: &Path,
191 test_dir: &Path,
192) -> Result<(), anyhow::Error> {
193 let make_link = match fs::read_link(link_test_dir).await {
194 Ok(existing) => {
195 if existing == test_dir {
196 false
197 } else {
198 debug!(
199 old = %existing.display(),
200 new = %test_dir.display(),
201 link = %link_test_dir.display(),
202 "Updating exinst test dir link"
203 );
204
205 fs::remove_file(link_test_dir).await?;
206 true
207 }
208 }
209 _ => true,
210 };
211 if make_link {
212 debug!(src = %test_dir.display(), dst = %link_test_dir.display(), "Linking test dir");
213 fs::symlink(&test_dir, link_test_dir).await?;
214 }
215 Ok(())
216}
217
218pub async fn cleanup_on_exit<T>(
219 main_process: impl futures::Future<Output = Result<T>>,
220 task_group: TaskGroup,
221) -> Result<Option<T>> {
222 match task_group
223 .make_handle()
224 .cancel_on_shutdown(main_process)
225 .await
226 {
227 Err(_) => {
228 info!("Received shutdown signal before finishing main process, exiting early");
229 Ok(None)
230 }
231 Ok(Ok(v)) => {
232 debug!(target: LOG_DEVIMINT, "Main process finished successfully, shutting down task group");
233 task_group
234 .shutdown_join_all(Duration::from_secs(30))
235 .await?;
236
237 Ok(Some(v))
239 }
240 Ok(Err(err)) => {
241 warn!(target: LOG_DEVIMINT, err = %err.fmt_compact_anyhow(), "Main process failed, will shutdown");
242 Err(err)
243 }
244 }
245}
246
247pub async fn write_ready_file<T>(global: &vars::Global, result: Result<T>) -> Result<T> {
248 let ready_file = &global.FM_READY_FILE;
249 match result {
250 Ok(_) => write_overwrite_async(ready_file, "READY").await?,
251 Err(_) => write_overwrite_async(ready_file, "ERROR").await?,
252 }
253 result
254}
255
256pub async fn handle_command(cmd: Cmd, common_args: CommonArgs) -> Result<()> {
257 match cmd {
258 Cmd::ExternalDaemons { exec } => {
259 let (process_mgr, task_group) = setup(common_args).await?;
260 let _daemons =
261 write_ready_file(&process_mgr.globals, external_daemons(&process_mgr).await)
262 .await?;
263 if let Some(exec) = exec {
264 exec_user_command(exec).await?;
265 task_group.shutdown();
266 }
267 task_group.make_handle().make_shutdown_rx().await;
268 }
269 Cmd::DevFed { exec } => handle_dev_fed_command(common_args, exec, false).await?,
270 Cmd::DevFedPreRestore { exec } => handle_dev_fed_command(common_args, exec, true).await?,
271 Cmd::Rpc(rpc_cmd) => rpc_command(rpc_cmd, common_args).await?,
272 }
273 Ok(())
274}
275
276async fn handle_dev_fed_command(
277 common_args: CommonArgs,
278 exec: Option<Vec<ffi::OsString>>,
279 pre_restore: bool,
280) -> Result<()> {
281 trace!(target: LOG_DEVIMINT, "Starting dev fed");
282 let start_time = Instant::now();
283 let skip_setup = common_args.skip_setup;
284 let pre_dkg = common_args.pre_dkg;
285 ensure!(
286 !pre_restore || (!skip_setup && !pre_dkg),
287 "dev-fed-pre-restore cannot be combined with skip-setup or pre-dkg"
288 );
289 let (process_mgr, task_group) = setup(common_args).await?;
290 let main = {
291 let task_group = task_group.clone();
292 async move {
293 let dev_fed =
294 DevJitFed::new_with_pre_restore(&process_mgr, skip_setup, pre_dkg, pre_restore)?;
295
296 let pegin_start_time = Instant::now();
297 debug!(target: LOG_DEVIMINT, "Peging in client and gateways");
298
299 if !skip_setup && !pre_dkg && !pre_restore {
300 const GW_PEGIN_AMOUNT: u64 = 1_000_000;
301 const CLIENT_PEGIN_AMOUNT: u64 = 1_000_000;
302
303 let (operation_id, (), ()) = tokio::try_join!(
304 async {
305 let (address, operation_id) =
306 dev_fed.internal_client().await?.get_deposit_addr().await?;
307 debug!(
308 target: LOG_DEVIMINT,
309 %address,
310 %operation_id,
311 "Sending funds to client deposit addr"
312 );
313 dev_fed
314 .bitcoind()
315 .await?
316 .send_to(address, CLIENT_PEGIN_AMOUNT)
317 .await?;
318 Ok(operation_id)
319 },
320 async {
321 let address = dev_fed
322 .gw_lnd_registered()
323 .await?
324 .client()
325 .get_pegin_addr(&dev_fed.fed().await?.calculate_federation_id())
326 .await?;
327 debug!(
328 target: LOG_DEVIMINT,
329 %address,
330 "Sending funds to LND deposit addr"
331 );
332 dev_fed
333 .bitcoind()
334 .await?
335 .send_to(address, GW_PEGIN_AMOUNT)
336 .await
337 .map(|_| ())
338 },
339 async {
340 if crate::util::supports_lnv2() {
341 let gw_ldk = dev_fed.gw_ldk_connected().await?;
342 let address = gw_ldk
343 .client()
344 .get_pegin_addr(&dev_fed.fed().await?.calculate_federation_id())
345 .await?;
346 debug!(
347 target: LOG_DEVIMINT,
348 %address,
349 "Sending funds to LDK deposit addr"
350 );
351 dev_fed
352 .bitcoind()
353 .await?
354 .send_to(address, GW_PEGIN_AMOUNT)
355 .await
356 .map(|_| ())
357 } else {
358 Ok(())
359 }
360 },
361 )?;
362
363 dev_fed.bitcoind().await?.mine_blocks_no_wait(11).await?;
364 if crate::util::supports_wallet_v2() {
365 dev_fed
366 .internal_client()
367 .await?
368 .await_balance(CLIENT_PEGIN_AMOUNT * 1000 * 9 / 10)
369 .await?;
370 } else {
371 dev_fed
372 .internal_client()
373 .await?
374 .await_deposit(&operation_id)
375 .await?;
376 }
377
378 info!(
379 target: LOG_DEVIMINT,
380 elapsed_ms = %pegin_start_time.elapsed().as_millis(),
381 "Pegins completed"
382 );
383 }
384
385 if !pre_dkg && !pre_restore {
386 unsafe {
389 std::env::set_var(FM_INVITE_CODE_ENV, dev_fed.fed().await?.invite_code()?);
390 };
391 }
392
393 if pre_restore {
394 let _ = dev_fed.fed().await?;
395 } else {
396 dev_fed.finalize(&process_mgr).await?;
397 }
398
399 let daemons = write_ready_file(&process_mgr.globals, Ok(dev_fed)).await?;
400
401 info!(
402 target: LOG_DEVIMINT,
403 elapsed_ms = %start_time.elapsed().as_millis(),
404 path = %process_mgr.globals.FM_DATA_DIR.display(),
405 "Devfed ready"
406 );
407 if let Some(exec) = exec {
408 debug!(target: LOG_DEVIMINT, "Starting exec command");
409 exec_user_command(exec).await?;
410 task_group.shutdown();
411 }
412
413 debug!(target: LOG_DEVIMINT, "Waiting for group task shutdown");
414 task_group.make_handle().make_shutdown_rx().await;
415
416 Ok::<_, anyhow::Error>(daemons)
417 }
418 };
419 if let Some(fed) = cleanup_on_exit(main, task_group).await? {
420 fed.fast_terminate().await;
421 }
422
423 Ok(())
424}
425
426pub async fn exec_user_command(path: Vec<ffi::OsString>) -> Result<(), anyhow::Error> {
427 let cmd_str = path
428 .join(ffi::OsStr::new(" "))
429 .to_string_lossy()
430 .to_string();
431
432 let path_with_aliases = if let Some(existing_path) = env::var_os("PATH") {
433 let mut path = devimint_static_data_dir();
434 path.push("/aliases:");
435 path.push(existing_path);
436 path
437 } else {
438 let mut path = devimint_static_data_dir();
439 path.push("/aliases");
440 path
441 };
442 debug!(target: LOG_DEVIMINT, cmd = %cmd_str, "Executing user command");
443 if !tokio::process::Command::new(&path[0])
444 .args(&path[1..])
445 .env("PATH", path_with_aliases)
446 .kill_on_drop(true)
447 .status()
448 .await
449 .with_context(|| format!("Executing user command failed: {cmd_str}"))?
450 .success()
451 {
452 error!(cmd = %cmd_str, "User command failed");
453 return Err(anyhow!("User command failed: {cmd_str}"));
454 }
455 Ok(())
456}
457
458fn devimint_static_data_dir() -> ffi::OsString {
459 env::var_os(FM_DEVIMINT_STATIC_DATA_DIR_ENV).unwrap_or(
461 env!(
462 "FM_DEVIMINT_STATIC_DATA_DIR"
464 )
465 .into(),
466 )
467}
468
469pub async fn rpc_command(rpc: RpcCmd, common: CommonArgs) -> Result<()> {
470 fedimint_logging::TracingSetup::default().init()?;
471 match rpc {
472 RpcCmd::Env => {
473 let env_file = common.test_dir().join("env");
474 poll("env file", || async {
475 if fs::try_exists(&env_file)
476 .await
477 .context("env file")
478 .map_err(ControlFlow::Continue)?
479 {
480 Ok(())
481 } else {
482 Err(ControlFlow::Continue(anyhow!("env file not found")))
483 }
484 })
485 .await?;
486 let env = fs::read_to_string(&env_file).await?;
487 print!("{env}");
488 Ok(())
489 }
490 RpcCmd::Wait => {
491 let ready_file = common.test_dir().join("ready");
492 poll("ready file", || async {
493 if fs::try_exists(&ready_file)
494 .await
495 .context("ready file")
496 .map_err(ControlFlow::Continue)?
497 {
498 Ok(())
499 } else {
500 Err(ControlFlow::Continue(anyhow!("ready file not found")))
501 }
502 })
503 .await?;
504 let env = fs::read_to_string(&ready_file).await?;
505 print!("{env}");
506
507 let test_dir = &common.test_dir();
509 let env_file = test_dir.join("env");
510 let invite_file = test_dir.join("cfg/invite-code");
511 if fs::try_exists(&env_file).await.ok().unwrap_or(false)
512 && fs::try_exists(&invite_file).await.ok().unwrap_or(false)
513 {
514 let invite = fs::read_to_string(&invite_file).await?;
515 let mut env_string = fs::read_to_string(&env_file).await?;
516 writeln!(env_string, r#"export FM_INVITE_CODE="{invite}""#)?;
517 unsafe { std::env::set_var(FM_INVITE_CODE_ENV, invite) };
519 write_overwrite_async(env_file, env_string).await?;
520 }
521
522 Ok(())
523 }
524 }
525}