devimint/
cli.rs

1use std::fmt::Write;
2use std::ops::ControlFlow;
3use std::path::{Path, PathBuf};
4use std::time::Duration;
5use std::{env, ffi};
6
7use anyhow::{Context, Result, anyhow, ensure};
8use clap::{Parser, Subcommand};
9use fedimint_core::task::TaskGroup;
10use fedimint_core::util::{FmtCompactAnyhow as _, write_overwrite_async};
11use fedimint_logging::LOG_DEVIMINT;
12use rand::Rng as _;
13use rand::distributions::Alphanumeric;
14use tokio::fs;
15use tokio::time::Instant;
16use tracing::{debug, error, info, trace, warn};
17
18use crate::devfed::DevJitFed;
19use crate::envs::{
20    FM_DEVIMINT_STATIC_DATA_DIR_ENV, FM_FED_SIZE_ENV, FM_FEDERATIONS_BASE_PORT_ENV,
21    FM_INVITE_CODE_ENV, FM_LINK_TEST_DIR_ENV, FM_NUM_FEDS_ENV, FM_OFFLINE_NODES_ENV,
22    FM_PRE_DKG_ENV, FM_TEST_DIR_ENV,
23};
24use crate::util::{ProcessManager, poll};
25use crate::vars::mkdir;
26use crate::{external_daemons, vars};
27
28fn random_test_dir_suffix() -> String {
29    rand::thread_rng()
30        .sample_iter(&Alphanumeric)
31        .filter(u8::is_ascii_digit)
32        .take(3)
33        .map(char::from)
34        .collect::<String>()
35}
36
37#[derive(Parser, Clone, Default)]
38pub struct CommonArgs {
39    #[clap(short = 'd', long, env = FM_TEST_DIR_ENV)]
40    pub test_dir: Option<PathBuf>,
41
42    /// Don't set up new Federation, start from the state in existing
43    /// devimint data dir
44    #[arg(long, env = "FM_SKIP_SETUP")]
45    skip_setup: bool,
46
47    /// Do not set up federation and stop at a pre-dkg stage
48    #[arg(long, env = FM_PRE_DKG_ENV)]
49    pre_dkg: bool,
50
51    /// Number of peers to allocate in every federation
52    #[clap(short = 'n', long, env = FM_FED_SIZE_ENV, default_value = "4")]
53    pub fed_size: usize,
54
55    /// Number of federations to allocate for the test/run
56    #[clap(long, env = FM_NUM_FEDS_ENV, default_value = "1")]
57    pub num_feds: usize,
58
59    #[clap(long, env = FM_LINK_TEST_DIR_ENV)]
60    /// Create a link to the test dir under this path
61    pub link_test_dir: Option<PathBuf>,
62
63    #[clap(long, default_value_t = random_test_dir_suffix())]
64    pub link_test_dir_suffix: String,
65
66    /// Run degraded federation with FM_OFFLINE_NODES shutdown
67    #[clap(long, env = FM_OFFLINE_NODES_ENV, default_value = "0")]
68    pub offline_nodes: usize,
69
70    /// Force a base federations port, e.g. for convenience during dev tasks
71    #[clap(long, env = FM_FEDERATIONS_BASE_PORT_ENV)]
72    pub federations_base_port: Option<u16>,
73}
74
75impl CommonArgs {
76    pub fn mk_test_dir(&self) -> Result<PathBuf> {
77        if self.skip_setup {
78            ensure!(
79                self.test_dir.is_some(),
80                "When using `--skip-setup`, `--test-dir` must be set"
81            );
82        }
83        let path = self.test_dir();
84
85        std::fs::create_dir_all(&path)
86            .with_context(|| format!("Creating tmp directory {}", path.display()))?;
87
88        Ok(path)
89    }
90
91    pub fn test_dir(&self) -> PathBuf {
92        self.test_dir.clone().unwrap_or_else(|| {
93            std::env::temp_dir().join(format!(
94                "devimint-{}-{}",
95                std::process::id(),
96                self.link_test_dir_suffix
97            ))
98        })
99    }
100}
101
102#[derive(Subcommand)]
103pub enum Cmd {
104    /// Spins up bitcoind, electrs, and esplora.
105    ExternalDaemons {
106        #[arg(long, trailing_var_arg = true, allow_hyphen_values = true, num_args=1..)]
107        exec: Option<Vec<ffi::OsString>>,
108    },
109    /// Spins up bitcoind, LDK Gateway, lnd w/ gateway, a faucet, electrs,
110    /// esplora, and a federation sized from FM_FED_SIZE it opens LN channel
111    /// between the two nodes. it connects the gateways to the federation.
112    /// it finally switches to use the LND gateway for LNv1
113    DevFed {
114        #[arg(long, trailing_var_arg = true, allow_hyphen_values = true, num_args=1..)]
115        exec: Option<Vec<ffi::OsString>>,
116    },
117    /// Rpc commands to the long running devimint instance. Could be entry point
118    /// for devimint as a cli
119    #[clap(flatten)]
120    Rpc(RpcCmd),
121}
122
123#[derive(Subcommand)]
124pub enum RpcCmd {
125    Wait,
126    Env,
127}
128
129pub async fn setup(arg: CommonArgs) -> Result<(ProcessManager, TaskGroup)> {
130    let test_dir = &arg.mk_test_dir()?;
131    mkdir(test_dir.clone()).await?;
132    let logs_dir: PathBuf = test_dir.join("logs");
133    mkdir(logs_dir.clone()).await?;
134
135    let log_file = fs::OpenOptions::new()
136        .write(true)
137        .create(true)
138        .append(true)
139        .open(logs_dir.join("devimint.log"))
140        .await?
141        .into_std()
142        .await;
143
144    fedimint_logging::TracingSetup::default()
145        .with_file(Some(log_file))
146        // jsonrpsee is expected to fail during startup
147        .with_directive("jsonrpsee-client=off")
148        .init()?;
149
150    let globals = vars::Global::new(
151        test_dir,
152        arg.num_feds,
153        arg.fed_size,
154        arg.offline_nodes,
155        arg.federations_base_port,
156    )
157    .await?;
158
159    if let Some(link_test_dir) = arg.link_test_dir.as_ref() {
160        update_test_dir_link(link_test_dir, &arg.test_dir()).await?;
161    }
162    info!(target: LOG_DEVIMINT, path=%globals.FM_DATA_DIR.display() , "Devimint data dir");
163
164    let mut env_string = String::new();
165    for (var, value) in globals.vars() {
166        debug!(var, value, "Env variable set");
167        writeln!(env_string, r#"export {var}="{value}""#)?; // hope that value doesn't contain a "
168        // TODO: Audit that the environment access only happens in single-threaded code.
169        unsafe { std::env::set_var(var, value) };
170    }
171    write_overwrite_async(globals.FM_TEST_DIR.join("env"), env_string).await?;
172    let process_mgr = ProcessManager::new(globals);
173    let task_group = TaskGroup::new();
174    task_group.install_kill_handler();
175    Ok((process_mgr, task_group))
176}
177
178pub async fn update_test_dir_link(
179    link_test_dir: &Path,
180    test_dir: &Path,
181) -> Result<(), anyhow::Error> {
182    let make_link = match fs::read_link(link_test_dir).await {
183        Ok(existing) => {
184            if existing == test_dir {
185                false
186            } else {
187                debug!(
188                    old = %existing.display(),
189                    new = %test_dir.display(),
190                    link = %link_test_dir.display(),
191                    "Updating exinst test dir link"
192                );
193
194                fs::remove_file(link_test_dir).await?;
195                true
196            }
197        }
198        _ => true,
199    };
200    if make_link {
201        debug!(src = %test_dir.display(), dst = %link_test_dir.display(), "Linking test dir");
202        fs::symlink(&test_dir, link_test_dir).await?;
203    }
204    Ok(())
205}
206
207pub async fn cleanup_on_exit<T>(
208    main_process: impl futures::Future<Output = Result<T>>,
209    task_group: TaskGroup,
210) -> Result<Option<T>> {
211    match task_group
212        .make_handle()
213        .cancel_on_shutdown(main_process)
214        .await
215    {
216        Err(_) => {
217            info!("Received shutdown signal before finishing main process, exiting early");
218            Ok(None)
219        }
220        Ok(Ok(v)) => {
221            debug!(target: LOG_DEVIMINT, "Main process finished successfully, shutting down task group");
222            task_group
223                .shutdown_join_all(Duration::from_secs(30))
224                .await?;
225
226            // the caller can drop the v after shutdown
227            Ok(Some(v))
228        }
229        Ok(Err(err)) => {
230            warn!(target: LOG_DEVIMINT, err = %err.fmt_compact_anyhow(), "Main process failed, will shutdown");
231            Err(err)
232        }
233    }
234}
235
236pub async fn write_ready_file<T>(global: &vars::Global, result: Result<T>) -> Result<T> {
237    let ready_file = &global.FM_READY_FILE;
238    match result {
239        Ok(_) => write_overwrite_async(ready_file, "READY").await?,
240        Err(_) => write_overwrite_async(ready_file, "ERROR").await?,
241    }
242    result
243}
244
245pub async fn handle_command(cmd: Cmd, common_args: CommonArgs) -> Result<()> {
246    match cmd {
247        Cmd::ExternalDaemons { exec } => {
248            let (process_mgr, task_group) = setup(common_args).await?;
249            let _daemons =
250                write_ready_file(&process_mgr.globals, external_daemons(&process_mgr).await)
251                    .await?;
252            if let Some(exec) = exec {
253                exec_user_command(exec).await?;
254                task_group.shutdown();
255            }
256            task_group.make_handle().make_shutdown_rx().await;
257        }
258        Cmd::DevFed { exec } => {
259            trace!(target: LOG_DEVIMINT, "Starting dev fed");
260            let start_time = Instant::now();
261            let skip_setup = common_args.skip_setup;
262            let pre_dkg = common_args.pre_dkg;
263            let (process_mgr, task_group) = setup(common_args).await?;
264            let main = {
265                let task_group = task_group.clone();
266                async move {
267                    let dev_fed = DevJitFed::new(&process_mgr, skip_setup, pre_dkg)?;
268
269                    let pegin_start_time = Instant::now();
270                    debug!(target: LOG_DEVIMINT, "Peging in client and gateways");
271
272                    if !skip_setup && !pre_dkg {
273                        const GW_PEGIN_AMOUNT: u64 = 1_000_000;
274                        const CLIENT_PEGIN_AMOUNT: u64 = 1_000_000;
275
276                        let (operation_id, (), ()) = tokio::try_join!(
277                            async {
278                                let (address, operation_id) =
279                                    dev_fed.internal_client().await?.get_deposit_addr().await?;
280                                debug!(
281                                    target: LOG_DEVIMINT,
282                                    %address,
283                                    %operation_id,
284                                    "Sending funds to client deposit addr"
285                                );
286                                dev_fed
287                                    .bitcoind()
288                                    .await?
289                                    .send_to(address, CLIENT_PEGIN_AMOUNT)
290                                    .await?;
291                                Ok(operation_id)
292                            },
293                            async {
294                                let address = dev_fed
295                                    .gw_lnd_registered()
296                                    .await?
297                                    .get_pegin_addr(&dev_fed.fed().await?.calculate_federation_id())
298                                    .await?;
299                                debug!(
300                                    target: LOG_DEVIMINT,
301                                    %address,
302                                    "Sending funds to LND deposit addr"
303                                );
304                                dev_fed
305                                    .bitcoind()
306                                    .await?
307                                    .send_to(address, GW_PEGIN_AMOUNT)
308                                    .await
309                                    .map(|_| ())
310                            },
311                            async {
312                                if crate::util::supports_lnv2() {
313                                    let gw_ldk = dev_fed.gw_ldk_connected().await?;
314                                    let address = gw_ldk
315                                        .get_pegin_addr(
316                                            &dev_fed.fed().await?.calculate_federation_id(),
317                                        )
318                                        .await?;
319                                    debug!(
320                                        target: LOG_DEVIMINT,
321                                        %address,
322                                        "Sending funds to LDK deposit addr"
323                                    );
324                                    dev_fed
325                                        .bitcoind()
326                                        .await?
327                                        .send_to(address, GW_PEGIN_AMOUNT)
328                                        .await
329                                        .map(|_| ())
330                                } else {
331                                    Ok(())
332                                }
333                            },
334                        )?;
335
336                        dev_fed.bitcoind().await?.mine_blocks_no_wait(11).await?;
337                        dev_fed
338                            .internal_client()
339                            .await?
340                            .await_deposit(&operation_id)
341                            .await?;
342
343                        info!(
344                            target: LOG_DEVIMINT,
345                            elapsed_ms = %pegin_start_time.elapsed().as_millis(),
346                            "Pegins completed"
347                        );
348                    }
349
350                    if !pre_dkg {
351                        // TODO: Audit that the environment access only happens in single-threaded
352                        // code.
353                        unsafe {
354                            std::env::set_var(
355                                FM_INVITE_CODE_ENV,
356                                dev_fed.fed().await?.invite_code()?,
357                            );
358                        };
359                    }
360
361                    dev_fed.finalize(&process_mgr).await?;
362
363                    let daemons = write_ready_file(&process_mgr.globals, Ok(dev_fed)).await?;
364
365                    info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed().as_millis(), "Devfed ready");
366                    if let Some(exec) = exec {
367                        debug!(target: LOG_DEVIMINT, "Starting exec command");
368                        exec_user_command(exec).await?;
369                        task_group.shutdown();
370                    }
371
372                    debug!(target: LOG_DEVIMINT, "Waiting for group task shutdown");
373                    task_group.make_handle().make_shutdown_rx().await;
374
375                    Ok::<_, anyhow::Error>(daemons)
376                }
377            };
378            if let Some(fed) = cleanup_on_exit(main, task_group).await? {
379                fed.fast_terminate().await;
380            }
381        }
382        Cmd::Rpc(rpc_cmd) => rpc_command(rpc_cmd, common_args).await?,
383    }
384    Ok(())
385}
386
387pub async fn exec_user_command(path: Vec<ffi::OsString>) -> Result<(), anyhow::Error> {
388    let cmd_str = path
389        .join(ffi::OsStr::new(" "))
390        .to_string_lossy()
391        .to_string();
392
393    let path_with_aliases = if let Some(existing_path) = env::var_os("PATH") {
394        let mut path = devimint_static_data_dir();
395        path.push("/aliases:");
396        path.push(existing_path);
397        path
398    } else {
399        let mut path = devimint_static_data_dir();
400        path.push("/aliases");
401        path
402    };
403    debug!(target: LOG_DEVIMINT, cmd = %cmd_str, "Executing user command");
404    if !tokio::process::Command::new(&path[0])
405        .args(&path[1..])
406        .env("PATH", path_with_aliases)
407        .kill_on_drop(true)
408        .status()
409        .await
410        .with_context(|| format!("Executing user command failed: {cmd_str}"))?
411        .success()
412    {
413        error!(cmd = %cmd_str, "User command failed");
414        return Err(anyhow!("User command failed: {cmd_str}"));
415    }
416    Ok(())
417}
418
419fn devimint_static_data_dir() -> ffi::OsString {
420    // If set, use the runtime, otherwise the compile time value
421    env::var_os(FM_DEVIMINT_STATIC_DATA_DIR_ENV).unwrap_or(
422        env!(
423            // Note: consant expression, not allowed, so we can't use the constant :/
424            "FM_DEVIMINT_STATIC_DATA_DIR"
425        )
426        .into(),
427    )
428}
429
430pub async fn rpc_command(rpc: RpcCmd, common: CommonArgs) -> Result<()> {
431    fedimint_logging::TracingSetup::default().init()?;
432    match rpc {
433        RpcCmd::Env => {
434            let env_file = common.test_dir().join("env");
435            poll("env file", || async {
436                if fs::try_exists(&env_file)
437                    .await
438                    .context("env file")
439                    .map_err(ControlFlow::Continue)?
440                {
441                    Ok(())
442                } else {
443                    Err(ControlFlow::Continue(anyhow!("env file not found")))
444                }
445            })
446            .await?;
447            let env = fs::read_to_string(&env_file).await?;
448            print!("{env}");
449            Ok(())
450        }
451        RpcCmd::Wait => {
452            let ready_file = common.test_dir().join("ready");
453            poll("ready file", || async {
454                if fs::try_exists(&ready_file)
455                    .await
456                    .context("ready file")
457                    .map_err(ControlFlow::Continue)?
458                {
459                    Ok(())
460                } else {
461                    Err(ControlFlow::Continue(anyhow!("ready file not found")))
462                }
463            })
464            .await?;
465            let env = fs::read_to_string(&ready_file).await?;
466            print!("{env}");
467
468            // Append invite code to devimint env
469            let test_dir = &common.test_dir();
470            let env_file = test_dir.join("env");
471            let invite_file = test_dir.join("cfg/invite-code");
472            if fs::try_exists(&env_file).await.ok().unwrap_or(false)
473                && fs::try_exists(&invite_file).await.ok().unwrap_or(false)
474            {
475                let invite = fs::read_to_string(&invite_file).await?;
476                let mut env_string = fs::read_to_string(&env_file).await?;
477                writeln!(env_string, r#"export FM_INVITE_CODE="{invite}""#)?;
478                // TODO: Audit that the environment access only happens in single-threaded code.
479                unsafe { std::env::set_var(FM_INVITE_CODE_ENV, invite) };
480                write_overwrite_async(env_file, env_string).await?;
481            }
482
483            Ok(())
484        }
485    }
486}