fedimint_core/util/
mod.rs

1pub mod backoff_util;
2/// Copied from `tokio_stream` 0.1.12 to use our optional Send bounds
3pub mod broadcaststream;
4mod error;
5pub mod update_merge;
6
7use std::convert::Infallible;
8use std::fmt::{Debug, Display, Formatter};
9use std::future::Future;
10use std::hash::Hash;
11use std::io::Write;
12use std::path::Path;
13use std::pin::Pin;
14use std::str::FromStr;
15use std::sync::LazyLock;
16use std::{fs, io};
17
18use anyhow::format_err;
19pub use error::*;
20use fedimint_logging::LOG_CORE;
21use futures::StreamExt;
22use serde::{Deserialize, Serialize};
23use tokio::io::AsyncWriteExt;
24use tracing::{Instrument, Span, debug, warn};
25use url::{Host, ParseError, Url};
26
27use crate::envs::{FM_DEBUG_SHOW_SECRETS_ENV, is_env_var_set};
28use crate::task::MaybeSend;
29use crate::{apply, async_trait_maybe_send, maybe_add_send, runtime};
30
31/// Future that is `Send` unless targeting WASM
32pub type BoxFuture<'a, T> = Pin<Box<maybe_add_send!(dyn Future<Output = T> + 'a)>>;
33
34/// Stream that is `Send` unless targeting WASM
35pub type BoxStream<'a, T> = Pin<Box<maybe_add_send!(dyn futures::Stream<Item = T> + 'a)>>;
36
37#[apply(async_trait_maybe_send!)]
38pub trait NextOrPending {
39    type Output;
40
41    async fn next_or_pending(&mut self) -> Self::Output;
42
43    async fn ok(&mut self) -> anyhow::Result<Self::Output>;
44}
45
46#[apply(async_trait_maybe_send!)]
47impl<S> NextOrPending for S
48where
49    S: futures::Stream + Unpin + MaybeSend,
50    S::Item: MaybeSend,
51{
52    type Output = S::Item;
53
54    /// Waits for the next item in a stream. If the stream is closed while
55    /// waiting, returns an error.  Useful when expecting a stream to progress.
56    async fn ok(&mut self) -> anyhow::Result<Self::Output> {
57        self.next()
58            .await
59            .map_or_else(|| Err(format_err!("Stream was unexpectedly closed")), Ok)
60    }
61
62    /// Waits for the next item in a stream. If the stream is closed while
63    /// waiting the future will be pending forever. This is useful in cases
64    /// where the future will be cancelled by shutdown logic anyway and handling
65    /// each place where a stream may terminate would be too much trouble.
66    async fn next_or_pending(&mut self) -> Self::Output {
67        if let Some(item) = self.next().await {
68            item
69        } else {
70            debug!(target: LOG_CORE, "Stream ended in next_or_pending, pending forever to avoid throwing an error on shutdown");
71            std::future::pending().await
72        }
73    }
74}
75
76// TODO: make fully RFC1738 conformant
77/// Wrapper for `Url` that only prints the scheme, domain, port and path portion
78/// of a `Url` in its `Display` implementation.
79///
80/// This is useful to hide private
81/// information like user names and passwords in logs or UIs.
82///
83/// The output is not fully RFC1738 conformant but good enough for our current
84/// purposes.
85#[derive(Hash, Clone, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
86// nosemgrep: ban-raw-url
87pub struct SafeUrl(Url);
88
89impl SafeUrl {
90    pub fn parse(url_str: &str) -> Result<Self, ParseError> {
91        Url::parse(url_str).map(SafeUrl)
92    }
93
94    /// Warning: This removes the safety.
95    // nosemgrep: ban-raw-url
96    pub fn to_unsafe(self) -> Url {
97        self.0
98    }
99
100    #[allow(clippy::result_unit_err)] // just copying `url`'s API here
101    pub fn set_username(&mut self, username: &str) -> Result<(), ()> {
102        self.0.set_username(username)
103    }
104
105    #[allow(clippy::result_unit_err)] // just copying `url`'s API here
106    pub fn set_password(&mut self, password: Option<&str>) -> Result<(), ()> {
107        self.0.set_password(password)
108    }
109
110    #[allow(clippy::result_unit_err)] // just copying `url`'s API here
111    pub fn without_auth(&self) -> Result<Self, ()> {
112        let mut url = self.clone();
113
114        url.set_username("").and_then(|()| url.set_password(None))?;
115
116        Ok(url)
117    }
118
119    pub fn host(&self) -> Option<Host<&str>> {
120        self.0.host()
121    }
122    pub fn host_str(&self) -> Option<&str> {
123        self.0.host_str()
124    }
125    pub fn scheme(&self) -> &str {
126        self.0.scheme()
127    }
128    pub fn port(&self) -> Option<u16> {
129        self.0.port()
130    }
131    pub fn port_or_known_default(&self) -> Option<u16> {
132        self.0.port_or_known_default()
133    }
134    pub fn path(&self) -> &str {
135        self.0.path()
136    }
137    /// Warning: This will expose username & password if present.
138    pub fn as_str(&self) -> &str {
139        self.0.as_str()
140    }
141    pub fn username(&self) -> &str {
142        self.0.username()
143    }
144    pub fn password(&self) -> Option<&str> {
145        self.0.password()
146    }
147    pub fn join(&self, input: &str) -> Result<Self, ParseError> {
148        self.0.join(input).map(SafeUrl)
149    }
150
151    // It can be removed to use `is_onion_address()` implementation,
152    // once https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/2214 lands.
153    #[allow(clippy::case_sensitive_file_extension_comparisons)]
154    pub fn is_onion_address(&self) -> bool {
155        let host = self.host_str().unwrap_or_default();
156
157        host.ends_with(".onion")
158    }
159
160    pub fn fragment(&self) -> Option<&str> {
161        self.0.fragment()
162    }
163
164    pub fn set_fragment(&mut self, arg: Option<&str>) {
165        self.0.set_fragment(arg);
166    }
167}
168
169static SHOW_SECRETS: LazyLock<bool> = LazyLock::new(|| {
170    let enable = is_env_var_set(FM_DEBUG_SHOW_SECRETS_ENV);
171
172    if enable {
173        warn!(target: LOG_CORE, "{} enabled. Please don't use in production.", FM_DEBUG_SHOW_SECRETS_ENV);
174    }
175
176    enable
177});
178
179impl Display for SafeUrl {
180    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
181        write!(f, "{}://", self.0.scheme())?;
182
183        if !self.0.username().is_empty() {
184            let show_secrets = *SHOW_SECRETS;
185            if show_secrets {
186                write!(f, "{}", self.0.username())?;
187            } else {
188                write!(f, "REDACTEDUSER")?;
189            }
190
191            if self.0.password().is_some() {
192                if show_secrets {
193                    write!(
194                        f,
195                        ":{}",
196                        self.0.password().expect("Just checked it's checked")
197                    )?;
198                } else {
199                    write!(f, ":REDACTEDPASS")?;
200                }
201            }
202
203            write!(f, "@")?;
204        }
205
206        if let Some(host) = self.0.host_str() {
207            write!(f, "{host}")?;
208        }
209
210        if let Some(port) = self.0.port() {
211            write!(f, ":{port}")?;
212        }
213
214        write!(f, "{}", self.0.path())?;
215
216        Ok(())
217    }
218}
219
220impl Debug for SafeUrl {
221    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
222        write!(f, "SafeUrl(")?;
223        Display::fmt(self, f)?;
224        write!(f, ")")?;
225        Ok(())
226    }
227}
228
229impl From<Url> for SafeUrl {
230    fn from(u: Url) -> Self {
231        Self(u)
232    }
233}
234
235impl FromStr for SafeUrl {
236    type Err = ParseError;
237
238    #[inline]
239    fn from_str(input: &str) -> Result<Self, ParseError> {
240        Self::parse(input)
241    }
242}
243
244/// Write out a new file (like [`std::fs::write`] but fails if file already
245/// exists)
246#[cfg(not(target_family = "wasm"))]
247pub fn write_new<P: AsRef<Path>, C: AsRef<[u8]>>(path: P, contents: C) -> io::Result<()> {
248    fs::File::options()
249        .write(true)
250        .create_new(true)
251        .open(path)?
252        .write_all(contents.as_ref())
253}
254
255#[cfg(not(target_family = "wasm"))]
256pub fn write_overwrite<P: AsRef<Path>, C: AsRef<[u8]>>(path: P, contents: C) -> io::Result<()> {
257    fs::File::options()
258        .write(true)
259        .create(true)
260        .truncate(true)
261        .open(path)?
262        .write_all(contents.as_ref())
263}
264
265#[cfg(not(target_family = "wasm"))]
266pub async fn write_overwrite_async<P: AsRef<Path>, C: AsRef<[u8]>>(
267    path: P,
268    contents: C,
269) -> io::Result<()> {
270    tokio::fs::OpenOptions::new()
271        .write(true)
272        .create(true)
273        .truncate(true)
274        .open(path)
275        .await?
276        .write_all(contents.as_ref())
277        .await
278}
279
280#[cfg(not(target_family = "wasm"))]
281pub async fn write_new_async<P: AsRef<Path>, C: AsRef<[u8]>>(
282    path: P,
283    contents: C,
284) -> io::Result<()> {
285    tokio::fs::OpenOptions::new()
286        .write(true)
287        .create_new(true)
288        .open(path)
289        .await?
290        .write_all(contents.as_ref())
291        .await
292}
293
294#[derive(Debug, Clone)]
295pub struct Spanned<T> {
296    value: T,
297    span: Span,
298}
299
300impl<T> Spanned<T> {
301    pub async fn new<F: Future<Output = T>>(span: Span, make: F) -> Self {
302        Self::try_new::<Infallible, _>(span, async { Ok(make.await) })
303            .await
304            .unwrap()
305    }
306
307    pub async fn try_new<E, F: Future<Output = Result<T, E>>>(
308        span: Span,
309        make: F,
310    ) -> Result<Self, E> {
311        let span2 = span.clone();
312        async {
313            Ok(Self {
314                value: make.await?,
315                span: span2,
316            })
317        }
318        .instrument(span)
319        .await
320    }
321
322    pub fn borrow(&self) -> Spanned<&T> {
323        Spanned {
324            value: &self.value,
325            span: self.span.clone(),
326        }
327    }
328
329    pub fn map<U>(self, map: impl Fn(T) -> U) -> Spanned<U> {
330        Spanned {
331            value: map(self.value),
332            span: self.span,
333        }
334    }
335
336    pub fn borrow_mut(&mut self) -> Spanned<&mut T> {
337        Spanned {
338            value: &mut self.value,
339            span: self.span.clone(),
340        }
341    }
342
343    pub fn with_sync<O, F: FnOnce(T) -> O>(self, f: F) -> O {
344        let _g = self.span.enter();
345        f(self.value)
346    }
347
348    pub async fn with<Fut: Future, F: FnOnce(T) -> Fut>(self, f: F) -> Fut::Output {
349        async { f(self.value).await }.instrument(self.span).await
350    }
351
352    pub fn span(&self) -> Span {
353        self.span.clone()
354    }
355
356    pub fn value(&self) -> &T {
357        &self.value
358    }
359
360    pub fn value_mut(&mut self) -> &mut T {
361        &mut self.value
362    }
363
364    pub fn into_value(self) -> T {
365        self.value
366    }
367}
368
369/// For CLIs, detects `version-hash` as a single argument, prints the provided
370/// version hash, then exits the process.
371pub fn handle_version_hash_command(version_hash: &str) {
372    let mut args = std::env::args();
373    if let Some(ref arg) = args.nth(1) {
374        if arg.as_str() == "version-hash" {
375            println!("{version_hash}");
376            std::process::exit(0);
377        }
378    }
379}
380
381/// Run the supplied closure `op_fn` until it succeeds. Frequency and number of
382/// retries is determined by the specified strategy.
383///
384/// ```
385/// use std::time::Duration;
386///
387/// use fedimint_core::util::{backoff_util, retry};
388/// # tokio_test::block_on(async {
389/// retry(
390///     "Gateway balance after swap".to_string(),
391///     backoff_util::background_backoff(),
392///     || async {
393///         // Fallible network calls …
394///         Ok(())
395///     },
396/// )
397/// .await
398/// .expect("never fails");
399/// # });
400/// ```
401///
402/// # Returns
403///
404/// - If the closure runs successfully, the result is immediately returned
405/// - If the closure did not run successfully for `max_attempts` times, the
406///   error of the closure is returned
407pub async fn retry<F, Fut, T>(
408    op_name: impl Into<String>,
409    strategy: impl backoff_util::Backoff,
410    op_fn: F,
411) -> Result<T, anyhow::Error>
412where
413    F: Fn() -> Fut,
414    Fut: Future<Output = Result<T, anyhow::Error>>,
415{
416    let mut strategy = strategy;
417    let op_name = op_name.into();
418    let mut attempts: u64 = 0;
419    loop {
420        attempts += 1;
421        match op_fn().await {
422            Ok(result) => return Ok(result),
423            Err(err) => {
424                if let Some(interval) = strategy.next() {
425                    // run closure op_fn again
426                    debug!(
427                        target: LOG_CORE,
428                        err = %err.fmt_compact_anyhow(),
429                        %attempts,
430                        interval = interval.as_secs(),
431                        "{} failed, retrying",
432                        op_name,
433                    );
434                    runtime::sleep(interval).await;
435                } else {
436                    warn!(
437                        target: LOG_CORE,
438                        err = %err.fmt_compact_anyhow(),
439                        %attempts,
440                        "{} failed",
441                        op_name,
442                    );
443                    return Err(err);
444                }
445            }
446        }
447    }
448}
449
450/// Computes the median from a slice of sorted `u64`s
451pub fn get_median(vals: &[u64]) -> Option<u64> {
452    if vals.is_empty() {
453        return None;
454    }
455    let len = vals.len();
456    let mid = len / 2;
457
458    if len % 2 == 0 {
459        Some(u64::midpoint(vals[mid - 1], vals[mid]))
460    } else {
461        Some(vals[mid])
462    }
463}
464
465/// Computes the average of the given `u64` slice.
466pub fn get_average(vals: &[u64]) -> Option<u64> {
467    if vals.is_empty() {
468        return None;
469    }
470
471    let sum: u64 = vals.iter().sum();
472    Some(sum / vals.len() as u64)
473}
474
475#[cfg(test)]
476mod tests;