devimint/process_reaper.rs
1use std::sync::{Condvar, LazyLock, Mutex};
2use std::thread;
3use std::time::{Duration, Instant};
4
5use nix::sys::signal::{self, Signal};
6use nix::sys::wait::waitpid;
7use nix::unistd::Pid;
8use tokio::process::Child;
9
10/// How long to wait after SIGTERM before escalating to SIGKILL.
11///
12/// Long enough for iroh-quinn to flush its close frames (avoids the
13/// `untracked_bytes <= segment_size` debug assertion in
14/// iroh-quinn-proto 0.13), short enough that test teardown stays snappy.
15const GRACE_PERIOD: Duration = Duration::from_millis(250);
16
17/// Deferred process reaper.
18///
19/// [`kill_process`] sends SIGTERM and enqueues the pid. A dedicated
20/// background thread owns the escalation to SIGKILL after [`GRACE_PERIOD`]
21/// and the subsequent `waitpid`. [`reap_killed_processes`] blocks until
22/// every enqueued pid has been reaped, so ports and file locks are
23/// guaranteed released before we spawn a replacement.
24///
25/// Doing the graceful SIGTERM→wait→SIGKILL dance off the tokio runtime
26/// lets `Drop` stay fully synchronous while still giving peers a chance
27/// to close QUIC connections cleanly.
28struct Reaper {
29 state: Mutex<Vec<PendingKill>>,
30 cv: Condvar,
31}
32
33struct PendingKill {
34 pid: Pid,
35 enqueued_at: Instant,
36}
37
38static REAPER: LazyLock<&'static Reaper> = LazyLock::new(|| {
39 let reaper: &'static Reaper = Box::leak(Box::new(Reaper {
40 state: Mutex::new(Vec::new()),
41 cv: Condvar::new(),
42 }));
43 thread::Builder::new()
44 .name("devimint-reaper".into())
45 .spawn(move || reaper_loop(reaper))
46 .expect("failed to spawn devimint reaper thread");
47 reaper
48});
49
50fn reaper_loop(reaper: &'static Reaper) -> ! {
51 let mut state = reaper.state.lock().expect("reaper lock");
52 loop {
53 if state.is_empty() {
54 state = reaper.cv.wait(state).expect("reaper cv");
55 continue;
56 }
57
58 let now = Instant::now();
59 let next_deadline = state
60 .iter()
61 .map(|e| e.enqueued_at + GRACE_PERIOD)
62 .min()
63 .expect("non-empty checked above");
64
65 if now < next_deadline {
66 let (s, _) = reaper
67 .cv
68 .wait_timeout(state, next_deadline - now)
69 .expect("reaper cv");
70 state = s;
71 continue;
72 }
73
74 // SIGKILL + reap any entries past their grace deadline.
75 // waitpid after SIGKILL returns in microseconds, so holding
76 // the lock across it is fine.
77 //
78 // Errors are ignored on purpose: SIGTERM may have already killed
79 // the process (ESRCH), or tokio's internal pidfd/SIGCHLD machinery
80 // may have raced us to reap (ECHILD). Both outcomes are fine — we
81 // only care that the process is gone and its zombie is cleared.
82 state.retain(|entry| {
83 if entry.enqueued_at + GRACE_PERIOD <= now {
84 let _ = signal::kill(entry.pid, Signal::SIGKILL);
85 let _ = waitpid(entry.pid, None);
86 false
87 } else {
88 true
89 }
90 });
91
92 if state.is_empty() {
93 reaper.cv.notify_all();
94 }
95 }
96}
97
98pub fn kill_process(child: &Child) {
99 let Some(id) = child.id() else {
100 return;
101 };
102 let pid = Pid::from_raw(id as _);
103 // Send SIGTERM now so the grace period starts immediately, without
104 // waiting for the reaper thread to wake up. `kill()` is non-blocking
105 // (the kernel just queues the signal), so this is safe in Drop.
106 let _ = signal::kill(pid, Signal::SIGTERM);
107
108 let reaper = *REAPER;
109 reaper.state.lock().expect("reaper lock").push(PendingKill {
110 pid,
111 enqueued_at: Instant::now(),
112 });
113 reaper.cv.notify_all();
114}
115
116/// Block until the reaper queue is fully drained.
117///
118/// Note: this waits for *all* currently-enqueued kills, not just the
119/// caller's. That's fine in devimint, where process lifecycle is
120/// single-threaded per test — callers don't enqueue concurrently.
121pub fn reap_killed_processes() {
122 let reaper = *REAPER;
123 let wait = || {
124 let mut state = reaper.state.lock().expect("reaper lock");
125 while !state.is_empty() {
126 state = reaper.cv.wait(state).expect("reaper cv");
127 }
128 };
129
130 // Use `block_in_place` only when we're inside a multi-thread tokio
131 // runtime — it panics on `current_thread` runtimes and is pointless
132 // outside any runtime (Drop at program exit, sync tests).
133 match tokio::runtime::Handle::try_current().map(|h| h.runtime_flavor()) {
134 Ok(tokio::runtime::RuntimeFlavor::MultiThread) => {
135 fedimint_core::runtime::block_in_place(wait);
136 }
137 _ => wait(),
138 }
139}