1use std::fmt::Write;
2use std::ops::ControlFlow;
3use std::path::{Path, PathBuf};
4use std::time::Duration;
5use std::{env, ffi};
6
7use anyhow::{Context, Result, anyhow, ensure};
8use clap::{Parser, Subcommand};
9use fedimint_core::task::TaskGroup;
10use fedimint_core::util::{FmtCompactAnyhow as _, write_overwrite_async};
11use fedimint_logging::LOG_DEVIMINT;
12use rand::Rng as _;
13use rand::distributions::Alphanumeric;
14use tokio::fs;
15use tokio::time::Instant;
16use tracing::{debug, error, info, trace, warn};
17
18use crate::devfed::DevJitFed;
19use crate::envs::{
20 FM_DEVIMINT_STATIC_DATA_DIR_ENV, FM_FED_SIZE_ENV, FM_FEDERATIONS_BASE_PORT_ENV,
21 FM_GATEWAY_BASE_PORT_ENV, FM_INVITE_CODE_ENV, FM_LINK_TEST_DIR_ENV, FM_NUM_FEDS_ENV,
22 FM_OFFLINE_NODES_ENV, FM_PRE_DKG_ENV, FM_TEST_DIR_ENV,
23};
24use crate::util::{ProcessManager, poll};
25use crate::vars::mkdir;
26use crate::{external_daemons, vars};
27
28fn random_test_dir_suffix() -> String {
29 rand::thread_rng()
30 .sample_iter(&Alphanumeric)
31 .filter(u8::is_ascii_digit)
32 .take(3)
33 .map(char::from)
34 .collect::<String>()
35}
36
37#[derive(Parser, Clone, Default)]
38pub struct CommonArgs {
39 #[clap(short = 'd', long, env = FM_TEST_DIR_ENV)]
40 pub test_dir: Option<PathBuf>,
41
42 #[arg(long, env = "FM_SKIP_SETUP")]
45 skip_setup: bool,
46
47 #[arg(long, env = FM_PRE_DKG_ENV)]
49 pre_dkg: bool,
50
51 #[clap(short = 'n', long, env = FM_FED_SIZE_ENV, default_value = "4")]
53 pub fed_size: usize,
54
55 #[clap(long, env = FM_NUM_FEDS_ENV, default_value = "1")]
57 pub num_feds: usize,
58
59 #[clap(long, env = FM_LINK_TEST_DIR_ENV)]
60 pub link_test_dir: Option<PathBuf>,
62
63 #[clap(long, default_value_t = random_test_dir_suffix())]
64 pub link_test_dir_suffix: String,
65
66 #[clap(long, env = FM_OFFLINE_NODES_ENV, default_value = "0")]
68 pub offline_nodes: usize,
69
70 #[clap(long, env = FM_FEDERATIONS_BASE_PORT_ENV)]
72 pub federations_base_port: Option<u16>,
73
74 #[clap(long, env = FM_GATEWAY_BASE_PORT_ENV)]
76 pub gateway_base_port: Option<u16>,
77}
78
79impl CommonArgs {
80 pub fn mk_test_dir(&self) -> Result<PathBuf> {
81 if self.skip_setup {
82 ensure!(
83 self.test_dir.is_some(),
84 "When using `--skip-setup`, `--test-dir` must be set"
85 );
86 }
87 let path = self.test_dir();
88
89 std::fs::create_dir_all(&path)
90 .with_context(|| format!("Creating tmp directory {}", path.display()))?;
91
92 Ok(path)
93 }
94
95 pub fn test_dir(&self) -> PathBuf {
96 self.test_dir.clone().unwrap_or_else(|| {
97 std::env::temp_dir().join(format!(
98 "devimint-{}-{}",
99 std::process::id(),
100 self.link_test_dir_suffix
101 ))
102 })
103 }
104}
105
106#[derive(Subcommand)]
107pub enum Cmd {
108 ExternalDaemons {
110 #[arg(long, trailing_var_arg = true, allow_hyphen_values = true, num_args=1..)]
111 exec: Option<Vec<ffi::OsString>>,
112 },
113 DevFed {
118 #[arg(long, trailing_var_arg = true, allow_hyphen_values = true, num_args=1..)]
119 exec: Option<Vec<ffi::OsString>>,
120 },
121 #[clap(flatten)]
124 Rpc(RpcCmd),
125}
126
127#[derive(Subcommand)]
128pub enum RpcCmd {
129 Wait,
130 Env,
131}
132
133pub async fn setup(arg: CommonArgs) -> Result<(ProcessManager, TaskGroup)> {
134 let test_dir = &arg.mk_test_dir()?;
135 mkdir(test_dir.clone()).await?;
136 let logs_dir: PathBuf = test_dir.join("logs");
137 mkdir(logs_dir.clone()).await?;
138
139 let log_file = fs::OpenOptions::new()
140 .write(true)
141 .create(true)
142 .append(true)
143 .open(logs_dir.join("devimint.log"))
144 .await?
145 .into_std()
146 .await;
147
148 fedimint_logging::TracingSetup::default()
149 .with_file(Some(log_file))
150 .with_directive("jsonrpsee-client=off")
152 .init()?;
153
154 let globals = vars::Global::new(
155 test_dir,
156 arg.num_feds,
157 arg.fed_size,
158 arg.offline_nodes,
159 arg.federations_base_port,
160 arg.gateway_base_port,
161 )
162 .await?;
163
164 if let Some(link_test_dir) = arg.link_test_dir.as_ref() {
165 update_test_dir_link(link_test_dir, &arg.test_dir()).await?;
166 }
167 info!(target: LOG_DEVIMINT, path = %globals.FM_DATA_DIR.display(), "Devimint data dir");
168
169 let mut env_string = String::new();
170 for (var, value) in globals.vars() {
171 debug!(var, value, "Env variable set");
172 writeln!(env_string, r#"export {var}="{value}""#)?; unsafe { std::env::set_var(var, value) };
175 }
176 write_overwrite_async(globals.FM_TEST_DIR.join("env"), env_string).await?;
177 let process_mgr = ProcessManager::new(globals);
178 let task_group = TaskGroup::new();
179 task_group.install_kill_handler();
180 Ok((process_mgr, task_group))
181}
182
183pub async fn update_test_dir_link(
184 link_test_dir: &Path,
185 test_dir: &Path,
186) -> Result<(), anyhow::Error> {
187 let make_link = match fs::read_link(link_test_dir).await {
188 Ok(existing) => {
189 if existing == test_dir {
190 false
191 } else {
192 debug!(
193 old = %existing.display(),
194 new = %test_dir.display(),
195 link = %link_test_dir.display(),
196 "Updating exinst test dir link"
197 );
198
199 fs::remove_file(link_test_dir).await?;
200 true
201 }
202 }
203 _ => true,
204 };
205 if make_link {
206 debug!(src = %test_dir.display(), dst = %link_test_dir.display(), "Linking test dir");
207 fs::symlink(&test_dir, link_test_dir).await?;
208 }
209 Ok(())
210}
211
212pub async fn cleanup_on_exit<T>(
213 main_process: impl futures::Future<Output = Result<T>>,
214 task_group: TaskGroup,
215) -> Result<Option<T>> {
216 match task_group
217 .make_handle()
218 .cancel_on_shutdown(main_process)
219 .await
220 {
221 Err(_) => {
222 info!("Received shutdown signal before finishing main process, exiting early");
223 Ok(None)
224 }
225 Ok(Ok(v)) => {
226 debug!(target: LOG_DEVIMINT, "Main process finished successfully, shutting down task group");
227 task_group
228 .shutdown_join_all(Duration::from_secs(30))
229 .await?;
230
231 Ok(Some(v))
233 }
234 Ok(Err(err)) => {
235 warn!(target: LOG_DEVIMINT, err = %err.fmt_compact_anyhow(), "Main process failed, will shutdown");
236 Err(err)
237 }
238 }
239}
240
241pub async fn write_ready_file<T>(global: &vars::Global, result: Result<T>) -> Result<T> {
242 let ready_file = &global.FM_READY_FILE;
243 match result {
244 Ok(_) => write_overwrite_async(ready_file, "READY").await?,
245 Err(_) => write_overwrite_async(ready_file, "ERROR").await?,
246 }
247 result
248}
249
250pub async fn handle_command(cmd: Cmd, common_args: CommonArgs) -> Result<()> {
251 match cmd {
252 Cmd::ExternalDaemons { exec } => {
253 let (process_mgr, task_group) = setup(common_args).await?;
254 let _daemons =
255 write_ready_file(&process_mgr.globals, external_daemons(&process_mgr).await)
256 .await?;
257 if let Some(exec) = exec {
258 exec_user_command(exec).await?;
259 task_group.shutdown();
260 }
261 task_group.make_handle().make_shutdown_rx().await;
262 }
263 Cmd::DevFed { exec } => {
264 trace!(target: LOG_DEVIMINT, "Starting dev fed");
265 let start_time = Instant::now();
266 let skip_setup = common_args.skip_setup;
267 let pre_dkg = common_args.pre_dkg;
268 let (process_mgr, task_group) = setup(common_args).await?;
269 let main = {
270 let task_group = task_group.clone();
271 async move {
272 let dev_fed = DevJitFed::new(&process_mgr, skip_setup, pre_dkg)?;
273
274 let pegin_start_time = Instant::now();
275 debug!(target: LOG_DEVIMINT, "Peging in client and gateways");
276
277 if !skip_setup && !pre_dkg {
278 const GW_PEGIN_AMOUNT: u64 = 1_000_000;
279 const CLIENT_PEGIN_AMOUNT: u64 = 1_000_000;
280
281 let (operation_id, (), ()) = tokio::try_join!(
282 async {
283 let (address, operation_id) =
284 dev_fed.internal_client().await?.get_deposit_addr().await?;
285 debug!(
286 target: LOG_DEVIMINT,
287 %address,
288 %operation_id,
289 "Sending funds to client deposit addr"
290 );
291 dev_fed
292 .bitcoind()
293 .await?
294 .send_to(address, CLIENT_PEGIN_AMOUNT)
295 .await?;
296 Ok(operation_id)
297 },
298 async {
299 let address = dev_fed
300 .gw_lnd_registered()
301 .await?
302 .client()
303 .get_pegin_addr(&dev_fed.fed().await?.calculate_federation_id())
304 .await?;
305 debug!(
306 target: LOG_DEVIMINT,
307 %address,
308 "Sending funds to LND deposit addr"
309 );
310 dev_fed
311 .bitcoind()
312 .await?
313 .send_to(address, GW_PEGIN_AMOUNT)
314 .await
315 .map(|_| ())
316 },
317 async {
318 if crate::util::supports_lnv2() {
319 let gw_ldk = dev_fed.gw_ldk_connected().await?;
320 let address = gw_ldk
321 .client()
322 .get_pegin_addr(
323 &dev_fed.fed().await?.calculate_federation_id(),
324 )
325 .await?;
326 debug!(
327 target: LOG_DEVIMINT,
328 %address,
329 "Sending funds to LDK deposit addr"
330 );
331 dev_fed
332 .bitcoind()
333 .await?
334 .send_to(address, GW_PEGIN_AMOUNT)
335 .await
336 .map(|_| ())
337 } else {
338 Ok(())
339 }
340 },
341 )?;
342
343 dev_fed.bitcoind().await?.mine_blocks_no_wait(11).await?;
344 if crate::util::supports_wallet_v2() {
345 dev_fed
346 .internal_client()
347 .await?
348 .await_balance(CLIENT_PEGIN_AMOUNT * 1000 * 9 / 10)
349 .await?;
350 } else {
351 dev_fed
352 .internal_client()
353 .await?
354 .await_deposit(&operation_id)
355 .await?;
356 }
357
358 info!(
359 target: LOG_DEVIMINT,
360 elapsed_ms = %pegin_start_time.elapsed().as_millis(),
361 "Pegins completed"
362 );
363 }
364
365 if !pre_dkg {
366 unsafe {
369 std::env::set_var(
370 FM_INVITE_CODE_ENV,
371 dev_fed.fed().await?.invite_code()?,
372 );
373 };
374 }
375
376 dev_fed.finalize(&process_mgr).await?;
377
378 let daemons = write_ready_file(&process_mgr.globals, Ok(dev_fed)).await?;
379
380 info!(
381 target: LOG_DEVIMINT,
382 elapsed_ms = %start_time.elapsed().as_millis(),
383 path = %process_mgr.globals.FM_DATA_DIR.display(),
384 "Devfed ready"
385 );
386 if let Some(exec) = exec {
387 debug!(target: LOG_DEVIMINT, "Starting exec command");
388 exec_user_command(exec).await?;
389 task_group.shutdown();
390 }
391
392 debug!(target: LOG_DEVIMINT, "Waiting for group task shutdown");
393 task_group.make_handle().make_shutdown_rx().await;
394
395 Ok::<_, anyhow::Error>(daemons)
396 }
397 };
398 if let Some(fed) = cleanup_on_exit(main, task_group).await? {
399 fed.fast_terminate().await;
400 }
401 }
402 Cmd::Rpc(rpc_cmd) => rpc_command(rpc_cmd, common_args).await?,
403 }
404 Ok(())
405}
406
407pub async fn exec_user_command(path: Vec<ffi::OsString>) -> Result<(), anyhow::Error> {
408 let cmd_str = path
409 .join(ffi::OsStr::new(" "))
410 .to_string_lossy()
411 .to_string();
412
413 let path_with_aliases = if let Some(existing_path) = env::var_os("PATH") {
414 let mut path = devimint_static_data_dir();
415 path.push("/aliases:");
416 path.push(existing_path);
417 path
418 } else {
419 let mut path = devimint_static_data_dir();
420 path.push("/aliases");
421 path
422 };
423 debug!(target: LOG_DEVIMINT, cmd = %cmd_str, "Executing user command");
424 if !tokio::process::Command::new(&path[0])
425 .args(&path[1..])
426 .env("PATH", path_with_aliases)
427 .kill_on_drop(true)
428 .status()
429 .await
430 .with_context(|| format!("Executing user command failed: {cmd_str}"))?
431 .success()
432 {
433 error!(cmd = %cmd_str, "User command failed");
434 return Err(anyhow!("User command failed: {cmd_str}"));
435 }
436 Ok(())
437}
438
439fn devimint_static_data_dir() -> ffi::OsString {
440 env::var_os(FM_DEVIMINT_STATIC_DATA_DIR_ENV).unwrap_or(
442 env!(
443 "FM_DEVIMINT_STATIC_DATA_DIR"
445 )
446 .into(),
447 )
448}
449
450pub async fn rpc_command(rpc: RpcCmd, common: CommonArgs) -> Result<()> {
451 fedimint_logging::TracingSetup::default().init()?;
452 match rpc {
453 RpcCmd::Env => {
454 let env_file = common.test_dir().join("env");
455 poll("env file", || async {
456 if fs::try_exists(&env_file)
457 .await
458 .context("env file")
459 .map_err(ControlFlow::Continue)?
460 {
461 Ok(())
462 } else {
463 Err(ControlFlow::Continue(anyhow!("env file not found")))
464 }
465 })
466 .await?;
467 let env = fs::read_to_string(&env_file).await?;
468 print!("{env}");
469 Ok(())
470 }
471 RpcCmd::Wait => {
472 let ready_file = common.test_dir().join("ready");
473 poll("ready file", || async {
474 if fs::try_exists(&ready_file)
475 .await
476 .context("ready file")
477 .map_err(ControlFlow::Continue)?
478 {
479 Ok(())
480 } else {
481 Err(ControlFlow::Continue(anyhow!("ready file not found")))
482 }
483 })
484 .await?;
485 let env = fs::read_to_string(&ready_file).await?;
486 print!("{env}");
487
488 let test_dir = &common.test_dir();
490 let env_file = test_dir.join("env");
491 let invite_file = test_dir.join("cfg/invite-code");
492 if fs::try_exists(&env_file).await.ok().unwrap_or(false)
493 && fs::try_exists(&invite_file).await.ok().unwrap_or(false)
494 {
495 let invite = fs::read_to_string(&invite_file).await?;
496 let mut env_string = fs::read_to_string(&env_file).await?;
497 writeln!(env_string, r#"export FM_INVITE_CODE="{invite}""#)?;
498 unsafe { std::env::set_var(FM_INVITE_CODE_ENV, invite) };
500 write_overwrite_async(env_file, env_string).await?;
501 }
502
503 Ok(())
504 }
505 }
506}