1pub mod error;
2pub mod http;
3pub mod iroh;
4#[cfg(all(feature = "tor", not(target_family = "wasm")))]
5pub mod tor;
6pub mod ws;
7
8use std::collections::{BTreeMap, BTreeSet, HashMap};
9use std::fmt::{self, Debug};
10use std::pin::Pin;
11use std::sync::Arc;
12use std::time::Duration;
13
14use anyhow::{anyhow, bail};
15use async_trait::async_trait;
16use fedimint_core::envs::{FM_WS_API_CONNECT_OVERRIDES_ENV, parse_kv_list_from_env};
17use fedimint_core::module::{ApiMethod, ApiRequestErased};
18use fedimint_core::util::backoff_util::{FibonacciBackoff, custom_backoff};
19use fedimint_core::util::{FmtCompact, FmtCompactAnyhow, SafeUrl};
20use fedimint_core::{apply, async_trait_maybe_send};
21use fedimint_logging::{LOG_CLIENT_NET_API, LOG_NET};
22use reqwest::Method;
23use serde_json::Value;
24use tokio::sync::{OnceCell, SetOnce, broadcast, watch};
25use tracing::trace;
26
27use crate::error::ServerError;
28use crate::ws::WebsocketConnector;
29
30pub type ServerResult<T> = Result<T, ServerError>;
31
32type ConnectorInitFn = Arc<
34 dyn Fn() -> Pin<Box<dyn Future<Output = anyhow::Result<DynConnector>> + Send>> + Send + Sync,
35>;
36
37#[derive(Debug, Clone)]
42#[allow(clippy::struct_excessive_bools)] pub struct ConnectorRegistryBuilder {
44 connection_overrides: BTreeMap<SafeUrl, SafeUrl>,
49
50 iroh_enable: bool,
52 iroh_dns: Option<SafeUrl>,
54 iroh_next: bool,
56 iroh_pkarr_dht: bool,
58
59 ws_enable: bool,
61 ws_force_tor: bool,
62
63 http_enable: bool,
65}
66
67impl ConnectorRegistryBuilder {
68 #[allow(clippy::unused_async)] pub async fn bind(self) -> anyhow::Result<ConnectorRegistry> {
70 let mut connectors_lazy: BTreeMap<String, (ConnectorInitFn, OnceCell<DynConnector>)> =
72 BTreeMap::new();
73
74 let builder_ws = self.clone();
76 let ws_connector_init = Arc::new(move || {
77 let builder = builder_ws.clone();
78 Box::pin(async move { builder.build_ws_connector().await })
79 as Pin<Box<dyn Future<Output = anyhow::Result<DynConnector>> + Send>>
80 });
81 connectors_lazy.insert("ws".into(), (ws_connector_init.clone(), OnceCell::new()));
82 connectors_lazy.insert("wss".into(), (ws_connector_init.clone(), OnceCell::new()));
83
84 let builder_iroh = self.clone();
86 connectors_lazy.insert(
87 "iroh".into(),
88 (
89 Arc::new(move || {
90 let builder = builder_iroh.clone();
91 Box::pin(async move { builder.build_iroh_connector().await })
92 as Pin<Box<dyn Future<Output = anyhow::Result<DynConnector>> + Send>>
93 }),
94 OnceCell::new(),
95 ),
96 );
97
98 let builder_http = self.clone();
99 let http_connector_init = Arc::new(move || {
100 let builder = builder_http.clone();
101 Box::pin(async move { builder.build_http_connector() })
102 as Pin<Box<dyn Future<Output = anyhow::Result<DynConnector>> + Send>>
103 });
104
105 connectors_lazy.insert(
106 "http".into(),
107 (http_connector_init.clone(), OnceCell::new()),
108 );
109 connectors_lazy.insert(
110 "https".into(),
111 (http_connector_init.clone(), OnceCell::new()),
112 );
113
114 Ok(ConnectorRegistry {
115 inner: ConnectorRegistryInner {
116 connectors_lazy,
117 connection_overrides: self.connection_overrides,
118 initialized: SetOnce::new(),
119 }
120 .into(),
121 })
122 }
123
124 pub async fn build_iroh_connector(&self) -> anyhow::Result<DynConnector> {
125 if !self.iroh_enable {
126 bail!("Iroh connector not enabled");
127 }
128 Ok(Arc::new(
129 iroh::IrohConnector::new(self.iroh_dns.clone(), self.iroh_pkarr_dht, self.iroh_next)
130 .await?,
131 ) as DynConnector)
132 }
133
134 pub async fn build_ws_connector(&self) -> anyhow::Result<DynConnector> {
135 if !self.ws_enable {
136 bail!("Websocket connector not enabled");
137 }
138
139 match self.ws_force_tor {
140 #[cfg(all(feature = "tor", not(target_family = "wasm")))]
141 true => {
142 use crate::tor::TorConnector;
143
144 Ok(Arc::new(TorConnector::bootstrap().await?) as DynConnector)
145 }
146
147 false => Ok(Arc::new(WebsocketConnector::new()) as DynConnector),
148 #[allow(unreachable_patterns)]
149 _ => bail!("Tor requested, but not support not compiled in"),
150 }
151 }
152
153 pub fn build_http_connector(&self) -> anyhow::Result<DynConnector> {
154 if !self.http_enable {
155 bail!("Http connector not enabled");
156 }
157
158 Ok(Arc::new(crate::http::HttpConnector::default()) as DynConnector)
159 }
160
161 pub fn iroh_pkarr_dht(self, enable: bool) -> Self {
162 Self {
163 iroh_pkarr_dht: enable,
164 ..self
165 }
166 }
167
168 pub fn iroh_next(self, enable: bool) -> Self {
169 Self {
170 iroh_next: enable,
171 ..self
172 }
173 }
174
175 pub fn ws_force_tor(self, enable: bool) -> Self {
176 Self {
177 ws_force_tor: enable,
178 ..self
179 }
180 }
181
182 pub fn set_iroh_dns(self, url: SafeUrl) -> Self {
183 Self {
184 iroh_dns: Some(url),
185 ..self
186 }
187 }
188
189 pub fn with_env_var_overrides(mut self) -> anyhow::Result<Self> {
191 for (k, v) in parse_kv_list_from_env::<_, SafeUrl>(FM_WS_API_CONNECT_OVERRIDES_ENV)? {
193 self = self.with_connection_override(k, v);
194 }
195
196 Ok(Self { ..self })
197 }
198
199 pub fn with_connection_override(
200 mut self,
201 original_url: SafeUrl,
202 replacement_url: SafeUrl,
203 ) -> Self {
204 self.connection_overrides
205 .insert(original_url, replacement_url);
206 self
207 }
208}
209
210struct ConnectorRegistryInner {
212 connectors_lazy: BTreeMap<String, (ConnectorInitFn, OnceCell<DynConnector>)>,
214 connection_overrides: BTreeMap<SafeUrl, SafeUrl>,
216 initialized: tokio::sync::SetOnce<()>,
221}
222
223#[derive(Clone)]
236pub struct ConnectorRegistry {
237 inner: Arc<ConnectorRegistryInner>,
238}
239
240impl fmt::Debug for ConnectorRegistry {
241 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
242 f.debug_struct("ConnectorRegistry")
243 .field("connectors_lazy", &self.inner.connectors_lazy.len())
244 .field("connection_overrides", &self.inner.connection_overrides)
245 .finish()
246 }
247}
248
249impl ConnectorRegistry {
250 pub fn build_from_client_defaults() -> ConnectorRegistryBuilder {
255 ConnectorRegistryBuilder {
256 iroh_enable: true,
257 iroh_dns: None,
258 iroh_pkarr_dht: false,
259 iroh_next: true,
260 ws_enable: true,
261 ws_force_tor: false,
262 http_enable: true,
263
264 connection_overrides: BTreeMap::default(),
265 }
266 }
267
268 pub fn build_from_server_defaults() -> ConnectorRegistryBuilder {
271 ConnectorRegistryBuilder {
272 iroh_enable: true,
273 iroh_dns: None,
274 iroh_pkarr_dht: true,
275 iroh_next: true,
276 ws_enable: true,
277 ws_force_tor: false,
278 http_enable: false,
279
280 connection_overrides: BTreeMap::default(),
281 }
282 }
283
284 pub fn build_from_testing_defaults() -> ConnectorRegistryBuilder {
287 ConnectorRegistryBuilder {
288 iroh_enable: true,
289 iroh_dns: None,
290 iroh_pkarr_dht: false,
291 iroh_next: false,
292 ws_enable: true,
293 ws_force_tor: false,
294 http_enable: true,
295
296 connection_overrides: BTreeMap::default(),
297 }
298 }
299
300 pub fn build_from_client_env() -> anyhow::Result<ConnectorRegistryBuilder> {
303 let builder = Self::build_from_client_defaults().with_env_var_overrides()?;
304 Ok(builder)
305 }
306
307 pub fn build_from_server_env() -> anyhow::Result<ConnectorRegistryBuilder> {
310 let builder = Self::build_from_server_defaults().with_env_var_overrides()?;
311 Ok(builder)
312 }
313
314 pub fn build_from_testing_env() -> anyhow::Result<ConnectorRegistryBuilder> {
317 let builder = Self::build_from_testing_defaults().with_env_var_overrides()?;
318 Ok(builder)
319 }
320
321 pub async fn wait_for_initialized_connections(&self) {
323 self.inner.initialized.wait().await;
324 }
325
326 pub async fn connect_guardian(
331 &self,
332 url: &SafeUrl,
333 api_secret: Option<&str>,
334 ) -> ServerResult<DynGuaridianConnection> {
335 trace!(
336 target: LOG_NET,
337 %url,
338 "Connection requested to guardian"
339 );
340 let _ = self.inner.initialized.set(());
341
342 let url = match self.inner.connection_overrides.get(url) {
343 Some(replacement) => {
344 trace!(
345 target: LOG_NET,
346 original_url = %url,
347 replacement_url = %replacement,
348 "Using a connectivity override for connection"
349 );
350
351 replacement
352 }
353 None => url,
354 };
355
356 let connector_key = url.scheme();
357
358 let Some(connector_lazy) = self.inner.connectors_lazy.get(connector_key) else {
359 return Err(ServerError::InvalidEndpoint(anyhow!(
360 "Unsupported scheme: {}; missing endpoint handler",
361 url.scheme()
362 )));
363 };
364
365 let init_fn = connector_lazy.0.clone();
367
368 let conn = connector_lazy
369 .1
370 .get_or_try_init(|| async move { init_fn().await })
371 .await
372 .map_err(|e| {
373 ServerError::Transport(anyhow!(
374 "Connector failed to initialize: {}",
375 e.fmt_compact_anyhow()
376 ))
377 })?
378 .connect_guardian(url, api_secret)
379 .await
380 .inspect_err(|err| {
381 trace!(
382 target: LOG_NET,
383 %url,
384 err = %err.fmt_compact(),
385 "Connection failed"
386 );
387 })?;
388
389 trace!(
390 target: LOG_NET,
391 %url,
392 "Connection returned"
393 );
394 Ok(conn)
395 }
396
397 pub async fn connect_gateway(&self, url: &SafeUrl) -> anyhow::Result<DynGatewayConnection> {
402 trace!(
403 target: LOG_NET,
404 %url,
405 "Connection requested to gateway"
406 );
407 let _ = self.inner.initialized.set(());
408
409 let url = match self.inner.connection_overrides.get(url) {
410 Some(replacement) => {
411 trace!(
412 target: LOG_NET,
413 original_url = %url,
414 replacement_url = %replacement,
415 "Using a connectivity override for connection"
416 );
417
418 replacement
419 }
420 None => url,
421 };
422
423 let connector_key = url.scheme();
424
425 let Some(connector_lazy) = self.inner.connectors_lazy.get(connector_key) else {
426 return Err(anyhow!(
427 "Unsupported scheme: {}; missing endpoint handler",
428 url.scheme()
429 ));
430 };
431
432 let init_fn = connector_lazy.0.clone();
434
435 let conn = connector_lazy
436 .1
437 .get_or_try_init(|| async move { init_fn().await })
438 .await
439 .map_err(|e| {
440 ServerError::Transport(anyhow!(
441 "Connector failed to initialize: {}",
442 e.fmt_compact_anyhow()
443 ))
444 })?
445 .connect_gateway(url)
446 .await?;
447
448 Ok(conn)
449 }
450}
451pub type DynConnector = Arc<dyn Connector>;
452
453#[async_trait]
454pub trait Connector: Send + Sync + 'static + Debug {
455 async fn connect_guardian(
456 &self,
457 url: &SafeUrl,
458 api_secret: Option<&str>,
459 ) -> ServerResult<DynGuaridianConnection>;
460
461 async fn connect_gateway(&self, url: &SafeUrl) -> anyhow::Result<DynGatewayConnection>;
462}
463
464#[apply(async_trait_maybe_send!)]
467pub trait IConnection: Debug + Send + Sync + 'static {
468 fn is_connected(&self) -> bool;
469
470 async fn await_disconnection(&self);
471}
472
473pub type DynGuaridianConnection = Arc<dyn IGuardianConnection>;
475
476#[async_trait]
478pub trait IGuardianConnection: IConnection + Debug + Send + Sync + 'static {
479 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> ServerResult<Value>;
480
481 fn into_dyn(self) -> DynGuaridianConnection
482 where
483 Self: Sized,
484 {
485 Arc::new(self)
486 }
487}
488
489pub type DynGatewayConnection = Arc<dyn IGatewayConnection>;
491
492#[apply(async_trait_maybe_send!)]
494pub trait IGatewayConnection: IConnection + Debug + Send + Sync + 'static {
495 async fn request(
496 &self,
497 password: Option<String>,
498 method: Method,
499 route: &str,
500 payload: Option<Value>,
501 ) -> ServerResult<Value>;
502
503 fn into_dyn(self) -> DynGatewayConnection
504 where
505 Self: Sized,
506 {
507 Arc::new(self)
508 }
509}
510
511#[derive(Debug)]
512pub struct ConnectionPool<T: IConnection + ?Sized> {
513 connectors: ConnectorRegistry,
515
516 active_connections: watch::Sender<BTreeSet<SafeUrl>>,
517
518 #[allow(clippy::type_complexity)]
524 connections: Arc<tokio::sync::Mutex<HashMap<SafeUrl, Arc<ConnectionState<T>>>>>,
525}
526
527impl<T: IConnection + ?Sized> Clone for ConnectionPool<T> {
528 fn clone(&self) -> Self {
529 Self {
530 connectors: self.connectors.clone(),
531 connections: self.connections.clone(),
532 active_connections: self.active_connections.clone(),
533 }
534 }
535}
536
537impl<T: IConnection + ?Sized> ConnectionPool<T> {
538 pub fn new(connectors: ConnectorRegistry) -> Self {
539 Self {
540 connectors,
541 connections: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
542 active_connections: watch::channel(BTreeSet::new()).0,
543 }
544 }
545
546 async fn get_or_init_pool_entry(&self, url: &SafeUrl) -> Arc<ConnectionState<T>> {
547 let mut pool_locked = self.connections.lock().await;
548 pool_locked
549 .entry(url.to_owned())
550 .and_modify(|entry_arc| {
551 if let Some(existing_conn) = entry_arc.connection.get()
557 && !existing_conn.is_connected()
558 {
559 trace!(
560 target: LOG_CLIENT_NET_API,
561 %url,
562 "Existing connection is disconnected, removing from pool"
563 );
564 self.active_connections.send_modify(|v| {
565 v.remove(url);
566 });
567 *entry_arc = Arc::new(ConnectionState::new_reconnecting());
568 }
569 })
570 .or_insert_with(|| Arc::new(ConnectionState::new_initial()))
571 .clone()
572 }
573
574 pub async fn get_or_create_connection<F, Fut>(
575 &self,
576 url: &SafeUrl,
577 api_secret: Option<&str>,
578 create_connection: F,
579 ) -> ServerResult<Arc<T>>
580 where
581 F: Fn(SafeUrl, Option<String>, ConnectorRegistry) -> Fut + Clone + Send + Sync + 'static,
582 Fut: Future<Output = ServerResult<Arc<T>>> + Send + 'static,
583 {
584 let pool_entry_arc = self.get_or_init_pool_entry(url).await;
585
586 let leader_tx = loop {
587 let mut leader_rx = {
588 let mut chan_locked = pool_entry_arc
589 .merge_connection_attempts_chan
590 .lock()
591 .expect("locking error");
592
593 if chan_locked.is_closed() {
594 let (leader_tx, leader_rx) = broadcast::channel(1);
595 *chan_locked = leader_rx;
596 break leader_tx;
599 }
600
601 chan_locked.resubscribe()
603 };
604
605 if let Ok(res) = leader_rx.recv().await {
606 match res {
607 Ok(o) => return Ok(o),
608 Err(err) => {
609 return Err(ServerError::Connection(anyhow::format_err!("{}", err)));
610 }
611 }
612 }
613 };
614
615 let conn = pool_entry_arc
616 .connection
617 .get_or_try_init(|| async {
618 let retry_delay = pool_entry_arc.pre_reconnect_delay();
619 fedimint_core::runtime::sleep(retry_delay).await;
620
621 trace!(target: LOG_CLIENT_NET_API, %url, "Attempting to create a new connection");
622 let res = create_connection(
623 url.clone(),
624 api_secret.map(std::string::ToString::to_string),
625 self.connectors.clone(),
626 )
627 .await;
628
629 let _ = leader_tx.send(
634 res.as_ref()
635 .map(|o| o.clone())
636 .map_err(|err| err.to_string()),
637 );
638
639 let conn = res?;
640
641 self.active_connections.send_modify(|v| {
642 v.insert(url.clone());
643 });
644
645 fedimint_core::runtime::spawn("connection disconnect watch", {
646 let conn = conn.clone();
647 let s = self.clone();
648 let url = url.clone();
649 async move {
650 conn.await_disconnection().await;
652 s.get_or_init_pool_entry(&url).await;
658 }
659 });
660
661 Ok(conn)
662 })
663 .await?;
664
665 trace!(target: LOG_CLIENT_NET_API, %url, "Connection ready");
666 Ok(conn.clone())
667 }
668 pub fn get_active_connection_receiver(&self) -> watch::Receiver<BTreeSet<SafeUrl>> {
670 self.active_connections.subscribe()
671 }
672
673 pub async fn wait_for_initialized_connections(&self) {
674 self.connectors.wait_for_initialized_connections().await
675 }
676}
677
678#[derive(Debug)]
681struct ConnectionStateInner {
682 fresh: bool,
683 backoff: FibonacciBackoff,
684}
685
686#[derive(Debug)]
687pub struct ConnectionState<T: ?Sized> {
688 pub connection: tokio::sync::OnceCell<Arc<T>>,
690
691 merge_connection_attempts_chan:
695 std::sync::Mutex<broadcast::Receiver<std::result::Result<Arc<T>, String>>>,
696
697 inner: std::sync::Mutex<ConnectionStateInner>,
701}
702
703impl<T: ?Sized> ConnectionState<T> {
704 pub fn new_initial() -> Self {
706 Self {
707 connection: OnceCell::new(),
708 inner: std::sync::Mutex::new(ConnectionStateInner {
709 fresh: true,
710 backoff: custom_backoff(
711 Duration::from_millis(5),
713 Duration::from_secs(30),
714 None,
715 ),
716 }),
717 merge_connection_attempts_chan: std::sync::Mutex::new(broadcast::channel(1).1),
718 }
719 }
720
721 pub fn new_reconnecting() -> Self {
724 Self {
725 connection: OnceCell::new(),
726 inner: std::sync::Mutex::new(ConnectionStateInner {
727 fresh: false,
729 backoff: custom_backoff(
730 Duration::from_millis(500),
732 Duration::from_secs(30),
733 None,
734 ),
735 }),
736 merge_connection_attempts_chan: std::sync::Mutex::new(broadcast::channel(1).1),
737 }
738 }
739
740 pub fn pre_reconnect_delay(&self) -> Duration {
743 let mut backoff_locked = self.inner.lock().expect("Locking failed");
744 let fresh = backoff_locked.fresh;
745
746 backoff_locked.fresh = false;
747
748 if fresh {
749 Duration::default()
750 } else {
751 backoff_locked.backoff.next().expect("Keeps retrying")
752 }
753 }
754}