fedimint_core/task/
jit.rs

1use std::convert::Infallible;
2use std::fmt;
3#[cfg(not(target_family = "wasm"))]
4use std::panic;
5use std::sync::Arc;
6
7use fedimint_core::runtime::JoinHandle;
8use fedimint_logging::LOG_TASK;
9use futures::Future;
10use tokio::sync;
11use tracing::warn;
12
13use super::MaybeSend;
14use crate::util::FmtCompact;
15
16pub type Jit<T> = JitCore<T, Infallible>;
17pub type JitTry<T, E> = JitCore<T, E>;
18pub type JitTryAnyhow<T> = JitCore<T, anyhow::Error>;
19
20/// Error that could have been returned before
21///
22/// Newtype over `Option<E>` that allows better user (error conversion mostly)
23/// experience
24#[derive(Debug)]
25pub enum OneTimeError<E> {
26    Original(E),
27    Copy(anyhow::Error),
28}
29
30impl<E> std::error::Error for OneTimeError<E>
31where
32    E: fmt::Debug + fmt::Display,
33{
34    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
35        // In neither case we can preserve the source information
36        None
37    }
38
39    fn cause(&self) -> Option<&dyn std::error::Error> {
40        self.source()
41    }
42}
43
44impl<E> fmt::Display for OneTimeError<E>
45where
46    E: fmt::Display,
47{
48    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49        match self {
50            Self::Original(o) => o.fmt(f),
51            Self::Copy(c) => c.fmt(f),
52        }
53    }
54}
55
56/// A value that initializes eagerly in parallel in a falliable way
57#[derive(Debug)]
58pub struct JitCore<T, E> {
59    inner: Arc<JitInner<T, E>>,
60}
61
62#[derive(Debug)]
63struct JitInner<T, E> {
64    handle: sync::Mutex<JoinHandle<Result<T, E>>>,
65    val: sync::OnceCell<Result<T, String>>,
66}
67
68impl<T, E> Clone for JitCore<T, E>
69where
70    T: Clone,
71{
72    fn clone(&self) -> Self {
73        Self {
74            inner: self.inner.clone(),
75        }
76    }
77}
78impl<T, E> Drop for JitInner<T, E> {
79    fn drop(&mut self) {
80        self.handle.get_mut().abort();
81    }
82}
83impl<T, E> JitCore<T, E>
84where
85    T: MaybeSend + 'static,
86    E: MaybeSend + 'static + fmt::Display,
87{
88    /// Create `JitTry` value, and spawn a future `f` that computes its value
89    ///
90    /// Unlike normal Rust futures, the `f` executes eagerly (is spawned as a
91    /// tokio task).
92    pub fn new_try<Fut>(f: impl FnOnce() -> Fut + 'static + MaybeSend) -> Self
93    where
94        Fut: Future<Output = std::result::Result<T, E>> + 'static + MaybeSend,
95    {
96        let handle = crate::runtime::spawn("jit-value", async { f().await });
97
98        Self {
99            inner: JitInner {
100                handle: handle.into(),
101                val: sync::OnceCell::new(),
102            }
103            .into(),
104        }
105    }
106
107    /// Get the reference to the value, potentially blocking for the
108    /// initialization future to complete
109    pub async fn get_try(&self) -> Result<&T, OneTimeError<E>> {
110        let mut init_error = None;
111        let value = self
112            .inner
113            .val
114            .get_or_init(|| async {
115                let handle: &mut _ = &mut *self.inner.handle.lock().await;
116                match handle.await {
117                        Ok(Ok(o)) => Ok(o),
118                        Ok(Err(err)) => {
119                            let err_str = err.to_string();
120                            init_error = Some(err);
121                            Err(err_str)
122                        },
123                        Err(err) => {
124
125                            #[cfg(not(target_family = "wasm"))]
126                            if err.is_panic() {
127                                warn!(target: LOG_TASK, err = %err.fmt_compact(), type_name = %std::any::type_name::<T>(), "Jit value panicked");
128                                // Resume the panic on the main task
129                                panic::resume_unwind(err.into_panic());
130                            }
131                            #[cfg(not(target_family = "wasm"))]
132                            if err.is_cancelled() {
133                                warn!(target: LOG_TASK, err = %err.fmt_compact(), type_name = %std::any::type_name::<T>(), "Jit value task canceled:");
134                            }
135                            Err(format!("Jit value {} failed unexpectedly with: {}", std::any::type_name::<T>(), err))
136                        },
137                    }
138            })
139            .await;
140        if let Some(err) = init_error {
141            return Err(OneTimeError::Original(err));
142        }
143        value
144            .as_ref()
145            .map_err(|err_str| OneTimeError::Copy(anyhow::Error::msg(err_str.to_owned())))
146    }
147}
148impl<T> JitCore<T, Infallible>
149where
150    T: MaybeSend + 'static,
151{
152    pub fn new<Fut>(f: impl FnOnce() -> Fut + 'static + MaybeSend) -> Self
153    where
154        Fut: Future<Output = T> + 'static + MaybeSend,
155        T: 'static,
156    {
157        Self::new_try(|| async { Ok(f().await) })
158    }
159
160    pub async fn get(&self) -> &T {
161        self.get_try().await.expect("can't fail")
162    }
163}
164#[cfg(test)]
165mod tests;