Skip to main content

devimint/
cli.rs

1use std::fmt::Write;
2use std::ops::ControlFlow;
3use std::path::{Path, PathBuf};
4use std::time::Duration;
5use std::{env, ffi};
6
7use anyhow::{Context, Result, anyhow, ensure};
8use clap::{Parser, Subcommand};
9use fedimint_core::task::TaskGroup;
10use fedimint_core::util::{FmtCompactAnyhow as _, write_overwrite_async};
11use fedimint_logging::LOG_DEVIMINT;
12use rand::Rng as _;
13use rand::distributions::Alphanumeric;
14use tokio::fs;
15use tokio::time::Instant;
16use tracing::{debug, error, info, trace, warn};
17
18use crate::devfed::DevJitFed;
19use crate::envs::{
20    FM_DEVIMINT_STATIC_DATA_DIR_ENV, FM_FED_SIZE_ENV, FM_FEDERATIONS_BASE_PORT_ENV,
21    FM_GATEWAY_BASE_PORT_ENV, FM_INVITE_CODE_ENV, FM_LINK_TEST_DIR_ENV, FM_NUM_FEDS_ENV,
22    FM_OFFLINE_NODES_ENV, FM_PRE_DKG_ENV, FM_TEST_DIR_ENV,
23};
24use crate::util::{ProcessManager, poll};
25use crate::vars::mkdir;
26use crate::{external_daemons, vars};
27
28fn random_test_dir_suffix() -> String {
29    rand::thread_rng()
30        .sample_iter(&Alphanumeric)
31        .filter(u8::is_ascii_digit)
32        .take(3)
33        .map(char::from)
34        .collect::<String>()
35}
36
37#[derive(Parser, Clone, Default)]
38pub struct CommonArgs {
39    #[clap(short = 'd', long, env = FM_TEST_DIR_ENV)]
40    pub test_dir: Option<PathBuf>,
41
42    /// Don't set up new Federation, start from the state in existing
43    /// devimint data dir
44    #[arg(long, env = "FM_SKIP_SETUP")]
45    skip_setup: bool,
46
47    /// Do not set up federation and stop at a pre-dkg stage
48    #[arg(long, env = FM_PRE_DKG_ENV)]
49    pre_dkg: bool,
50
51    /// Number of peers to allocate in every federation
52    #[clap(short = 'n', long, env = FM_FED_SIZE_ENV, default_value = "4")]
53    pub fed_size: usize,
54
55    /// Number of federations to allocate for the test/run
56    #[clap(long, env = FM_NUM_FEDS_ENV, default_value = "1")]
57    pub num_feds: usize,
58
59    #[clap(long, env = FM_LINK_TEST_DIR_ENV)]
60    /// Create a link to the test dir under this path
61    pub link_test_dir: Option<PathBuf>,
62
63    #[clap(long, default_value_t = random_test_dir_suffix())]
64    pub link_test_dir_suffix: String,
65
66    /// Run degraded federation with FM_OFFLINE_NODES shutdown
67    #[clap(long, env = FM_OFFLINE_NODES_ENV, default_value = "0")]
68    pub offline_nodes: usize,
69
70    /// Force a base federations port, e.g. for convenience during dev tasks
71    #[clap(long, env = FM_FEDERATIONS_BASE_PORT_ENV)]
72    pub federations_base_port: Option<u16>,
73
74    /// Force a base gateway port, e.g. for convenience during dev tasks
75    #[clap(long, env = FM_GATEWAY_BASE_PORT_ENV)]
76    pub gateway_base_port: Option<u16>,
77}
78
79impl CommonArgs {
80    pub fn mk_test_dir(&self) -> Result<PathBuf> {
81        if self.skip_setup {
82            ensure!(
83                self.test_dir.is_some(),
84                "When using `--skip-setup`, `--test-dir` must be set"
85            );
86        }
87        let path = self.test_dir();
88
89        std::fs::create_dir_all(&path)
90            .with_context(|| format!("Creating tmp directory {}", path.display()))?;
91
92        Ok(path)
93    }
94
95    pub fn test_dir(&self) -> PathBuf {
96        self.test_dir.clone().unwrap_or_else(|| {
97            std::env::temp_dir().join(format!(
98                "devimint-{}-{}",
99                std::process::id(),
100                self.link_test_dir_suffix
101            ))
102        })
103    }
104}
105
106#[derive(Subcommand)]
107pub enum Cmd {
108    /// Spins up bitcoind and esplora.
109    ExternalDaemons {
110        #[arg(long, trailing_var_arg = true, allow_hyphen_values = true, num_args=1..)]
111        exec: Option<Vec<ffi::OsString>>,
112    },
113    /// Spins up bitcoind, LDK Gateway, lnd w/ gateway, a faucet,
114    /// esplora, and a federation sized from FM_FED_SIZE it opens LN channel
115    /// between the two nodes. it connects the gateways to the federation.
116    /// it finally switches to use the LND gateway for LNv1
117    DevFed {
118        #[arg(long, trailing_var_arg = true, allow_hyphen_values = true, num_args=1..)]
119        exec: Option<Vec<ffi::OsString>>,
120    },
121    /// Rpc commands to the long running devimint instance. Could be entry point
122    /// for devimint as a cli
123    #[clap(flatten)]
124    Rpc(RpcCmd),
125}
126
127#[derive(Subcommand)]
128pub enum RpcCmd {
129    Wait,
130    Env,
131}
132
133pub async fn setup(arg: CommonArgs) -> Result<(ProcessManager, TaskGroup)> {
134    let test_dir = &arg.mk_test_dir()?;
135    mkdir(test_dir.clone()).await?;
136    let logs_dir: PathBuf = test_dir.join("logs");
137    mkdir(logs_dir.clone()).await?;
138
139    let log_file = fs::OpenOptions::new()
140        .write(true)
141        .create(true)
142        .append(true)
143        .open(logs_dir.join("devimint.log"))
144        .await?
145        .into_std()
146        .await;
147
148    fedimint_logging::TracingSetup::default()
149        .with_file(Some(log_file))
150        // jsonrpsee is expected to fail during startup
151        .with_directive("jsonrpsee-client=off")
152        .init()?;
153
154    let globals = vars::Global::new(
155        test_dir,
156        arg.num_feds,
157        arg.fed_size,
158        arg.offline_nodes,
159        arg.federations_base_port,
160        arg.gateway_base_port,
161    )
162    .await?;
163
164    if let Some(link_test_dir) = arg.link_test_dir.as_ref() {
165        update_test_dir_link(link_test_dir, &arg.test_dir()).await?;
166    }
167    info!(target: LOG_DEVIMINT, path = %globals.FM_DATA_DIR.display(), "Devimint data dir");
168
169    let mut env_string = String::new();
170    for (var, value) in globals.vars() {
171        debug!(var, value, "Env variable set");
172        writeln!(env_string, r#"export {var}="{value}""#)?; // hope that value doesn't contain a "
173        // TODO: Audit that the environment access only happens in single-threaded code.
174        unsafe { std::env::set_var(var, value) };
175    }
176    write_overwrite_async(globals.FM_TEST_DIR.join("env"), env_string).await?;
177    let process_mgr = ProcessManager::new(globals);
178    let task_group = TaskGroup::new();
179    task_group.install_kill_handler();
180    Ok((process_mgr, task_group))
181}
182
183pub async fn update_test_dir_link(
184    link_test_dir: &Path,
185    test_dir: &Path,
186) -> Result<(), anyhow::Error> {
187    let make_link = match fs::read_link(link_test_dir).await {
188        Ok(existing) => {
189            if existing == test_dir {
190                false
191            } else {
192                debug!(
193                    old = %existing.display(),
194                    new = %test_dir.display(),
195                    link = %link_test_dir.display(),
196                    "Updating exinst test dir link"
197                );
198
199                fs::remove_file(link_test_dir).await?;
200                true
201            }
202        }
203        _ => true,
204    };
205    if make_link {
206        debug!(src = %test_dir.display(), dst = %link_test_dir.display(), "Linking test dir");
207        fs::symlink(&test_dir, link_test_dir).await?;
208    }
209    Ok(())
210}
211
212pub async fn cleanup_on_exit<T>(
213    main_process: impl futures::Future<Output = Result<T>>,
214    task_group: TaskGroup,
215) -> Result<Option<T>> {
216    match task_group
217        .make_handle()
218        .cancel_on_shutdown(main_process)
219        .await
220    {
221        Err(_) => {
222            info!("Received shutdown signal before finishing main process, exiting early");
223            Ok(None)
224        }
225        Ok(Ok(v)) => {
226            debug!(target: LOG_DEVIMINT, "Main process finished successfully, shutting down task group");
227            task_group
228                .shutdown_join_all(Duration::from_secs(30))
229                .await?;
230
231            // the caller can drop the v after shutdown
232            Ok(Some(v))
233        }
234        Ok(Err(err)) => {
235            warn!(target: LOG_DEVIMINT, err = %err.fmt_compact_anyhow(), "Main process failed, will shutdown");
236            Err(err)
237        }
238    }
239}
240
241pub async fn write_ready_file<T>(global: &vars::Global, result: Result<T>) -> Result<T> {
242    let ready_file = &global.FM_READY_FILE;
243    match result {
244        Ok(_) => write_overwrite_async(ready_file, "READY").await?,
245        Err(_) => write_overwrite_async(ready_file, "ERROR").await?,
246    }
247    result
248}
249
250pub async fn handle_command(cmd: Cmd, common_args: CommonArgs) -> Result<()> {
251    match cmd {
252        Cmd::ExternalDaemons { exec } => {
253            let (process_mgr, task_group) = setup(common_args).await?;
254            let _daemons =
255                write_ready_file(&process_mgr.globals, external_daemons(&process_mgr).await)
256                    .await?;
257            if let Some(exec) = exec {
258                exec_user_command(exec).await?;
259                task_group.shutdown();
260            }
261            task_group.make_handle().make_shutdown_rx().await;
262        }
263        Cmd::DevFed { exec } => {
264            trace!(target: LOG_DEVIMINT, "Starting dev fed");
265            let start_time = Instant::now();
266            let skip_setup = common_args.skip_setup;
267            let pre_dkg = common_args.pre_dkg;
268            let (process_mgr, task_group) = setup(common_args).await?;
269            let main = {
270                let task_group = task_group.clone();
271                async move {
272                    let dev_fed = DevJitFed::new(&process_mgr, skip_setup, pre_dkg)?;
273
274                    let pegin_start_time = Instant::now();
275                    debug!(target: LOG_DEVIMINT, "Peging in client and gateways");
276
277                    if !skip_setup && !pre_dkg {
278                        const GW_PEGIN_AMOUNT: u64 = 1_000_000;
279                        const CLIENT_PEGIN_AMOUNT: u64 = 1_000_000;
280
281                        let (operation_id, (), ()) = tokio::try_join!(
282                            async {
283                                let (address, operation_id) =
284                                    dev_fed.internal_client().await?.get_deposit_addr().await?;
285                                debug!(
286                                    target: LOG_DEVIMINT,
287                                    %address,
288                                    %operation_id,
289                                    "Sending funds to client deposit addr"
290                                );
291                                dev_fed
292                                    .bitcoind()
293                                    .await?
294                                    .send_to(address, CLIENT_PEGIN_AMOUNT)
295                                    .await?;
296                                Ok(operation_id)
297                            },
298                            async {
299                                let address = dev_fed
300                                    .gw_lnd_registered()
301                                    .await?
302                                    .client()
303                                    .get_pegin_addr(&dev_fed.fed().await?.calculate_federation_id())
304                                    .await?;
305                                debug!(
306                                    target: LOG_DEVIMINT,
307                                    %address,
308                                    "Sending funds to LND deposit addr"
309                                );
310                                dev_fed
311                                    .bitcoind()
312                                    .await?
313                                    .send_to(address, GW_PEGIN_AMOUNT)
314                                    .await
315                                    .map(|_| ())
316                            },
317                            async {
318                                if crate::util::supports_lnv2() {
319                                    let gw_ldk = dev_fed.gw_ldk_connected().await?;
320                                    let address = gw_ldk
321                                        .client()
322                                        .get_pegin_addr(
323                                            &dev_fed.fed().await?.calculate_federation_id(),
324                                        )
325                                        .await?;
326                                    debug!(
327                                        target: LOG_DEVIMINT,
328                                        %address,
329                                        "Sending funds to LDK deposit addr"
330                                    );
331                                    dev_fed
332                                        .bitcoind()
333                                        .await?
334                                        .send_to(address, GW_PEGIN_AMOUNT)
335                                        .await
336                                        .map(|_| ())
337                                } else {
338                                    Ok(())
339                                }
340                            },
341                        )?;
342
343                        dev_fed.bitcoind().await?.mine_blocks_no_wait(11).await?;
344                        if crate::util::supports_wallet_v2() {
345                            dev_fed
346                                .internal_client()
347                                .await?
348                                .await_balance(CLIENT_PEGIN_AMOUNT * 1000 * 9 / 10)
349                                .await?;
350                        } else {
351                            dev_fed
352                                .internal_client()
353                                .await?
354                                .await_deposit(&operation_id)
355                                .await?;
356                        }
357
358                        info!(
359                            target: LOG_DEVIMINT,
360                            elapsed_ms = %pegin_start_time.elapsed().as_millis(),
361                            "Pegins completed"
362                        );
363                    }
364
365                    if !pre_dkg {
366                        // TODO: Audit that the environment access only happens in single-threaded
367                        // code.
368                        unsafe {
369                            std::env::set_var(
370                                FM_INVITE_CODE_ENV,
371                                dev_fed.fed().await?.invite_code()?,
372                            );
373                        };
374                    }
375
376                    dev_fed.finalize(&process_mgr).await?;
377
378                    let daemons = write_ready_file(&process_mgr.globals, Ok(dev_fed)).await?;
379
380                    info!(
381                        target: LOG_DEVIMINT,
382                        elapsed_ms = %start_time.elapsed().as_millis(),
383                        path = %process_mgr.globals.FM_DATA_DIR.display(),
384                        "Devfed ready"
385                    );
386                    if let Some(exec) = exec {
387                        debug!(target: LOG_DEVIMINT, "Starting exec command");
388                        exec_user_command(exec).await?;
389                        task_group.shutdown();
390                    }
391
392                    debug!(target: LOG_DEVIMINT, "Waiting for group task shutdown");
393                    task_group.make_handle().make_shutdown_rx().await;
394
395                    Ok::<_, anyhow::Error>(daemons)
396                }
397            };
398            if let Some(fed) = cleanup_on_exit(main, task_group).await? {
399                fed.fast_terminate().await;
400            }
401        }
402        Cmd::Rpc(rpc_cmd) => rpc_command(rpc_cmd, common_args).await?,
403    }
404    Ok(())
405}
406
407pub async fn exec_user_command(path: Vec<ffi::OsString>) -> Result<(), anyhow::Error> {
408    let cmd_str = path
409        .join(ffi::OsStr::new(" "))
410        .to_string_lossy()
411        .to_string();
412
413    let path_with_aliases = if let Some(existing_path) = env::var_os("PATH") {
414        let mut path = devimint_static_data_dir();
415        path.push("/aliases:");
416        path.push(existing_path);
417        path
418    } else {
419        let mut path = devimint_static_data_dir();
420        path.push("/aliases");
421        path
422    };
423    debug!(target: LOG_DEVIMINT, cmd = %cmd_str, "Executing user command");
424    if !tokio::process::Command::new(&path[0])
425        .args(&path[1..])
426        .env("PATH", path_with_aliases)
427        .kill_on_drop(true)
428        .status()
429        .await
430        .with_context(|| format!("Executing user command failed: {cmd_str}"))?
431        .success()
432    {
433        error!(cmd = %cmd_str, "User command failed");
434        return Err(anyhow!("User command failed: {cmd_str}"));
435    }
436    Ok(())
437}
438
439fn devimint_static_data_dir() -> ffi::OsString {
440    // If set, use the runtime, otherwise the compile time value
441    env::var_os(FM_DEVIMINT_STATIC_DATA_DIR_ENV).unwrap_or(
442        env!(
443            // Note: constant expression, not allowed, so we can't use the constant :/
444            "FM_DEVIMINT_STATIC_DATA_DIR"
445        )
446        .into(),
447    )
448}
449
450pub async fn rpc_command(rpc: RpcCmd, common: CommonArgs) -> Result<()> {
451    fedimint_logging::TracingSetup::default().init()?;
452    match rpc {
453        RpcCmd::Env => {
454            let env_file = common.test_dir().join("env");
455            poll("env file", || async {
456                if fs::try_exists(&env_file)
457                    .await
458                    .context("env file")
459                    .map_err(ControlFlow::Continue)?
460                {
461                    Ok(())
462                } else {
463                    Err(ControlFlow::Continue(anyhow!("env file not found")))
464                }
465            })
466            .await?;
467            let env = fs::read_to_string(&env_file).await?;
468            print!("{env}");
469            Ok(())
470        }
471        RpcCmd::Wait => {
472            let ready_file = common.test_dir().join("ready");
473            poll("ready file", || async {
474                if fs::try_exists(&ready_file)
475                    .await
476                    .context("ready file")
477                    .map_err(ControlFlow::Continue)?
478                {
479                    Ok(())
480                } else {
481                    Err(ControlFlow::Continue(anyhow!("ready file not found")))
482                }
483            })
484            .await?;
485            let env = fs::read_to_string(&ready_file).await?;
486            print!("{env}");
487
488            // Append invite code to devimint env
489            let test_dir = &common.test_dir();
490            let env_file = test_dir.join("env");
491            let invite_file = test_dir.join("cfg/invite-code");
492            if fs::try_exists(&env_file).await.ok().unwrap_or(false)
493                && fs::try_exists(&invite_file).await.ok().unwrap_or(false)
494            {
495                let invite = fs::read_to_string(&invite_file).await?;
496                let mut env_string = fs::read_to_string(&env_file).await?;
497                writeln!(env_string, r#"export FM_INVITE_CODE="{invite}""#)?;
498                // TODO: Audit that the environment access only happens in single-threaded code.
499                unsafe { std::env::set_var(FM_INVITE_CODE_ENV, invite) };
500                write_overwrite_async(env_file, env_string).await?;
501            }
502
503            Ok(())
504        }
505    }
506}