fedimint_core/util/
mod.rs

1pub mod backoff_util;
2/// Copied from `tokio_stream` 0.1.12 to use our optional Send bounds
3pub mod broadcaststream;
4mod error;
5pub mod update_merge;
6
7use std::convert::Infallible;
8use std::fmt::{Debug, Display, Formatter};
9use std::future::Future;
10use std::hash::Hash;
11use std::io::Write;
12use std::path::Path;
13use std::pin::Pin;
14use std::str::FromStr;
15use std::{fs, io};
16
17use anyhow::format_err;
18pub use error::*;
19use fedimint_logging::LOG_CORE;
20use futures::StreamExt;
21use serde::{Deserialize, Serialize};
22use thiserror::Error;
23use tokio::io::AsyncWriteExt;
24use tracing::{Instrument, Span, debug, warn};
25use url::{Host, ParseError, Url};
26
27use crate::task::MaybeSend;
28use crate::{apply, async_trait_maybe_send, maybe_add_send, runtime};
29
30/// Future that is `Send` unless targeting WASM
31pub type BoxFuture<'a, T> = Pin<Box<maybe_add_send!(dyn Future<Output = T> + 'a)>>;
32
33/// Stream that is `Send` unless targeting WASM
34pub type BoxStream<'a, T> = Pin<Box<maybe_add_send!(dyn futures::Stream<Item = T> + 'a)>>;
35
36#[apply(async_trait_maybe_send!)]
37pub trait NextOrPending {
38    type Output;
39
40    async fn next_or_pending(&mut self) -> Self::Output;
41
42    async fn ok(&mut self) -> anyhow::Result<Self::Output>;
43}
44
45#[apply(async_trait_maybe_send!)]
46impl<S> NextOrPending for S
47where
48    S: futures::Stream + Unpin + MaybeSend,
49    S::Item: MaybeSend,
50{
51    type Output = S::Item;
52
53    /// Waits for the next item in a stream. If the stream is closed while
54    /// waiting, returns an error.  Useful when expecting a stream to progress.
55    async fn ok(&mut self) -> anyhow::Result<Self::Output> {
56        self.next()
57            .await
58            .map_or_else(|| Err(format_err!("Stream was unexpectedly closed")), Ok)
59    }
60
61    /// Waits for the next item in a stream. If the stream is closed while
62    /// waiting the future will be pending forever. This is useful in cases
63    /// where the future will be cancelled by shutdown logic anyway and handling
64    /// each place where a stream may terminate would be too much trouble.
65    async fn next_or_pending(&mut self) -> Self::Output {
66        if let Some(item) = self.next().await {
67            item
68        } else {
69            debug!(target: LOG_CORE, "Stream ended in next_or_pending, pending forever to avoid throwing an error on shutdown");
70            std::future::pending().await
71        }
72    }
73}
74
75// TODO: make fully RFC1738 conformant
76/// Wrapper for `Url` that only prints the scheme, domain, port and path portion
77/// of a `Url` in its `Display` implementation.
78///
79/// This is useful to hide private
80/// information like user names and passwords in logs or UIs.
81///
82/// The output is not fully RFC1738 conformant but good enough for our current
83/// purposes.
84#[derive(Hash, Clone, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
85// nosemgrep: ban-raw-url
86pub struct SafeUrl(Url);
87
88#[derive(Debug, Error)]
89pub enum SafeUrlError {
90    #[error("Failed to remove auth from URL")]
91    WithoutAuthError,
92}
93
94impl SafeUrl {
95    pub fn parse(url_str: &str) -> Result<Self, ParseError> {
96        Url::parse(url_str).map(SafeUrl)
97    }
98
99    /// Warning: This removes the safety.
100    // nosemgrep: ban-raw-url
101    pub fn to_unsafe(self) -> Url {
102        self.0
103    }
104
105    #[allow(clippy::result_unit_err)] // just copying `url`'s API here
106    pub fn set_username(&mut self, username: &str) -> Result<(), ()> {
107        self.0.set_username(username)
108    }
109
110    #[allow(clippy::result_unit_err)] // just copying `url`'s API here
111    pub fn set_password(&mut self, password: Option<&str>) -> Result<(), ()> {
112        self.0.set_password(password)
113    }
114
115    pub fn without_auth(&self) -> Result<Self, SafeUrlError> {
116        let mut url = self.clone();
117        url.set_username("")
118            .and_then(|()| url.set_password(None))
119            .map_err(|()| SafeUrlError::WithoutAuthError)?;
120        Ok(url)
121    }
122
123    pub fn host(&self) -> Option<Host<&str>> {
124        self.0.host()
125    }
126    pub fn host_str(&self) -> Option<&str> {
127        self.0.host_str()
128    }
129    pub fn scheme(&self) -> &str {
130        self.0.scheme()
131    }
132    pub fn port(&self) -> Option<u16> {
133        self.0.port()
134    }
135    pub fn port_or_known_default(&self) -> Option<u16> {
136        self.0.port_or_known_default()
137    }
138    pub fn path(&self) -> &str {
139        self.0.path()
140    }
141    /// Warning: This will expose username & password if present.
142    pub fn as_str(&self) -> &str {
143        self.0.as_str()
144    }
145    pub fn username(&self) -> &str {
146        self.0.username()
147    }
148    pub fn password(&self) -> Option<&str> {
149        self.0.password()
150    }
151    pub fn join(&self, input: &str) -> Result<Self, ParseError> {
152        self.0.join(input).map(SafeUrl)
153    }
154
155    // It can be removed to use `is_onion_address()` implementation,
156    // once https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/2214 lands.
157    #[allow(clippy::case_sensitive_file_extension_comparisons)]
158    pub fn is_onion_address(&self) -> bool {
159        let host = self.host_str().unwrap_or_default();
160
161        host.ends_with(".onion")
162    }
163
164    pub fn fragment(&self) -> Option<&str> {
165        self.0.fragment()
166    }
167
168    pub fn set_fragment(&mut self, arg: Option<&str>) {
169        self.0.set_fragment(arg);
170    }
171}
172
173impl Display for SafeUrl {
174    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
175        write!(f, "{}://", self.0.scheme())?;
176
177        if !self.0.username().is_empty() {
178            write!(f, "REDACTEDUSER")?;
179
180            if self.0.password().is_some() {
181                write!(f, ":REDACTEDPASS")?;
182            }
183
184            write!(f, "@")?;
185        }
186
187        if let Some(host) = self.0.host_str() {
188            write!(f, "{host}")?;
189        }
190
191        if let Some(port) = self.0.port() {
192            write!(f, ":{port}")?;
193        }
194
195        write!(f, "{}", self.0.path())?;
196
197        Ok(())
198    }
199}
200
201impl Debug for SafeUrl {
202    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
203        write!(f, "SafeUrl(")?;
204        Display::fmt(self, f)?;
205        write!(f, ")")?;
206        Ok(())
207    }
208}
209
210impl From<Url> for SafeUrl {
211    fn from(u: Url) -> Self {
212        Self(u)
213    }
214}
215
216impl FromStr for SafeUrl {
217    type Err = ParseError;
218
219    #[inline]
220    fn from_str(input: &str) -> Result<Self, ParseError> {
221        Self::parse(input)
222    }
223}
224
225/// Write out a new file (like [`std::fs::write`] but fails if file already
226/// exists)
227#[cfg(not(target_family = "wasm"))]
228pub fn write_new<P: AsRef<Path>, C: AsRef<[u8]>>(path: P, contents: C) -> io::Result<()> {
229    fs::File::options()
230        .write(true)
231        .create_new(true)
232        .open(path)?
233        .write_all(contents.as_ref())
234}
235
236#[cfg(not(target_family = "wasm"))]
237pub fn write_overwrite<P: AsRef<Path>, C: AsRef<[u8]>>(path: P, contents: C) -> io::Result<()> {
238    fs::File::options()
239        .write(true)
240        .create(true)
241        .truncate(true)
242        .open(path)?
243        .write_all(contents.as_ref())
244}
245
246#[cfg(not(target_family = "wasm"))]
247pub async fn write_overwrite_async<P: AsRef<Path>, C: AsRef<[u8]>>(
248    path: P,
249    contents: C,
250) -> io::Result<()> {
251    tokio::fs::OpenOptions::new()
252        .write(true)
253        .create(true)
254        .truncate(true)
255        .open(path)
256        .await?
257        .write_all(contents.as_ref())
258        .await
259}
260
261#[cfg(not(target_family = "wasm"))]
262pub async fn write_new_async<P: AsRef<Path>, C: AsRef<[u8]>>(
263    path: P,
264    contents: C,
265) -> io::Result<()> {
266    tokio::fs::OpenOptions::new()
267        .write(true)
268        .create_new(true)
269        .open(path)
270        .await?
271        .write_all(contents.as_ref())
272        .await
273}
274
275#[derive(Debug, Clone)]
276pub struct Spanned<T> {
277    value: T,
278    span: Span,
279}
280
281impl<T> Spanned<T> {
282    pub async fn new<F: Future<Output = T>>(span: Span, make: F) -> Self {
283        Self::try_new::<Infallible, _>(span, async { Ok(make.await) })
284            .await
285            .unwrap()
286    }
287
288    pub async fn try_new<E, F: Future<Output = Result<T, E>>>(
289        span: Span,
290        make: F,
291    ) -> Result<Self, E> {
292        let span2 = span.clone();
293        async {
294            Ok(Self {
295                value: make.await?,
296                span: span2,
297            })
298        }
299        .instrument(span)
300        .await
301    }
302
303    pub fn borrow(&self) -> Spanned<&T> {
304        Spanned {
305            value: &self.value,
306            span: self.span.clone(),
307        }
308    }
309
310    pub fn map<U>(self, map: impl Fn(T) -> U) -> Spanned<U> {
311        Spanned {
312            value: map(self.value),
313            span: self.span,
314        }
315    }
316
317    pub fn borrow_mut(&mut self) -> Spanned<&mut T> {
318        Spanned {
319            value: &mut self.value,
320            span: self.span.clone(),
321        }
322    }
323
324    pub fn with_sync<O, F: FnOnce(T) -> O>(self, f: F) -> O {
325        let _g = self.span.enter();
326        f(self.value)
327    }
328
329    pub async fn with<Fut: Future, F: FnOnce(T) -> Fut>(self, f: F) -> Fut::Output {
330        async { f(self.value).await }.instrument(self.span).await
331    }
332
333    pub fn span(&self) -> Span {
334        self.span.clone()
335    }
336
337    pub fn value(&self) -> &T {
338        &self.value
339    }
340
341    pub fn value_mut(&mut self) -> &mut T {
342        &mut self.value
343    }
344
345    pub fn into_value(self) -> T {
346        self.value
347    }
348}
349
350/// For CLIs, detects `version-hash` as a single argument, prints the provided
351/// version hash, then exits the process.
352pub fn handle_version_hash_command(version_hash: &str) {
353    let mut args = std::env::args();
354    if let Some(ref arg) = args.nth(1) {
355        if arg.as_str() == "version-hash" {
356            println!("{version_hash}");
357            std::process::exit(0);
358        }
359    }
360}
361
362/// Run the supplied closure `op_fn` until it succeeds. Frequency and number of
363/// retries is determined by the specified strategy.
364///
365/// ```
366/// use std::time::Duration;
367///
368/// use fedimint_core::util::{backoff_util, retry};
369/// # tokio_test::block_on(async {
370/// retry(
371///     "Gateway balance after swap".to_string(),
372///     backoff_util::background_backoff(),
373///     || async {
374///         // Fallible network calls …
375///         Ok(())
376///     },
377/// )
378/// .await
379/// .expect("never fails");
380/// # });
381/// ```
382///
383/// # Returns
384///
385/// - If the closure runs successfully, the result is immediately returned
386/// - If the closure did not run successfully for `max_attempts` times, the
387///   error of the closure is returned
388pub async fn retry<F, Fut, T>(
389    op_name: impl Into<String>,
390    strategy: impl backoff_util::Backoff,
391    op_fn: F,
392) -> Result<T, anyhow::Error>
393where
394    F: Fn() -> Fut,
395    Fut: Future<Output = Result<T, anyhow::Error>>,
396{
397    let mut strategy = strategy;
398    let op_name = op_name.into();
399    let mut attempts: u64 = 0;
400    loop {
401        attempts += 1;
402        match op_fn().await {
403            Ok(result) => return Ok(result),
404            Err(err) => {
405                if let Some(interval) = strategy.next() {
406                    // run closure op_fn again
407                    debug!(
408                        target: LOG_CORE,
409                        err = %err.fmt_compact_anyhow(),
410                        %attempts,
411                        interval = interval.as_secs(),
412                        "{} failed, retrying",
413                        op_name,
414                    );
415                    runtime::sleep(interval).await;
416                } else {
417                    warn!(
418                        target: LOG_CORE,
419                        err = %err.fmt_compact_anyhow(),
420                        %attempts,
421                        "{} failed",
422                        op_name,
423                    );
424                    return Err(err);
425                }
426            }
427        }
428    }
429}
430
431/// Computes the median from a slice of sorted `u64`s
432pub fn get_median(vals: &[u64]) -> Option<u64> {
433    if vals.is_empty() {
434        return None;
435    }
436    let len = vals.len();
437    let mid = len / 2;
438
439    if len % 2 == 0 {
440        Some((vals[mid - 1] + vals[mid]) / 2)
441    } else {
442        Some(vals[mid])
443    }
444}
445
446/// Computes the average of the given `u64` slice.
447pub fn get_average(vals: &[u64]) -> Option<u64> {
448    if vals.is_empty() {
449        return None;
450    }
451
452    let sum: u64 = vals.iter().sum();
453    Some(sum / vals.len() as u64)
454}
455
456#[cfg(test)]
457mod tests;