1pub mod error;
2pub mod http;
3pub mod iroh;
4#[cfg(all(feature = "tor", not(target_family = "wasm")))]
5pub mod tor;
6pub mod ws;
7
8use std::collections::{BTreeMap, HashMap};
9use std::fmt::{self, Debug};
10use std::pin::Pin;
11use std::sync::Arc;
12use std::time::Duration;
13
14use anyhow::{anyhow, bail};
15use async_trait::async_trait;
16use fedimint_core::envs::{FM_WS_API_CONNECT_OVERRIDES_ENV, parse_kv_list_from_env};
17use fedimint_core::module::{ApiMethod, ApiRequestErased};
18use fedimint_core::util::backoff_util::{FibonacciBackoff, custom_backoff};
19use fedimint_core::util::{FmtCompact, FmtCompactAnyhow, SafeUrl};
20use fedimint_core::{apply, async_trait_maybe_send};
21use fedimint_logging::{LOG_CLIENT_NET_API, LOG_NET};
22use reqwest::Method;
23use serde_json::Value;
24use tokio::sync::OnceCell;
25use tracing::trace;
26
27use crate::error::ServerError;
28use crate::ws::WebsocketConnector;
29
30pub type ServerResult<T> = Result<T, ServerError>;
31
32type ConnectorInitFn = Arc<
34 dyn Fn() -> Pin<Box<dyn Future<Output = anyhow::Result<DynConnector>> + Send>> + Send + Sync,
35>;
36
37#[derive(Debug, Clone)]
42#[allow(clippy::struct_excessive_bools)] pub struct ConnectorRegistryBuilder {
44 connection_overrides: BTreeMap<SafeUrl, SafeUrl>,
49
50 iroh_enable: bool,
52 iroh_dns: Option<SafeUrl>,
54 iroh_next: bool,
56 iroh_pkarr_dht: bool,
58
59 ws_enable: bool,
61 ws_force_tor: bool,
62
63 http_enable: bool,
65}
66
67impl ConnectorRegistryBuilder {
68 #[allow(clippy::unused_async)] pub async fn bind(self) -> anyhow::Result<ConnectorRegistry> {
70 let mut connectors_lazy: BTreeMap<String, (ConnectorInitFn, OnceCell<DynConnector>)> =
72 BTreeMap::new();
73
74 let builder_ws = self.clone();
76 let ws_connector_init = Arc::new(move || {
77 let builder = builder_ws.clone();
78 Box::pin(async move { builder.build_ws_connector().await })
79 as Pin<Box<dyn Future<Output = anyhow::Result<DynConnector>> + Send>>
80 });
81 connectors_lazy.insert("ws".into(), (ws_connector_init.clone(), OnceCell::new()));
82 connectors_lazy.insert("wss".into(), (ws_connector_init.clone(), OnceCell::new()));
83
84 let builder_iroh = self.clone();
86 connectors_lazy.insert(
87 "iroh".into(),
88 (
89 Arc::new(move || {
90 let builder = builder_iroh.clone();
91 Box::pin(async move { builder.build_iroh_connector().await })
92 as Pin<Box<dyn Future<Output = anyhow::Result<DynConnector>> + Send>>
93 }),
94 OnceCell::new(),
95 ),
96 );
97
98 let builder_http = self.clone();
99 let http_connector_init = Arc::new(move || {
100 let builder = builder_http.clone();
101 Box::pin(async move { builder.build_http_connector() })
102 as Pin<Box<dyn Future<Output = anyhow::Result<DynConnector>> + Send>>
103 });
104
105 connectors_lazy.insert(
106 "http".into(),
107 (http_connector_init.clone(), OnceCell::new()),
108 );
109 connectors_lazy.insert(
110 "https".into(),
111 (http_connector_init.clone(), OnceCell::new()),
112 );
113
114 Ok(ConnectorRegistry {
115 connectors_lazy: Arc::new(connectors_lazy),
116 connection_overrides: self.connection_overrides,
117 })
118 }
119
120 pub async fn build_iroh_connector(&self) -> anyhow::Result<DynConnector> {
121 if !self.iroh_enable {
122 bail!("Iroh connector not enabled");
123 }
124 Ok(Arc::new(
125 iroh::IrohConnector::new(self.iroh_dns.clone(), self.iroh_pkarr_dht, self.iroh_next)
126 .await?,
127 ) as DynConnector)
128 }
129
130 pub async fn build_ws_connector(&self) -> anyhow::Result<DynConnector> {
131 if !self.ws_enable {
132 bail!("Websocket connector not enabled");
133 }
134
135 match self.ws_force_tor {
136 #[cfg(all(feature = "tor", not(target_family = "wasm")))]
137 true => {
138 use crate::tor::TorConnector;
139
140 Ok(Arc::new(TorConnector::bootstrap().await?) as DynConnector)
141 }
142
143 false => Ok(Arc::new(WebsocketConnector::new()) as DynConnector),
144 #[allow(unreachable_patterns)]
145 _ => bail!("Tor requested, but not support not compiled in"),
146 }
147 }
148
149 pub fn build_http_connector(&self) -> anyhow::Result<DynConnector> {
150 if !self.http_enable {
151 bail!("Http connector not enabled");
152 }
153
154 Ok(Arc::new(crate::http::HttpConnector::default()) as DynConnector)
155 }
156
157 pub fn iroh_pkarr_dht(self, enable: bool) -> Self {
158 Self {
159 iroh_pkarr_dht: enable,
160 ..self
161 }
162 }
163
164 pub fn iroh_next(self, enable: bool) -> Self {
165 Self {
166 iroh_next: enable,
167 ..self
168 }
169 }
170
171 pub fn ws_force_tor(self, enable: bool) -> Self {
172 Self {
173 ws_force_tor: enable,
174 ..self
175 }
176 }
177
178 pub fn set_iroh_dns(self, url: SafeUrl) -> Self {
179 Self {
180 iroh_dns: Some(url),
181 ..self
182 }
183 }
184
185 pub fn with_env_var_overrides(mut self) -> anyhow::Result<Self> {
187 for (k, v) in parse_kv_list_from_env::<_, SafeUrl>(FM_WS_API_CONNECT_OVERRIDES_ENV)? {
189 self = self.with_connection_override(k, v);
190 }
191
192 Ok(Self { ..self })
193 }
194
195 pub fn with_connection_override(
196 mut self,
197 original_url: SafeUrl,
198 replacement_url: SafeUrl,
199 ) -> Self {
200 self.connection_overrides
201 .insert(original_url, replacement_url);
202 self
203 }
204}
205
206#[derive(Clone)]
219pub struct ConnectorRegistry {
220 connectors_lazy: Arc<BTreeMap<String, (ConnectorInitFn, OnceCell<DynConnector>)>>,
222 connection_overrides: BTreeMap<SafeUrl, SafeUrl>,
224}
225
226impl fmt::Debug for ConnectorRegistry {
227 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
228 f.debug_struct("ConnectorRegistry")
229 .field("connectors_lazy", &self.connectors_lazy.len())
230 .field("connection_overrides", &self.connection_overrides)
231 .finish()
232 }
233}
234
235impl ConnectorRegistry {
236 pub fn build_from_client_defaults() -> ConnectorRegistryBuilder {
241 ConnectorRegistryBuilder {
242 iroh_enable: true,
243 iroh_dns: None,
244 iroh_pkarr_dht: false,
245 iroh_next: true,
246 ws_enable: true,
247 ws_force_tor: false,
248 http_enable: true,
249
250 connection_overrides: BTreeMap::default(),
251 }
252 }
253
254 pub fn build_from_server_defaults() -> ConnectorRegistryBuilder {
257 ConnectorRegistryBuilder {
258 iroh_enable: true,
259 iroh_dns: None,
260 iroh_pkarr_dht: true,
261 iroh_next: true,
262 ws_enable: true,
263 ws_force_tor: false,
264 http_enable: false,
265
266 connection_overrides: BTreeMap::default(),
267 }
268 }
269
270 pub fn build_from_testing_defaults() -> ConnectorRegistryBuilder {
273 ConnectorRegistryBuilder {
274 iroh_enable: true,
275 iroh_dns: None,
276 iroh_pkarr_dht: false,
277 iroh_next: false,
278 ws_enable: true,
279 ws_force_tor: false,
280 http_enable: true,
281
282 connection_overrides: BTreeMap::default(),
283 }
284 }
285
286 pub fn build_from_client_env() -> anyhow::Result<ConnectorRegistryBuilder> {
289 let builder = Self::build_from_client_defaults().with_env_var_overrides()?;
290 Ok(builder)
291 }
292
293 pub fn build_from_server_env() -> anyhow::Result<ConnectorRegistryBuilder> {
296 let builder = Self::build_from_server_defaults().with_env_var_overrides()?;
297 Ok(builder)
298 }
299
300 pub fn build_from_testing_env() -> anyhow::Result<ConnectorRegistryBuilder> {
303 let builder = Self::build_from_testing_defaults().with_env_var_overrides()?;
304 Ok(builder)
305 }
306
307 pub async fn connect_guardian(
312 &self,
313 url: &SafeUrl,
314 api_secret: Option<&str>,
315 ) -> ServerResult<DynGuaridianConnection> {
316 trace!(
317 target: LOG_NET,
318 %url,
319 "Connection requested"
320 );
321 let url = match self.connection_overrides.get(url) {
322 Some(replacement) => {
323 trace!(
324 target: LOG_NET,
325 original_url = %url,
326 replacement_url = %replacement,
327 "Using a connectivity override for connection"
328 );
329
330 replacement
331 }
332 None => url,
333 };
334
335 let connector_key = url.scheme();
336
337 let Some(connector_lazy) = self.connectors_lazy.get(connector_key) else {
338 return Err(ServerError::InvalidEndpoint(anyhow!(
339 "Unsupported scheme: {}; missing endpoint handler",
340 url.scheme()
341 )));
342 };
343
344 let init_fn = connector_lazy.0.clone();
346
347 let conn = connector_lazy
348 .1
349 .get_or_try_init(|| async move { init_fn().await })
350 .await
351 .map_err(|e| {
352 ServerError::Transport(anyhow!(
353 "Connector failed to initialize: {}",
354 e.fmt_compact_anyhow()
355 ))
356 })?
357 .connect_guardian(url, api_secret)
358 .await
359 .inspect_err(|err| {
360 trace!(
361 target: LOG_NET,
362 %url,
363 err = %err.fmt_compact(),
364 "Connection failed"
365 );
366 })?;
367
368 trace!(
369 target: LOG_NET,
370 %url,
371 "Connection returned"
372 );
373 Ok(conn)
374 }
375
376 pub async fn connect_gateway(&self, url: &SafeUrl) -> anyhow::Result<DynGatewayConnection> {
381 let url = match self.connection_overrides.get(url) {
382 Some(replacement) => {
383 trace!(
384 target: LOG_NET,
385 original_url = %url,
386 replacement_url = %replacement,
387 "Using a connectivity override for connection"
388 );
389
390 replacement
391 }
392 None => url,
393 };
394
395 let connector_key = url.scheme();
396
397 let Some(connector_lazy) = self.connectors_lazy.get(connector_key) else {
398 return Err(anyhow!(
399 "Unsupported scheme: {}; missing endpoint handler",
400 url.scheme()
401 ));
402 };
403
404 let init_fn = connector_lazy.0.clone();
406
407 connector_lazy
408 .1
409 .get_or_try_init(|| async move { init_fn().await })
410 .await
411 .map_err(|e| {
412 ServerError::Transport(anyhow!(
413 "Connector failed to initialize: {}",
414 e.fmt_compact_anyhow()
415 ))
416 })?
417 .connect_gateway(url)
418 .await
419 }
420}
421pub type DynConnector = Arc<dyn Connector>;
422
423#[async_trait]
424pub trait Connector: Send + Sync + 'static + Debug {
425 async fn connect_guardian(
426 &self,
427 url: &SafeUrl,
428 api_secret: Option<&str>,
429 ) -> ServerResult<DynGuaridianConnection>;
430
431 async fn connect_gateway(&self, url: &SafeUrl) -> anyhow::Result<DynGatewayConnection>;
432}
433
434#[apply(async_trait_maybe_send!)]
437pub trait IConnection: Debug + Send + Sync + 'static {
438 fn is_connected(&self) -> bool;
439
440 async fn await_disconnection(&self);
441}
442
443pub type DynGuaridianConnection = Arc<dyn IGuardianConnection>;
445
446#[async_trait]
448pub trait IGuardianConnection: IConnection + Debug + Send + Sync + 'static {
449 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> ServerResult<Value>;
450
451 fn into_dyn(self) -> DynGuaridianConnection
452 where
453 Self: Sized,
454 {
455 Arc::new(self)
456 }
457}
458
459pub type DynGatewayConnection = Arc<dyn IGatewayConnection>;
461
462#[apply(async_trait_maybe_send!)]
464pub trait IGatewayConnection: IConnection + Debug + Send + Sync + 'static {
465 async fn request(
466 &self,
467 password: Option<String>,
468 method: Method,
469 route: &str,
470 payload: Option<Value>,
471 ) -> ServerResult<Value>;
472
473 fn into_dyn(self) -> DynGatewayConnection
474 where
475 Self: Sized,
476 {
477 Arc::new(self)
478 }
479}
480
481#[derive(Debug)]
482pub struct ConnectionPool<T: IConnection + ?Sized> {
483 connectors: ConnectorRegistry,
485
486 #[allow(clippy::type_complexity)]
492 connections: Arc<tokio::sync::Mutex<HashMap<SafeUrl, Arc<ConnectionState<T>>>>>,
493}
494
495impl<T: IConnection + ?Sized> Clone for ConnectionPool<T> {
496 fn clone(&self) -> Self {
497 Self {
498 connectors: self.connectors.clone(),
499 connections: self.connections.clone(),
500 }
501 }
502}
503
504impl<T: IConnection + ?Sized> ConnectionPool<T> {
505 pub fn new(connectors: ConnectorRegistry) -> Self {
506 Self {
507 connectors,
508 connections: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
509 }
510 }
511
512 pub async fn get_or_create_connection<F, Fut>(
513 &self,
514 url: &SafeUrl,
515 api_secret: Option<&str>,
516 create_connection: F,
517 ) -> ServerResult<Arc<T>>
518 where
519 F: Fn(SafeUrl, Option<String>, ConnectorRegistry) -> Fut + Clone + Send + Sync + 'static,
520 Fut: Future<Output = ServerResult<Arc<T>>> + Send + 'static,
521 {
522 let mut pool_locked = self.connections.lock().await;
523
524 let pool_entry_arc = pool_locked
525 .entry(url.to_owned())
526 .and_modify(|entry_arc| {
527 if let Some(existing_conn) = entry_arc.connection.get()
533 && !existing_conn.is_connected(){
534 trace!(target: LOG_CLIENT_NET_API, %url, "Existing connection is disconnected, removing from pool");
535 *entry_arc = Arc::new(ConnectionState::new_reconnecting());
536 }
537 })
538 .or_insert_with(|| Arc::new(ConnectionState::new_initial()))
539 .clone();
540
541 drop(pool_locked);
543
544 let conn = pool_entry_arc
545 .connection
546 .get_or_try_init(|| async {
554 let retry_delay = pool_entry_arc.pre_reconnect_delay();
555 fedimint_core::runtime::sleep(retry_delay).await;
556
557 trace!(target: LOG_CLIENT_NET_API, %url, "Attempting to create a new connection");
558 create_connection(
559 url.clone(),
560 api_secret.map(std::string::ToString::to_string),
561 self.connectors.clone(),
562 )
563 .await
564 })
565 .await?;
566
567 trace!(target: LOG_CLIENT_NET_API, %url, "Connection ready");
568 Ok(conn.clone())
569 }
570}
571
572#[derive(Debug)]
575struct ConnectionStateInner {
576 fresh: bool,
577 backoff: FibonacciBackoff,
578}
579
580#[derive(Debug)]
581pub struct ConnectionState<T: ?Sized> {
582 pub connection: tokio::sync::OnceCell<Arc<T>>,
584 inner: std::sync::Mutex<ConnectionStateInner>,
588}
589
590impl<T: ?Sized> ConnectionState<T> {
591 pub fn new_initial() -> Self {
593 Self {
594 connection: OnceCell::new(),
595 inner: std::sync::Mutex::new(ConnectionStateInner {
596 fresh: true,
597 backoff: custom_backoff(
598 Duration::from_millis(5),
600 Duration::from_secs(30),
601 None,
602 ),
603 }),
604 }
605 }
606
607 pub fn new_reconnecting() -> Self {
610 Self {
611 connection: OnceCell::new(),
612 inner: std::sync::Mutex::new(ConnectionStateInner {
613 fresh: false,
615 backoff: custom_backoff(
616 Duration::from_millis(500),
618 Duration::from_secs(30),
619 None,
620 ),
621 }),
622 }
623 }
624
625 pub fn pre_reconnect_delay(&self) -> Duration {
628 let mut backoff_locked = self.inner.lock().expect("Locking failed");
629 let fresh = backoff_locked.fresh;
630
631 backoff_locked.fresh = false;
632
633 if fresh {
634 Duration::default()
635 } else {
636 backoff_locked.backoff.next().expect("Keeps retrying")
637 }
638 }
639}