1pub mod backoff_util;
2pub mod broadcaststream;
4mod error;
5pub mod update_merge;
6
7use std::convert::Infallible;
8use std::fmt::{Debug, Display, Formatter};
9use std::future::Future;
10use std::hash::Hash;
11use std::io::Write;
12use std::path::Path;
13use std::pin::Pin;
14use std::str::FromStr;
15use std::{fs, io};
16
17use anyhow::format_err;
18pub use error::*;
19use fedimint_logging::LOG_CORE;
20use futures::StreamExt;
21use serde::{Deserialize, Serialize};
22use thiserror::Error;
23use tokio::io::AsyncWriteExt;
24use tracing::{Instrument, Span, debug, warn};
25use url::{Host, ParseError, Url};
26
27use crate::task::MaybeSend;
28use crate::{apply, async_trait_maybe_send, maybe_add_send, runtime};
29
30pub type BoxFuture<'a, T> = Pin<Box<maybe_add_send!(dyn Future<Output = T> + 'a)>>;
32
33pub type BoxStream<'a, T> = Pin<Box<maybe_add_send!(dyn futures::Stream<Item = T> + 'a)>>;
35
36#[apply(async_trait_maybe_send!)]
37pub trait NextOrPending {
38 type Output;
39
40 async fn next_or_pending(&mut self) -> Self::Output;
41
42 async fn ok(&mut self) -> anyhow::Result<Self::Output>;
43}
44
45#[apply(async_trait_maybe_send!)]
46impl<S> NextOrPending for S
47where
48 S: futures::Stream + Unpin + MaybeSend,
49 S::Item: MaybeSend,
50{
51 type Output = S::Item;
52
53 async fn ok(&mut self) -> anyhow::Result<Self::Output> {
56 self.next()
57 .await
58 .map_or_else(|| Err(format_err!("Stream was unexpectedly closed")), Ok)
59 }
60
61 async fn next_or_pending(&mut self) -> Self::Output {
66 if let Some(item) = self.next().await {
67 item
68 } else {
69 debug!(target: LOG_CORE, "Stream ended in next_or_pending, pending forever to avoid throwing an error on shutdown");
70 std::future::pending().await
71 }
72 }
73}
74
75#[derive(Hash, Clone, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
85pub struct SafeUrl(Url);
87
88#[derive(Debug, Error)]
89pub enum SafeUrlError {
90 #[error("Failed to remove auth from URL")]
91 WithoutAuthError,
92}
93
94impl SafeUrl {
95 pub fn parse(url_str: &str) -> Result<Self, ParseError> {
96 Url::parse(url_str).map(SafeUrl)
97 }
98
99 pub fn to_unsafe(self) -> Url {
102 self.0
103 }
104
105 #[allow(clippy::result_unit_err)] pub fn set_username(&mut self, username: &str) -> Result<(), ()> {
107 self.0.set_username(username)
108 }
109
110 #[allow(clippy::result_unit_err)] pub fn set_password(&mut self, password: Option<&str>) -> Result<(), ()> {
112 self.0.set_password(password)
113 }
114
115 pub fn without_auth(&self) -> Result<Self, SafeUrlError> {
116 let mut url = self.clone();
117 url.set_username("")
118 .and_then(|()| url.set_password(None))
119 .map_err(|()| SafeUrlError::WithoutAuthError)?;
120 Ok(url)
121 }
122
123 pub fn host(&self) -> Option<Host<&str>> {
124 self.0.host()
125 }
126 pub fn host_str(&self) -> Option<&str> {
127 self.0.host_str()
128 }
129 pub fn scheme(&self) -> &str {
130 self.0.scheme()
131 }
132 pub fn port(&self) -> Option<u16> {
133 self.0.port()
134 }
135 pub fn port_or_known_default(&self) -> Option<u16> {
136 self.0.port_or_known_default()
137 }
138 pub fn path(&self) -> &str {
139 self.0.path()
140 }
141 pub fn as_str(&self) -> &str {
143 self.0.as_str()
144 }
145 pub fn username(&self) -> &str {
146 self.0.username()
147 }
148 pub fn password(&self) -> Option<&str> {
149 self.0.password()
150 }
151 pub fn join(&self, input: &str) -> Result<Self, ParseError> {
152 self.0.join(input).map(SafeUrl)
153 }
154
155 #[allow(clippy::case_sensitive_file_extension_comparisons)]
158 pub fn is_onion_address(&self) -> bool {
159 let host = self.host_str().unwrap_or_default();
160
161 host.ends_with(".onion")
162 }
163
164 pub fn fragment(&self) -> Option<&str> {
165 self.0.fragment()
166 }
167
168 pub fn set_fragment(&mut self, arg: Option<&str>) {
169 self.0.set_fragment(arg);
170 }
171}
172
173impl Display for SafeUrl {
174 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
175 write!(f, "{}://", self.0.scheme())?;
176
177 if !self.0.username().is_empty() {
178 write!(f, "REDACTEDUSER")?;
179
180 if self.0.password().is_some() {
181 write!(f, ":REDACTEDPASS")?;
182 }
183
184 write!(f, "@")?;
185 }
186
187 if let Some(host) = self.0.host_str() {
188 write!(f, "{host}")?;
189 }
190
191 if let Some(port) = self.0.port() {
192 write!(f, ":{port}")?;
193 }
194
195 write!(f, "{}", self.0.path())?;
196
197 Ok(())
198 }
199}
200
201impl Debug for SafeUrl {
202 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
203 write!(f, "SafeUrl(")?;
204 Display::fmt(self, f)?;
205 write!(f, ")")?;
206 Ok(())
207 }
208}
209
210impl From<Url> for SafeUrl {
211 fn from(u: Url) -> Self {
212 Self(u)
213 }
214}
215
216impl FromStr for SafeUrl {
217 type Err = ParseError;
218
219 #[inline]
220 fn from_str(input: &str) -> Result<Self, ParseError> {
221 Self::parse(input)
222 }
223}
224
225#[cfg(not(target_family = "wasm"))]
228pub fn write_new<P: AsRef<Path>, C: AsRef<[u8]>>(path: P, contents: C) -> io::Result<()> {
229 fs::File::options()
230 .write(true)
231 .create_new(true)
232 .open(path)?
233 .write_all(contents.as_ref())
234}
235
236#[cfg(not(target_family = "wasm"))]
237pub fn write_overwrite<P: AsRef<Path>, C: AsRef<[u8]>>(path: P, contents: C) -> io::Result<()> {
238 fs::File::options()
239 .write(true)
240 .create(true)
241 .truncate(true)
242 .open(path)?
243 .write_all(contents.as_ref())
244}
245
246#[cfg(not(target_family = "wasm"))]
247pub async fn write_overwrite_async<P: AsRef<Path>, C: AsRef<[u8]>>(
248 path: P,
249 contents: C,
250) -> io::Result<()> {
251 tokio::fs::OpenOptions::new()
252 .write(true)
253 .create(true)
254 .truncate(true)
255 .open(path)
256 .await?
257 .write_all(contents.as_ref())
258 .await
259}
260
261#[cfg(not(target_family = "wasm"))]
262pub async fn write_new_async<P: AsRef<Path>, C: AsRef<[u8]>>(
263 path: P,
264 contents: C,
265) -> io::Result<()> {
266 tokio::fs::OpenOptions::new()
267 .write(true)
268 .create_new(true)
269 .open(path)
270 .await?
271 .write_all(contents.as_ref())
272 .await
273}
274
275#[derive(Debug, Clone)]
276pub struct Spanned<T> {
277 value: T,
278 span: Span,
279}
280
281impl<T> Spanned<T> {
282 pub async fn new<F: Future<Output = T>>(span: Span, make: F) -> Self {
283 Self::try_new::<Infallible, _>(span, async { Ok(make.await) })
284 .await
285 .unwrap()
286 }
287
288 pub async fn try_new<E, F: Future<Output = Result<T, E>>>(
289 span: Span,
290 make: F,
291 ) -> Result<Self, E> {
292 let span2 = span.clone();
293 async {
294 Ok(Self {
295 value: make.await?,
296 span: span2,
297 })
298 }
299 .instrument(span)
300 .await
301 }
302
303 pub fn borrow(&self) -> Spanned<&T> {
304 Spanned {
305 value: &self.value,
306 span: self.span.clone(),
307 }
308 }
309
310 pub fn map<U>(self, map: impl Fn(T) -> U) -> Spanned<U> {
311 Spanned {
312 value: map(self.value),
313 span: self.span,
314 }
315 }
316
317 pub fn borrow_mut(&mut self) -> Spanned<&mut T> {
318 Spanned {
319 value: &mut self.value,
320 span: self.span.clone(),
321 }
322 }
323
324 pub fn with_sync<O, F: FnOnce(T) -> O>(self, f: F) -> O {
325 let _g = self.span.enter();
326 f(self.value)
327 }
328
329 pub async fn with<Fut: Future, F: FnOnce(T) -> Fut>(self, f: F) -> Fut::Output {
330 async { f(self.value).await }.instrument(self.span).await
331 }
332
333 pub fn span(&self) -> Span {
334 self.span.clone()
335 }
336
337 pub fn value(&self) -> &T {
338 &self.value
339 }
340
341 pub fn value_mut(&mut self) -> &mut T {
342 &mut self.value
343 }
344
345 pub fn into_value(self) -> T {
346 self.value
347 }
348}
349
350pub fn handle_version_hash_command(version_hash: &str) {
353 let mut args = std::env::args();
354 if let Some(ref arg) = args.nth(1) {
355 if arg.as_str() == "version-hash" {
356 println!("{version_hash}");
357 std::process::exit(0);
358 }
359 }
360}
361
362pub async fn retry<F, Fut, T>(
389 op_name: impl Into<String>,
390 strategy: impl backoff_util::Backoff,
391 op_fn: F,
392) -> Result<T, anyhow::Error>
393where
394 F: Fn() -> Fut,
395 Fut: Future<Output = Result<T, anyhow::Error>>,
396{
397 let mut strategy = strategy;
398 let op_name = op_name.into();
399 let mut attempts: u64 = 0;
400 loop {
401 attempts += 1;
402 match op_fn().await {
403 Ok(result) => return Ok(result),
404 Err(err) => {
405 if let Some(interval) = strategy.next() {
406 debug!(
408 target: LOG_CORE,
409 err = %err.fmt_compact_anyhow(),
410 %attempts,
411 interval = interval.as_secs(),
412 "{} failed, retrying",
413 op_name,
414 );
415 runtime::sleep(interval).await;
416 } else {
417 warn!(
418 target: LOG_CORE,
419 err = %err.fmt_compact_anyhow(),
420 %attempts,
421 "{} failed",
422 op_name,
423 );
424 return Err(err);
425 }
426 }
427 }
428 }
429}
430
431pub fn get_median(vals: &[u64]) -> Option<u64> {
433 if vals.is_empty() {
434 return None;
435 }
436 let len = vals.len();
437 let mid = len / 2;
438
439 if len % 2 == 0 {
440 Some((vals[mid - 1] + vals[mid]) / 2)
441 } else {
442 Some(vals[mid])
443 }
444}
445
446pub fn get_average(vals: &[u64]) -> Option<u64> {
448 if vals.is_empty() {
449 return None;
450 }
451
452 let sum: u64 = vals.iter().sum();
453 Some(sum / vals.len() as u64)
454}
455
456#[cfg(test)]
457mod tests;