1pub mod error;
2pub mod http;
3pub mod iroh;
4pub mod metrics;
5#[cfg(all(feature = "tor", not(target_family = "wasm")))]
6pub mod tor;
7pub mod ws;
8
9use std::collections::{BTreeMap, BTreeSet, HashMap};
10use std::fmt::{self, Debug};
11use std::pin::Pin;
12use std::sync::Arc;
13use std::time::Duration;
14
15use anyhow::{anyhow, bail};
16use async_trait::async_trait;
17use fedimint_core::envs::{FM_WS_API_CONNECT_OVERRIDES_ENV, parse_kv_list_from_env};
18use fedimint_core::module::{ApiMethod, ApiRequestErased};
19use fedimint_core::util::backoff_util::{FibonacciBackoff, custom_backoff};
20use fedimint_core::util::{FmtCompact, FmtCompactAnyhow, SafeUrl};
21use fedimint_core::{apply, async_trait_maybe_send};
22use fedimint_logging::{LOG_CLIENT_NET_API, LOG_NET};
23use fedimint_metrics::HistogramExt as _;
24use reqwest::Method;
25use serde_json::Value;
26use tokio::sync::{OnceCell, SetOnce, broadcast, watch};
27use tracing::trace;
28
29use crate::error::ServerError;
30use crate::metrics::{CONNECTION_ATTEMPTS_TOTAL, CONNECTION_DURATION_SECONDS};
31use crate::ws::WebsocketConnector;
32
33pub type ServerResult<T> = Result<T, ServerError>;
34
35type ConnectorInitFn = Arc<
37 dyn Fn() -> Pin<Box<dyn Future<Output = anyhow::Result<DynConnector>> + Send>> + Send + Sync,
38>;
39
40#[derive(Debug, Clone)]
45#[allow(clippy::struct_excessive_bools)] pub struct ConnectorRegistryBuilder {
47 connection_overrides: BTreeMap<SafeUrl, SafeUrl>,
52
53 iroh_enable: bool,
55 iroh_dns: Option<SafeUrl>,
57 iroh_next: bool,
59 iroh_pkarr_dht: bool,
61
62 ws_enable: bool,
64 ws_force_tor: bool,
65
66 http_enable: bool,
68}
69
70impl ConnectorRegistryBuilder {
71 #[allow(clippy::unused_async)] pub async fn bind(self) -> anyhow::Result<ConnectorRegistry> {
73 let mut connectors_lazy: BTreeMap<String, (ConnectorInitFn, OnceCell<DynConnector>)> =
75 BTreeMap::new();
76
77 let path_change = Arc::new(watch::channel(0u64).0);
81
82 let builder_ws = self.clone();
84 let ws_connector_init = Arc::new(move || {
85 let builder = builder_ws.clone();
86 Box::pin(async move { builder.build_ws_connector().await })
87 as Pin<Box<dyn Future<Output = anyhow::Result<DynConnector>> + Send>>
88 });
89 connectors_lazy.insert("ws".into(), (ws_connector_init.clone(), OnceCell::new()));
90 connectors_lazy.insert("wss".into(), (ws_connector_init.clone(), OnceCell::new()));
91
92 let builder_iroh = self.clone();
94 let path_change_iroh = path_change.clone();
95 connectors_lazy.insert(
96 "iroh".into(),
97 (
98 Arc::new(move || {
99 let builder = builder_iroh.clone();
100 let path_change = path_change_iroh.clone();
101 Box::pin(async move { builder.build_iroh_connector(path_change).await })
102 as Pin<Box<dyn Future<Output = anyhow::Result<DynConnector>> + Send>>
103 }),
104 OnceCell::new(),
105 ),
106 );
107
108 let builder_http = self.clone();
109 let http_connector_init = Arc::new(move || {
110 let builder = builder_http.clone();
111 Box::pin(async move { builder.build_http_connector() })
112 as Pin<Box<dyn Future<Output = anyhow::Result<DynConnector>> + Send>>
113 });
114
115 connectors_lazy.insert(
116 "http".into(),
117 (http_connector_init.clone(), OnceCell::new()),
118 );
119 connectors_lazy.insert(
120 "https".into(),
121 (http_connector_init.clone(), OnceCell::new()),
122 );
123
124 Ok(ConnectorRegistry {
125 inner: ConnectorRegistryInner {
126 connectors_lazy,
127 connection_overrides: self.connection_overrides,
128 initialized: SetOnce::new(),
129 path_change,
130 }
131 .into(),
132 })
133 }
134
135 pub async fn build_iroh_connector(
136 &self,
137 path_change: Arc<watch::Sender<u64>>,
138 ) -> anyhow::Result<DynConnector> {
139 if !self.iroh_enable {
140 bail!("Iroh connector not enabled");
141 }
142 Ok(Arc::new(
143 iroh::IrohConnector::new(
144 self.iroh_dns.clone(),
145 self.iroh_pkarr_dht,
146 self.iroh_next,
147 path_change,
148 )
149 .await?,
150 ) as DynConnector)
151 }
152
153 pub async fn build_ws_connector(&self) -> anyhow::Result<DynConnector> {
154 if !self.ws_enable {
155 bail!("Websocket connector not enabled");
156 }
157
158 match self.ws_force_tor {
159 #[cfg(all(feature = "tor", not(target_family = "wasm")))]
160 true => {
161 use crate::tor::TorConnector;
162
163 Ok(Arc::new(TorConnector::bootstrap().await?) as DynConnector)
164 }
165
166 false => Ok(Arc::new(WebsocketConnector::new()) as DynConnector),
167 #[allow(unreachable_patterns)]
168 _ => bail!("Tor requested, but not support not compiled in"),
169 }
170 }
171
172 pub fn build_http_connector(&self) -> anyhow::Result<DynConnector> {
173 if !self.http_enable {
174 bail!("Http connector not enabled");
175 }
176
177 Ok(Arc::new(crate::http::HttpConnector::default()) as DynConnector)
178 }
179
180 pub fn iroh_pkarr_dht(self, enable: bool) -> Self {
181 Self {
182 iroh_pkarr_dht: enable,
183 ..self
184 }
185 }
186
187 pub fn iroh_next(self, enable: bool) -> Self {
188 Self {
189 iroh_next: enable,
190 ..self
191 }
192 }
193
194 pub fn ws_force_tor(self, enable: bool) -> Self {
195 Self {
196 ws_force_tor: enable,
197 ..self
198 }
199 }
200
201 pub fn http(self, enable: bool) -> Self {
202 Self {
203 http_enable: enable,
204 ..self
205 }
206 }
207
208 pub fn set_iroh_dns(self, url: SafeUrl) -> Self {
209 Self {
210 iroh_dns: Some(url),
211 ..self
212 }
213 }
214
215 pub fn with_env_var_overrides(mut self) -> anyhow::Result<Self> {
217 for (k, v) in parse_kv_list_from_env::<_, SafeUrl>(FM_WS_API_CONNECT_OVERRIDES_ENV)? {
219 self = self.with_connection_override(k, v);
220 }
221
222 Ok(Self { ..self })
223 }
224
225 pub fn with_connection_override(
226 mut self,
227 original_url: SafeUrl,
228 replacement_url: SafeUrl,
229 ) -> Self {
230 self.connection_overrides
231 .insert(original_url, replacement_url);
232 self
233 }
234}
235
236struct ConnectorRegistryInner {
238 connectors_lazy: BTreeMap<String, (ConnectorInitFn, OnceCell<DynConnector>)>,
240 connection_overrides: BTreeMap<SafeUrl, SafeUrl>,
242 initialized: tokio::sync::SetOnce<()>,
247 path_change: Arc<watch::Sender<u64>>,
250}
251
252#[derive(Clone)]
265pub struct ConnectorRegistry {
266 inner: Arc<ConnectorRegistryInner>,
267}
268
269impl fmt::Debug for ConnectorRegistry {
270 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
271 f.debug_struct("ConnectorRegistry")
272 .field("connectors_lazy", &self.inner.connectors_lazy.len())
273 .field("connection_overrides", &self.inner.connection_overrides)
274 .finish()
275 }
276}
277
278impl ConnectorRegistry {
279 pub fn build_from_client_defaults() -> ConnectorRegistryBuilder {
284 ConnectorRegistryBuilder {
285 iroh_enable: true,
286 iroh_dns: None,
287 iroh_pkarr_dht: false,
288 iroh_next: true,
289 ws_enable: true,
290 ws_force_tor: false,
291 http_enable: true,
292
293 connection_overrides: BTreeMap::default(),
294 }
295 }
296
297 pub fn build_from_server_defaults() -> ConnectorRegistryBuilder {
300 ConnectorRegistryBuilder {
301 iroh_enable: true,
302 iroh_dns: None,
303 iroh_pkarr_dht: true,
304 iroh_next: true,
305 ws_enable: true,
306 ws_force_tor: false,
307 http_enable: false,
308
309 connection_overrides: BTreeMap::default(),
310 }
311 }
312
313 pub fn build_from_testing_defaults() -> ConnectorRegistryBuilder {
316 ConnectorRegistryBuilder {
317 iroh_enable: true,
318 iroh_dns: None,
319 iroh_pkarr_dht: false,
320 iroh_next: false,
321 ws_enable: true,
322 ws_force_tor: false,
323 http_enable: true,
324
325 connection_overrides: BTreeMap::default(),
326 }
327 }
328
329 pub fn build_from_client_env() -> anyhow::Result<ConnectorRegistryBuilder> {
332 let builder = Self::build_from_client_defaults().with_env_var_overrides()?;
333 Ok(builder)
334 }
335
336 pub fn build_from_server_env() -> anyhow::Result<ConnectorRegistryBuilder> {
339 let builder = Self::build_from_server_defaults().with_env_var_overrides()?;
340 Ok(builder)
341 }
342
343 pub fn build_from_testing_env() -> anyhow::Result<ConnectorRegistryBuilder> {
346 let builder = Self::build_from_testing_defaults().with_env_var_overrides()?;
347 Ok(builder)
348 }
349
350 pub async fn wait_for_initialized_connections(&self) {
352 self.inner.initialized.wait().await;
353 }
354
355 pub async fn connect_guardian(
360 &self,
361 url: &SafeUrl,
362 api_secret: Option<&str>,
363 ) -> ServerResult<DynGuaridianConnection> {
364 trace!(
365 target: LOG_NET,
366 %url,
367 "Connection requested to guardian"
368 );
369 let _ = self.inner.initialized.set(());
370
371 let url = match self.inner.connection_overrides.get(url) {
372 Some(replacement) => {
373 trace!(
374 target: LOG_NET,
375 original_url = %url,
376 replacement_url = %replacement,
377 "Using a connectivity override for connection"
378 );
379
380 replacement
381 }
382 None => url,
383 };
384
385 let scheme = url.scheme().to_string();
386
387 let Some(connector_lazy) = self.inner.connectors_lazy.get(&scheme) else {
388 return Err(ServerError::InvalidEndpoint(anyhow!(
389 "Unsupported scheme: {}; missing endpoint handler",
390 url.scheme()
391 )));
392 };
393
394 let init_fn = connector_lazy.0.clone();
396
397 let timer = CONNECTION_DURATION_SECONDS
398 .with_label_values(&[&scheme])
399 .start_timer_ext();
400
401 let result = connector_lazy
402 .1
403 .get_or_try_init(|| async move { init_fn().await })
404 .await
405 .map_err(|e| {
406 ServerError::Transport(anyhow!(
407 "Connector failed to initialize: {}",
408 e.fmt_compact_anyhow()
409 ))
410 })?
411 .connect_guardian(url, api_secret)
412 .await;
413
414 timer.observe_duration();
415
416 let result_label = if result.is_ok() { "success" } else { "error" }.to_string();
417 CONNECTION_ATTEMPTS_TOTAL
418 .with_label_values(&[&scheme, &result_label])
419 .inc();
420
421 let conn = result.inspect_err(|err| {
422 trace!(
423 target: LOG_NET,
424 %url,
425 err = %err.fmt_compact(),
426 "Connection failed"
427 );
428 })?;
429
430 trace!(
431 target: LOG_NET,
432 %url,
433 "Connection returned"
434 );
435 Ok(conn)
436 }
437
438 pub async fn connect_gateway(&self, url: &SafeUrl) -> anyhow::Result<DynGatewayConnection> {
443 trace!(
444 target: LOG_NET,
445 %url,
446 "Connection requested to gateway"
447 );
448 let _ = self.inner.initialized.set(());
449
450 let url = match self.inner.connection_overrides.get(url) {
451 Some(replacement) => {
452 trace!(
453 target: LOG_NET,
454 original_url = %url,
455 replacement_url = %replacement,
456 "Using a connectivity override for connection"
457 );
458
459 replacement
460 }
461 None => url,
462 };
463
464 let scheme = url.scheme().to_string();
465
466 let Some(connector_lazy) = self.inner.connectors_lazy.get(&scheme) else {
467 return Err(anyhow!(
468 "Unsupported scheme: {}; missing endpoint handler",
469 url.scheme()
470 ));
471 };
472
473 let init_fn = connector_lazy.0.clone();
475
476 let timer = CONNECTION_DURATION_SECONDS
477 .with_label_values(&[&scheme])
478 .start_timer_ext();
479
480 let result = connector_lazy
481 .1
482 .get_or_try_init(|| async move { init_fn().await })
483 .await
484 .map_err(|e| {
485 ServerError::Transport(anyhow!(
486 "Connector failed to initialize: {}",
487 e.fmt_compact_anyhow()
488 ))
489 })?
490 .connect_gateway(url)
491 .await;
492
493 timer.observe_duration();
494
495 let result_label = if result.is_ok() { "success" } else { "error" }.to_string();
496 CONNECTION_ATTEMPTS_TOTAL
497 .with_label_values(&[&scheme, &result_label])
498 .inc();
499
500 result
501 }
502
503 pub fn connectivity(&self, url: &SafeUrl) -> Connectivity {
509 let url = match self.inner.connection_overrides.get(url) {
510 Some(replacement) => replacement,
511 None => url,
512 };
513
514 let Some((_, connector_cell)) = self.inner.connectors_lazy.get(url.scheme()) else {
515 return Connectivity::Unknown;
516 };
517
518 match connector_cell.get() {
519 Some(connector) => connector.connectivity(url),
520 None => Connectivity::Unknown,
521 }
522 }
523
524 pub fn connectivity_change_notifier(&self) -> watch::Receiver<u64> {
533 self.inner.path_change.subscribe()
534 }
535}
536pub type DynConnector = Arc<dyn Connector>;
537
538#[async_trait]
539pub trait Connector: Send + Sync + 'static + Debug {
540 async fn connect_guardian(
541 &self,
542 url: &SafeUrl,
543 api_secret: Option<&str>,
544 ) -> ServerResult<DynGuaridianConnection>;
545
546 async fn connect_gateway(&self, url: &SafeUrl) -> anyhow::Result<DynGatewayConnection>;
547
548 fn connectivity(&self, url: &SafeUrl) -> Connectivity;
550}
551
552#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
561pub enum Connectivity {
562 Direct,
563 Relay,
564 Mixed,
565 Tor,
566 Unknown,
567}
568
569#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
576pub enum PeerStatus {
577 Disconnected,
578 Connected(Connectivity),
579}
580
581#[apply(async_trait_maybe_send!)]
584pub trait IConnection: Debug + Send + Sync + 'static {
585 fn is_connected(&self) -> bool;
586
587 async fn await_disconnection(&self);
588}
589
590pub type DynGuaridianConnection = Arc<dyn IGuardianConnection>;
592
593#[async_trait]
595pub trait IGuardianConnection: IConnection + Debug + Send + Sync + 'static {
596 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> ServerResult<Value>;
597
598 fn into_dyn(self) -> DynGuaridianConnection
599 where
600 Self: Sized,
601 {
602 Arc::new(self)
603 }
604}
605
606pub type DynGatewayConnection = Arc<dyn IGatewayConnection>;
608
609#[apply(async_trait_maybe_send!)]
611pub trait IGatewayConnection: IConnection + Debug + Send + Sync + 'static {
612 async fn request(
613 &self,
614 password: Option<String>,
615 method: Method,
616 route: &str,
617 payload: Option<Value>,
618 ) -> ServerResult<Value>;
619
620 fn into_dyn(self) -> DynGatewayConnection
621 where
622 Self: Sized,
623 {
624 Arc::new(self)
625 }
626}
627
628#[derive(Debug)]
629pub struct ConnectionPool<T: IConnection + ?Sized> {
630 connectors: ConnectorRegistry,
632
633 active_connections: watch::Sender<BTreeSet<SafeUrl>>,
634
635 #[allow(clippy::type_complexity)]
641 connections: Arc<tokio::sync::Mutex<HashMap<SafeUrl, Arc<ConnectionState<T>>>>>,
642}
643
644impl<T: IConnection + ?Sized> Clone for ConnectionPool<T> {
645 fn clone(&self) -> Self {
646 Self {
647 connectors: self.connectors.clone(),
648 connections: self.connections.clone(),
649 active_connections: self.active_connections.clone(),
650 }
651 }
652}
653
654impl<T: IConnection + ?Sized> ConnectionPool<T> {
655 pub fn new(connectors: ConnectorRegistry) -> Self {
656 Self {
657 connectors,
658 connections: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
659 active_connections: watch::channel(BTreeSet::new()).0,
660 }
661 }
662
663 async fn get_or_init_pool_entry(&self, url: &SafeUrl) -> Arc<ConnectionState<T>> {
664 let mut pool_locked = self.connections.lock().await;
665 pool_locked
666 .entry(url.to_owned())
667 .and_modify(|entry_arc| {
668 if let Some(existing_conn) = entry_arc.connection.get()
674 && !existing_conn.is_connected()
675 {
676 trace!(
677 target: LOG_CLIENT_NET_API,
678 %url,
679 "Existing connection is disconnected, removing from pool"
680 );
681 self.active_connections.send_modify(|v| {
682 v.remove(url);
683 });
684 *entry_arc = Arc::new(ConnectionState::new_reconnecting());
685 }
686 })
687 .or_insert_with(|| Arc::new(ConnectionState::new_initial()))
688 .clone()
689 }
690
691 pub async fn get_or_create_connection<F, Fut>(
692 &self,
693 url: &SafeUrl,
694 api_secret: Option<&str>,
695 create_connection: F,
696 ) -> ServerResult<Arc<T>>
697 where
698 F: Fn(SafeUrl, Option<String>, ConnectorRegistry) -> Fut + Clone + Send + Sync + 'static,
699 Fut: Future<Output = ServerResult<Arc<T>>> + Send + 'static,
700 {
701 let pool_entry_arc = self.get_or_init_pool_entry(url).await;
702
703 let leader_tx = loop {
704 let mut leader_rx = {
705 let mut chan_locked = pool_entry_arc
706 .merge_connection_attempts_chan
707 .lock()
708 .expect("locking error");
709
710 if chan_locked.is_closed() {
711 let (leader_tx, leader_rx) = broadcast::channel(1);
712 *chan_locked = leader_rx;
713 break leader_tx;
716 }
717
718 chan_locked.resubscribe()
720 };
721
722 if let Ok(res) = leader_rx.recv().await {
723 match res {
724 Ok(o) => return Ok(o),
725 Err(err) => {
726 return Err(ServerError::Connection(anyhow::format_err!("{}", err)));
727 }
728 }
729 }
730 };
731
732 let conn = pool_entry_arc
733 .connection
734 .get_or_try_init(|| async {
735 let retry_delay = pool_entry_arc.pre_reconnect_delay();
736 fedimint_core::runtime::sleep(retry_delay).await;
737
738 trace!(target: LOG_CLIENT_NET_API, %url, "Attempting to create a new connection");
739 let res = create_connection(
740 url.clone(),
741 api_secret.map(std::string::ToString::to_string),
742 self.connectors.clone(),
743 )
744 .await;
745
746 let _ = leader_tx.send(
751 res.as_ref()
752 .map(|o| o.clone())
753 .map_err(|err| err.to_string()),
754 );
755
756 let conn = res?;
757
758 self.active_connections.send_modify(|v| {
759 v.insert(url.clone());
760 });
761
762 fedimint_core::runtime::spawn("connection disconnect watch", {
763 let conn = conn.clone();
764 let s = self.clone();
765 let url = url.clone();
766 async move {
767 conn.await_disconnection().await;
769 s.get_or_init_pool_entry(&url).await;
775 }
776 });
777
778 Ok(conn)
779 })
780 .await?;
781
782 trace!(target: LOG_CLIENT_NET_API, %url, "Connection ready");
783 Ok(conn.clone())
784 }
785 pub fn get_active_connection_receiver(&self) -> watch::Receiver<BTreeSet<SafeUrl>> {
787 self.active_connections.subscribe()
788 }
789
790 pub async fn wait_for_initialized_connections(&self) {
791 self.connectors.wait_for_initialized_connections().await
792 }
793
794 pub fn connectivity(&self, url: &SafeUrl) -> Connectivity {
796 self.connectors.connectivity(url)
797 }
798
799 pub fn connectivity_change_notifier(&self) -> watch::Receiver<u64> {
802 self.connectors.connectivity_change_notifier()
803 }
804}
805
806#[derive(Debug)]
809struct ConnectionStateInner {
810 fresh: bool,
811 backoff: FibonacciBackoff,
812}
813
814#[derive(Debug)]
815pub struct ConnectionState<T: ?Sized> {
816 pub connection: tokio::sync::OnceCell<Arc<T>>,
818
819 merge_connection_attempts_chan:
823 std::sync::Mutex<broadcast::Receiver<std::result::Result<Arc<T>, String>>>,
824
825 inner: std::sync::Mutex<ConnectionStateInner>,
829}
830
831impl<T: ?Sized> ConnectionState<T> {
832 pub fn new_initial() -> Self {
834 Self {
835 connection: OnceCell::new(),
836 inner: std::sync::Mutex::new(ConnectionStateInner {
837 fresh: true,
838 backoff: custom_backoff(
839 Duration::from_millis(5),
841 Duration::from_secs(30),
842 None,
843 ),
844 }),
845 merge_connection_attempts_chan: std::sync::Mutex::new(broadcast::channel(1).1),
846 }
847 }
848
849 pub fn new_reconnecting() -> Self {
852 Self {
853 connection: OnceCell::new(),
854 inner: std::sync::Mutex::new(ConnectionStateInner {
855 fresh: false,
857 backoff: custom_backoff(
858 Duration::from_millis(500),
860 Duration::from_secs(30),
861 None,
862 ),
863 }),
864 merge_connection_attempts_chan: std::sync::Mutex::new(broadcast::channel(1).1),
865 }
866 }
867
868 pub fn pre_reconnect_delay(&self) -> Duration {
871 let mut backoff_locked = self.inner.lock().expect("Locking failed");
872 let fresh = backoff_locked.fresh;
873
874 backoff_locked.fresh = false;
875
876 if fresh {
877 Duration::default()
878 } else {
879 backoff_locked.backoff.next().expect("Keeps retrying")
880 }
881 }
882}