1pub mod error;
2pub mod http;
3pub mod iroh;
4pub mod metrics;
5#[cfg(all(feature = "tor", not(target_family = "wasm")))]
6pub mod tor;
7pub mod ws;
8
9use std::collections::{BTreeMap, BTreeSet, HashMap};
10use std::fmt::{self, Debug};
11use std::pin::Pin;
12use std::sync::Arc;
13use std::time::Duration;
14
15use anyhow::{anyhow, bail};
16use async_trait::async_trait;
17use fedimint_core::envs::{FM_WS_API_CONNECT_OVERRIDES_ENV, parse_kv_list_from_env};
18use fedimint_core::module::{ApiMethod, ApiRequestErased};
19use fedimint_core::util::backoff_util::{FibonacciBackoff, custom_backoff};
20use fedimint_core::util::{FmtCompact, FmtCompactAnyhow, SafeUrl};
21use fedimint_core::{apply, async_trait_maybe_send};
22use fedimint_logging::{LOG_CLIENT_NET_API, LOG_NET};
23use fedimint_metrics::HistogramExt as _;
24use reqwest::Method;
25use serde_json::Value;
26use tokio::sync::{OnceCell, SetOnce, broadcast, watch};
27use tracing::trace;
28
29use crate::error::ServerError;
30use crate::metrics::{CONNECTION_ATTEMPTS_TOTAL, CONNECTION_DURATION_SECONDS};
31use crate::ws::WebsocketConnector;
32
33pub type ServerResult<T> = Result<T, ServerError>;
34
35type ConnectorInitFn = Arc<
37 dyn Fn() -> Pin<Box<dyn Future<Output = anyhow::Result<DynConnector>> + Send>> + Send + Sync,
38>;
39
40#[derive(Debug, Clone)]
45#[allow(clippy::struct_excessive_bools)] pub struct ConnectorRegistryBuilder {
47 connection_overrides: BTreeMap<SafeUrl, SafeUrl>,
52
53 iroh_enable: bool,
55 iroh_dns: Option<SafeUrl>,
57 iroh_next: bool,
59 iroh_pkarr_dht: bool,
61
62 ws_enable: bool,
64 ws_force_tor: bool,
65
66 http_enable: bool,
68}
69
70impl ConnectorRegistryBuilder {
71 #[allow(clippy::unused_async)] pub async fn bind(self) -> anyhow::Result<ConnectorRegistry> {
73 let mut connectors_lazy: BTreeMap<String, (ConnectorInitFn, OnceCell<DynConnector>)> =
75 BTreeMap::new();
76
77 let path_change = Arc::new(watch::channel(0u64).0);
81
82 let builder_ws = self.clone();
84 let ws_connector_init = Arc::new(move || {
85 let builder = builder_ws.clone();
86 Box::pin(async move { builder.build_ws_connector().await })
87 as Pin<Box<dyn Future<Output = anyhow::Result<DynConnector>> + Send>>
88 });
89 connectors_lazy.insert("ws".into(), (ws_connector_init.clone(), OnceCell::new()));
90 connectors_lazy.insert("wss".into(), (ws_connector_init.clone(), OnceCell::new()));
91
92 let builder_iroh = self.clone();
94 let path_change_iroh = path_change.clone();
95 connectors_lazy.insert(
96 "iroh".into(),
97 (
98 Arc::new(move || {
99 let builder = builder_iroh.clone();
100 let path_change = path_change_iroh.clone();
101 Box::pin(async move { builder.build_iroh_connector(path_change).await })
102 as Pin<Box<dyn Future<Output = anyhow::Result<DynConnector>> + Send>>
103 }),
104 OnceCell::new(),
105 ),
106 );
107
108 let builder_http = self.clone();
109 let http_connector_init = Arc::new(move || {
110 let builder = builder_http.clone();
111 Box::pin(async move { builder.build_http_connector() })
112 as Pin<Box<dyn Future<Output = anyhow::Result<DynConnector>> + Send>>
113 });
114
115 connectors_lazy.insert(
116 "http".into(),
117 (http_connector_init.clone(), OnceCell::new()),
118 );
119 connectors_lazy.insert(
120 "https".into(),
121 (http_connector_init.clone(), OnceCell::new()),
122 );
123
124 Ok(ConnectorRegistry {
125 inner: ConnectorRegistryInner {
126 connectors_lazy,
127 connection_overrides: self.connection_overrides,
128 initialized: SetOnce::new(),
129 path_change,
130 }
131 .into(),
132 })
133 }
134
135 pub async fn build_iroh_connector(
136 &self,
137 path_change: Arc<watch::Sender<u64>>,
138 ) -> anyhow::Result<DynConnector> {
139 if !self.iroh_enable {
140 bail!("Iroh connector not enabled");
141 }
142 Ok(Arc::new(
143 iroh::IrohConnector::new(
144 self.iroh_dns.clone(),
145 self.iroh_pkarr_dht,
146 self.iroh_next,
147 path_change,
148 )
149 .await?,
150 ) as DynConnector)
151 }
152
153 pub async fn build_ws_connector(&self) -> anyhow::Result<DynConnector> {
154 if !self.ws_enable {
155 bail!("Websocket connector not enabled");
156 }
157
158 match self.ws_force_tor {
159 #[cfg(all(feature = "tor", not(target_family = "wasm")))]
160 true => {
161 use crate::tor::TorConnector;
162
163 Ok(Arc::new(TorConnector::bootstrap().await?) as DynConnector)
164 }
165
166 false => Ok(Arc::new(WebsocketConnector::new()) as DynConnector),
167 #[allow(unreachable_patterns)]
168 _ => bail!("Tor requested, but not support not compiled in"),
169 }
170 }
171
172 pub fn build_http_connector(&self) -> anyhow::Result<DynConnector> {
173 if !self.http_enable {
174 bail!("Http connector not enabled");
175 }
176
177 Ok(Arc::new(crate::http::HttpConnector::default()) as DynConnector)
178 }
179
180 pub fn iroh_pkarr_dht(self, enable: bool) -> Self {
181 Self {
182 iroh_pkarr_dht: enable,
183 ..self
184 }
185 }
186
187 pub fn iroh_next(self, enable: bool) -> Self {
188 Self {
189 iroh_next: enable,
190 ..self
191 }
192 }
193
194 pub fn ws_force_tor(self, enable: bool) -> Self {
195 Self {
196 ws_force_tor: enable,
197 ..self
198 }
199 }
200
201 pub fn set_iroh_dns(self, url: SafeUrl) -> Self {
202 Self {
203 iroh_dns: Some(url),
204 ..self
205 }
206 }
207
208 pub fn with_env_var_overrides(mut self) -> anyhow::Result<Self> {
210 for (k, v) in parse_kv_list_from_env::<_, SafeUrl>(FM_WS_API_CONNECT_OVERRIDES_ENV)? {
212 self = self.with_connection_override(k, v);
213 }
214
215 Ok(Self { ..self })
216 }
217
218 pub fn with_connection_override(
219 mut self,
220 original_url: SafeUrl,
221 replacement_url: SafeUrl,
222 ) -> Self {
223 self.connection_overrides
224 .insert(original_url, replacement_url);
225 self
226 }
227}
228
229struct ConnectorRegistryInner {
231 connectors_lazy: BTreeMap<String, (ConnectorInitFn, OnceCell<DynConnector>)>,
233 connection_overrides: BTreeMap<SafeUrl, SafeUrl>,
235 initialized: tokio::sync::SetOnce<()>,
240 path_change: Arc<watch::Sender<u64>>,
243}
244
245#[derive(Clone)]
258pub struct ConnectorRegistry {
259 inner: Arc<ConnectorRegistryInner>,
260}
261
262impl fmt::Debug for ConnectorRegistry {
263 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
264 f.debug_struct("ConnectorRegistry")
265 .field("connectors_lazy", &self.inner.connectors_lazy.len())
266 .field("connection_overrides", &self.inner.connection_overrides)
267 .finish()
268 }
269}
270
271impl ConnectorRegistry {
272 pub fn build_from_client_defaults() -> ConnectorRegistryBuilder {
277 ConnectorRegistryBuilder {
278 iroh_enable: true,
279 iroh_dns: None,
280 iroh_pkarr_dht: false,
281 iroh_next: true,
282 ws_enable: true,
283 ws_force_tor: false,
284 http_enable: true,
285
286 connection_overrides: BTreeMap::default(),
287 }
288 }
289
290 pub fn build_from_server_defaults() -> ConnectorRegistryBuilder {
293 ConnectorRegistryBuilder {
294 iroh_enable: true,
295 iroh_dns: None,
296 iroh_pkarr_dht: true,
297 iroh_next: true,
298 ws_enable: true,
299 ws_force_tor: false,
300 http_enable: false,
301
302 connection_overrides: BTreeMap::default(),
303 }
304 }
305
306 pub fn build_from_testing_defaults() -> ConnectorRegistryBuilder {
309 ConnectorRegistryBuilder {
310 iroh_enable: true,
311 iroh_dns: None,
312 iroh_pkarr_dht: false,
313 iroh_next: false,
314 ws_enable: true,
315 ws_force_tor: false,
316 http_enable: true,
317
318 connection_overrides: BTreeMap::default(),
319 }
320 }
321
322 pub fn build_from_client_env() -> anyhow::Result<ConnectorRegistryBuilder> {
325 let builder = Self::build_from_client_defaults().with_env_var_overrides()?;
326 Ok(builder)
327 }
328
329 pub fn build_from_server_env() -> anyhow::Result<ConnectorRegistryBuilder> {
332 let builder = Self::build_from_server_defaults().with_env_var_overrides()?;
333 Ok(builder)
334 }
335
336 pub fn build_from_testing_env() -> anyhow::Result<ConnectorRegistryBuilder> {
339 let builder = Self::build_from_testing_defaults().with_env_var_overrides()?;
340 Ok(builder)
341 }
342
343 pub async fn wait_for_initialized_connections(&self) {
345 self.inner.initialized.wait().await;
346 }
347
348 pub async fn connect_guardian(
353 &self,
354 url: &SafeUrl,
355 api_secret: Option<&str>,
356 ) -> ServerResult<DynGuaridianConnection> {
357 trace!(
358 target: LOG_NET,
359 %url,
360 "Connection requested to guardian"
361 );
362 let _ = self.inner.initialized.set(());
363
364 let url = match self.inner.connection_overrides.get(url) {
365 Some(replacement) => {
366 trace!(
367 target: LOG_NET,
368 original_url = %url,
369 replacement_url = %replacement,
370 "Using a connectivity override for connection"
371 );
372
373 replacement
374 }
375 None => url,
376 };
377
378 let scheme = url.scheme().to_string();
379
380 let Some(connector_lazy) = self.inner.connectors_lazy.get(&scheme) else {
381 return Err(ServerError::InvalidEndpoint(anyhow!(
382 "Unsupported scheme: {}; missing endpoint handler",
383 url.scheme()
384 )));
385 };
386
387 let init_fn = connector_lazy.0.clone();
389
390 let timer = CONNECTION_DURATION_SECONDS
391 .with_label_values(&[&scheme])
392 .start_timer_ext();
393
394 let result = connector_lazy
395 .1
396 .get_or_try_init(|| async move { init_fn().await })
397 .await
398 .map_err(|e| {
399 ServerError::Transport(anyhow!(
400 "Connector failed to initialize: {}",
401 e.fmt_compact_anyhow()
402 ))
403 })?
404 .connect_guardian(url, api_secret)
405 .await;
406
407 timer.observe_duration();
408
409 let result_label = if result.is_ok() { "success" } else { "error" }.to_string();
410 CONNECTION_ATTEMPTS_TOTAL
411 .with_label_values(&[&scheme, &result_label])
412 .inc();
413
414 let conn = result.inspect_err(|err| {
415 trace!(
416 target: LOG_NET,
417 %url,
418 err = %err.fmt_compact(),
419 "Connection failed"
420 );
421 })?;
422
423 trace!(
424 target: LOG_NET,
425 %url,
426 "Connection returned"
427 );
428 Ok(conn)
429 }
430
431 pub async fn connect_gateway(&self, url: &SafeUrl) -> anyhow::Result<DynGatewayConnection> {
436 trace!(
437 target: LOG_NET,
438 %url,
439 "Connection requested to gateway"
440 );
441 let _ = self.inner.initialized.set(());
442
443 let url = match self.inner.connection_overrides.get(url) {
444 Some(replacement) => {
445 trace!(
446 target: LOG_NET,
447 original_url = %url,
448 replacement_url = %replacement,
449 "Using a connectivity override for connection"
450 );
451
452 replacement
453 }
454 None => url,
455 };
456
457 let scheme = url.scheme().to_string();
458
459 let Some(connector_lazy) = self.inner.connectors_lazy.get(&scheme) else {
460 return Err(anyhow!(
461 "Unsupported scheme: {}; missing endpoint handler",
462 url.scheme()
463 ));
464 };
465
466 let init_fn = connector_lazy.0.clone();
468
469 let timer = CONNECTION_DURATION_SECONDS
470 .with_label_values(&[&scheme])
471 .start_timer_ext();
472
473 let result = connector_lazy
474 .1
475 .get_or_try_init(|| async move { init_fn().await })
476 .await
477 .map_err(|e| {
478 ServerError::Transport(anyhow!(
479 "Connector failed to initialize: {}",
480 e.fmt_compact_anyhow()
481 ))
482 })?
483 .connect_gateway(url)
484 .await;
485
486 timer.observe_duration();
487
488 let result_label = if result.is_ok() { "success" } else { "error" }.to_string();
489 CONNECTION_ATTEMPTS_TOTAL
490 .with_label_values(&[&scheme, &result_label])
491 .inc();
492
493 result
494 }
495
496 pub fn connectivity(&self, url: &SafeUrl) -> Connectivity {
502 let url = match self.inner.connection_overrides.get(url) {
503 Some(replacement) => replacement,
504 None => url,
505 };
506
507 let Some((_, connector_cell)) = self.inner.connectors_lazy.get(url.scheme()) else {
508 return Connectivity::Unknown;
509 };
510
511 match connector_cell.get() {
512 Some(connector) => connector.connectivity(url),
513 None => Connectivity::Unknown,
514 }
515 }
516
517 pub fn connectivity_change_notifier(&self) -> watch::Receiver<u64> {
526 self.inner.path_change.subscribe()
527 }
528}
529pub type DynConnector = Arc<dyn Connector>;
530
531#[async_trait]
532pub trait Connector: Send + Sync + 'static + Debug {
533 async fn connect_guardian(
534 &self,
535 url: &SafeUrl,
536 api_secret: Option<&str>,
537 ) -> ServerResult<DynGuaridianConnection>;
538
539 async fn connect_gateway(&self, url: &SafeUrl) -> anyhow::Result<DynGatewayConnection>;
540
541 fn connectivity(&self, url: &SafeUrl) -> Connectivity;
543}
544
545#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
554pub enum Connectivity {
555 Direct,
556 Relay,
557 Mixed,
558 Tor,
559 Unknown,
560}
561
562#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
569pub enum PeerStatus {
570 Disconnected,
571 Connected(Connectivity),
572}
573
574#[apply(async_trait_maybe_send!)]
577pub trait IConnection: Debug + Send + Sync + 'static {
578 fn is_connected(&self) -> bool;
579
580 async fn await_disconnection(&self);
581}
582
583pub type DynGuaridianConnection = Arc<dyn IGuardianConnection>;
585
586#[async_trait]
588pub trait IGuardianConnection: IConnection + Debug + Send + Sync + 'static {
589 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> ServerResult<Value>;
590
591 fn into_dyn(self) -> DynGuaridianConnection
592 where
593 Self: Sized,
594 {
595 Arc::new(self)
596 }
597}
598
599pub type DynGatewayConnection = Arc<dyn IGatewayConnection>;
601
602#[apply(async_trait_maybe_send!)]
604pub trait IGatewayConnection: IConnection + Debug + Send + Sync + 'static {
605 async fn request(
606 &self,
607 password: Option<String>,
608 method: Method,
609 route: &str,
610 payload: Option<Value>,
611 ) -> ServerResult<Value>;
612
613 fn into_dyn(self) -> DynGatewayConnection
614 where
615 Self: Sized,
616 {
617 Arc::new(self)
618 }
619}
620
621#[derive(Debug)]
622pub struct ConnectionPool<T: IConnection + ?Sized> {
623 connectors: ConnectorRegistry,
625
626 active_connections: watch::Sender<BTreeSet<SafeUrl>>,
627
628 #[allow(clippy::type_complexity)]
634 connections: Arc<tokio::sync::Mutex<HashMap<SafeUrl, Arc<ConnectionState<T>>>>>,
635}
636
637impl<T: IConnection + ?Sized> Clone for ConnectionPool<T> {
638 fn clone(&self) -> Self {
639 Self {
640 connectors: self.connectors.clone(),
641 connections: self.connections.clone(),
642 active_connections: self.active_connections.clone(),
643 }
644 }
645}
646
647impl<T: IConnection + ?Sized> ConnectionPool<T> {
648 pub fn new(connectors: ConnectorRegistry) -> Self {
649 Self {
650 connectors,
651 connections: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
652 active_connections: watch::channel(BTreeSet::new()).0,
653 }
654 }
655
656 async fn get_or_init_pool_entry(&self, url: &SafeUrl) -> Arc<ConnectionState<T>> {
657 let mut pool_locked = self.connections.lock().await;
658 pool_locked
659 .entry(url.to_owned())
660 .and_modify(|entry_arc| {
661 if let Some(existing_conn) = entry_arc.connection.get()
667 && !existing_conn.is_connected()
668 {
669 trace!(
670 target: LOG_CLIENT_NET_API,
671 %url,
672 "Existing connection is disconnected, removing from pool"
673 );
674 self.active_connections.send_modify(|v| {
675 v.remove(url);
676 });
677 *entry_arc = Arc::new(ConnectionState::new_reconnecting());
678 }
679 })
680 .or_insert_with(|| Arc::new(ConnectionState::new_initial()))
681 .clone()
682 }
683
684 pub async fn get_or_create_connection<F, Fut>(
685 &self,
686 url: &SafeUrl,
687 api_secret: Option<&str>,
688 create_connection: F,
689 ) -> ServerResult<Arc<T>>
690 where
691 F: Fn(SafeUrl, Option<String>, ConnectorRegistry) -> Fut + Clone + Send + Sync + 'static,
692 Fut: Future<Output = ServerResult<Arc<T>>> + Send + 'static,
693 {
694 let pool_entry_arc = self.get_or_init_pool_entry(url).await;
695
696 let leader_tx = loop {
697 let mut leader_rx = {
698 let mut chan_locked = pool_entry_arc
699 .merge_connection_attempts_chan
700 .lock()
701 .expect("locking error");
702
703 if chan_locked.is_closed() {
704 let (leader_tx, leader_rx) = broadcast::channel(1);
705 *chan_locked = leader_rx;
706 break leader_tx;
709 }
710
711 chan_locked.resubscribe()
713 };
714
715 if let Ok(res) = leader_rx.recv().await {
716 match res {
717 Ok(o) => return Ok(o),
718 Err(err) => {
719 return Err(ServerError::Connection(anyhow::format_err!("{}", err)));
720 }
721 }
722 }
723 };
724
725 let conn = pool_entry_arc
726 .connection
727 .get_or_try_init(|| async {
728 let retry_delay = pool_entry_arc.pre_reconnect_delay();
729 fedimint_core::runtime::sleep(retry_delay).await;
730
731 trace!(target: LOG_CLIENT_NET_API, %url, "Attempting to create a new connection");
732 let res = create_connection(
733 url.clone(),
734 api_secret.map(std::string::ToString::to_string),
735 self.connectors.clone(),
736 )
737 .await;
738
739 let _ = leader_tx.send(
744 res.as_ref()
745 .map(|o| o.clone())
746 .map_err(|err| err.to_string()),
747 );
748
749 let conn = res?;
750
751 self.active_connections.send_modify(|v| {
752 v.insert(url.clone());
753 });
754
755 fedimint_core::runtime::spawn("connection disconnect watch", {
756 let conn = conn.clone();
757 let s = self.clone();
758 let url = url.clone();
759 async move {
760 conn.await_disconnection().await;
762 s.get_or_init_pool_entry(&url).await;
768 }
769 });
770
771 Ok(conn)
772 })
773 .await?;
774
775 trace!(target: LOG_CLIENT_NET_API, %url, "Connection ready");
776 Ok(conn.clone())
777 }
778 pub fn get_active_connection_receiver(&self) -> watch::Receiver<BTreeSet<SafeUrl>> {
780 self.active_connections.subscribe()
781 }
782
783 pub async fn wait_for_initialized_connections(&self) {
784 self.connectors.wait_for_initialized_connections().await
785 }
786
787 pub fn connectivity(&self, url: &SafeUrl) -> Connectivity {
789 self.connectors.connectivity(url)
790 }
791
792 pub fn connectivity_change_notifier(&self) -> watch::Receiver<u64> {
795 self.connectors.connectivity_change_notifier()
796 }
797}
798
799#[derive(Debug)]
802struct ConnectionStateInner {
803 fresh: bool,
804 backoff: FibonacciBackoff,
805}
806
807#[derive(Debug)]
808pub struct ConnectionState<T: ?Sized> {
809 pub connection: tokio::sync::OnceCell<Arc<T>>,
811
812 merge_connection_attempts_chan:
816 std::sync::Mutex<broadcast::Receiver<std::result::Result<Arc<T>, String>>>,
817
818 inner: std::sync::Mutex<ConnectionStateInner>,
822}
823
824impl<T: ?Sized> ConnectionState<T> {
825 pub fn new_initial() -> Self {
827 Self {
828 connection: OnceCell::new(),
829 inner: std::sync::Mutex::new(ConnectionStateInner {
830 fresh: true,
831 backoff: custom_backoff(
832 Duration::from_millis(5),
834 Duration::from_secs(30),
835 None,
836 ),
837 }),
838 merge_connection_attempts_chan: std::sync::Mutex::new(broadcast::channel(1).1),
839 }
840 }
841
842 pub fn new_reconnecting() -> Self {
845 Self {
846 connection: OnceCell::new(),
847 inner: std::sync::Mutex::new(ConnectionStateInner {
848 fresh: false,
850 backoff: custom_backoff(
851 Duration::from_millis(500),
853 Duration::from_secs(30),
854 None,
855 ),
856 }),
857 merge_connection_attempts_chan: std::sync::Mutex::new(broadcast::channel(1).1),
858 }
859 }
860
861 pub fn pre_reconnect_delay(&self) -> Duration {
864 let mut backoff_locked = self.inner.lock().expect("Locking failed");
865 let fresh = backoff_locked.fresh;
866
867 backoff_locked.fresh = false;
868
869 if fresh {
870 Duration::default()
871 } else {
872 backoff_locked.backoff.next().expect("Keeps retrying")
873 }
874 }
875}