fedimint_connectors/
lib.rs

1pub mod error;
2pub mod http;
3pub mod iroh;
4#[cfg(all(feature = "tor", not(target_family = "wasm")))]
5pub mod tor;
6pub mod ws;
7
8use std::collections::{BTreeMap, HashMap};
9use std::fmt::{self, Debug};
10use std::pin::Pin;
11use std::sync::Arc;
12use std::time::Duration;
13
14use anyhow::{anyhow, bail};
15use async_trait::async_trait;
16use fedimint_core::envs::{FM_WS_API_CONNECT_OVERRIDES_ENV, parse_kv_list_from_env};
17use fedimint_core::module::{ApiMethod, ApiRequestErased};
18use fedimint_core::util::backoff_util::{FibonacciBackoff, custom_backoff};
19use fedimint_core::util::{FmtCompact, FmtCompactAnyhow, SafeUrl};
20use fedimint_core::{apply, async_trait_maybe_send};
21use fedimint_logging::{LOG_CLIENT_NET_API, LOG_NET};
22use reqwest::Method;
23use serde_json::Value;
24use tokio::sync::OnceCell;
25use tracing::trace;
26
27use crate::error::ServerError;
28use crate::ws::WebsocketConnector;
29
30pub type ServerResult<T> = Result<T, ServerError>;
31
32/// Type for connector initialization functions
33type ConnectorInitFn = Arc<
34    dyn Fn() -> Pin<Box<dyn Future<Output = anyhow::Result<DynConnector>> + Send>> + Send + Sync,
35>;
36
37/// Builder for [`ConnectorRegistry`]
38///
39/// See [`ConnectorRegistry::build_from_client_env`] and similar
40/// to create.
41#[derive(Debug, Clone)]
42#[allow(clippy::struct_excessive_bools)] // Shut up, Clippy
43pub struct ConnectorRegistryBuilder {
44    /// List of overrides to use when attempting to connect to given url
45    ///
46    /// This is useful for testing, or forcing non-default network
47    /// connectivity.
48    connection_overrides: BTreeMap<SafeUrl, SafeUrl>,
49
50    /// Enable Iroh endpoints at all?
51    iroh_enable: bool,
52    /// Override the Iroh DNS server to use
53    iroh_dns: Option<SafeUrl>,
54    /// Should start the "next/unstable" Iroh stack
55    iroh_next: bool,
56    /// Enable Pkarr DHT discovery
57    iroh_pkarr_dht: bool,
58
59    /// Enable Websocket API handling at all?
60    ws_enable: bool,
61    ws_force_tor: bool,
62
63    // Enable HTTP
64    http_enable: bool,
65}
66
67impl ConnectorRegistryBuilder {
68    #[allow(clippy::unused_async)] // Leave room for async in the future
69    pub async fn bind(self) -> anyhow::Result<ConnectorRegistry> {
70        // Create initialization functions for each connector type
71        let mut connectors_lazy: BTreeMap<String, (ConnectorInitFn, OnceCell<DynConnector>)> =
72            BTreeMap::new();
73
74        // WS connector init function
75        let builder_ws = self.clone();
76        let ws_connector_init = Arc::new(move || {
77            let builder = builder_ws.clone();
78            Box::pin(async move { builder.build_ws_connector().await })
79                as Pin<Box<dyn Future<Output = anyhow::Result<DynConnector>> + Send>>
80        });
81        connectors_lazy.insert("ws".into(), (ws_connector_init.clone(), OnceCell::new()));
82        connectors_lazy.insert("wss".into(), (ws_connector_init.clone(), OnceCell::new()));
83
84        // Iroh connector init function
85        let builder_iroh = self.clone();
86        connectors_lazy.insert(
87            "iroh".into(),
88            (
89                Arc::new(move || {
90                    let builder = builder_iroh.clone();
91                    Box::pin(async move { builder.build_iroh_connector().await })
92                        as Pin<Box<dyn Future<Output = anyhow::Result<DynConnector>> + Send>>
93                }),
94                OnceCell::new(),
95            ),
96        );
97
98        let builder_http = self.clone();
99        let http_connector_init = Arc::new(move || {
100            let builder = builder_http.clone();
101            Box::pin(async move { builder.build_http_connector() })
102                as Pin<Box<dyn Future<Output = anyhow::Result<DynConnector>> + Send>>
103        });
104
105        connectors_lazy.insert(
106            "http".into(),
107            (http_connector_init.clone(), OnceCell::new()),
108        );
109        connectors_lazy.insert(
110            "https".into(),
111            (http_connector_init.clone(), OnceCell::new()),
112        );
113
114        Ok(ConnectorRegistry {
115            connectors_lazy: Arc::new(connectors_lazy),
116            connection_overrides: self.connection_overrides,
117        })
118    }
119
120    pub async fn build_iroh_connector(&self) -> anyhow::Result<DynConnector> {
121        if !self.iroh_enable {
122            bail!("Iroh connector not enabled");
123        }
124        Ok(Arc::new(
125            iroh::IrohConnector::new(self.iroh_dns.clone(), self.iroh_pkarr_dht, self.iroh_next)
126                .await?,
127        ) as DynConnector)
128    }
129
130    pub async fn build_ws_connector(&self) -> anyhow::Result<DynConnector> {
131        if !self.ws_enable {
132            bail!("Websocket connector not enabled");
133        }
134
135        match self.ws_force_tor {
136            #[cfg(all(feature = "tor", not(target_family = "wasm")))]
137            true => {
138                use crate::tor::TorConnector;
139
140                Ok(Arc::new(TorConnector::bootstrap().await?) as DynConnector)
141            }
142
143            false => Ok(Arc::new(WebsocketConnector::new()) as DynConnector),
144            #[allow(unreachable_patterns)]
145            _ => bail!("Tor requested, but not support not compiled in"),
146        }
147    }
148
149    pub fn build_http_connector(&self) -> anyhow::Result<DynConnector> {
150        if !self.http_enable {
151            bail!("Http connector not enabled");
152        }
153
154        Ok(Arc::new(crate::http::HttpConnector::default()) as DynConnector)
155    }
156
157    pub fn iroh_pkarr_dht(self, enable: bool) -> Self {
158        Self {
159            iroh_pkarr_dht: enable,
160            ..self
161        }
162    }
163
164    pub fn iroh_next(self, enable: bool) -> Self {
165        Self {
166            iroh_next: enable,
167            ..self
168        }
169    }
170
171    pub fn ws_force_tor(self, enable: bool) -> Self {
172        Self {
173            ws_force_tor: enable,
174            ..self
175        }
176    }
177
178    pub fn set_iroh_dns(self, url: SafeUrl) -> Self {
179        Self {
180            iroh_dns: Some(url),
181            ..self
182        }
183    }
184
185    /// Apply overrides from env variables
186    pub fn with_env_var_overrides(mut self) -> anyhow::Result<Self> {
187        // TODO: read rest of the env
188        for (k, v) in parse_kv_list_from_env::<_, SafeUrl>(FM_WS_API_CONNECT_OVERRIDES_ENV)? {
189            self = self.with_connection_override(k, v);
190        }
191
192        Ok(Self { ..self })
193    }
194
195    pub fn with_connection_override(
196        mut self,
197        original_url: SafeUrl,
198        replacement_url: SafeUrl,
199    ) -> Self {
200        self.connection_overrides
201            .insert(original_url, replacement_url);
202        self
203    }
204}
205
206/// A set of available connectivity protocols a client can use to make
207/// network API requests (typically to federation).
208///
209/// Maps from connection URL schema to [`Connector`] to use to connect to it.
210///
211/// See [`ConnectorRegistry::build_from_client_env`] and similar
212/// to create.
213///
214/// [`ConnectorRegistry::connect_guardian`] is the main entry point for making
215/// mixed-networking stack connection.
216///
217/// Responsibilities:
218#[derive(Clone)]
219pub struct ConnectorRegistry {
220    /// Wrapped in Arc so clones share the same OnceCell instances
221    connectors_lazy: Arc<BTreeMap<String, (ConnectorInitFn, OnceCell<DynConnector>)>>,
222    /// Connection URL overrides for testing/custom routing
223    connection_overrides: BTreeMap<SafeUrl, SafeUrl>,
224}
225
226impl fmt::Debug for ConnectorRegistry {
227    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
228        f.debug_struct("ConnectorRegistry")
229            .field("connectors_lazy", &self.connectors_lazy.len())
230            .field("connection_overrides", &self.connection_overrides)
231            .finish()
232    }
233}
234
235impl ConnectorRegistry {
236    /// Create a builder with recommended defaults intended for client-side
237    /// usage
238    ///
239    /// In particular mobile devices are considered.
240    pub fn build_from_client_defaults() -> ConnectorRegistryBuilder {
241        ConnectorRegistryBuilder {
242            iroh_enable: true,
243            iroh_dns: None,
244            iroh_pkarr_dht: false,
245            iroh_next: true,
246            ws_enable: true,
247            ws_force_tor: false,
248            http_enable: true,
249
250            connection_overrides: BTreeMap::default(),
251        }
252    }
253
254    /// Create a builder with recommended defaults intended for the server-side
255    /// usage
256    pub fn build_from_server_defaults() -> ConnectorRegistryBuilder {
257        ConnectorRegistryBuilder {
258            iroh_enable: true,
259            iroh_dns: None,
260            iroh_pkarr_dht: true,
261            iroh_next: true,
262            ws_enable: true,
263            ws_force_tor: false,
264            http_enable: false,
265
266            connection_overrides: BTreeMap::default(),
267        }
268    }
269
270    /// Create a builder with recommended defaults intended for testing
271    /// usage
272    pub fn build_from_testing_defaults() -> ConnectorRegistryBuilder {
273        ConnectorRegistryBuilder {
274            iroh_enable: true,
275            iroh_dns: None,
276            iroh_pkarr_dht: false,
277            iroh_next: false,
278            ws_enable: true,
279            ws_force_tor: false,
280            http_enable: true,
281
282            connection_overrides: BTreeMap::default(),
283        }
284    }
285
286    /// Like [`Self::build_from_client_defaults`] build will apply
287    /// environment-provided overrides.
288    pub fn build_from_client_env() -> anyhow::Result<ConnectorRegistryBuilder> {
289        let builder = Self::build_from_client_defaults().with_env_var_overrides()?;
290        Ok(builder)
291    }
292
293    /// Like [`Self::build_from_server_defaults`] build will apply
294    /// environment-provided overrides.
295    pub fn build_from_server_env() -> anyhow::Result<ConnectorRegistryBuilder> {
296        let builder = Self::build_from_server_defaults().with_env_var_overrides()?;
297        Ok(builder)
298    }
299
300    /// Like [`Self::build_from_testing_defaults`] build will apply
301    /// environment-provided overrides.
302    pub fn build_from_testing_env() -> anyhow::Result<ConnectorRegistryBuilder> {
303        let builder = Self::build_from_testing_defaults().with_env_var_overrides()?;
304        Ok(builder)
305    }
306
307    /// Connect to a given `url` using matching [`Connector`]
308    ///
309    /// This is the main function consumed by the downstream use for making
310    /// connection.
311    pub async fn connect_guardian(
312        &self,
313        url: &SafeUrl,
314        api_secret: Option<&str>,
315    ) -> ServerResult<DynGuaridianConnection> {
316        trace!(
317            target: LOG_NET,
318            %url,
319            "Connection requested"
320        );
321        let url = match self.connection_overrides.get(url) {
322            Some(replacement) => {
323                trace!(
324                    target: LOG_NET,
325                    original_url = %url,
326                    replacement_url = %replacement,
327                    "Using a connectivity override for connection"
328                );
329
330                replacement
331            }
332            None => url,
333        };
334
335        let connector_key = url.scheme();
336
337        let Some(connector_lazy) = self.connectors_lazy.get(connector_key) else {
338            return Err(ServerError::InvalidEndpoint(anyhow!(
339                "Unsupported scheme: {}; missing endpoint handler",
340                url.scheme()
341            )));
342        };
343
344        // Clone the init function to use in the async block
345        let init_fn = connector_lazy.0.clone();
346
347        let conn = connector_lazy
348            .1
349            .get_or_try_init(|| async move { init_fn().await })
350            .await
351            .map_err(|e| {
352                ServerError::Transport(anyhow!(
353                    "Connector failed to initialize: {}",
354                    e.fmt_compact_anyhow()
355                ))
356            })?
357            .connect_guardian(url, api_secret)
358            .await
359            .inspect_err(|err| {
360                trace!(
361                    target: LOG_NET,
362                    %url,
363                    err = %err.fmt_compact(),
364                    "Connection failed"
365                );
366            })?;
367
368        trace!(
369            target: LOG_NET,
370            %url,
371            "Connection returned"
372        );
373        Ok(conn)
374    }
375
376    /// Connect to a given `url` using matching [`Connector`] to a gateway
377    ///
378    /// This is the main function consumed by the downstream use for making
379    /// connection.
380    pub async fn connect_gateway(&self, url: &SafeUrl) -> anyhow::Result<DynGatewayConnection> {
381        let url = match self.connection_overrides.get(url) {
382            Some(replacement) => {
383                trace!(
384                    target: LOG_NET,
385                    original_url = %url,
386                    replacement_url = %replacement,
387                    "Using a connectivity override for connection"
388                );
389
390                replacement
391            }
392            None => url,
393        };
394
395        let connector_key = url.scheme();
396
397        let Some(connector_lazy) = self.connectors_lazy.get(connector_key) else {
398            return Err(anyhow!(
399                "Unsupported scheme: {}; missing endpoint handler",
400                url.scheme()
401            ));
402        };
403
404        // Clone the init function to use in the async block
405        let init_fn = connector_lazy.0.clone();
406
407        connector_lazy
408            .1
409            .get_or_try_init(|| async move { init_fn().await })
410            .await
411            .map_err(|e| {
412                ServerError::Transport(anyhow!(
413                    "Connector failed to initialize: {}",
414                    e.fmt_compact_anyhow()
415                ))
416            })?
417            .connect_gateway(url)
418            .await
419    }
420}
421pub type DynConnector = Arc<dyn Connector>;
422
423#[async_trait]
424pub trait Connector: Send + Sync + 'static + Debug {
425    async fn connect_guardian(
426        &self,
427        url: &SafeUrl,
428        api_secret: Option<&str>,
429    ) -> ServerResult<DynGuaridianConnection>;
430
431    async fn connect_gateway(&self, url: &SafeUrl) -> anyhow::Result<DynGatewayConnection>;
432}
433
434/// Generic connection trait shared between [`IGuardianConnection`] and
435/// [`IGatewayConnection`]
436#[apply(async_trait_maybe_send!)]
437pub trait IConnection: Debug + Send + Sync + 'static {
438    fn is_connected(&self) -> bool;
439
440    async fn await_disconnection(&self);
441}
442
443/// A connection from api client to a federation guardian (type erased)
444pub type DynGuaridianConnection = Arc<dyn IGuardianConnection>;
445
446/// A connection from api client to a federation guardian
447#[async_trait]
448pub trait IGuardianConnection: IConnection + Debug + Send + Sync + 'static {
449    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> ServerResult<Value>;
450
451    fn into_dyn(self) -> DynGuaridianConnection
452    where
453        Self: Sized,
454    {
455        Arc::new(self)
456    }
457}
458
459/// A connection from api client to a gateway (type erased)
460pub type DynGatewayConnection = Arc<dyn IGatewayConnection>;
461
462/// A connection from a client to a gateway
463#[apply(async_trait_maybe_send!)]
464pub trait IGatewayConnection: IConnection + Debug + Send + Sync + 'static {
465    async fn request(
466        &self,
467        password: Option<String>,
468        method: Method,
469        route: &str,
470        payload: Option<Value>,
471    ) -> ServerResult<Value>;
472
473    fn into_dyn(self) -> DynGatewayConnection
474    where
475        Self: Sized,
476    {
477        Arc::new(self)
478    }
479}
480
481#[derive(Debug)]
482pub struct ConnectionPool<T: IConnection + ?Sized> {
483    /// Available connectors which we can make connections
484    connectors: ConnectorRegistry,
485
486    /// Connection pool
487    ///
488    /// Every entry in this map will be created on demand and correspond to a
489    /// single outgoing connection to a certain URL that is in the process
490    /// of being established, or we already established.
491    #[allow(clippy::type_complexity)]
492    connections: Arc<tokio::sync::Mutex<HashMap<SafeUrl, Arc<ConnectionState<T>>>>>,
493}
494
495impl<T: IConnection + ?Sized> Clone for ConnectionPool<T> {
496    fn clone(&self) -> Self {
497        Self {
498            connectors: self.connectors.clone(),
499            connections: self.connections.clone(),
500        }
501    }
502}
503
504impl<T: IConnection + ?Sized> ConnectionPool<T> {
505    pub fn new(connectors: ConnectorRegistry) -> Self {
506        Self {
507            connectors,
508            connections: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
509        }
510    }
511
512    pub async fn get_or_create_connection<F, Fut>(
513        &self,
514        url: &SafeUrl,
515        api_secret: Option<&str>,
516        create_connection: F,
517    ) -> ServerResult<Arc<T>>
518    where
519        F: Fn(SafeUrl, Option<String>, ConnectorRegistry) -> Fut + Clone + Send + Sync + 'static,
520        Fut: Future<Output = ServerResult<Arc<T>>> + Send + 'static,
521    {
522        let mut pool_locked = self.connections.lock().await;
523
524        let pool_entry_arc = pool_locked
525            .entry(url.to_owned())
526                        .and_modify(|entry_arc| {
527                // Check if existing connection is disconnected and reset the whole entry.
528                //
529                // This resets the state (like connectivity backoff), which is what we want.
530                // Since the (`OnceCell`) was already initialized, it means connection was successfully
531                // before, and disconnected afterwards.
532                if let Some(existing_conn) = entry_arc.connection.get()
533                    && !existing_conn.is_connected(){
534                        trace!(target: LOG_CLIENT_NET_API, %url, "Existing connection is disconnected, removing from pool");
535                        *entry_arc = Arc::new(ConnectionState::new_reconnecting());
536                    }
537            })
538            .or_insert_with(|| Arc::new(ConnectionState::new_initial()))
539            .clone();
540
541        // Drop the pool lock so other connections can work in parallel
542        drop(pool_locked);
543
544        let conn = pool_entry_arc
545            .connection
546            // This serializes all the connection attempts. If one attempt to connect (including
547            // waiting for the reconnect backoff) succeeds, all waiting ones will use it. If it
548            // fails, any already pending/next will attempt it right afterwards.
549            // Nit: if multiple calls are trying to connect to the same host that is offline, it
550            // will take some of them multiples of maximum retry delay to actually return with
551            // an error. This should be fine in practice and hard to avoid without a lot of
552            // complexity.
553            .get_or_try_init(|| async {
554                let retry_delay = pool_entry_arc.pre_reconnect_delay();
555                fedimint_core::runtime::sleep(retry_delay).await;
556
557                trace!(target: LOG_CLIENT_NET_API, %url, "Attempting to create a new connection");
558                create_connection(
559                    url.clone(),
560                    api_secret.map(std::string::ToString::to_string),
561                    self.connectors.clone(),
562                )
563                .await
564            })
565            .await?;
566
567        trace!(target: LOG_CLIENT_NET_API, %url, "Connection ready");
568        Ok(conn.clone())
569    }
570}
571
572/// Inner part of [`ConnectionState`] preserving state between attempts to
573/// initialize [`ConnectionState::connection`]
574#[derive(Debug)]
575struct ConnectionStateInner {
576    fresh: bool,
577    backoff: FibonacciBackoff,
578}
579
580#[derive(Debug)]
581pub struct ConnectionState<T: ?Sized> {
582    /// Connection we are trying to or already established
583    pub connection: tokio::sync::OnceCell<Arc<T>>,
584    /// State that technically is protected every time by
585    /// the serialization of `OnceCell::get_or_try_init`, but
586    /// for Rust purposes needs to be locked.
587    inner: std::sync::Mutex<ConnectionStateInner>,
588}
589
590impl<T: ?Sized> ConnectionState<T> {
591    /// Create a new connection state for a first time connection
592    pub fn new_initial() -> Self {
593        Self {
594            connection: OnceCell::new(),
595            inner: std::sync::Mutex::new(ConnectionStateInner {
596                fresh: true,
597                backoff: custom_backoff(
598                    // First time connections start quick
599                    Duration::from_millis(5),
600                    Duration::from_secs(30),
601                    None,
602                ),
603            }),
604        }
605    }
606
607    /// Create a new connection state for a connection that already failed, and
608    /// is being reset
609    pub fn new_reconnecting() -> Self {
610        Self {
611            connection: OnceCell::new(),
612            inner: std::sync::Mutex::new(ConnectionStateInner {
613                // set the attempts to 1, indicating that
614                fresh: false,
615                backoff: custom_backoff(
616                    // Connections after a disconnect start with some minimum delay
617                    Duration::from_millis(500),
618                    Duration::from_secs(30),
619                    None,
620                ),
621            }),
622        }
623    }
624
625    /// Record the fact that an attempt to connect is being made, and return
626    /// time the caller should wait.
627    pub fn pre_reconnect_delay(&self) -> Duration {
628        let mut backoff_locked = self.inner.lock().expect("Locking failed");
629        let fresh = backoff_locked.fresh;
630
631        backoff_locked.fresh = false;
632
633        if fresh {
634            Duration::default()
635        } else {
636            backoff_locked.backoff.next().expect("Keeps retrying")
637        }
638    }
639}