fedimint_core/task/
jit.rs1use std::convert::Infallible;
2use std::sync::Arc;
3use std::{fmt, panic};
4
5use fedimint_core::runtime::JoinHandle;
6use fedimint_logging::LOG_TASK;
7use futures::Future;
8use tokio::sync;
9use tracing::warn;
10
11use super::MaybeSend;
12use crate::util::FmtCompact;
13
14pub type Jit<T> = JitCore<T, Infallible>;
15pub type JitTry<T, E> = JitCore<T, E>;
16pub type JitTryAnyhow<T> = JitCore<T, anyhow::Error>;
17
18#[derive(Debug)]
23pub enum OneTimeError<E> {
24 Original(E),
25 Copy(anyhow::Error),
26}
27
28impl<E> std::error::Error for OneTimeError<E>
29where
30 E: fmt::Debug + fmt::Display,
31{
32 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
33 None
35 }
36
37 fn cause(&self) -> Option<&dyn std::error::Error> {
38 self.source()
39 }
40}
41
42impl<E> fmt::Display for OneTimeError<E>
43where
44 E: fmt::Display,
45{
46 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47 match self {
48 Self::Original(o) => o.fmt(f),
49 Self::Copy(c) => c.fmt(f),
50 }
51 }
52}
53
54#[derive(Debug)]
56pub struct JitCore<T, E> {
57 inner: Arc<JitInner<T, E>>,
58}
59
60#[derive(Debug)]
61struct JitInner<T, E> {
62 handle: sync::Mutex<JoinHandle<Result<T, E>>>,
63 val: sync::OnceCell<Result<T, String>>,
64}
65
66impl<T, E> Clone for JitCore<T, E>
67where
68 T: Clone,
69{
70 fn clone(&self) -> Self {
71 Self {
72 inner: self.inner.clone(),
73 }
74 }
75}
76impl<T, E> Drop for JitInner<T, E> {
77 fn drop(&mut self) {
78 self.handle.get_mut().abort();
79 }
80}
81impl<T, E> JitCore<T, E>
82where
83 T: MaybeSend + 'static,
84 E: MaybeSend + 'static + fmt::Display,
85{
86 pub fn new_try<Fut>(f: impl FnOnce() -> Fut + 'static + MaybeSend) -> Self
91 where
92 Fut: Future<Output = std::result::Result<T, E>> + 'static + MaybeSend,
93 {
94 let handle = crate::runtime::spawn("jit-value", async { f().await });
95
96 Self {
97 inner: JitInner {
98 handle: handle.into(),
99 val: sync::OnceCell::new(),
100 }
101 .into(),
102 }
103 }
104
105 pub async fn get_try(&self) -> Result<&T, OneTimeError<E>> {
108 let mut init_error = None;
109 let value = self
110 .inner
111 .val
112 .get_or_init(|| async {
113 let handle: &mut _ = &mut *self.inner.handle.lock().await;
114 match handle.await {
115 Ok(Ok(o)) => Ok(o),
116 Ok(Err(err)) => {
117 let err_str = err.to_string();
118 init_error = Some(err);
119 Err(err_str)
120 },
121 Err(err) => {
122
123 #[cfg(not(target_family = "wasm"))]
124 if err.is_panic() {
125 warn!(target: LOG_TASK, err = %err.fmt_compact(), type_name = %std::any::type_name::<T>(), "Jit value panicked");
126 panic::resume_unwind(err.into_panic());
128 }
129 #[cfg(not(target_family = "wasm"))]
130 if err.is_cancelled() {
131 warn!(target: LOG_TASK, err = %err.fmt_compact(), type_name = %std::any::type_name::<T>(), "Jit value task canceled:");
132 }
133 Err(format!("Jit value {} failed unexpectedly with: {}", std::any::type_name::<T>(), err))
134 },
135 }
136 })
137 .await;
138 if let Some(err) = init_error {
139 return Err(OneTimeError::Original(err));
140 }
141 value
142 .as_ref()
143 .map_err(|err_str| OneTimeError::Copy(anyhow::Error::msg(err_str.to_owned())))
144 }
145}
146impl<T> JitCore<T, Infallible>
147where
148 T: MaybeSend + 'static,
149{
150 pub fn new<Fut>(f: impl FnOnce() -> Fut + 'static + MaybeSend) -> Self
151 where
152 Fut: Future<Output = T> + 'static + MaybeSend,
153 T: 'static,
154 {
155 Self::new_try(|| async { Ok(f().await) })
156 }
157
158 pub async fn get(&self) -> &T {
159 self.get_try().await.expect("can't fail")
160 }
161}
162#[cfg(test)]
163mod tests;