1pub mod error;
2pub mod http;
3pub mod iroh;
4pub mod metrics;
5#[cfg(all(feature = "tor", not(target_family = "wasm")))]
6pub mod tor;
7pub mod ws;
8
9use std::collections::{BTreeMap, BTreeSet, HashMap};
10use std::fmt::{self, Debug};
11use std::pin::Pin;
12use std::sync::Arc;
13use std::time::Duration;
14
15use anyhow::{anyhow, bail};
16use async_trait::async_trait;
17use fedimint_core::envs::{FM_WS_API_CONNECT_OVERRIDES_ENV, parse_kv_list_from_env};
18use fedimint_core::module::{ApiMethod, ApiRequestErased};
19use fedimint_core::util::backoff_util::{FibonacciBackoff, custom_backoff};
20use fedimint_core::util::{FmtCompact, FmtCompactAnyhow, SafeUrl};
21use fedimint_core::{apply, async_trait_maybe_send};
22use fedimint_logging::{LOG_CLIENT_NET_API, LOG_NET};
23use fedimint_metrics::HistogramExt as _;
24use reqwest::Method;
25use serde_json::Value;
26use tokio::sync::{OnceCell, SetOnce, broadcast, watch};
27use tracing::trace;
28
29use crate::error::ServerError;
30use crate::metrics::{CONNECTION_ATTEMPTS_TOTAL, CONNECTION_DURATION_SECONDS};
31use crate::ws::WebsocketConnector;
32
33pub type ServerResult<T> = Result<T, ServerError>;
34
35type ConnectorInitFn = Arc<
37 dyn Fn() -> Pin<Box<dyn Future<Output = anyhow::Result<DynConnector>> + Send>> + Send + Sync,
38>;
39
40#[derive(Debug, Clone)]
45#[allow(clippy::struct_excessive_bools)] pub struct ConnectorRegistryBuilder {
47 connection_overrides: BTreeMap<SafeUrl, SafeUrl>,
52
53 iroh_enable: bool,
55 iroh_dns: Option<SafeUrl>,
57 iroh_next: bool,
59 iroh_pkarr_dht: bool,
61
62 ws_enable: bool,
64 ws_force_tor: bool,
65
66 http_enable: bool,
68}
69
70impl ConnectorRegistryBuilder {
71 #[allow(clippy::unused_async)] pub async fn bind(self) -> anyhow::Result<ConnectorRegistry> {
73 let mut connectors_lazy: BTreeMap<String, (ConnectorInitFn, OnceCell<DynConnector>)> =
75 BTreeMap::new();
76
77 let builder_ws = self.clone();
79 let ws_connector_init = Arc::new(move || {
80 let builder = builder_ws.clone();
81 Box::pin(async move { builder.build_ws_connector().await })
82 as Pin<Box<dyn Future<Output = anyhow::Result<DynConnector>> + Send>>
83 });
84 connectors_lazy.insert("ws".into(), (ws_connector_init.clone(), OnceCell::new()));
85 connectors_lazy.insert("wss".into(), (ws_connector_init.clone(), OnceCell::new()));
86
87 let builder_iroh = self.clone();
89 connectors_lazy.insert(
90 "iroh".into(),
91 (
92 Arc::new(move || {
93 let builder = builder_iroh.clone();
94 Box::pin(async move { builder.build_iroh_connector().await })
95 as Pin<Box<dyn Future<Output = anyhow::Result<DynConnector>> + Send>>
96 }),
97 OnceCell::new(),
98 ),
99 );
100
101 let builder_http = self.clone();
102 let http_connector_init = Arc::new(move || {
103 let builder = builder_http.clone();
104 Box::pin(async move { builder.build_http_connector() })
105 as Pin<Box<dyn Future<Output = anyhow::Result<DynConnector>> + Send>>
106 });
107
108 connectors_lazy.insert(
109 "http".into(),
110 (http_connector_init.clone(), OnceCell::new()),
111 );
112 connectors_lazy.insert(
113 "https".into(),
114 (http_connector_init.clone(), OnceCell::new()),
115 );
116
117 Ok(ConnectorRegistry {
118 inner: ConnectorRegistryInner {
119 connectors_lazy,
120 connection_overrides: self.connection_overrides,
121 initialized: SetOnce::new(),
122 }
123 .into(),
124 })
125 }
126
127 pub async fn build_iroh_connector(&self) -> anyhow::Result<DynConnector> {
128 if !self.iroh_enable {
129 bail!("Iroh connector not enabled");
130 }
131 Ok(Arc::new(
132 iroh::IrohConnector::new(self.iroh_dns.clone(), self.iroh_pkarr_dht, self.iroh_next)
133 .await?,
134 ) as DynConnector)
135 }
136
137 pub async fn build_ws_connector(&self) -> anyhow::Result<DynConnector> {
138 if !self.ws_enable {
139 bail!("Websocket connector not enabled");
140 }
141
142 match self.ws_force_tor {
143 #[cfg(all(feature = "tor", not(target_family = "wasm")))]
144 true => {
145 use crate::tor::TorConnector;
146
147 Ok(Arc::new(TorConnector::bootstrap().await?) as DynConnector)
148 }
149
150 false => Ok(Arc::new(WebsocketConnector::new()) as DynConnector),
151 #[allow(unreachable_patterns)]
152 _ => bail!("Tor requested, but not support not compiled in"),
153 }
154 }
155
156 pub fn build_http_connector(&self) -> anyhow::Result<DynConnector> {
157 if !self.http_enable {
158 bail!("Http connector not enabled");
159 }
160
161 Ok(Arc::new(crate::http::HttpConnector::default()) as DynConnector)
162 }
163
164 pub fn iroh_pkarr_dht(self, enable: bool) -> Self {
165 Self {
166 iroh_pkarr_dht: enable,
167 ..self
168 }
169 }
170
171 pub fn iroh_next(self, enable: bool) -> Self {
172 Self {
173 iroh_next: enable,
174 ..self
175 }
176 }
177
178 pub fn ws_force_tor(self, enable: bool) -> Self {
179 Self {
180 ws_force_tor: enable,
181 ..self
182 }
183 }
184
185 pub fn set_iroh_dns(self, url: SafeUrl) -> Self {
186 Self {
187 iroh_dns: Some(url),
188 ..self
189 }
190 }
191
192 pub fn with_env_var_overrides(mut self) -> anyhow::Result<Self> {
194 for (k, v) in parse_kv_list_from_env::<_, SafeUrl>(FM_WS_API_CONNECT_OVERRIDES_ENV)? {
196 self = self.with_connection_override(k, v);
197 }
198
199 Ok(Self { ..self })
200 }
201
202 pub fn with_connection_override(
203 mut self,
204 original_url: SafeUrl,
205 replacement_url: SafeUrl,
206 ) -> Self {
207 self.connection_overrides
208 .insert(original_url, replacement_url);
209 self
210 }
211}
212
213struct ConnectorRegistryInner {
215 connectors_lazy: BTreeMap<String, (ConnectorInitFn, OnceCell<DynConnector>)>,
217 connection_overrides: BTreeMap<SafeUrl, SafeUrl>,
219 initialized: tokio::sync::SetOnce<()>,
224}
225
226#[derive(Clone)]
239pub struct ConnectorRegistry {
240 inner: Arc<ConnectorRegistryInner>,
241}
242
243impl fmt::Debug for ConnectorRegistry {
244 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
245 f.debug_struct("ConnectorRegistry")
246 .field("connectors_lazy", &self.inner.connectors_lazy.len())
247 .field("connection_overrides", &self.inner.connection_overrides)
248 .finish()
249 }
250}
251
252impl ConnectorRegistry {
253 pub fn build_from_client_defaults() -> ConnectorRegistryBuilder {
258 ConnectorRegistryBuilder {
259 iroh_enable: true,
260 iroh_dns: None,
261 iroh_pkarr_dht: false,
262 iroh_next: true,
263 ws_enable: true,
264 ws_force_tor: false,
265 http_enable: true,
266
267 connection_overrides: BTreeMap::default(),
268 }
269 }
270
271 pub fn build_from_server_defaults() -> ConnectorRegistryBuilder {
274 ConnectorRegistryBuilder {
275 iroh_enable: true,
276 iroh_dns: None,
277 iroh_pkarr_dht: true,
278 iroh_next: true,
279 ws_enable: true,
280 ws_force_tor: false,
281 http_enable: false,
282
283 connection_overrides: BTreeMap::default(),
284 }
285 }
286
287 pub fn build_from_testing_defaults() -> ConnectorRegistryBuilder {
290 ConnectorRegistryBuilder {
291 iroh_enable: true,
292 iroh_dns: None,
293 iroh_pkarr_dht: false,
294 iroh_next: false,
295 ws_enable: true,
296 ws_force_tor: false,
297 http_enable: true,
298
299 connection_overrides: BTreeMap::default(),
300 }
301 }
302
303 pub fn build_from_client_env() -> anyhow::Result<ConnectorRegistryBuilder> {
306 let builder = Self::build_from_client_defaults().with_env_var_overrides()?;
307 Ok(builder)
308 }
309
310 pub fn build_from_server_env() -> anyhow::Result<ConnectorRegistryBuilder> {
313 let builder = Self::build_from_server_defaults().with_env_var_overrides()?;
314 Ok(builder)
315 }
316
317 pub fn build_from_testing_env() -> anyhow::Result<ConnectorRegistryBuilder> {
320 let builder = Self::build_from_testing_defaults().with_env_var_overrides()?;
321 Ok(builder)
322 }
323
324 pub async fn wait_for_initialized_connections(&self) {
326 self.inner.initialized.wait().await;
327 }
328
329 pub async fn connect_guardian(
334 &self,
335 url: &SafeUrl,
336 api_secret: Option<&str>,
337 ) -> ServerResult<DynGuaridianConnection> {
338 trace!(
339 target: LOG_NET,
340 %url,
341 "Connection requested to guardian"
342 );
343 let _ = self.inner.initialized.set(());
344
345 let url = match self.inner.connection_overrides.get(url) {
346 Some(replacement) => {
347 trace!(
348 target: LOG_NET,
349 original_url = %url,
350 replacement_url = %replacement,
351 "Using a connectivity override for connection"
352 );
353
354 replacement
355 }
356 None => url,
357 };
358
359 let scheme = url.scheme().to_string();
360
361 let Some(connector_lazy) = self.inner.connectors_lazy.get(&scheme) else {
362 return Err(ServerError::InvalidEndpoint(anyhow!(
363 "Unsupported scheme: {}; missing endpoint handler",
364 url.scheme()
365 )));
366 };
367
368 let init_fn = connector_lazy.0.clone();
370
371 let timer = CONNECTION_DURATION_SECONDS
372 .with_label_values(&[&scheme])
373 .start_timer_ext();
374
375 let result = connector_lazy
376 .1
377 .get_or_try_init(|| async move { init_fn().await })
378 .await
379 .map_err(|e| {
380 ServerError::Transport(anyhow!(
381 "Connector failed to initialize: {}",
382 e.fmt_compact_anyhow()
383 ))
384 })?
385 .connect_guardian(url, api_secret)
386 .await;
387
388 timer.observe_duration();
389
390 let result_label = if result.is_ok() { "success" } else { "error" }.to_string();
391 CONNECTION_ATTEMPTS_TOTAL
392 .with_label_values(&[&scheme, &result_label])
393 .inc();
394
395 let conn = result.inspect_err(|err| {
396 trace!(
397 target: LOG_NET,
398 %url,
399 err = %err.fmt_compact(),
400 "Connection failed"
401 );
402 })?;
403
404 trace!(
405 target: LOG_NET,
406 %url,
407 "Connection returned"
408 );
409 Ok(conn)
410 }
411
412 pub async fn connect_gateway(&self, url: &SafeUrl) -> anyhow::Result<DynGatewayConnection> {
417 trace!(
418 target: LOG_NET,
419 %url,
420 "Connection requested to gateway"
421 );
422 let _ = self.inner.initialized.set(());
423
424 let url = match self.inner.connection_overrides.get(url) {
425 Some(replacement) => {
426 trace!(
427 target: LOG_NET,
428 original_url = %url,
429 replacement_url = %replacement,
430 "Using a connectivity override for connection"
431 );
432
433 replacement
434 }
435 None => url,
436 };
437
438 let scheme = url.scheme().to_string();
439
440 let Some(connector_lazy) = self.inner.connectors_lazy.get(&scheme) else {
441 return Err(anyhow!(
442 "Unsupported scheme: {}; missing endpoint handler",
443 url.scheme()
444 ));
445 };
446
447 let init_fn = connector_lazy.0.clone();
449
450 let timer = CONNECTION_DURATION_SECONDS
451 .with_label_values(&[&scheme])
452 .start_timer_ext();
453
454 let result = connector_lazy
455 .1
456 .get_or_try_init(|| async move { init_fn().await })
457 .await
458 .map_err(|e| {
459 ServerError::Transport(anyhow!(
460 "Connector failed to initialize: {}",
461 e.fmt_compact_anyhow()
462 ))
463 })?
464 .connect_gateway(url)
465 .await;
466
467 timer.observe_duration();
468
469 let result_label = if result.is_ok() { "success" } else { "error" }.to_string();
470 CONNECTION_ATTEMPTS_TOTAL
471 .with_label_values(&[&scheme, &result_label])
472 .inc();
473
474 result
475 }
476}
477pub type DynConnector = Arc<dyn Connector>;
478
479#[async_trait]
480pub trait Connector: Send + Sync + 'static + Debug {
481 async fn connect_guardian(
482 &self,
483 url: &SafeUrl,
484 api_secret: Option<&str>,
485 ) -> ServerResult<DynGuaridianConnection>;
486
487 async fn connect_gateway(&self, url: &SafeUrl) -> anyhow::Result<DynGatewayConnection>;
488}
489
490#[apply(async_trait_maybe_send!)]
493pub trait IConnection: Debug + Send + Sync + 'static {
494 fn is_connected(&self) -> bool;
495
496 async fn await_disconnection(&self);
497}
498
499pub type DynGuaridianConnection = Arc<dyn IGuardianConnection>;
501
502#[async_trait]
504pub trait IGuardianConnection: IConnection + Debug + Send + Sync + 'static {
505 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> ServerResult<Value>;
506
507 fn into_dyn(self) -> DynGuaridianConnection
508 where
509 Self: Sized,
510 {
511 Arc::new(self)
512 }
513}
514
515pub type DynGatewayConnection = Arc<dyn IGatewayConnection>;
517
518#[apply(async_trait_maybe_send!)]
520pub trait IGatewayConnection: IConnection + Debug + Send + Sync + 'static {
521 async fn request(
522 &self,
523 password: Option<String>,
524 method: Method,
525 route: &str,
526 payload: Option<Value>,
527 ) -> ServerResult<Value>;
528
529 fn into_dyn(self) -> DynGatewayConnection
530 where
531 Self: Sized,
532 {
533 Arc::new(self)
534 }
535}
536
537#[derive(Debug)]
538pub struct ConnectionPool<T: IConnection + ?Sized> {
539 connectors: ConnectorRegistry,
541
542 active_connections: watch::Sender<BTreeSet<SafeUrl>>,
543
544 #[allow(clippy::type_complexity)]
550 connections: Arc<tokio::sync::Mutex<HashMap<SafeUrl, Arc<ConnectionState<T>>>>>,
551}
552
553impl<T: IConnection + ?Sized> Clone for ConnectionPool<T> {
554 fn clone(&self) -> Self {
555 Self {
556 connectors: self.connectors.clone(),
557 connections: self.connections.clone(),
558 active_connections: self.active_connections.clone(),
559 }
560 }
561}
562
563impl<T: IConnection + ?Sized> ConnectionPool<T> {
564 pub fn new(connectors: ConnectorRegistry) -> Self {
565 Self {
566 connectors,
567 connections: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
568 active_connections: watch::channel(BTreeSet::new()).0,
569 }
570 }
571
572 async fn get_or_init_pool_entry(&self, url: &SafeUrl) -> Arc<ConnectionState<T>> {
573 let mut pool_locked = self.connections.lock().await;
574 pool_locked
575 .entry(url.to_owned())
576 .and_modify(|entry_arc| {
577 if let Some(existing_conn) = entry_arc.connection.get()
583 && !existing_conn.is_connected()
584 {
585 trace!(
586 target: LOG_CLIENT_NET_API,
587 %url,
588 "Existing connection is disconnected, removing from pool"
589 );
590 self.active_connections.send_modify(|v| {
591 v.remove(url);
592 });
593 *entry_arc = Arc::new(ConnectionState::new_reconnecting());
594 }
595 })
596 .or_insert_with(|| Arc::new(ConnectionState::new_initial()))
597 .clone()
598 }
599
600 pub async fn get_or_create_connection<F, Fut>(
601 &self,
602 url: &SafeUrl,
603 api_secret: Option<&str>,
604 create_connection: F,
605 ) -> ServerResult<Arc<T>>
606 where
607 F: Fn(SafeUrl, Option<String>, ConnectorRegistry) -> Fut + Clone + Send + Sync + 'static,
608 Fut: Future<Output = ServerResult<Arc<T>>> + Send + 'static,
609 {
610 let pool_entry_arc = self.get_or_init_pool_entry(url).await;
611
612 let leader_tx = loop {
613 let mut leader_rx = {
614 let mut chan_locked = pool_entry_arc
615 .merge_connection_attempts_chan
616 .lock()
617 .expect("locking error");
618
619 if chan_locked.is_closed() {
620 let (leader_tx, leader_rx) = broadcast::channel(1);
621 *chan_locked = leader_rx;
622 break leader_tx;
625 }
626
627 chan_locked.resubscribe()
629 };
630
631 if let Ok(res) = leader_rx.recv().await {
632 match res {
633 Ok(o) => return Ok(o),
634 Err(err) => {
635 return Err(ServerError::Connection(anyhow::format_err!("{}", err)));
636 }
637 }
638 }
639 };
640
641 let conn = pool_entry_arc
642 .connection
643 .get_or_try_init(|| async {
644 let retry_delay = pool_entry_arc.pre_reconnect_delay();
645 fedimint_core::runtime::sleep(retry_delay).await;
646
647 trace!(target: LOG_CLIENT_NET_API, %url, "Attempting to create a new connection");
648 let res = create_connection(
649 url.clone(),
650 api_secret.map(std::string::ToString::to_string),
651 self.connectors.clone(),
652 )
653 .await;
654
655 let _ = leader_tx.send(
660 res.as_ref()
661 .map(|o| o.clone())
662 .map_err(|err| err.to_string()),
663 );
664
665 let conn = res?;
666
667 self.active_connections.send_modify(|v| {
668 v.insert(url.clone());
669 });
670
671 fedimint_core::runtime::spawn("connection disconnect watch", {
672 let conn = conn.clone();
673 let s = self.clone();
674 let url = url.clone();
675 async move {
676 conn.await_disconnection().await;
678 s.get_or_init_pool_entry(&url).await;
684 }
685 });
686
687 Ok(conn)
688 })
689 .await?;
690
691 trace!(target: LOG_CLIENT_NET_API, %url, "Connection ready");
692 Ok(conn.clone())
693 }
694 pub fn get_active_connection_receiver(&self) -> watch::Receiver<BTreeSet<SafeUrl>> {
696 self.active_connections.subscribe()
697 }
698
699 pub async fn wait_for_initialized_connections(&self) {
700 self.connectors.wait_for_initialized_connections().await
701 }
702}
703
704#[derive(Debug)]
707struct ConnectionStateInner {
708 fresh: bool,
709 backoff: FibonacciBackoff,
710}
711
712#[derive(Debug)]
713pub struct ConnectionState<T: ?Sized> {
714 pub connection: tokio::sync::OnceCell<Arc<T>>,
716
717 merge_connection_attempts_chan:
721 std::sync::Mutex<broadcast::Receiver<std::result::Result<Arc<T>, String>>>,
722
723 inner: std::sync::Mutex<ConnectionStateInner>,
727}
728
729impl<T: ?Sized> ConnectionState<T> {
730 pub fn new_initial() -> Self {
732 Self {
733 connection: OnceCell::new(),
734 inner: std::sync::Mutex::new(ConnectionStateInner {
735 fresh: true,
736 backoff: custom_backoff(
737 Duration::from_millis(5),
739 Duration::from_secs(30),
740 None,
741 ),
742 }),
743 merge_connection_attempts_chan: std::sync::Mutex::new(broadcast::channel(1).1),
744 }
745 }
746
747 pub fn new_reconnecting() -> Self {
750 Self {
751 connection: OnceCell::new(),
752 inner: std::sync::Mutex::new(ConnectionStateInner {
753 fresh: false,
755 backoff: custom_backoff(
756 Duration::from_millis(500),
758 Duration::from_secs(30),
759 None,
760 ),
761 }),
762 merge_connection_attempts_chan: std::sync::Mutex::new(broadcast::channel(1).1),
763 }
764 }
765
766 pub fn pre_reconnect_delay(&self) -> Duration {
769 let mut backoff_locked = self.inner.lock().expect("Locking failed");
770 let fresh = backoff_locked.fresh;
771
772 backoff_locked.fresh = false;
773
774 if fresh {
775 Duration::default()
776 } else {
777 backoff_locked.backoff.next().expect("Keeps retrying")
778 }
779 }
780}