1pub mod backoff_util;
2pub mod broadcaststream;
4mod error;
5pub mod update_merge;
6
7use std::convert::Infallible;
8use std::fmt::{Debug, Display, Formatter};
9use std::future::Future;
10use std::hash::Hash;
11use std::io::Write;
12use std::path::Path;
13use std::pin::Pin;
14use std::str::FromStr;
15use std::sync::LazyLock;
16use std::{fs, io};
17
18use anyhow::format_err;
19pub use error::*;
20use fedimint_logging::LOG_CORE;
21use futures::StreamExt;
22use serde::{Deserialize, Serialize};
23use tokio::io::AsyncWriteExt;
24use tracing::{Instrument, Span, debug, warn};
25use url::{Host, ParseError, Url};
26
27use crate::envs::{FM_DEBUG_SHOW_SECRETS_ENV, is_env_var_set};
28use crate::task::MaybeSend;
29use crate::{apply, async_trait_maybe_send, maybe_add_send, runtime};
30
31pub type BoxFuture<'a, T> = Pin<Box<maybe_add_send!(dyn Future<Output = T> + 'a)>>;
33
34pub type BoxStream<'a, T> = Pin<Box<maybe_add_send!(dyn futures::Stream<Item = T> + 'a)>>;
36
37#[apply(async_trait_maybe_send!)]
38pub trait NextOrPending {
39 type Output;
40
41 async fn next_or_pending(&mut self) -> Self::Output;
42
43 async fn ok(&mut self) -> anyhow::Result<Self::Output>;
44}
45
46#[apply(async_trait_maybe_send!)]
47impl<S> NextOrPending for S
48where
49 S: futures::Stream + Unpin + MaybeSend,
50 S::Item: MaybeSend,
51{
52 type Output = S::Item;
53
54 async fn ok(&mut self) -> anyhow::Result<Self::Output> {
57 self.next()
58 .await
59 .map_or_else(|| Err(format_err!("Stream was unexpectedly closed")), Ok)
60 }
61
62 async fn next_or_pending(&mut self) -> Self::Output {
67 if let Some(item) = self.next().await {
68 item
69 } else {
70 debug!(target: LOG_CORE, "Stream ended in next_or_pending, pending forever to avoid throwing an error on shutdown");
71 std::future::pending().await
72 }
73 }
74}
75
76#[derive(Hash, Clone, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
86pub struct SafeUrl(Url);
88
89impl SafeUrl {
90 pub fn parse(url_str: &str) -> Result<Self, ParseError> {
91 Url::parse(url_str).map(SafeUrl)
92 }
93
94 pub fn to_unsafe(self) -> Url {
97 self.0
98 }
99
100 #[allow(clippy::result_unit_err)] pub fn set_username(&mut self, username: &str) -> Result<(), ()> {
102 self.0.set_username(username)
103 }
104
105 #[allow(clippy::result_unit_err)] pub fn set_password(&mut self, password: Option<&str>) -> Result<(), ()> {
107 self.0.set_password(password)
108 }
109
110 #[allow(clippy::result_unit_err)] pub fn without_auth(&self) -> Result<Self, ()> {
112 let mut url = self.clone();
113
114 url.set_username("").and_then(|()| url.set_password(None))?;
115
116 Ok(url)
117 }
118
119 pub fn host(&self) -> Option<Host<&str>> {
120 self.0.host()
121 }
122 pub fn host_str(&self) -> Option<&str> {
123 self.0.host_str()
124 }
125 pub fn scheme(&self) -> &str {
126 self.0.scheme()
127 }
128 pub fn port(&self) -> Option<u16> {
129 self.0.port()
130 }
131 pub fn port_or_known_default(&self) -> Option<u16> {
132 self.0.port_or_known_default()
133 }
134 pub fn path(&self) -> &str {
135 self.0.path()
136 }
137 pub fn as_str(&self) -> &str {
139 self.0.as_str()
140 }
141 pub fn username(&self) -> &str {
142 self.0.username()
143 }
144 pub fn password(&self) -> Option<&str> {
145 self.0.password()
146 }
147 pub fn join(&self, input: &str) -> Result<Self, ParseError> {
148 self.0.join(input).map(SafeUrl)
149 }
150
151 #[allow(clippy::case_sensitive_file_extension_comparisons)]
154 pub fn is_onion_address(&self) -> bool {
155 let host = self.host_str().unwrap_or_default();
156
157 host.ends_with(".onion")
158 }
159
160 pub fn fragment(&self) -> Option<&str> {
161 self.0.fragment()
162 }
163
164 pub fn set_fragment(&mut self, arg: Option<&str>) {
165 self.0.set_fragment(arg);
166 }
167}
168
169static SHOW_SECRETS: LazyLock<bool> = LazyLock::new(|| {
170 let enable = is_env_var_set(FM_DEBUG_SHOW_SECRETS_ENV);
171
172 if enable {
173 warn!(target: LOG_CORE, "{} enabled. Please don't use in production.", FM_DEBUG_SHOW_SECRETS_ENV);
174 }
175
176 enable
177});
178
179impl Display for SafeUrl {
180 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
181 write!(f, "{}://", self.0.scheme())?;
182
183 if !self.0.username().is_empty() {
184 let show_secrets = *SHOW_SECRETS;
185 if show_secrets {
186 write!(f, "{}", self.0.username())?;
187 } else {
188 write!(f, "REDACTEDUSER")?;
189 }
190
191 if self.0.password().is_some() {
192 if show_secrets {
193 write!(
194 f,
195 ":{}",
196 self.0.password().expect("Just checked it's checked")
197 )?;
198 } else {
199 write!(f, ":REDACTEDPASS")?;
200 }
201 }
202
203 write!(f, "@")?;
204 }
205
206 if let Some(host) = self.0.host_str() {
207 write!(f, "{host}")?;
208 }
209
210 if let Some(port) = self.0.port() {
211 write!(f, ":{port}")?;
212 }
213
214 write!(f, "{}", self.0.path())?;
215
216 Ok(())
217 }
218}
219
220impl Debug for SafeUrl {
221 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
222 write!(f, "SafeUrl(")?;
223 Display::fmt(self, f)?;
224 write!(f, ")")?;
225 Ok(())
226 }
227}
228
229impl From<Url> for SafeUrl {
230 fn from(u: Url) -> Self {
231 Self(u)
232 }
233}
234
235impl FromStr for SafeUrl {
236 type Err = ParseError;
237
238 #[inline]
239 fn from_str(input: &str) -> Result<Self, ParseError> {
240 Self::parse(input)
241 }
242}
243
244#[cfg(not(target_family = "wasm"))]
247pub fn write_new<P: AsRef<Path>, C: AsRef<[u8]>>(path: P, contents: C) -> io::Result<()> {
248 fs::File::options()
249 .write(true)
250 .create_new(true)
251 .open(path)?
252 .write_all(contents.as_ref())
253}
254
255#[cfg(not(target_family = "wasm"))]
256pub fn write_overwrite<P: AsRef<Path>, C: AsRef<[u8]>>(path: P, contents: C) -> io::Result<()> {
257 fs::File::options()
258 .write(true)
259 .create(true)
260 .truncate(true)
261 .open(path)?
262 .write_all(contents.as_ref())
263}
264
265#[cfg(not(target_family = "wasm"))]
266pub async fn write_overwrite_async<P: AsRef<Path>, C: AsRef<[u8]>>(
267 path: P,
268 contents: C,
269) -> io::Result<()> {
270 tokio::fs::OpenOptions::new()
271 .write(true)
272 .create(true)
273 .truncate(true)
274 .open(path)
275 .await?
276 .write_all(contents.as_ref())
277 .await
278}
279
280#[cfg(not(target_family = "wasm"))]
281pub async fn write_new_async<P: AsRef<Path>, C: AsRef<[u8]>>(
282 path: P,
283 contents: C,
284) -> io::Result<()> {
285 tokio::fs::OpenOptions::new()
286 .write(true)
287 .create_new(true)
288 .open(path)
289 .await?
290 .write_all(contents.as_ref())
291 .await
292}
293
294#[derive(Debug, Clone)]
295pub struct Spanned<T> {
296 value: T,
297 span: Span,
298}
299
300impl<T> Spanned<T> {
301 pub async fn new<F: Future<Output = T>>(span: Span, make: F) -> Self {
302 Self::try_new::<Infallible, _>(span, async { Ok(make.await) })
303 .await
304 .unwrap()
305 }
306
307 pub async fn try_new<E, F: Future<Output = Result<T, E>>>(
308 span: Span,
309 make: F,
310 ) -> Result<Self, E> {
311 let span2 = span.clone();
312 async {
313 Ok(Self {
314 value: make.await?,
315 span: span2,
316 })
317 }
318 .instrument(span)
319 .await
320 }
321
322 pub fn borrow(&self) -> Spanned<&T> {
323 Spanned {
324 value: &self.value,
325 span: self.span.clone(),
326 }
327 }
328
329 pub fn map<U>(self, map: impl Fn(T) -> U) -> Spanned<U> {
330 Spanned {
331 value: map(self.value),
332 span: self.span,
333 }
334 }
335
336 pub fn borrow_mut(&mut self) -> Spanned<&mut T> {
337 Spanned {
338 value: &mut self.value,
339 span: self.span.clone(),
340 }
341 }
342
343 pub fn with_sync<O, F: FnOnce(T) -> O>(self, f: F) -> O {
344 let _g = self.span.enter();
345 f(self.value)
346 }
347
348 pub async fn with<Fut: Future, F: FnOnce(T) -> Fut>(self, f: F) -> Fut::Output {
349 async { f(self.value).await }.instrument(self.span).await
350 }
351
352 pub fn span(&self) -> Span {
353 self.span.clone()
354 }
355
356 pub fn value(&self) -> &T {
357 &self.value
358 }
359
360 pub fn value_mut(&mut self) -> &mut T {
361 &mut self.value
362 }
363
364 pub fn into_value(self) -> T {
365 self.value
366 }
367}
368
369pub fn handle_version_hash_command(version_hash: &str) {
372 let mut args = std::env::args();
373 if let Some(ref arg) = args.nth(1) {
374 if arg.as_str() == "version-hash" {
375 println!("{version_hash}");
376 std::process::exit(0);
377 }
378 }
379}
380
381pub async fn retry<F, Fut, T>(
408 op_name: impl Into<String>,
409 strategy: impl backoff_util::Backoff,
410 op_fn: F,
411) -> Result<T, anyhow::Error>
412where
413 F: Fn() -> Fut,
414 Fut: Future<Output = Result<T, anyhow::Error>>,
415{
416 let mut strategy = strategy;
417 let op_name = op_name.into();
418 let mut attempts: u64 = 0;
419 loop {
420 attempts += 1;
421 match op_fn().await {
422 Ok(result) => return Ok(result),
423 Err(err) => {
424 if let Some(interval) = strategy.next() {
425 debug!(
427 target: LOG_CORE,
428 err = %err.fmt_compact_anyhow(),
429 %attempts,
430 interval = interval.as_secs(),
431 "{} failed, retrying",
432 op_name,
433 );
434 runtime::sleep(interval).await;
435 } else {
436 warn!(
437 target: LOG_CORE,
438 err = %err.fmt_compact_anyhow(),
439 %attempts,
440 "{} failed",
441 op_name,
442 );
443 return Err(err);
444 }
445 }
446 }
447 }
448}
449
450pub fn get_median(vals: &[u64]) -> Option<u64> {
452 if vals.is_empty() {
453 return None;
454 }
455 let len = vals.len();
456 let mid = len / 2;
457
458 if len % 2 == 0 {
459 Some(u64::midpoint(vals[mid - 1], vals[mid]))
460 } else {
461 Some(vals[mid])
462 }
463}
464
465pub fn get_average(vals: &[u64]) -> Option<u64> {
467 if vals.is_empty() {
468 return None;
469 }
470
471 let sum: u64 = vals.iter().sum();
472 Some(sum / vals.len() as u64)
473}
474
475#[cfg(test)]
476mod tests;