fedimint_core/task/
jit.rs

1use std::convert::Infallible;
2use std::sync::Arc;
3use std::{fmt, panic};
4
5use fedimint_core::runtime::JoinHandle;
6use fedimint_logging::LOG_TASK;
7use futures::Future;
8use tokio::sync;
9use tracing::warn;
10
11use super::MaybeSend;
12use crate::util::FmtCompact;
13
14pub type Jit<T> = JitCore<T, Infallible>;
15pub type JitTry<T, E> = JitCore<T, E>;
16pub type JitTryAnyhow<T> = JitCore<T, anyhow::Error>;
17
18/// Error that could have been returned before
19///
20/// Newtype over `Option<E>` that allows better user (error conversion mostly)
21/// experience
22#[derive(Debug)]
23pub enum OneTimeError<E> {
24    Original(E),
25    Copy(anyhow::Error),
26}
27
28impl<E> std::error::Error for OneTimeError<E>
29where
30    E: fmt::Debug + fmt::Display,
31{
32    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
33        // In neither case we can preserve the source information
34        None
35    }
36
37    fn cause(&self) -> Option<&dyn std::error::Error> {
38        self.source()
39    }
40}
41
42impl<E> fmt::Display for OneTimeError<E>
43where
44    E: fmt::Display,
45{
46    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47        match self {
48            Self::Original(o) => o.fmt(f),
49            Self::Copy(c) => c.fmt(f),
50        }
51    }
52}
53
54/// A value that initializes eagerly in parallel in a falliable way
55#[derive(Debug)]
56pub struct JitCore<T, E> {
57    inner: Arc<JitInner<T, E>>,
58}
59
60#[derive(Debug)]
61struct JitInner<T, E> {
62    handle: sync::Mutex<JoinHandle<Result<T, E>>>,
63    val: sync::OnceCell<Result<T, String>>,
64}
65
66impl<T, E> Clone for JitCore<T, E>
67where
68    T: Clone,
69{
70    fn clone(&self) -> Self {
71        Self {
72            inner: self.inner.clone(),
73        }
74    }
75}
76impl<T, E> Drop for JitInner<T, E> {
77    fn drop(&mut self) {
78        self.handle.get_mut().abort();
79    }
80}
81impl<T, E> JitCore<T, E>
82where
83    T: MaybeSend + 'static,
84    E: MaybeSend + 'static + fmt::Display,
85{
86    /// Create `JitTry` value, and spawn a future `f` that computes its value
87    ///
88    /// Unlike normal Rust futures, the `f` executes eagerly (is spawned as a
89    /// tokio task).
90    pub fn new_try<Fut>(f: impl FnOnce() -> Fut + 'static + MaybeSend) -> Self
91    where
92        Fut: Future<Output = std::result::Result<T, E>> + 'static + MaybeSend,
93    {
94        let handle = crate::runtime::spawn("jit-value", async { f().await });
95
96        Self {
97            inner: JitInner {
98                handle: handle.into(),
99                val: sync::OnceCell::new(),
100            }
101            .into(),
102        }
103    }
104
105    /// Get the reference to the value, potentially blocking for the
106    /// initialization future to complete
107    pub async fn get_try(&self) -> Result<&T, OneTimeError<E>> {
108        let mut init_error = None;
109        let value = self
110            .inner
111            .val
112            .get_or_init(|| async {
113                let handle: &mut _ = &mut *self.inner.handle.lock().await;
114                match handle.await {
115                        Ok(Ok(o)) => Ok(o),
116                        Ok(Err(err)) => {
117                            let err_str = err.to_string();
118                            init_error = Some(err);
119                            Err(err_str)
120                        },
121                        Err(err) => {
122
123                            #[cfg(not(target_family = "wasm"))]
124                            if err.is_panic() {
125                                warn!(target: LOG_TASK, err = %err.fmt_compact(), type_name = %std::any::type_name::<T>(), "Jit value panicked");
126                                // Resume the panic on the main task
127                                panic::resume_unwind(err.into_panic());
128                            }
129                            #[cfg(not(target_family = "wasm"))]
130                            if err.is_cancelled() {
131                                warn!(target: LOG_TASK, err = %err.fmt_compact(), type_name = %std::any::type_name::<T>(), "Jit value task canceled:");
132                            }
133                            Err(format!("Jit value {} failed unexpectedly with: {}", std::any::type_name::<T>(), err))
134                        },
135                    }
136            })
137            .await;
138        if let Some(err) = init_error {
139            return Err(OneTimeError::Original(err));
140        }
141        value
142            .as_ref()
143            .map_err(|err_str| OneTimeError::Copy(anyhow::Error::msg(err_str.to_owned())))
144    }
145}
146impl<T> JitCore<T, Infallible>
147where
148    T: MaybeSend + 'static,
149{
150    pub fn new<Fut>(f: impl FnOnce() -> Fut + 'static + MaybeSend) -> Self
151    where
152        Fut: Future<Output = T> + 'static + MaybeSend,
153        T: 'static,
154    {
155        Self::new_try(|| async { Ok(f().await) })
156    }
157
158    pub async fn get(&self) -> &T {
159        self.get_try().await.expect("can't fail")
160    }
161}
162#[cfg(test)]
163mod tests;