1use std::fmt::Write;
2use std::ops::ControlFlow;
3use std::path::{Path, PathBuf};
4use std::time::Duration;
5use std::{env, ffi};
6
7use anyhow::{Context, Result, anyhow, ensure};
8use clap::{Args, Parser, Subcommand};
9use fedimint_core::task::TaskGroup;
10use fedimint_core::util::{FmtCompactAnyhow as _, write_overwrite_async};
11use fedimint_logging::LOG_DEVIMINT;
12use rand::Rng as _;
13use rand::distributions::Alphanumeric;
14use tokio::fs;
15use tokio::net::TcpStream;
16use tokio::time::Instant;
17use tracing::{debug, error, info, trace, warn};
18
19use crate::devfed::DevJitFed;
20use crate::envs::{
21 FM_BITCOIN_RPC_URL_ENV, FM_CLN_SOCKET_ENV, FM_DEVIMINT_STATIC_DATA_DIR_ENV,
22 FM_FAUCET_BIND_ADDR_ENV, FM_FED_SIZE_ENV, FM_INVITE_CODE_ENV, FM_LINK_TEST_DIR_ENV,
23 FM_NUM_FEDS_ENV, FM_OFFLINE_NODES_ENV, FM_PORT_GW_LND_ENV, FM_TEST_DIR_ENV,
24};
25use crate::federation::Fedimintd;
26use crate::util::{ProcessManager, poll};
27use crate::vars::mkdir;
28use crate::{ExternalDaemons, external_daemons, vars};
29
30fn random_test_dir_suffix() -> String {
31 rand::thread_rng()
32 .sample_iter(&Alphanumeric)
33 .filter(u8::is_ascii_digit)
34 .take(3)
35 .map(char::from)
36 .collect::<String>()
37}
38
39#[derive(Parser, Clone, Default)]
40pub struct CommonArgs {
41 #[clap(short = 'd', long, env = FM_TEST_DIR_ENV)]
42 pub test_dir: Option<PathBuf>,
43
44 #[arg(long, env = "FM_SKIP_SETUP")]
47 skip_setup: bool,
48
49 #[clap(short = 'n', long, env = FM_FED_SIZE_ENV, default_value = "4")]
51 pub fed_size: usize,
52
53 #[clap(long, env = FM_NUM_FEDS_ENV, default_value = "1")]
55 pub num_feds: usize,
56
57 #[clap(long, env = FM_LINK_TEST_DIR_ENV)]
58 pub link_test_dir: Option<PathBuf>,
60
61 #[clap(long, default_value_t = random_test_dir_suffix())]
62 pub link_test_dir_suffix: String,
63
64 #[clap(long, env = FM_OFFLINE_NODES_ENV, default_value = "0")]
66 pub offline_nodes: usize,
67}
68
69impl CommonArgs {
70 pub fn mk_test_dir(&self) -> Result<PathBuf> {
71 if self.skip_setup {
72 ensure!(
73 self.test_dir.is_some(),
74 "When using `--skip-setup`, `--test-dir` must be set"
75 );
76 }
77 let path = self.test_dir();
78
79 std::fs::create_dir_all(&path)
80 .with_context(|| format!("Creating tmp directory {}", path.display()))?;
81
82 Ok(path)
83 }
84
85 pub fn test_dir(&self) -> PathBuf {
86 self.test_dir.clone().unwrap_or_else(|| {
87 std::env::temp_dir().join(format!(
88 "devimint-{}-{}",
89 std::process::id(),
90 self.link_test_dir_suffix
91 ))
92 })
93 }
94}
95
96#[derive(Subcommand)]
97pub enum Cmd {
98 ExternalDaemons {
101 #[arg(long, trailing_var_arg = true, allow_hyphen_values = true, num_args=1..)]
102 exec: Option<Vec<ffi::OsString>>,
103 },
104 DevFed {
109 #[arg(long, trailing_var_arg = true, allow_hyphen_values = true, num_args=1..)]
110 exec: Option<Vec<ffi::OsString>>,
111 },
112 RunUi,
114 #[clap(flatten)]
117 Rpc(RpcCmd),
118
119 Faucet(FaucetOpts),
121}
122
123#[derive(Args, Debug, Clone)]
124pub struct FaucetOpts {
125 #[clap(long, env = FM_FAUCET_BIND_ADDR_ENV)]
126 pub bind_addr: String,
127 #[clap(long, env = FM_BITCOIN_RPC_URL_ENV)]
128 pub bitcoind_rpc: String,
129 #[clap(long, env = FM_CLN_SOCKET_ENV)]
130 pub cln_socket: String,
131 #[clap(long, env = FM_PORT_GW_LND_ENV)]
132 pub gw_lnd_port: u16,
133 #[clap(long, env = FM_INVITE_CODE_ENV)]
134 pub invite_code: Option<String>,
135}
136
137#[derive(Subcommand)]
138pub enum RpcCmd {
139 Wait,
140 Env,
141}
142
143pub async fn setup(arg: CommonArgs) -> Result<(ProcessManager, TaskGroup)> {
144 let test_dir = &arg.mk_test_dir()?;
145 mkdir(test_dir.clone()).await?;
146 let logs_dir: PathBuf = test_dir.join("logs");
147 mkdir(logs_dir.clone()).await?;
148
149 let log_file = fs::OpenOptions::new()
150 .write(true)
151 .create(true)
152 .append(true)
153 .open(logs_dir.join("devimint.log"))
154 .await?
155 .into_std()
156 .await;
157
158 fedimint_logging::TracingSetup::default()
159 .with_file(Some(log_file))
160 .with_directive("jsonrpsee-client=off")
162 .init()?;
163
164 let globals =
165 vars::Global::new(test_dir, arg.num_feds, arg.fed_size, arg.offline_nodes).await?;
166
167 if let Some(link_test_dir) = arg.link_test_dir.as_ref() {
168 update_test_dir_link(link_test_dir, &arg.test_dir()).await?;
169 }
170 info!(target: LOG_DEVIMINT, path=%globals.FM_DATA_DIR.display() , "Devimint data dir");
171
172 let mut env_string = String::new();
173 for (var, value) in globals.vars() {
174 debug!(var, value, "Env variable set");
175 writeln!(env_string, r#"export {var}="{value}""#)?; unsafe { std::env::set_var(var, value) };
178 }
179 write_overwrite_async(globals.FM_TEST_DIR.join("env"), env_string).await?;
180 let process_mgr = ProcessManager::new(globals);
181 let task_group = TaskGroup::new();
182 task_group.install_kill_handler();
183 Ok((process_mgr, task_group))
184}
185
186pub async fn update_test_dir_link(
187 link_test_dir: &Path,
188 test_dir: &Path,
189) -> Result<(), anyhow::Error> {
190 let make_link = match fs::read_link(link_test_dir).await {
191 Ok(existing) => {
192 if existing == test_dir {
193 false
194 } else {
195 debug!(
196 old = %existing.display(),
197 new = %test_dir.display(),
198 link = %link_test_dir.display(),
199 "Updating exinst test dir link"
200 );
201
202 fs::remove_file(link_test_dir).await?;
203 true
204 }
205 }
206 _ => true,
207 };
208 if make_link {
209 debug!(src = %test_dir.display(), dst = %link_test_dir.display(), "Linking test dir");
210 fs::symlink(&test_dir, link_test_dir).await?;
211 }
212 Ok(())
213}
214
215pub async fn cleanup_on_exit<T>(
216 main_process: impl futures::Future<Output = Result<T>>,
217 task_group: TaskGroup,
218) -> Result<Option<T>> {
219 match task_group
220 .make_handle()
221 .cancel_on_shutdown(main_process)
222 .await
223 {
224 Err(_) => {
225 info!("Received shutdown signal before finishing main process, exiting early");
226 Ok(None)
227 }
228 Ok(Ok(v)) => {
229 debug!(target: LOG_DEVIMINT, "Main process finished successfully, shutting down task group");
230 task_group
231 .shutdown_join_all(Duration::from_secs(30))
232 .await?;
233
234 Ok(Some(v))
236 }
237 Ok(Err(err)) => {
238 warn!(target: LOG_DEVIMINT, err = %err.fmt_compact_anyhow(), "Main process failed, will shutdown");
239 Err(err)
240 }
241 }
242}
243
244pub async fn write_ready_file<T>(global: &vars::Global, result: Result<T>) -> Result<T> {
245 let ready_file = &global.FM_READY_FILE;
246 match result {
247 Ok(_) => write_overwrite_async(ready_file, "READY").await?,
248 Err(_) => write_overwrite_async(ready_file, "ERROR").await?,
249 }
250 result
251}
252
253pub async fn handle_command(cmd: Cmd, common_args: CommonArgs) -> Result<()> {
254 match cmd {
255 Cmd::ExternalDaemons { exec } => {
256 let (process_mgr, task_group) = setup(common_args).await?;
257 let _daemons =
258 write_ready_file(&process_mgr.globals, external_daemons(&process_mgr).await)
259 .await?;
260 if let Some(exec) = exec {
261 exec_user_command(exec).await?;
262 task_group.shutdown();
263 }
264 task_group.make_handle().make_shutdown_rx().await;
265 }
266 Cmd::DevFed { exec } => {
267 trace!(target: LOG_DEVIMINT, "Starting dev fed");
268 let start_time = Instant::now();
269 let skip_setup = common_args.skip_setup;
270 let (process_mgr, task_group) = setup(common_args).await?;
271 let main = {
272 let task_group = task_group.clone();
273 async move {
274 let dev_fed = DevJitFed::new(&process_mgr, skip_setup)?;
275
276 let pegin_start_time = Instant::now();
277 debug!(target: LOG_DEVIMINT, "Peging in client and gateways");
278
279 if !skip_setup {
280 const GW_PEGIN_AMOUNT: u64 = 1_000_000;
281 const CLIENT_PEGIN_AMOUNT: u64 = 1_000_000;
282
283 let (operation_id, (), ()) = tokio::try_join!(
284 async {
285 let (address, operation_id) =
286 dev_fed.internal_client().await?.get_deposit_addr().await?;
287 debug!(
288 target: LOG_DEVIMINT,
289 %address,
290 %operation_id,
291 "Sending funds to client deposit addr"
292 );
293 dev_fed
294 .bitcoind()
295 .await?
296 .send_to(address, CLIENT_PEGIN_AMOUNT)
297 .await?;
298 Ok(operation_id)
299 },
300 async {
301 let address = dev_fed
302 .gw_lnd_registered()
303 .await?
304 .get_pegin_addr(&dev_fed.fed().await?.calculate_federation_id())
305 .await?;
306 debug!(
307 target: LOG_DEVIMINT,
308 %address,
309 "Sending funds to LND deposit addr"
310 );
311 dev_fed
312 .bitcoind()
313 .await?
314 .send_to(address, GW_PEGIN_AMOUNT)
315 .await
316 .map(|_| ())
317 },
318 async {
319 if crate::util::supports_lnv2() {
320 let gw_ldk = dev_fed.gw_ldk_connected().await?;
321 let address = gw_ldk
322 .get_pegin_addr(
323 &dev_fed.fed().await?.calculate_federation_id(),
324 )
325 .await?;
326 debug!(
327 target: LOG_DEVIMINT,
328 %address,
329 "Sending funds to LDK deposit addr"
330 );
331 dev_fed
332 .bitcoind()
333 .await?
334 .send_to(address, GW_PEGIN_AMOUNT)
335 .await
336 .map(|_| ())
337 } else {
338 Ok(())
339 }
340 },
341 )?;
342
343 dev_fed.bitcoind().await?.mine_blocks_no_wait(11).await?;
344 dev_fed
345 .internal_client()
346 .await?
347 .await_deposit(&operation_id)
348 .await?;
349
350 info!(
351 target: LOG_DEVIMINT,
352 elapsed_ms = %pegin_start_time.elapsed().as_millis(),
353 "Pegins completed"
354 );
355 }
356
357 unsafe {
359 std::env::set_var(FM_INVITE_CODE_ENV, dev_fed.fed().await?.invite_code()?);
360 };
361
362 dev_fed.finalize(&process_mgr).await?;
363
364 let daemons = write_ready_file(&process_mgr.globals, Ok(dev_fed)).await?;
365
366 info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed().as_millis(), "Devfed ready");
367 if let Some(exec) = exec {
368 debug!(target: LOG_DEVIMINT, "Starting exec command");
369 exec_user_command(exec).await?;
370 task_group.shutdown();
371 }
372
373 debug!(target: LOG_DEVIMINT, "Waiting for group task shutdown");
374 task_group.make_handle().make_shutdown_rx().await;
375
376 Ok::<_, anyhow::Error>(daemons)
377 }
378 };
379 if let Some(fed) = cleanup_on_exit(main, task_group).await? {
380 fed.fast_terminate().await;
381 }
382 }
383 Cmd::Rpc(rpc_cmd) => rpc_command(rpc_cmd, common_args).await?,
384 Cmd::RunUi => {
385 let (process_mgr, task_group) = setup(common_args).await?;
386 let task_group_clone = task_group.clone();
387 let main = async {
388 let result = run_ui(&process_mgr).await;
389 let daemons = write_ready_file(&process_mgr.globals, result).await?;
390
391 debug!(target: LOG_DEVIMINT, "Waiting for group task shutdown");
392 task_group_clone.make_handle().make_shutdown_rx().await;
393
394 Ok::<_, anyhow::Error>(daemons)
395 };
396 Box::pin(cleanup_on_exit(main, task_group)).await?;
397 }
398 Cmd::Faucet(opts) => crate::faucet::run(opts).await?,
399 }
400 Ok(())
401}
402
403pub async fn exec_user_command(path: Vec<ffi::OsString>) -> Result<(), anyhow::Error> {
404 let cmd_str = path
405 .join(ffi::OsStr::new(" "))
406 .to_string_lossy()
407 .to_string();
408
409 let path_with_aliases = if let Some(existing_path) = env::var_os("PATH") {
410 let mut path = devimint_static_data_dir();
411 path.push("/aliases:");
412 path.push(existing_path);
413 path
414 } else {
415 let mut path = devimint_static_data_dir();
416 path.push("/aliases");
417 path
418 };
419 debug!(target: LOG_DEVIMINT, cmd = %cmd_str, "Executing user command");
420 if !tokio::process::Command::new(&path[0])
421 .args(&path[1..])
422 .env("PATH", path_with_aliases)
423 .kill_on_drop(true)
424 .status()
425 .await
426 .with_context(|| format!("Executing user command failed: {cmd_str}"))?
427 .success()
428 {
429 error!(cmd = %cmd_str, "User command failed");
430 return Err(anyhow!("User command failed: {cmd_str}"));
431 }
432 Ok(())
433}
434
435fn devimint_static_data_dir() -> ffi::OsString {
436 env::var_os(FM_DEVIMINT_STATIC_DATA_DIR_ENV).unwrap_or(
438 env!(
439 "FM_DEVIMINT_STATIC_DATA_DIR"
441 )
442 .into(),
443 )
444}
445
446pub async fn rpc_command(rpc: RpcCmd, common: CommonArgs) -> Result<()> {
447 fedimint_logging::TracingSetup::default().init()?;
448 match rpc {
449 RpcCmd::Env => {
450 let env_file = common.test_dir().join("env");
451 poll("env file", || async {
452 if fs::try_exists(&env_file)
453 .await
454 .context("env file")
455 .map_err(ControlFlow::Continue)?
456 {
457 Ok(())
458 } else {
459 Err(ControlFlow::Continue(anyhow!("env file not found")))
460 }
461 })
462 .await?;
463 let env = fs::read_to_string(&env_file).await?;
464 print!("{env}");
465 Ok(())
466 }
467 RpcCmd::Wait => {
468 let ready_file = common.test_dir().join("ready");
469 poll("ready file", || async {
470 if fs::try_exists(&ready_file)
471 .await
472 .context("ready file")
473 .map_err(ControlFlow::Continue)?
474 {
475 Ok(())
476 } else {
477 Err(ControlFlow::Continue(anyhow!("ready file not found")))
478 }
479 })
480 .await?;
481 let env = fs::read_to_string(&ready_file).await?;
482 print!("{env}");
483
484 let test_dir = &common.test_dir();
486 let env_file = test_dir.join("env");
487 let invite_file = test_dir.join("cfg/invite-code");
488 if fs::try_exists(&env_file).await.ok().unwrap_or(false)
489 && fs::try_exists(&invite_file).await.ok().unwrap_or(false)
490 {
491 let invite = fs::read_to_string(&invite_file).await?;
492 let mut env_string = fs::read_to_string(&env_file).await?;
493 writeln!(env_string, r#"export FM_INVITE_CODE="{invite}""#)?;
494 unsafe { std::env::set_var(FM_INVITE_CODE_ENV, invite) };
496 write_overwrite_async(env_file, env_string).await?;
497 }
498
499 Ok(())
500 }
501 }
502}
503
504async fn run_ui(process_mgr: &ProcessManager) -> Result<(Vec<Fedimintd>, ExternalDaemons)> {
505 let externals = external_daemons(process_mgr).await?;
506 let fed_size = process_mgr.globals.FM_FED_SIZE;
507 let fedimintds = futures::future::try_join_all((0..fed_size).map(|peer| {
508 let bitcoind = externals.bitcoind.clone();
509 async move {
510 let peer_port = 10000 + 8137 + peer * 2;
511 let api_port = peer_port + 1;
512 let metrics_port = 3510 + peer;
513
514 let vars = vars::Fedimintd {
525 FM_BIND_P2P: format!("127.0.0.1:{peer_port}"),
526 FM_P2P_URL: format!("fedimint://127.0.0.1:{peer_port}"),
527 FM_BIND_API: format!("127.0.0.1:{api_port}"),
528 FM_API_URL: format!("ws://127.0.0.1:{api_port}"),
529 FM_DATA_DIR: process_mgr
530 .globals
531 .FM_DATA_DIR
532 .join(format!("fedimintd-{peer}")),
533 FM_BIND_METRICS_API: format!("127.0.0.1:{metrics_port}"),
534 FM_FORCE_BITCOIN_RPC_URL: format!(
535 "http://bitcoin:bitcoin@127.0.0.1:{}",
536 process_mgr.globals.FM_PORT_BTC_RPC
537 ),
538 FM_FORCE_BITCOIN_RPC_KIND: "bitcoind".into(),
539 FM_IROH_P2P_SECRET_KEY_OVERRIDE: String::new(),
540 FM_IROH_API_SECRET_KEY_OVERRIDE: String::new(),
541 };
542 let fm = Fedimintd::new(
543 process_mgr,
544 bitcoind.clone(),
545 peer,
546 &vars,
547 "default".to_string(),
548 )
549 .await?;
550 let server_addr = &vars.FM_BIND_API;
551
552 poll("waiting for api startup", || async {
553 TcpStream::connect(server_addr)
554 .await
555 .context("connect to api")
556 .map_err(ControlFlow::Continue)
557 })
558 .await?;
559
560 anyhow::Ok(fm)
561 }
562 }))
563 .await?;
564
565 Ok((fedimintds, externals))
566}