fedimint_core/task/
jit.rs1use std::convert::Infallible;
2use std::fmt;
3#[cfg(not(target_family = "wasm"))]
4use std::panic;
5use std::sync::Arc;
6
7use fedimint_core::runtime::JoinHandle;
8use fedimint_logging::LOG_TASK;
9use futures::Future;
10use tokio::sync;
11use tracing::warn;
12
13use super::MaybeSend;
14use crate::util::FmtCompact;
15
16pub type Jit<T> = JitCore<T, Infallible>;
17pub type JitTry<T, E> = JitCore<T, E>;
18pub type JitTryAnyhow<T> = JitCore<T, anyhow::Error>;
19
20#[derive(Debug)]
25pub enum OneTimeError<E> {
26 Original(E),
27 Copy(anyhow::Error),
28}
29
30impl<E> std::error::Error for OneTimeError<E>
31where
32 E: fmt::Debug + fmt::Display,
33{
34 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
35 None
37 }
38
39 fn cause(&self) -> Option<&dyn std::error::Error> {
40 self.source()
41 }
42}
43
44impl<E> fmt::Display for OneTimeError<E>
45where
46 E: fmt::Display,
47{
48 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
49 match self {
50 Self::Original(o) => o.fmt(f),
51 Self::Copy(c) => c.fmt(f),
52 }
53 }
54}
55
56#[derive(Debug)]
58pub struct JitCore<T, E> {
59 inner: Arc<JitInner<T, E>>,
60}
61
62#[derive(Debug)]
63struct JitInner<T, E> {
64 handle: sync::Mutex<JoinHandle<Result<T, E>>>,
65 val: sync::OnceCell<Result<T, String>>,
66}
67
68impl<T, E> Clone for JitCore<T, E>
69where
70 T: Clone,
71{
72 fn clone(&self) -> Self {
73 Self {
74 inner: self.inner.clone(),
75 }
76 }
77}
78impl<T, E> Drop for JitInner<T, E> {
79 fn drop(&mut self) {
80 self.handle.get_mut().abort();
81 }
82}
83impl<T, E> JitCore<T, E>
84where
85 T: MaybeSend + 'static,
86 E: MaybeSend + 'static + fmt::Display,
87{
88 pub fn new_try<Fut>(f: impl FnOnce() -> Fut + 'static + MaybeSend) -> Self
93 where
94 Fut: Future<Output = std::result::Result<T, E>> + 'static + MaybeSend,
95 {
96 let handle = crate::runtime::spawn("jit-value", async { f().await });
97
98 Self {
99 inner: JitInner {
100 handle: handle.into(),
101 val: sync::OnceCell::new(),
102 }
103 .into(),
104 }
105 }
106
107 pub async fn get_try(&self) -> Result<&T, OneTimeError<E>> {
110 let mut init_error = None;
111 let value = self
112 .inner
113 .val
114 .get_or_init(|| async {
115 let handle: &mut _ = &mut *self.inner.handle.lock().await;
116 match handle.await {
117 Ok(Ok(o)) => Ok(o),
118 Ok(Err(err)) => {
119 let err_str = err.to_string();
120 init_error = Some(err);
121 Err(err_str)
122 },
123 Err(err) => {
124
125 #[cfg(not(target_family = "wasm"))]
126 if err.is_panic() {
127 warn!(target: LOG_TASK, err = %err.fmt_compact(), type_name = %std::any::type_name::<T>(), "Jit value panicked");
128 panic::resume_unwind(err.into_panic());
130 }
131 #[cfg(not(target_family = "wasm"))]
132 if err.is_cancelled() {
133 warn!(target: LOG_TASK, err = %err.fmt_compact(), type_name = %std::any::type_name::<T>(), "Jit value task canceled:");
134 }
135 Err(format!("Jit value {} failed unexpectedly with: {}", std::any::type_name::<T>(), err))
136 },
137 }
138 })
139 .await;
140 if let Some(err) = init_error {
141 return Err(OneTimeError::Original(err));
142 }
143 value
144 .as_ref()
145 .map_err(|err_str| OneTimeError::Copy(anyhow::Error::msg(err_str.to_owned())))
146 }
147}
148impl<T> JitCore<T, Infallible>
149where
150 T: MaybeSend + 'static,
151{
152 pub fn new<Fut>(f: impl FnOnce() -> Fut + 'static + MaybeSend) -> Self
153 where
154 Fut: Future<Output = T> + 'static + MaybeSend,
155 T: 'static,
156 {
157 Self::new_try(|| async { Ok(f().await) })
158 }
159
160 pub async fn get(&self) -> &T {
161 self.get_try().await.expect("can't fail")
162 }
163}
164#[cfg(test)]
165mod tests;