fedimint_core/util/
mod.rs

1pub mod backoff_util;
2/// Copied from `tokio_stream` 0.1.12 to use our optional Send bounds
3pub mod broadcaststream;
4pub mod update_merge;
5
6use std::convert::Infallible;
7use std::fmt::{Debug, Display, Formatter};
8use std::future::Future;
9use std::hash::Hash;
10use std::io::Write;
11use std::path::Path;
12use std::pin::Pin;
13use std::str::FromStr;
14use std::sync::LazyLock;
15use std::{fs, io};
16
17use anyhow::format_err;
18use fedimint_logging::LOG_CORE;
19pub use fedimint_util_error::*;
20use futures::StreamExt;
21use serde::{Deserialize, Serialize};
22use tokio::io::AsyncWriteExt;
23use tracing::{Instrument, Span, debug, warn};
24use url::{Host, ParseError, Url};
25
26use crate::envs::{FM_DEBUG_SHOW_SECRETS_ENV, is_env_var_set};
27use crate::task::MaybeSend;
28use crate::{apply, async_trait_maybe_send, maybe_add_send, runtime};
29
30/// Future that is `Send` unless targeting WASM
31pub type BoxFuture<'a, T> = Pin<Box<maybe_add_send!(dyn Future<Output = T> + 'a)>>;
32
33/// Stream that is `Send` unless targeting WASM
34pub type BoxStream<'a, T> = Pin<Box<maybe_add_send!(dyn futures::Stream<Item = T> + 'a)>>;
35
36#[apply(async_trait_maybe_send!)]
37pub trait NextOrPending {
38    type Output;
39
40    async fn next_or_pending(&mut self) -> Self::Output;
41
42    async fn ok(&mut self) -> anyhow::Result<Self::Output>;
43}
44
45#[apply(async_trait_maybe_send!)]
46impl<S> NextOrPending for S
47where
48    S: futures::Stream + Unpin + MaybeSend,
49    S::Item: MaybeSend,
50{
51    type Output = S::Item;
52
53    /// Waits for the next item in a stream. If the stream is closed while
54    /// waiting, returns an error.  Useful when expecting a stream to progress.
55    async fn ok(&mut self) -> anyhow::Result<Self::Output> {
56        self.next()
57            .await
58            .map_or_else(|| Err(format_err!("Stream was unexpectedly closed")), Ok)
59    }
60
61    /// Waits for the next item in a stream. If the stream is closed while
62    /// waiting the future will be pending forever. This is useful in cases
63    /// where the future will be cancelled by shutdown logic anyway and handling
64    /// each place where a stream may terminate would be too much trouble.
65    async fn next_or_pending(&mut self) -> Self::Output {
66        if let Some(item) = self.next().await {
67            item
68        } else {
69            debug!(target: LOG_CORE, "Stream ended in next_or_pending, pending forever to avoid throwing an error on shutdown");
70            std::future::pending().await
71        }
72    }
73}
74
75// TODO: make fully RFC1738 conformant
76/// Wrapper for `Url` that only prints the scheme, domain, port and path portion
77/// of a `Url` in its `Display` implementation.
78///
79/// This is useful to hide private
80/// information like user names and passwords in logs or UIs.
81///
82/// The output is not fully RFC1738 conformant but good enough for our current
83/// purposes.
84#[derive(Hash, Clone, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
85// nosemgrep: ban-raw-url
86pub struct SafeUrl(Url);
87
88impl SafeUrl {
89    pub fn parse(url_str: &str) -> Result<Self, ParseError> {
90        Url::parse(url_str).map(SafeUrl)
91    }
92
93    /// Warning: This removes the safety.
94    // nosemgrep: ban-raw-url
95    pub fn to_unsafe(self) -> Url {
96        self.0
97    }
98
99    #[allow(clippy::result_unit_err)] // just copying `url`'s API here
100    pub fn set_username(&mut self, username: &str) -> Result<(), ()> {
101        self.0.set_username(username)
102    }
103
104    #[allow(clippy::result_unit_err)] // just copying `url`'s API here
105    pub fn set_password(&mut self, password: Option<&str>) -> Result<(), ()> {
106        self.0.set_password(password)
107    }
108
109    #[allow(clippy::result_unit_err)] // just copying `url`'s API here
110    pub fn without_auth(&self) -> Result<Self, ()> {
111        let mut url = self.clone();
112
113        url.set_username("").and_then(|()| url.set_password(None))?;
114
115        Ok(url)
116    }
117
118    pub fn host(&self) -> Option<Host<&str>> {
119        self.0.host()
120    }
121    pub fn host_str(&self) -> Option<&str> {
122        self.0.host_str()
123    }
124    pub fn scheme(&self) -> &str {
125        self.0.scheme()
126    }
127    pub fn port(&self) -> Option<u16> {
128        self.0.port()
129    }
130    pub fn port_or_known_default(&self) -> Option<u16> {
131        self.0.port_or_known_default()
132    }
133    pub fn path(&self) -> &str {
134        self.0.path()
135    }
136    /// Warning: This will expose username & password if present.
137    pub fn as_str(&self) -> &str {
138        self.0.as_str()
139    }
140    pub fn username(&self) -> &str {
141        self.0.username()
142    }
143    pub fn password(&self) -> Option<&str> {
144        self.0.password()
145    }
146    pub fn join(&self, input: &str) -> Result<Self, ParseError> {
147        self.0.join(input).map(SafeUrl)
148    }
149
150    // It can be removed to use `is_onion_address()` implementation,
151    // once https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/2214 lands.
152    #[allow(clippy::case_sensitive_file_extension_comparisons)]
153    pub fn is_onion_address(&self) -> bool {
154        let host = self.host_str().unwrap_or_default();
155
156        host.ends_with(".onion")
157    }
158
159    pub fn fragment(&self) -> Option<&str> {
160        self.0.fragment()
161    }
162
163    pub fn set_fragment(&mut self, arg: Option<&str>) {
164        self.0.set_fragment(arg);
165    }
166}
167
168static SHOW_SECRETS: LazyLock<bool> = LazyLock::new(|| {
169    let enable = is_env_var_set(FM_DEBUG_SHOW_SECRETS_ENV);
170
171    if enable {
172        warn!(target: LOG_CORE, "{} enabled. Please don't use in production.", FM_DEBUG_SHOW_SECRETS_ENV);
173    }
174
175    enable
176});
177
178impl Display for SafeUrl {
179    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
180        write!(f, "{}://", self.0.scheme())?;
181
182        if !self.0.username().is_empty() {
183            let show_secrets = *SHOW_SECRETS;
184            if show_secrets {
185                write!(f, "{}", self.0.username())?;
186            } else {
187                write!(f, "REDACTEDUSER")?;
188            }
189
190            if self.0.password().is_some() {
191                if show_secrets {
192                    write!(
193                        f,
194                        ":{}",
195                        self.0.password().expect("Just checked it's checked")
196                    )?;
197                } else {
198                    write!(f, ":REDACTEDPASS")?;
199                }
200            }
201
202            write!(f, "@")?;
203        }
204
205        if let Some(host) = self.0.host_str() {
206            write!(f, "{host}")?;
207        }
208
209        if let Some(port) = self.0.port() {
210            write!(f, ":{port}")?;
211        }
212
213        write!(f, "{}", self.0.path())?;
214
215        Ok(())
216    }
217}
218
219impl Debug for SafeUrl {
220    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
221        write!(f, "SafeUrl(")?;
222        Display::fmt(self, f)?;
223        write!(f, ")")?;
224        Ok(())
225    }
226}
227
228impl From<Url> for SafeUrl {
229    fn from(u: Url) -> Self {
230        Self(u)
231    }
232}
233
234impl FromStr for SafeUrl {
235    type Err = ParseError;
236
237    #[inline]
238    fn from_str(input: &str) -> Result<Self, ParseError> {
239        Self::parse(input)
240    }
241}
242
243/// Write out a new file (like [`std::fs::write`] but fails if file already
244/// exists)
245#[cfg(not(target_family = "wasm"))]
246pub fn write_new<P: AsRef<Path>, C: AsRef<[u8]>>(path: P, contents: C) -> io::Result<()> {
247    let mut file = fs::File::options()
248        .write(true)
249        .create_new(true)
250        .open(path)?;
251    file.write_all(contents.as_ref())?;
252    file.sync_all()?;
253    Ok(())
254}
255
256#[cfg(not(target_family = "wasm"))]
257pub fn write_overwrite<P: AsRef<Path>, C: AsRef<[u8]>>(path: P, contents: C) -> io::Result<()> {
258    fs::File::options()
259        .write(true)
260        .create(true)
261        .truncate(true)
262        .open(path)?
263        .write_all(contents.as_ref())
264}
265
266#[cfg(not(target_family = "wasm"))]
267pub async fn write_overwrite_async<P: AsRef<Path>, C: AsRef<[u8]>>(
268    path: P,
269    contents: C,
270) -> io::Result<()> {
271    tokio::fs::OpenOptions::new()
272        .write(true)
273        .create(true)
274        .truncate(true)
275        .open(path)
276        .await?
277        .write_all(contents.as_ref())
278        .await
279}
280
281#[cfg(not(target_family = "wasm"))]
282pub async fn write_new_async<P: AsRef<Path>, C: AsRef<[u8]>>(
283    path: P,
284    contents: C,
285) -> io::Result<()> {
286    tokio::fs::OpenOptions::new()
287        .write(true)
288        .create_new(true)
289        .open(path)
290        .await?
291        .write_all(contents.as_ref())
292        .await
293}
294
295#[derive(Debug, Clone)]
296pub struct Spanned<T> {
297    value: T,
298    span: Span,
299}
300
301impl<T> Spanned<T> {
302    pub async fn new<F: Future<Output = T>>(span: Span, make: F) -> Self {
303        Self::try_new::<Infallible, _>(span, async { Ok(make.await) })
304            .await
305            .unwrap()
306    }
307
308    pub async fn try_new<E, F: Future<Output = Result<T, E>>>(
309        span: Span,
310        make: F,
311    ) -> Result<Self, E> {
312        let span2 = span.clone();
313        async {
314            Ok(Self {
315                value: make.await?,
316                span: span2,
317            })
318        }
319        .instrument(span)
320        .await
321    }
322
323    pub fn borrow(&self) -> Spanned<&T> {
324        Spanned {
325            value: &self.value,
326            span: self.span.clone(),
327        }
328    }
329
330    pub fn map<U>(self, map: impl Fn(T) -> U) -> Spanned<U> {
331        Spanned {
332            value: map(self.value),
333            span: self.span,
334        }
335    }
336
337    pub fn borrow_mut(&mut self) -> Spanned<&mut T> {
338        Spanned {
339            value: &mut self.value,
340            span: self.span.clone(),
341        }
342    }
343
344    pub fn with_sync<O, F: FnOnce(T) -> O>(self, f: F) -> O {
345        let _g = self.span.enter();
346        f(self.value)
347    }
348
349    pub async fn with<Fut: Future, F: FnOnce(T) -> Fut>(self, f: F) -> Fut::Output {
350        async { f(self.value).await }.instrument(self.span).await
351    }
352
353    pub fn span(&self) -> Span {
354        self.span.clone()
355    }
356
357    pub fn value(&self) -> &T {
358        &self.value
359    }
360
361    pub fn value_mut(&mut self) -> &mut T {
362        &mut self.value
363    }
364
365    pub fn into_value(self) -> T {
366        self.value
367    }
368}
369
370/// For CLIs, detects `version-hash` as a single argument, prints the provided
371/// version hash, then exits the process.
372pub fn handle_version_hash_command(version_hash: &str) {
373    let mut args = std::env::args();
374    if let Some(ref arg) = args.nth(1)
375        && arg.as_str() == "version-hash"
376    {
377        println!("{version_hash}");
378        std::process::exit(0);
379    }
380}
381
382/// Run the supplied closure `op_fn` until it succeeds. Frequency and number of
383/// retries is determined by the specified strategy.
384///
385/// ```
386/// use std::time::Duration;
387///
388/// use fedimint_core::util::{backoff_util, retry};
389/// # tokio_test::block_on(async {
390/// retry(
391///     "Gateway balance after swap".to_string(),
392///     backoff_util::background_backoff(),
393///     || async {
394///         // Fallible network calls …
395///         Ok(())
396///     },
397/// )
398/// .await
399/// .expect("never fails");
400/// # });
401/// ```
402///
403/// # Returns
404///
405/// - If the closure runs successfully, the result is immediately returned
406/// - If the closure did not run successfully for `max_attempts` times, the
407///   error of the closure is returned
408pub async fn retry<F, Fut, T>(
409    op_name: impl Into<String>,
410    strategy: impl backoff_util::Backoff,
411    op_fn: F,
412) -> Result<T, anyhow::Error>
413where
414    F: Fn() -> Fut,
415    Fut: Future<Output = Result<T, anyhow::Error>>,
416{
417    let mut strategy = strategy;
418    let op_name = op_name.into();
419    let mut attempts: u64 = 0;
420    loop {
421        attempts += 1;
422        match op_fn().await {
423            Ok(result) => return Ok(result),
424            Err(err) => {
425                if let Some(interval) = strategy.next() {
426                    // run closure op_fn again
427                    debug!(
428                        target: LOG_CORE,
429                        err = %err.fmt_compact_anyhow(),
430                        %attempts,
431                        interval = interval.as_secs(),
432                        "{} failed, retrying",
433                        op_name,
434                    );
435                    runtime::sleep(interval).await;
436                } else {
437                    warn!(
438                        target: LOG_CORE,
439                        err = %err.fmt_compact_anyhow(),
440                        %attempts,
441                        "{} failed",
442                        op_name,
443                    );
444                    return Err(err);
445                }
446            }
447        }
448    }
449}
450
451/// Computes the median from a slice of sorted `u64`s
452pub fn get_median(vals: &[u64]) -> Option<u64> {
453    if vals.is_empty() {
454        return None;
455    }
456    let len = vals.len();
457    let mid = len / 2;
458
459    if len % 2 == 0 {
460        Some(u64::midpoint(vals[mid - 1], vals[mid]))
461    } else {
462        Some(vals[mid])
463    }
464}
465
466/// Computes the average of the given `u64` slice.
467pub fn get_average(vals: &[u64]) -> Option<u64> {
468    if vals.is_empty() {
469        return None;
470    }
471
472    let sum: u64 = vals.iter().sum();
473    Some(sum / vals.len() as u64)
474}
475
476#[cfg(test)]
477mod tests;