Skip to main content

devimint/
process_reaper.rs

1use std::sync::{Condvar, LazyLock, Mutex};
2use std::thread;
3use std::time::{Duration, Instant};
4
5use nix::sys::signal::{self, Signal};
6use nix::sys::wait::waitpid;
7use nix::unistd::Pid;
8use tokio::process::Child;
9
10/// How long to wait after SIGTERM before escalating to SIGKILL.
11///
12/// Long enough for iroh-quinn to flush its close frames (avoids the
13/// `untracked_bytes <= segment_size` debug assertion in
14/// iroh-quinn-proto 0.13), short enough that test teardown stays snappy.
15const GRACE_PERIOD: Duration = Duration::from_millis(250);
16
17/// Deferred process reaper.
18///
19/// [`kill_process`] sends SIGTERM and enqueues the pid. A dedicated
20/// background thread owns the escalation to SIGKILL after [`GRACE_PERIOD`]
21/// and the subsequent `waitpid`. [`reap_killed_processes`] blocks until
22/// every enqueued pid has been reaped, so ports and file locks are
23/// guaranteed released before we spawn a replacement.
24///
25/// Doing the graceful SIGTERM→wait→SIGKILL dance off the tokio runtime
26/// lets `Drop` stay fully synchronous while still giving peers a chance
27/// to close QUIC connections cleanly.
28struct Reaper {
29    state: Mutex<Vec<PendingKill>>,
30    cv: Condvar,
31}
32
33struct PendingKill {
34    pid: Pid,
35    enqueued_at: Instant,
36}
37
38static REAPER: LazyLock<&'static Reaper> = LazyLock::new(|| {
39    let reaper: &'static Reaper = Box::leak(Box::new(Reaper {
40        state: Mutex::new(Vec::new()),
41        cv: Condvar::new(),
42    }));
43    thread::Builder::new()
44        .name("devimint-reaper".into())
45        .spawn(move || reaper_loop(reaper))
46        .expect("failed to spawn devimint reaper thread");
47    reaper
48});
49
50fn reaper_loop(reaper: &'static Reaper) -> ! {
51    let mut state = reaper.state.lock().expect("reaper lock");
52    loop {
53        if state.is_empty() {
54            state = reaper.cv.wait(state).expect("reaper cv");
55            continue;
56        }
57
58        let now = Instant::now();
59        let next_deadline = state
60            .iter()
61            .map(|e| e.enqueued_at + GRACE_PERIOD)
62            .min()
63            .expect("non-empty checked above");
64
65        if now < next_deadline {
66            let (s, _) = reaper
67                .cv
68                .wait_timeout(state, next_deadline - now)
69                .expect("reaper cv");
70            state = s;
71            continue;
72        }
73
74        // SIGKILL + reap any entries past their grace deadline.
75        // waitpid after SIGKILL returns in microseconds, so holding
76        // the lock across it is fine.
77        //
78        // Errors are ignored on purpose: SIGTERM may have already killed
79        // the process (ESRCH), or tokio's internal pidfd/SIGCHLD machinery
80        // may have raced us to reap (ECHILD). Both outcomes are fine — we
81        // only care that the process is gone and its zombie is cleared.
82        state.retain(|entry| {
83            if entry.enqueued_at + GRACE_PERIOD <= now {
84                let _ = signal::kill(entry.pid, Signal::SIGKILL);
85                let _ = waitpid(entry.pid, None);
86                false
87            } else {
88                true
89            }
90        });
91
92        if state.is_empty() {
93            reaper.cv.notify_all();
94        }
95    }
96}
97
98pub fn kill_process(child: &Child) {
99    let Some(id) = child.id() else {
100        return;
101    };
102    let pid = Pid::from_raw(id as _);
103    // Send SIGTERM now so the grace period starts immediately, without
104    // waiting for the reaper thread to wake up. `kill()` is non-blocking
105    // (the kernel just queues the signal), so this is safe in Drop.
106    let _ = signal::kill(pid, Signal::SIGTERM);
107
108    let reaper = *REAPER;
109    reaper.state.lock().expect("reaper lock").push(PendingKill {
110        pid,
111        enqueued_at: Instant::now(),
112    });
113    reaper.cv.notify_all();
114}
115
116/// Block until the reaper queue is fully drained.
117///
118/// Note: this waits for *all* currently-enqueued kills, not just the
119/// caller's. That's fine in devimint, where process lifecycle is
120/// single-threaded per test — callers don't enqueue concurrently.
121pub fn reap_killed_processes() {
122    let reaper = *REAPER;
123    let wait = || {
124        let mut state = reaper.state.lock().expect("reaper lock");
125        while !state.is_empty() {
126            state = reaper.cv.wait(state).expect("reaper cv");
127        }
128    };
129
130    // Use `block_in_place` only when we're inside a multi-thread tokio
131    // runtime — it panics on `current_thread` runtimes and is pointless
132    // outside any runtime (Drop at program exit, sync tests).
133    match tokio::runtime::Handle::try_current().map(|h| h.runtime_flavor()) {
134        Ok(tokio::runtime::RuntimeFlavor::MultiThread) => {
135            fedimint_core::runtime::block_in_place(wait);
136        }
137        _ => wait(),
138    }
139}