fedimint_core/task/
waiter.rs

1//! Wait for a task to finish.
2
3use tokio::sync::Semaphore;
4
5/// Helper to wait for actions to be [`Self::done`]
6#[derive(Debug)]
7pub struct Waiter {
8    done_semaphore: Semaphore,
9}
10
11impl Default for Waiter {
12    fn default() -> Self {
13        Self::new()
14    }
15}
16
17impl Waiter {
18    pub fn new() -> Self {
19        Self {
20            // semaphore never has permits.
21            done_semaphore: Semaphore::new(0),
22        }
23    }
24
25    /// Mark this waiter as done.
26    ///
27    /// NOTE: Calling this twice is ignored.
28    pub fn done(&self) {
29        // close the semaphore and notify all waiters.
30        self.done_semaphore.close();
31    }
32
33    /// Wait for [`Self::done`] call.
34    pub async fn wait(&self) {
35        // wait for semaphore to be closed.
36        self.done_semaphore
37            .acquire()
38            .await
39            .expect_err("done semaphore is only closed, never has permits");
40    }
41
42    /// Check if Waiter was marked as done.
43    pub fn is_done(&self) -> bool {
44        self.done_semaphore.is_closed()
45    }
46}
47
48#[cfg(test)]
49mod tests {
50    use std::time::Duration;
51
52    use super::*;
53
54    #[tokio::test]
55    async fn test_simple() {
56        let waiter = Waiter::new();
57        assert!(!waiter.is_done());
58        waiter.done();
59        assert!(waiter.is_done());
60    }
61
62    #[tokio::test]
63    async fn test_async() {
64        let waiter = Waiter::new();
65        assert!(!waiter.is_done());
66        tokio::join!(
67            async {
68                waiter.done();
69            },
70            async {
71                waiter.wait().await;
72            }
73        );
74        assert!(waiter.is_done());
75        waiter.wait().await;
76        assert!(waiter.is_done());
77    }
78    #[tokio::test]
79    async fn test_async_multi() {
80        let waiter = Waiter::new();
81        assert!(!waiter.is_done());
82        tokio::join!(
83            async {
84                waiter.done();
85            },
86            async {
87                waiter.done();
88            },
89            async {
90                waiter.done();
91            },
92        );
93        assert!(waiter.is_done());
94        waiter.wait().await;
95        assert!(waiter.is_done());
96    }
97    #[tokio::test]
98    async fn test_async_sleep() {
99        let waiter = Waiter::new();
100        assert!(!waiter.is_done());
101        tokio::join!(
102            async {
103                fedimint_core::runtime::sleep(Duration::from_millis(10)).await;
104                waiter.done();
105            },
106            waiter.wait(),
107        );
108        assert!(waiter.is_done());
109        waiter.wait().await;
110        assert!(waiter.is_done());
111    }
112}