devimint/
cli.rs

1use std::fmt::Write;
2use std::ops::ControlFlow;
3use std::path::{Path, PathBuf};
4use std::time::Duration;
5use std::{env, ffi};
6
7use anyhow::{Context, Result, anyhow, ensure};
8use clap::{Args, Parser, Subcommand};
9use fedimint_core::task::TaskGroup;
10use fedimint_core::util::{FmtCompactAnyhow as _, write_overwrite_async};
11use fedimint_logging::LOG_DEVIMINT;
12use rand::Rng as _;
13use rand::distributions::Alphanumeric;
14use tokio::fs;
15use tokio::net::TcpStream;
16use tokio::time::Instant;
17use tracing::{debug, error, info, trace, warn};
18
19use crate::devfed::DevJitFed;
20use crate::envs::{
21    FM_BITCOIN_RPC_URL_ENV, FM_CLN_SOCKET_ENV, FM_DEVIMINT_STATIC_DATA_DIR_ENV,
22    FM_FAUCET_BIND_ADDR_ENV, FM_FED_SIZE_ENV, FM_INVITE_CODE_ENV, FM_LINK_TEST_DIR_ENV,
23    FM_NUM_FEDS_ENV, FM_OFFLINE_NODES_ENV, FM_PORT_GW_LND_ENV, FM_TEST_DIR_ENV,
24};
25use crate::federation::Fedimintd;
26use crate::util::{ProcessManager, poll};
27use crate::vars::mkdir;
28use crate::{ExternalDaemons, external_daemons, vars};
29
30fn random_test_dir_suffix() -> String {
31    rand::thread_rng()
32        .sample_iter(&Alphanumeric)
33        .filter(u8::is_ascii_digit)
34        .take(3)
35        .map(char::from)
36        .collect::<String>()
37}
38
39#[derive(Parser, Clone, Default)]
40pub struct CommonArgs {
41    #[clap(short = 'd', long, env = FM_TEST_DIR_ENV)]
42    pub test_dir: Option<PathBuf>,
43
44    /// Don't set up new Federation, start from the state in existing
45    /// devimint data dir
46    #[arg(long, env = "FM_SKIP_SETUP")]
47    skip_setup: bool,
48
49    /// Number of peers to allocate in every federation
50    #[clap(short = 'n', long, env = FM_FED_SIZE_ENV, default_value = "4")]
51    pub fed_size: usize,
52
53    /// Number of federations to allocate for the test/run
54    #[clap(long, env = FM_NUM_FEDS_ENV, default_value = "1")]
55    pub num_feds: usize,
56
57    #[clap(long, env = FM_LINK_TEST_DIR_ENV)]
58    /// Create a link to the test dir under this path
59    pub link_test_dir: Option<PathBuf>,
60
61    #[clap(long, default_value_t = random_test_dir_suffix())]
62    pub link_test_dir_suffix: String,
63
64    /// Run degraded federation with FM_OFFLINE_NODES shutdown
65    #[clap(long, env = FM_OFFLINE_NODES_ENV, default_value = "0")]
66    pub offline_nodes: usize,
67}
68
69impl CommonArgs {
70    pub fn mk_test_dir(&self) -> Result<PathBuf> {
71        if self.skip_setup {
72            ensure!(
73                self.test_dir.is_some(),
74                "When using `--skip-setup`, `--test-dir` must be set"
75            );
76        }
77        let path = self.test_dir();
78
79        std::fs::create_dir_all(&path)
80            .with_context(|| format!("Creating tmp directory {}", path.display()))?;
81
82        Ok(path)
83    }
84
85    pub fn test_dir(&self) -> PathBuf {
86        self.test_dir.clone().unwrap_or_else(|| {
87            std::env::temp_dir().join(format!(
88                "devimint-{}-{}",
89                std::process::id(),
90                self.link_test_dir_suffix
91            ))
92        })
93    }
94}
95
96#[derive(Subcommand)]
97pub enum Cmd {
98    /// Spins up bitcoind, cln, lnd, electrs, esplora, and opens a channel
99    /// between the two lightning nodes
100    ExternalDaemons {
101        #[arg(long, trailing_var_arg = true, allow_hyphen_values = true, num_args=1..)]
102        exec: Option<Vec<ffi::OsString>>,
103    },
104    /// Spins up bitcoind, cln, lnd w/ gateway, a faucet, electrs,
105    /// esplora, and a federation sized from FM_FED_SIZE it opens LN channel
106    /// between the two nodes. it connects the gateways to the federation.
107    /// it finally switches to use the LND gateway for LNv1
108    DevFed {
109        #[arg(long, trailing_var_arg = true, allow_hyphen_values = true, num_args=1..)]
110        exec: Option<Vec<ffi::OsString>>,
111    },
112    /// Runs bitcoind, spins up FM_FED_SIZE worth of fedimints
113    RunUi,
114    /// Rpc commands to the long running devimint instance. Could be entry point
115    /// for devimint as a cli
116    #[clap(flatten)]
117    Rpc(RpcCmd),
118
119    /// Start built-in faucet (in the past called `devimint-faucet`)
120    Faucet(FaucetOpts),
121}
122
123#[derive(Args, Debug, Clone)]
124pub struct FaucetOpts {
125    #[clap(long, env = FM_FAUCET_BIND_ADDR_ENV)]
126    pub bind_addr: String,
127    #[clap(long, env = FM_BITCOIN_RPC_URL_ENV)]
128    pub bitcoind_rpc: String,
129    #[clap(long, env = FM_CLN_SOCKET_ENV)]
130    pub cln_socket: String,
131    #[clap(long, env = FM_PORT_GW_LND_ENV)]
132    pub gw_lnd_port: u16,
133    #[clap(long, env = FM_INVITE_CODE_ENV)]
134    pub invite_code: Option<String>,
135}
136
137#[derive(Subcommand)]
138pub enum RpcCmd {
139    Wait,
140    Env,
141}
142
143pub async fn setup(arg: CommonArgs) -> Result<(ProcessManager, TaskGroup)> {
144    let test_dir = &arg.mk_test_dir()?;
145    mkdir(test_dir.clone()).await?;
146    let logs_dir: PathBuf = test_dir.join("logs");
147    mkdir(logs_dir.clone()).await?;
148
149    let log_file = fs::OpenOptions::new()
150        .write(true)
151        .create(true)
152        .append(true)
153        .open(logs_dir.join("devimint.log"))
154        .await?
155        .into_std()
156        .await;
157
158    fedimint_logging::TracingSetup::default()
159        .with_file(Some(log_file))
160        // jsonrpsee is expected to fail during startup
161        .with_directive("jsonrpsee-client=off")
162        .init()?;
163
164    let globals =
165        vars::Global::new(test_dir, arg.num_feds, arg.fed_size, arg.offline_nodes).await?;
166
167    if let Some(link_test_dir) = arg.link_test_dir.as_ref() {
168        update_test_dir_link(link_test_dir, &arg.test_dir()).await?;
169    }
170    info!(target: LOG_DEVIMINT, path=%globals.FM_DATA_DIR.display() , "Devimint data dir");
171
172    let mut env_string = String::new();
173    for (var, value) in globals.vars() {
174        debug!(var, value, "Env variable set");
175        writeln!(env_string, r#"export {var}="{value}""#)?; // hope that value doesn't contain a "
176        // TODO: Audit that the environment access only happens in single-threaded code.
177        unsafe { std::env::set_var(var, value) };
178    }
179    write_overwrite_async(globals.FM_TEST_DIR.join("env"), env_string).await?;
180    let process_mgr = ProcessManager::new(globals);
181    let task_group = TaskGroup::new();
182    task_group.install_kill_handler();
183    Ok((process_mgr, task_group))
184}
185
186pub async fn update_test_dir_link(
187    link_test_dir: &Path,
188    test_dir: &Path,
189) -> Result<(), anyhow::Error> {
190    let make_link = match fs::read_link(link_test_dir).await {
191        Ok(existing) => {
192            if existing == test_dir {
193                false
194            } else {
195                debug!(
196                    old = %existing.display(),
197                    new = %test_dir.display(),
198                    link = %link_test_dir.display(),
199                    "Updating exinst test dir link"
200                );
201
202                fs::remove_file(link_test_dir).await?;
203                true
204            }
205        }
206        _ => true,
207    };
208    if make_link {
209        debug!(src = %test_dir.display(), dst = %link_test_dir.display(), "Linking test dir");
210        fs::symlink(&test_dir, link_test_dir).await?;
211    }
212    Ok(())
213}
214
215pub async fn cleanup_on_exit<T>(
216    main_process: impl futures::Future<Output = Result<T>>,
217    task_group: TaskGroup,
218) -> Result<Option<T>> {
219    match task_group
220        .make_handle()
221        .cancel_on_shutdown(main_process)
222        .await
223    {
224        Err(_) => {
225            info!("Received shutdown signal before finishing main process, exiting early");
226            Ok(None)
227        }
228        Ok(Ok(v)) => {
229            debug!(target: LOG_DEVIMINT, "Main process finished successfully, shutting down task group");
230            task_group
231                .shutdown_join_all(Duration::from_secs(30))
232                .await?;
233
234            // the caller can drop the v after shutdown
235            Ok(Some(v))
236        }
237        Ok(Err(err)) => {
238            warn!(target: LOG_DEVIMINT, err = %err.fmt_compact_anyhow(), "Main process failed, will shutdown");
239            Err(err)
240        }
241    }
242}
243
244pub async fn write_ready_file<T>(global: &vars::Global, result: Result<T>) -> Result<T> {
245    let ready_file = &global.FM_READY_FILE;
246    match result {
247        Ok(_) => write_overwrite_async(ready_file, "READY").await?,
248        Err(_) => write_overwrite_async(ready_file, "ERROR").await?,
249    }
250    result
251}
252
253pub async fn handle_command(cmd: Cmd, common_args: CommonArgs) -> Result<()> {
254    match cmd {
255        Cmd::ExternalDaemons { exec } => {
256            let (process_mgr, task_group) = setup(common_args).await?;
257            let _daemons =
258                write_ready_file(&process_mgr.globals, external_daemons(&process_mgr).await)
259                    .await?;
260            if let Some(exec) = exec {
261                exec_user_command(exec).await?;
262                task_group.shutdown();
263            }
264            task_group.make_handle().make_shutdown_rx().await;
265        }
266        Cmd::DevFed { exec } => {
267            trace!(target: LOG_DEVIMINT, "Starting dev fed");
268            let start_time = Instant::now();
269            let skip_setup = common_args.skip_setup;
270            let (process_mgr, task_group) = setup(common_args).await?;
271            let main = {
272                let task_group = task_group.clone();
273                async move {
274                    let dev_fed = DevJitFed::new(&process_mgr, skip_setup)?;
275
276                    let pegin_start_time = Instant::now();
277                    debug!(target: LOG_DEVIMINT, "Peging in client and gateways");
278
279                    if !skip_setup {
280                        const GW_PEGIN_AMOUNT: u64 = 1_000_000;
281                        const CLIENT_PEGIN_AMOUNT: u64 = 1_000_000;
282
283                        let (operation_id, (), ()) = tokio::try_join!(
284                            async {
285                                let (address, operation_id) =
286                                    dev_fed.internal_client().await?.get_deposit_addr().await?;
287                                debug!(
288                                    target: LOG_DEVIMINT,
289                                    %address,
290                                    %operation_id,
291                                    "Sending funds to client deposit addr"
292                                );
293                                dev_fed
294                                    .bitcoind()
295                                    .await?
296                                    .send_to(address, CLIENT_PEGIN_AMOUNT)
297                                    .await?;
298                                Ok(operation_id)
299                            },
300                            async {
301                                let address = dev_fed
302                                    .gw_lnd_registered()
303                                    .await?
304                                    .get_pegin_addr(&dev_fed.fed().await?.calculate_federation_id())
305                                    .await?;
306                                debug!(
307                                    target: LOG_DEVIMINT,
308                                    %address,
309                                    "Sending funds to LND deposit addr"
310                                );
311                                dev_fed
312                                    .bitcoind()
313                                    .await?
314                                    .send_to(address, GW_PEGIN_AMOUNT)
315                                    .await
316                                    .map(|_| ())
317                            },
318                            async {
319                                if crate::util::supports_lnv2() {
320                                    let gw_ldk = dev_fed.gw_ldk_connected().await?;
321                                    let address = gw_ldk
322                                        .get_pegin_addr(
323                                            &dev_fed.fed().await?.calculate_federation_id(),
324                                        )
325                                        .await?;
326                                    debug!(
327                                        target: LOG_DEVIMINT,
328                                        %address,
329                                        "Sending funds to LDK deposit addr"
330                                    );
331                                    dev_fed
332                                        .bitcoind()
333                                        .await?
334                                        .send_to(address, GW_PEGIN_AMOUNT)
335                                        .await
336                                        .map(|_| ())
337                                } else {
338                                    Ok(())
339                                }
340                            },
341                        )?;
342
343                        dev_fed.bitcoind().await?.mine_blocks_no_wait(11).await?;
344                        dev_fed
345                            .internal_client()
346                            .await?
347                            .await_deposit(&operation_id)
348                            .await?;
349
350                        info!(
351                            target: LOG_DEVIMINT,
352                            elapsed_ms = %pegin_start_time.elapsed().as_millis(),
353                            "Pegins completed"
354                        );
355                    }
356
357                    // TODO: Audit that the environment access only happens in single-threaded code.
358                    unsafe {
359                        std::env::set_var(FM_INVITE_CODE_ENV, dev_fed.fed().await?.invite_code()?);
360                    };
361
362                    dev_fed.finalize(&process_mgr).await?;
363
364                    let daemons = write_ready_file(&process_mgr.globals, Ok(dev_fed)).await?;
365
366                    info!(target: LOG_DEVIMINT, elapsed_ms = %start_time.elapsed().as_millis(), "Devfed ready");
367                    if let Some(exec) = exec {
368                        debug!(target: LOG_DEVIMINT, "Starting exec command");
369                        exec_user_command(exec).await?;
370                        task_group.shutdown();
371                    }
372
373                    debug!(target: LOG_DEVIMINT, "Waiting for group task shutdown");
374                    task_group.make_handle().make_shutdown_rx().await;
375
376                    Ok::<_, anyhow::Error>(daemons)
377                }
378            };
379            if let Some(fed) = cleanup_on_exit(main, task_group).await? {
380                fed.fast_terminate().await;
381            }
382        }
383        Cmd::Rpc(rpc_cmd) => rpc_command(rpc_cmd, common_args).await?,
384        Cmd::RunUi => {
385            let (process_mgr, task_group) = setup(common_args).await?;
386            let task_group_clone = task_group.clone();
387            let main = async {
388                let result = run_ui(&process_mgr).await;
389                let daemons = write_ready_file(&process_mgr.globals, result).await?;
390
391                debug!(target: LOG_DEVIMINT, "Waiting for group task shutdown");
392                task_group_clone.make_handle().make_shutdown_rx().await;
393
394                Ok::<_, anyhow::Error>(daemons)
395            };
396            Box::pin(cleanup_on_exit(main, task_group)).await?;
397        }
398        Cmd::Faucet(opts) => crate::faucet::run(opts).await?,
399    }
400    Ok(())
401}
402
403pub async fn exec_user_command(path: Vec<ffi::OsString>) -> Result<(), anyhow::Error> {
404    let cmd_str = path
405        .join(ffi::OsStr::new(" "))
406        .to_string_lossy()
407        .to_string();
408
409    let path_with_aliases = if let Some(existing_path) = env::var_os("PATH") {
410        let mut path = devimint_static_data_dir();
411        path.push("/aliases:");
412        path.push(existing_path);
413        path
414    } else {
415        let mut path = devimint_static_data_dir();
416        path.push("/aliases");
417        path
418    };
419    debug!(target: LOG_DEVIMINT, cmd = %cmd_str, "Executing user command");
420    if !tokio::process::Command::new(&path[0])
421        .args(&path[1..])
422        .env("PATH", path_with_aliases)
423        .kill_on_drop(true)
424        .status()
425        .await
426        .with_context(|| format!("Executing user command failed: {cmd_str}"))?
427        .success()
428    {
429        error!(cmd = %cmd_str, "User command failed");
430        return Err(anyhow!("User command failed: {cmd_str}"));
431    }
432    Ok(())
433}
434
435fn devimint_static_data_dir() -> ffi::OsString {
436    // If set, use the runtime, otherwise the compile time value
437    env::var_os(FM_DEVIMINT_STATIC_DATA_DIR_ENV).unwrap_or(
438        env!(
439            // Note: consant expression, not allowed, so we can't use the constant :/
440            "FM_DEVIMINT_STATIC_DATA_DIR"
441        )
442        .into(),
443    )
444}
445
446pub async fn rpc_command(rpc: RpcCmd, common: CommonArgs) -> Result<()> {
447    fedimint_logging::TracingSetup::default().init()?;
448    match rpc {
449        RpcCmd::Env => {
450            let env_file = common.test_dir().join("env");
451            poll("env file", || async {
452                if fs::try_exists(&env_file)
453                    .await
454                    .context("env file")
455                    .map_err(ControlFlow::Continue)?
456                {
457                    Ok(())
458                } else {
459                    Err(ControlFlow::Continue(anyhow!("env file not found")))
460                }
461            })
462            .await?;
463            let env = fs::read_to_string(&env_file).await?;
464            print!("{env}");
465            Ok(())
466        }
467        RpcCmd::Wait => {
468            let ready_file = common.test_dir().join("ready");
469            poll("ready file", || async {
470                if fs::try_exists(&ready_file)
471                    .await
472                    .context("ready file")
473                    .map_err(ControlFlow::Continue)?
474                {
475                    Ok(())
476                } else {
477                    Err(ControlFlow::Continue(anyhow!("ready file not found")))
478                }
479            })
480            .await?;
481            let env = fs::read_to_string(&ready_file).await?;
482            print!("{env}");
483
484            // Append invite code to devimint env
485            let test_dir = &common.test_dir();
486            let env_file = test_dir.join("env");
487            let invite_file = test_dir.join("cfg/invite-code");
488            if fs::try_exists(&env_file).await.ok().unwrap_or(false)
489                && fs::try_exists(&invite_file).await.ok().unwrap_or(false)
490            {
491                let invite = fs::read_to_string(&invite_file).await?;
492                let mut env_string = fs::read_to_string(&env_file).await?;
493                writeln!(env_string, r#"export FM_INVITE_CODE="{invite}""#)?;
494                // TODO: Audit that the environment access only happens in single-threaded code.
495                unsafe { std::env::set_var(FM_INVITE_CODE_ENV, invite) };
496                write_overwrite_async(env_file, env_string).await?;
497            }
498
499            Ok(())
500        }
501    }
502}
503
504async fn run_ui(process_mgr: &ProcessManager) -> Result<(Vec<Fedimintd>, ExternalDaemons)> {
505    let externals = external_daemons(process_mgr).await?;
506    let fed_size = process_mgr.globals.FM_FED_SIZE;
507    let fedimintds = futures::future::try_join_all((0..fed_size).map(|peer| {
508        let bitcoind = externals.bitcoind.clone();
509        async move {
510            let peer_port = 10000 + 8137 + peer * 2;
511            let api_port = peer_port + 1;
512            let metrics_port = 3510 + peer;
513
514            // TODO: we should use this, and override ports that need to be fixed by an env
515            // var and/or just scap `run-ui` altogether
516            // let peer_id = PeerId::new(peer as u16);
517            // let peer_env_vars = vars::Fedimintd::init(
518            //     &process_mgr.globals,
519            //     "ui-test-federation".to_string(),
520            //     peer_id,
521            //     process_mgr.globals.fedimintd_overrides.peer_expect(peer_id),
522            // )
523            // .await?;
524            let vars = vars::Fedimintd {
525                FM_BIND_P2P: format!("127.0.0.1:{peer_port}"),
526                FM_P2P_URL: format!("fedimint://127.0.0.1:{peer_port}"),
527                FM_BIND_API: format!("127.0.0.1:{api_port}"),
528                FM_API_URL: format!("ws://127.0.0.1:{api_port}"),
529                FM_DATA_DIR: process_mgr
530                    .globals
531                    .FM_DATA_DIR
532                    .join(format!("fedimintd-{peer}")),
533                FM_BIND_METRICS_API: format!("127.0.0.1:{metrics_port}"),
534                FM_FORCE_BITCOIN_RPC_URL: format!(
535                    "http://bitcoin:bitcoin@127.0.0.1:{}",
536                    process_mgr.globals.FM_PORT_BTC_RPC
537                ),
538                FM_FORCE_BITCOIN_RPC_KIND: "bitcoind".into(),
539                FM_IROH_P2P_SECRET_KEY_OVERRIDE: String::new(),
540                FM_IROH_API_SECRET_KEY_OVERRIDE: String::new(),
541            };
542            let fm = Fedimintd::new(
543                process_mgr,
544                bitcoind.clone(),
545                peer,
546                &vars,
547                "default".to_string(),
548            )
549            .await?;
550            let server_addr = &vars.FM_BIND_API;
551
552            poll("waiting for api startup", || async {
553                TcpStream::connect(server_addr)
554                    .await
555                    .context("connect to api")
556                    .map_err(ControlFlow::Continue)
557            })
558            .await?;
559
560            anyhow::Ok(fm)
561        }
562    }))
563    .await?;
564
565    Ok((fedimintds, externals))
566}