1pub mod backoff_util;
2pub mod broadcaststream;
4pub mod update_merge;
5
6use std::convert::Infallible;
7use std::fmt::{Debug, Display, Formatter};
8use std::future::Future;
9use std::hash::Hash;
10use std::io::Write;
11use std::path::Path;
12use std::pin::Pin;
13use std::str::FromStr;
14use std::sync::LazyLock;
15use std::{fs, io};
16
17use anyhow::format_err;
18use fedimint_logging::LOG_CORE;
19pub use fedimint_util_error::*;
20use futures::StreamExt;
21use serde::{Deserialize, Serialize};
22use tokio::io::AsyncWriteExt;
23use tracing::{Instrument, Span, debug, warn};
24use url::{Host, ParseError, Url};
25
26use crate::envs::{FM_DEBUG_SHOW_SECRETS_ENV, is_env_var_set};
27use crate::task::MaybeSend;
28use crate::{apply, async_trait_maybe_send, maybe_add_send, runtime};
29
30pub type BoxFuture<'a, T> = Pin<Box<maybe_add_send!(dyn Future<Output = T> + 'a)>>;
32
33pub type BoxStream<'a, T> = Pin<Box<maybe_add_send!(dyn futures::Stream<Item = T> + 'a)>>;
35
36#[apply(async_trait_maybe_send!)]
37pub trait NextOrPending {
38 type Output;
39
40 async fn next_or_pending(&mut self) -> Self::Output;
41
42 async fn ok(&mut self) -> anyhow::Result<Self::Output>;
43}
44
45#[apply(async_trait_maybe_send!)]
46impl<S> NextOrPending for S
47where
48 S: futures::Stream + Unpin + MaybeSend,
49 S::Item: MaybeSend,
50{
51 type Output = S::Item;
52
53 async fn ok(&mut self) -> anyhow::Result<Self::Output> {
56 self.next()
57 .await
58 .map_or_else(|| Err(format_err!("Stream was unexpectedly closed")), Ok)
59 }
60
61 async fn next_or_pending(&mut self) -> Self::Output {
66 if let Some(item) = self.next().await {
67 item
68 } else {
69 debug!(target: LOG_CORE, "Stream ended in next_or_pending, pending forever to avoid throwing an error on shutdown");
70 std::future::pending().await
71 }
72 }
73}
74
75#[derive(Hash, Clone, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
85pub struct SafeUrl(Url);
87
88impl SafeUrl {
89 pub fn parse(url_str: &str) -> Result<Self, ParseError> {
90 Url::parse(url_str).map(SafeUrl)
91 }
92
93 pub fn to_unsafe(self) -> Url {
96 self.0
97 }
98
99 #[allow(clippy::result_unit_err)] pub fn set_username(&mut self, username: &str) -> Result<(), ()> {
101 self.0.set_username(username)
102 }
103
104 #[allow(clippy::result_unit_err)] pub fn set_password(&mut self, password: Option<&str>) -> Result<(), ()> {
106 self.0.set_password(password)
107 }
108
109 #[allow(clippy::result_unit_err)] pub fn without_auth(&self) -> Result<Self, ()> {
111 let mut url = self.clone();
112
113 url.set_username("").and_then(|()| url.set_password(None))?;
114
115 Ok(url)
116 }
117
118 pub fn host(&self) -> Option<Host<&str>> {
119 self.0.host()
120 }
121 pub fn host_str(&self) -> Option<&str> {
122 self.0.host_str()
123 }
124 pub fn scheme(&self) -> &str {
125 self.0.scheme()
126 }
127 pub fn port(&self) -> Option<u16> {
128 self.0.port()
129 }
130 pub fn port_or_known_default(&self) -> Option<u16> {
131 self.0.port_or_known_default()
132 }
133 pub fn path(&self) -> &str {
134 self.0.path()
135 }
136 pub fn as_str(&self) -> &str {
138 self.0.as_str()
139 }
140 pub fn username(&self) -> &str {
141 self.0.username()
142 }
143 pub fn password(&self) -> Option<&str> {
144 self.0.password()
145 }
146 pub fn join(&self, input: &str) -> Result<Self, ParseError> {
147 self.0.join(input).map(SafeUrl)
148 }
149
150 #[allow(clippy::case_sensitive_file_extension_comparisons)]
153 pub fn is_onion_address(&self) -> bool {
154 let host = self.host_str().unwrap_or_default();
155
156 host.ends_with(".onion")
157 }
158
159 pub fn fragment(&self) -> Option<&str> {
160 self.0.fragment()
161 }
162
163 pub fn set_fragment(&mut self, arg: Option<&str>) {
164 self.0.set_fragment(arg);
165 }
166}
167
168static SHOW_SECRETS: LazyLock<bool> = LazyLock::new(|| {
169 let enable = is_env_var_set(FM_DEBUG_SHOW_SECRETS_ENV);
170
171 if enable {
172 warn!(target: LOG_CORE, "{} enabled. Please don't use in production.", FM_DEBUG_SHOW_SECRETS_ENV);
173 }
174
175 enable
176});
177
178impl Display for SafeUrl {
179 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
180 write!(f, "{}://", self.0.scheme())?;
181
182 if !self.0.username().is_empty() {
183 let show_secrets = *SHOW_SECRETS;
184 if show_secrets {
185 write!(f, "{}", self.0.username())?;
186 } else {
187 write!(f, "REDACTEDUSER")?;
188 }
189
190 if self.0.password().is_some() {
191 if show_secrets {
192 write!(
193 f,
194 ":{}",
195 self.0.password().expect("Just checked it's checked")
196 )?;
197 } else {
198 write!(f, ":REDACTEDPASS")?;
199 }
200 }
201
202 write!(f, "@")?;
203 }
204
205 if let Some(host) = self.0.host_str() {
206 write!(f, "{host}")?;
207 }
208
209 if let Some(port) = self.0.port() {
210 write!(f, ":{port}")?;
211 }
212
213 write!(f, "{}", self.0.path())?;
214
215 Ok(())
216 }
217}
218
219impl Debug for SafeUrl {
220 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
221 write!(f, "SafeUrl(")?;
222 Display::fmt(self, f)?;
223 write!(f, ")")?;
224 Ok(())
225 }
226}
227
228impl From<Url> for SafeUrl {
229 fn from(u: Url) -> Self {
230 Self(u)
231 }
232}
233
234impl FromStr for SafeUrl {
235 type Err = ParseError;
236
237 #[inline]
238 fn from_str(input: &str) -> Result<Self, ParseError> {
239 Self::parse(input)
240 }
241}
242
243#[cfg(not(target_family = "wasm"))]
246pub fn write_new<P: AsRef<Path>, C: AsRef<[u8]>>(path: P, contents: C) -> io::Result<()> {
247 let mut file = fs::File::options()
248 .write(true)
249 .create_new(true)
250 .open(path)?;
251 file.write_all(contents.as_ref())?;
252 file.sync_all()?;
253 Ok(())
254}
255
256#[cfg(not(target_family = "wasm"))]
257pub fn write_overwrite<P: AsRef<Path>, C: AsRef<[u8]>>(path: P, contents: C) -> io::Result<()> {
258 fs::File::options()
259 .write(true)
260 .create(true)
261 .truncate(true)
262 .open(path)?
263 .write_all(contents.as_ref())
264}
265
266#[cfg(not(target_family = "wasm"))]
267pub async fn write_overwrite_async<P: AsRef<Path>, C: AsRef<[u8]>>(
268 path: P,
269 contents: C,
270) -> io::Result<()> {
271 tokio::fs::OpenOptions::new()
272 .write(true)
273 .create(true)
274 .truncate(true)
275 .open(path)
276 .await?
277 .write_all(contents.as_ref())
278 .await
279}
280
281#[cfg(not(target_family = "wasm"))]
282pub async fn write_new_async<P: AsRef<Path>, C: AsRef<[u8]>>(
283 path: P,
284 contents: C,
285) -> io::Result<()> {
286 tokio::fs::OpenOptions::new()
287 .write(true)
288 .create_new(true)
289 .open(path)
290 .await?
291 .write_all(contents.as_ref())
292 .await
293}
294
295#[derive(Debug, Clone)]
296pub struct Spanned<T> {
297 value: T,
298 span: Span,
299}
300
301impl<T> Spanned<T> {
302 pub async fn new<F: Future<Output = T>>(span: Span, make: F) -> Self {
303 Self::try_new::<Infallible, _>(span, async { Ok(make.await) })
304 .await
305 .unwrap()
306 }
307
308 pub async fn try_new<E, F: Future<Output = Result<T, E>>>(
309 span: Span,
310 make: F,
311 ) -> Result<Self, E> {
312 let span2 = span.clone();
313 async {
314 Ok(Self {
315 value: make.await?,
316 span: span2,
317 })
318 }
319 .instrument(span)
320 .await
321 }
322
323 pub fn borrow(&self) -> Spanned<&T> {
324 Spanned {
325 value: &self.value,
326 span: self.span.clone(),
327 }
328 }
329
330 pub fn map<U>(self, map: impl Fn(T) -> U) -> Spanned<U> {
331 Spanned {
332 value: map(self.value),
333 span: self.span,
334 }
335 }
336
337 pub fn borrow_mut(&mut self) -> Spanned<&mut T> {
338 Spanned {
339 value: &mut self.value,
340 span: self.span.clone(),
341 }
342 }
343
344 pub fn with_sync<O, F: FnOnce(T) -> O>(self, f: F) -> O {
345 let _g = self.span.enter();
346 f(self.value)
347 }
348
349 pub async fn with<Fut: Future, F: FnOnce(T) -> Fut>(self, f: F) -> Fut::Output {
350 async { f(self.value).await }.instrument(self.span).await
351 }
352
353 pub fn span(&self) -> Span {
354 self.span.clone()
355 }
356
357 pub fn value(&self) -> &T {
358 &self.value
359 }
360
361 pub fn value_mut(&mut self) -> &mut T {
362 &mut self.value
363 }
364
365 pub fn into_value(self) -> T {
366 self.value
367 }
368}
369
370pub fn handle_version_hash_command(version_hash: &str) {
373 let mut args = std::env::args();
374 if let Some(ref arg) = args.nth(1)
375 && arg.as_str() == "version-hash"
376 {
377 println!("{version_hash}");
378 std::process::exit(0);
379 }
380}
381
382pub async fn retry<F, Fut, T>(
409 op_name: impl Into<String>,
410 strategy: impl backoff_util::Backoff,
411 op_fn: F,
412) -> Result<T, anyhow::Error>
413where
414 F: Fn() -> Fut,
415 Fut: Future<Output = Result<T, anyhow::Error>>,
416{
417 let mut strategy = strategy;
418 let op_name = op_name.into();
419 let mut attempts: u64 = 0;
420 loop {
421 attempts += 1;
422 match op_fn().await {
423 Ok(result) => return Ok(result),
424 Err(err) => {
425 if let Some(interval) = strategy.next() {
426 debug!(
428 target: LOG_CORE,
429 err = %err.fmt_compact_anyhow(),
430 %attempts,
431 interval = interval.as_secs(),
432 "{} failed, retrying",
433 op_name,
434 );
435 runtime::sleep(interval).await;
436 } else {
437 warn!(
438 target: LOG_CORE,
439 err = %err.fmt_compact_anyhow(),
440 %attempts,
441 "{} failed",
442 op_name,
443 );
444 return Err(err);
445 }
446 }
447 }
448 }
449}
450
451pub fn get_median(vals: &[u64]) -> Option<u64> {
453 if vals.is_empty() {
454 return None;
455 }
456 let len = vals.len();
457 let mid = len / 2;
458
459 if len % 2 == 0 {
460 Some(u64::midpoint(vals[mid - 1], vals[mid]))
461 } else {
462 Some(vals[mid])
463 }
464}
465
466pub fn get_average(vals: &[u64]) -> Option<u64> {
468 if vals.is_empty() {
469 return None;
470 }
471
472 let sum: u64 = vals.iter().sum();
473 Some(sum / vals.len() as u64)
474}
475
476#[cfg(test)]
477mod tests;