fedimint_core/util/
update_merge.rs

1use futures::Future;
2use tokio::sync::Mutex;
3
4#[derive(Debug)]
5pub struct UpdateMerge {
6    last_failed: Mutex<bool>,
7}
8
9impl Default for UpdateMerge {
10    fn default() -> Self {
11        Self {
12            last_failed: Mutex::new(false),
13        }
14    }
15}
16impl UpdateMerge {
17    /// Merges concurrent futures execution.
18    ///
19    /// If two `merge` are called concurrently, the calls are merged.
20    /// But if the first call fails, the second call is still run again.
21    ///
22    /// The future `fut` is never executed concurrently.
23    pub async fn merge<E>(&self, fut: impl Future<Output = Result<(), E>>) -> Result<(), E> {
24        let mut guard = if let Ok(guard) = self.last_failed.try_lock() {
25            // not running => run now
26            guard
27        } else {
28            // already running concurrently
29            // wait for other call to return
30            let guard = self.last_failed.lock().await;
31            if *guard {
32                // Last call failed. Run again.
33                guard
34            } else {
35                // Last call completed successfully. Merge the call.
36                return Ok(());
37            }
38        };
39        // future may panic, use mark as failed initially
40        *guard = true;
41        // run the future and save last call status
42        let result = fut.await;
43        *guard = result.is_err();
44        result
45    }
46}
47
48#[cfg(test)]
49mod tests {
50    use futures::future;
51    use tokio::test;
52
53    use super::*;
54
55    #[test]
56    async fn test_merge_successful() {
57        let update_merge = UpdateMerge::default();
58
59        let result: Result<(), ()> = update_merge
60            .merge(async {
61                let _ = future::ready(Ok::<(), ()>(())).await;
62                Ok::<(), ()>(())
63            })
64            .await;
65
66        assert!(result.is_ok(), "Merge should be successful");
67    }
68
69    #[test]
70    async fn test_merge_failed() {
71        let update_merge = UpdateMerge::default();
72
73        let result: Result<(), ()> = update_merge
74            .merge(async {
75                let _ = future::ready(Ok::<(), ()>(())).await;
76                Err::<(), ()>(())
77            })
78            .await;
79
80        assert!(result.is_err(), "Merge should fail");
81    }
82
83    #[tokio::test]
84    async fn test_concurrent_merge() {
85        let update_merge = UpdateMerge::default();
86
87        let fut1 = async {
88            let _ = future::ready(Ok::<(), ()>(())).await;
89            update_merge
90                .merge(async {
91                    let _ = future::ready(Ok::<(), ()>(())).await;
92                    Ok::<(), ()>(())
93                })
94                .await
95        };
96        let fut2 = async {
97            let _ = future::ready(Ok::<(), ()>(())).await;
98            update_merge
99                .merge(async {
100                    let _ = future::ready(Ok::<(), ()>(())).await;
101                    Ok::<(), ()>(())
102                })
103                .await
104        };
105
106        let (result1, result2) = tokio::join!(fut1, fut2);
107
108        assert!(
109            result1.is_ok() && result2.is_ok(),
110            "Both merges should be successful"
111        );
112    }
113}