Skip to main content

fedimint_connectors/
lib.rs

1pub mod error;
2pub mod http;
3pub mod iroh;
4pub mod metrics;
5#[cfg(all(feature = "tor", not(target_family = "wasm")))]
6pub mod tor;
7pub mod ws;
8
9use std::collections::{BTreeMap, BTreeSet, HashMap};
10use std::fmt::{self, Debug};
11use std::pin::Pin;
12use std::sync::Arc;
13use std::time::Duration;
14
15use anyhow::{anyhow, bail};
16use async_trait::async_trait;
17use fedimint_core::envs::{FM_WS_API_CONNECT_OVERRIDES_ENV, parse_kv_list_from_env};
18use fedimint_core::module::{ApiMethod, ApiRequestErased};
19use fedimint_core::util::backoff_util::{FibonacciBackoff, custom_backoff};
20use fedimint_core::util::{FmtCompact, FmtCompactAnyhow, SafeUrl};
21use fedimint_core::{apply, async_trait_maybe_send};
22use fedimint_logging::{LOG_CLIENT_NET_API, LOG_NET};
23use fedimint_metrics::HistogramExt as _;
24use reqwest::Method;
25use serde_json::Value;
26use tokio::sync::{OnceCell, SetOnce, broadcast, watch};
27use tracing::trace;
28
29use crate::error::ServerError;
30use crate::metrics::{CONNECTION_ATTEMPTS_TOTAL, CONNECTION_DURATION_SECONDS};
31use crate::ws::WebsocketConnector;
32
33pub type ServerResult<T> = Result<T, ServerError>;
34
35/// Type for connector initialization functions
36type ConnectorInitFn = Arc<
37    dyn Fn() -> Pin<Box<dyn Future<Output = anyhow::Result<DynConnector>> + Send>> + Send + Sync,
38>;
39
40/// Builder for [`ConnectorRegistry`]
41///
42/// See [`ConnectorRegistry::build_from_client_env`] and similar
43/// to create.
44#[derive(Debug, Clone)]
45#[allow(clippy::struct_excessive_bools)] // Shut up, Clippy
46pub struct ConnectorRegistryBuilder {
47    /// List of overrides to use when attempting to connect to given url
48    ///
49    /// This is useful for testing, or forcing non-default network
50    /// connectivity.
51    connection_overrides: BTreeMap<SafeUrl, SafeUrl>,
52
53    /// Enable Iroh endpoints at all?
54    iroh_enable: bool,
55    /// Override the Iroh DNS server to use
56    iroh_dns: Option<SafeUrl>,
57    /// Should start the "next/unstable" Iroh stack
58    iroh_next: bool,
59    /// Enable Pkarr DHT discovery
60    iroh_pkarr_dht: bool,
61
62    /// Enable Websocket API handling at all?
63    ws_enable: bool,
64    ws_force_tor: bool,
65
66    // Enable HTTP
67    http_enable: bool,
68}
69
70impl ConnectorRegistryBuilder {
71    #[allow(clippy::unused_async)] // Leave room for async in the future
72    pub async fn bind(self) -> anyhow::Result<ConnectorRegistry> {
73        // Create initialization functions for each connector type
74        let mut connectors_lazy: BTreeMap<String, (ConnectorInitFn, OnceCell<DynConnector>)> =
75            BTreeMap::new();
76
77        // Eagerly created so consumers can subscribe before the Iroh
78        // connector is lazily initialized. Only Iroh bumps it today
79        // (on transport-level path changes like relay → direct).
80        let path_change = Arc::new(watch::channel(0u64).0);
81
82        // WS connector init function
83        let builder_ws = self.clone();
84        let ws_connector_init = Arc::new(move || {
85            let builder = builder_ws.clone();
86            Box::pin(async move { builder.build_ws_connector().await })
87                as Pin<Box<dyn Future<Output = anyhow::Result<DynConnector>> + Send>>
88        });
89        connectors_lazy.insert("ws".into(), (ws_connector_init.clone(), OnceCell::new()));
90        connectors_lazy.insert("wss".into(), (ws_connector_init.clone(), OnceCell::new()));
91
92        // Iroh connector init function
93        let builder_iroh = self.clone();
94        let path_change_iroh = path_change.clone();
95        connectors_lazy.insert(
96            "iroh".into(),
97            (
98                Arc::new(move || {
99                    let builder = builder_iroh.clone();
100                    let path_change = path_change_iroh.clone();
101                    Box::pin(async move { builder.build_iroh_connector(path_change).await })
102                        as Pin<Box<dyn Future<Output = anyhow::Result<DynConnector>> + Send>>
103                }),
104                OnceCell::new(),
105            ),
106        );
107
108        let builder_http = self.clone();
109        let http_connector_init = Arc::new(move || {
110            let builder = builder_http.clone();
111            Box::pin(async move { builder.build_http_connector() })
112                as Pin<Box<dyn Future<Output = anyhow::Result<DynConnector>> + Send>>
113        });
114
115        connectors_lazy.insert(
116            "http".into(),
117            (http_connector_init.clone(), OnceCell::new()),
118        );
119        connectors_lazy.insert(
120            "https".into(),
121            (http_connector_init.clone(), OnceCell::new()),
122        );
123
124        Ok(ConnectorRegistry {
125            inner: ConnectorRegistryInner {
126                connectors_lazy,
127                connection_overrides: self.connection_overrides,
128                initialized: SetOnce::new(),
129                path_change,
130            }
131            .into(),
132        })
133    }
134
135    pub async fn build_iroh_connector(
136        &self,
137        path_change: Arc<watch::Sender<u64>>,
138    ) -> anyhow::Result<DynConnector> {
139        if !self.iroh_enable {
140            bail!("Iroh connector not enabled");
141        }
142        Ok(Arc::new(
143            iroh::IrohConnector::new(
144                self.iroh_dns.clone(),
145                self.iroh_pkarr_dht,
146                self.iroh_next,
147                path_change,
148            )
149            .await?,
150        ) as DynConnector)
151    }
152
153    pub async fn build_ws_connector(&self) -> anyhow::Result<DynConnector> {
154        if !self.ws_enable {
155            bail!("Websocket connector not enabled");
156        }
157
158        match self.ws_force_tor {
159            #[cfg(all(feature = "tor", not(target_family = "wasm")))]
160            true => {
161                use crate::tor::TorConnector;
162
163                Ok(Arc::new(TorConnector::bootstrap().await?) as DynConnector)
164            }
165
166            false => Ok(Arc::new(WebsocketConnector::new()) as DynConnector),
167            #[allow(unreachable_patterns)]
168            _ => bail!("Tor requested, but not support not compiled in"),
169        }
170    }
171
172    pub fn build_http_connector(&self) -> anyhow::Result<DynConnector> {
173        if !self.http_enable {
174            bail!("Http connector not enabled");
175        }
176
177        Ok(Arc::new(crate::http::HttpConnector::default()) as DynConnector)
178    }
179
180    pub fn iroh_pkarr_dht(self, enable: bool) -> Self {
181        Self {
182            iroh_pkarr_dht: enable,
183            ..self
184        }
185    }
186
187    pub fn iroh_next(self, enable: bool) -> Self {
188        Self {
189            iroh_next: enable,
190            ..self
191        }
192    }
193
194    pub fn ws_force_tor(self, enable: bool) -> Self {
195        Self {
196            ws_force_tor: enable,
197            ..self
198        }
199    }
200
201    pub fn http(self, enable: bool) -> Self {
202        Self {
203            http_enable: enable,
204            ..self
205        }
206    }
207
208    pub fn set_iroh_dns(self, url: SafeUrl) -> Self {
209        Self {
210            iroh_dns: Some(url),
211            ..self
212        }
213    }
214
215    /// Apply overrides from env variables
216    pub fn with_env_var_overrides(mut self) -> anyhow::Result<Self> {
217        // TODO: read rest of the env
218        for (k, v) in parse_kv_list_from_env::<_, SafeUrl>(FM_WS_API_CONNECT_OVERRIDES_ENV)? {
219            self = self.with_connection_override(k, v);
220        }
221
222        Ok(Self { ..self })
223    }
224
225    pub fn with_connection_override(
226        mut self,
227        original_url: SafeUrl,
228        replacement_url: SafeUrl,
229    ) -> Self {
230        self.connection_overrides
231            .insert(original_url, replacement_url);
232        self
233    }
234}
235
236/// Actual data shared between copies of [`ConnectorRegistry`] handle
237struct ConnectorRegistryInner {
238    /// Lazily initialized [`Connector`]s per protocol supported
239    connectors_lazy: BTreeMap<String, (ConnectorInitFn, OnceCell<DynConnector>)>,
240    /// Connection URL overrides for testing/custom routing
241    connection_overrides: BTreeMap<SafeUrl, SafeUrl>,
242    /// Set on first connection attempt
243    ///
244    /// This is used for functionality that wants to avoid making
245    /// network connections if nothing else did network request.
246    initialized: tokio::sync::SetOnce<()>,
247    /// Ticks whenever a connector observes a transport-level path change
248    /// (e.g. iroh relay → direct). Only Iroh bumps this today.
249    path_change: Arc<watch::Sender<u64>>,
250}
251
252/// A set of available connectivity protocols a client can use to make
253/// network API requests (typically to federation).
254///
255/// Maps from connection URL schema to [`Connector`] to use to connect to it.
256///
257/// See [`ConnectorRegistry::build_from_client_env`] and similar
258/// to create.
259///
260/// [`ConnectorRegistry::connect_guardian`] is the main entry point for making
261/// mixed-networking stack connection.
262///
263/// Responsibilities:
264#[derive(Clone)]
265pub struct ConnectorRegistry {
266    inner: Arc<ConnectorRegistryInner>,
267}
268
269impl fmt::Debug for ConnectorRegistry {
270    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
271        f.debug_struct("ConnectorRegistry")
272            .field("connectors_lazy", &self.inner.connectors_lazy.len())
273            .field("connection_overrides", &self.inner.connection_overrides)
274            .finish()
275    }
276}
277
278impl ConnectorRegistry {
279    /// Create a builder with recommended defaults intended for client-side
280    /// usage
281    ///
282    /// In particular mobile devices are considered.
283    pub fn build_from_client_defaults() -> ConnectorRegistryBuilder {
284        ConnectorRegistryBuilder {
285            iroh_enable: true,
286            iroh_dns: None,
287            iroh_pkarr_dht: false,
288            iroh_next: true,
289            ws_enable: true,
290            ws_force_tor: false,
291            http_enable: true,
292
293            connection_overrides: BTreeMap::default(),
294        }
295    }
296
297    /// Create a builder with recommended defaults intended for the server-side
298    /// usage
299    pub fn build_from_server_defaults() -> ConnectorRegistryBuilder {
300        ConnectorRegistryBuilder {
301            iroh_enable: true,
302            iroh_dns: None,
303            iroh_pkarr_dht: true,
304            iroh_next: true,
305            ws_enable: true,
306            ws_force_tor: false,
307            http_enable: false,
308
309            connection_overrides: BTreeMap::default(),
310        }
311    }
312
313    /// Create a builder with recommended defaults intended for testing
314    /// usage
315    pub fn build_from_testing_defaults() -> ConnectorRegistryBuilder {
316        ConnectorRegistryBuilder {
317            iroh_enable: true,
318            iroh_dns: None,
319            iroh_pkarr_dht: false,
320            iroh_next: false,
321            ws_enable: true,
322            ws_force_tor: false,
323            http_enable: true,
324
325            connection_overrides: BTreeMap::default(),
326        }
327    }
328
329    /// Like [`Self::build_from_client_defaults`] build will apply
330    /// environment-provided overrides.
331    pub fn build_from_client_env() -> anyhow::Result<ConnectorRegistryBuilder> {
332        let builder = Self::build_from_client_defaults().with_env_var_overrides()?;
333        Ok(builder)
334    }
335
336    /// Like [`Self::build_from_server_defaults`] build will apply
337    /// environment-provided overrides.
338    pub fn build_from_server_env() -> anyhow::Result<ConnectorRegistryBuilder> {
339        let builder = Self::build_from_server_defaults().with_env_var_overrides()?;
340        Ok(builder)
341    }
342
343    /// Like [`Self::build_from_testing_defaults`] build will apply
344    /// environment-provided overrides.
345    pub fn build_from_testing_env() -> anyhow::Result<ConnectorRegistryBuilder> {
346        let builder = Self::build_from_testing_defaults().with_env_var_overrides()?;
347        Ok(builder)
348    }
349
350    /// Wait until some connections have been made
351    pub async fn wait_for_initialized_connections(&self) {
352        self.inner.initialized.wait().await;
353    }
354
355    /// Connect to a given `url` using matching [`Connector`]
356    ///
357    /// This is the main function consumed by the downstream use for making
358    /// connection.
359    pub async fn connect_guardian(
360        &self,
361        url: &SafeUrl,
362        api_secret: Option<&str>,
363    ) -> ServerResult<DynGuaridianConnection> {
364        trace!(
365            target: LOG_NET,
366            %url,
367            "Connection requested to guardian"
368        );
369        let _ = self.inner.initialized.set(());
370
371        let url = match self.inner.connection_overrides.get(url) {
372            Some(replacement) => {
373                trace!(
374                    target: LOG_NET,
375                    original_url = %url,
376                    replacement_url = %replacement,
377                    "Using a connectivity override for connection"
378                );
379
380                replacement
381            }
382            None => url,
383        };
384
385        let scheme = url.scheme().to_string();
386
387        let Some(connector_lazy) = self.inner.connectors_lazy.get(&scheme) else {
388            return Err(ServerError::InvalidEndpoint(anyhow!(
389                "Unsupported scheme: {}; missing endpoint handler",
390                url.scheme()
391            )));
392        };
393
394        // Clone the init function to use in the async block
395        let init_fn = connector_lazy.0.clone();
396
397        let timer = CONNECTION_DURATION_SECONDS
398            .with_label_values(&[&scheme])
399            .start_timer_ext();
400
401        let result = connector_lazy
402            .1
403            .get_or_try_init(|| async move { init_fn().await })
404            .await
405            .map_err(|e| {
406                ServerError::Transport(anyhow!(
407                    "Connector failed to initialize: {}",
408                    e.fmt_compact_anyhow()
409                ))
410            })?
411            .connect_guardian(url, api_secret)
412            .await;
413
414        timer.observe_duration();
415
416        let result_label = if result.is_ok() { "success" } else { "error" }.to_string();
417        CONNECTION_ATTEMPTS_TOTAL
418            .with_label_values(&[&scheme, &result_label])
419            .inc();
420
421        let conn = result.inspect_err(|err| {
422            trace!(
423                target: LOG_NET,
424                %url,
425                err = %err.fmt_compact(),
426                "Connection failed"
427            );
428        })?;
429
430        trace!(
431            target: LOG_NET,
432            %url,
433            "Connection returned"
434        );
435        Ok(conn)
436    }
437
438    /// Connect to a given `url` using matching [`Connector`] to a gateway
439    ///
440    /// This is the main function consumed by the downstream use for making
441    /// connection.
442    pub async fn connect_gateway(&self, url: &SafeUrl) -> anyhow::Result<DynGatewayConnection> {
443        trace!(
444            target: LOG_NET,
445            %url,
446            "Connection requested to gateway"
447        );
448        let _ = self.inner.initialized.set(());
449
450        let url = match self.inner.connection_overrides.get(url) {
451            Some(replacement) => {
452                trace!(
453                    target: LOG_NET,
454                    original_url = %url,
455                    replacement_url = %replacement,
456                    "Using a connectivity override for connection"
457                );
458
459                replacement
460            }
461            None => url,
462        };
463
464        let scheme = url.scheme().to_string();
465
466        let Some(connector_lazy) = self.inner.connectors_lazy.get(&scheme) else {
467            return Err(anyhow!(
468                "Unsupported scheme: {}; missing endpoint handler",
469                url.scheme()
470            ));
471        };
472
473        // Clone the init function to use in the async block
474        let init_fn = connector_lazy.0.clone();
475
476        let timer = CONNECTION_DURATION_SECONDS
477            .with_label_values(&[&scheme])
478            .start_timer_ext();
479
480        let result = connector_lazy
481            .1
482            .get_or_try_init(|| async move { init_fn().await })
483            .await
484            .map_err(|e| {
485                ServerError::Transport(anyhow!(
486                    "Connector failed to initialize: {}",
487                    e.fmt_compact_anyhow()
488                ))
489            })?
490            .connect_gateway(url)
491            .await;
492
493        timer.observe_duration();
494
495        let result_label = if result.is_ok() { "success" } else { "error" }.to_string();
496        CONNECTION_ATTEMPTS_TOTAL
497            .with_label_values(&[&scheme, &result_label])
498            .inc();
499
500        result
501    }
502
503    /// Report how a connection to `url` is currently reaching its peer.
504    ///
505    /// Returns [`Connectivity::Unknown`] if no connector for the url's scheme
506    /// is registered, or if the matching connector has not been initialized
507    /// yet (i.e. no connection attempt has been made).
508    pub fn connectivity(&self, url: &SafeUrl) -> Connectivity {
509        let url = match self.inner.connection_overrides.get(url) {
510            Some(replacement) => replacement,
511            None => url,
512        };
513
514        let Some((_, connector_cell)) = self.inner.connectors_lazy.get(url.scheme()) else {
515            return Connectivity::Unknown;
516        };
517
518        match connector_cell.get() {
519            Some(connector) => connector.connectivity(url),
520            None => Connectivity::Unknown,
521        }
522    }
523
524    /// Subscribe to transport-level connectivity changes across all
525    /// connectors managed by this registry.
526    ///
527    /// The receiver ticks whenever a connector observes a path change on
528    /// an existing connection (for example an iroh connection upgrading
529    /// from relay to direct). The carried `u64` is an opaque counter —
530    /// consumers should treat each update as a "re-read connectivity"
531    /// signal.
532    pub fn connectivity_change_notifier(&self) -> watch::Receiver<u64> {
533        self.inner.path_change.subscribe()
534    }
535}
536pub type DynConnector = Arc<dyn Connector>;
537
538#[async_trait]
539pub trait Connector: Send + Sync + 'static + Debug {
540    async fn connect_guardian(
541        &self,
542        url: &SafeUrl,
543        api_secret: Option<&str>,
544    ) -> ServerResult<DynGuaridianConnection>;
545
546    async fn connect_gateway(&self, url: &SafeUrl) -> anyhow::Result<DynGatewayConnection>;
547
548    /// Report how a connection to `url` is currently reaching its peer.
549    fn connectivity(&self, url: &SafeUrl) -> Connectivity;
550}
551
552/// How a connection is currently reaching its peer.
553///
554/// Transports without a relay concept (WS, HTTP) are always
555/// [`Connectivity::Direct`]. Tor-routed connections report
556/// [`Connectivity::Tor`]. Iroh connections may be [`Connectivity::Direct`]
557/// (peer-to-peer), [`Connectivity::Relay`] (routed through a relay
558/// server), or [`Connectivity::Mixed`] (both paths active); for Iroh this
559/// can change at runtime as hole-punching succeeds or falls back.
560#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
561pub enum Connectivity {
562    Direct,
563    Relay,
564    Mixed,
565    Tor,
566    Unknown,
567}
568
569/// Per-peer connection state reported by the federation API.
570///
571/// [`PeerStatus::Connected`] carries the current [`Connectivity`] of the
572/// active connection; for Iroh this reflects the path at the moment of the
573/// emission and may be stale until the next pool-level change (relay→direct
574/// upgrades on an existing connection are not yet streamed).
575#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
576pub enum PeerStatus {
577    Disconnected,
578    Connected(Connectivity),
579}
580
581/// Generic connection trait shared between [`IGuardianConnection`] and
582/// [`IGatewayConnection`]
583#[apply(async_trait_maybe_send!)]
584pub trait IConnection: Debug + Send + Sync + 'static {
585    fn is_connected(&self) -> bool;
586
587    async fn await_disconnection(&self);
588}
589
590/// A connection from api client to a federation guardian (type erased)
591pub type DynGuaridianConnection = Arc<dyn IGuardianConnection>;
592
593/// A connection from api client to a federation guardian
594#[async_trait]
595pub trait IGuardianConnection: IConnection + Debug + Send + Sync + 'static {
596    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> ServerResult<Value>;
597
598    fn into_dyn(self) -> DynGuaridianConnection
599    where
600        Self: Sized,
601    {
602        Arc::new(self)
603    }
604}
605
606/// A connection from api client to a gateway (type erased)
607pub type DynGatewayConnection = Arc<dyn IGatewayConnection>;
608
609/// A connection from a client to a gateway
610#[apply(async_trait_maybe_send!)]
611pub trait IGatewayConnection: IConnection + Debug + Send + Sync + 'static {
612    async fn request(
613        &self,
614        password: Option<String>,
615        method: Method,
616        route: &str,
617        payload: Option<Value>,
618    ) -> ServerResult<Value>;
619
620    fn into_dyn(self) -> DynGatewayConnection
621    where
622        Self: Sized,
623    {
624        Arc::new(self)
625    }
626}
627
628#[derive(Debug)]
629pub struct ConnectionPool<T: IConnection + ?Sized> {
630    /// Available connectors which we can make connections
631    connectors: ConnectorRegistry,
632
633    active_connections: watch::Sender<BTreeSet<SafeUrl>>,
634
635    /// Connection pool
636    ///
637    /// Every entry in this map will be created on demand and correspond to a
638    /// single outgoing connection to a certain URL that is in the process
639    /// of being established, or we already established.
640    #[allow(clippy::type_complexity)]
641    connections: Arc<tokio::sync::Mutex<HashMap<SafeUrl, Arc<ConnectionState<T>>>>>,
642}
643
644impl<T: IConnection + ?Sized> Clone for ConnectionPool<T> {
645    fn clone(&self) -> Self {
646        Self {
647            connectors: self.connectors.clone(),
648            connections: self.connections.clone(),
649            active_connections: self.active_connections.clone(),
650        }
651    }
652}
653
654impl<T: IConnection + ?Sized> ConnectionPool<T> {
655    pub fn new(connectors: ConnectorRegistry) -> Self {
656        Self {
657            connectors,
658            connections: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
659            active_connections: watch::channel(BTreeSet::new()).0,
660        }
661    }
662
663    async fn get_or_init_pool_entry(&self, url: &SafeUrl) -> Arc<ConnectionState<T>> {
664        let mut pool_locked = self.connections.lock().await;
665        pool_locked
666            .entry(url.to_owned())
667            .and_modify(|entry_arc| {
668                // Check if existing connection is disconnected and reset the whole entry.
669                //
670                // This resets the state (like connectivity backoff), which is what we want.
671                // Since the (`OnceCell`) was already initialized, it means connection was
672                // successfully before, and disconnected afterwards.
673                if let Some(existing_conn) = entry_arc.connection.get()
674                    && !existing_conn.is_connected()
675                {
676                    trace!(
677                        target: LOG_CLIENT_NET_API,
678                        %url,
679                        "Existing connection is disconnected, removing from pool"
680                    );
681                    self.active_connections.send_modify(|v| {
682                        v.remove(url);
683                    });
684                    *entry_arc = Arc::new(ConnectionState::new_reconnecting());
685                }
686            })
687            .or_insert_with(|| Arc::new(ConnectionState::new_initial()))
688            .clone()
689    }
690
691    pub async fn get_or_create_connection<F, Fut>(
692        &self,
693        url: &SafeUrl,
694        api_secret: Option<&str>,
695        create_connection: F,
696    ) -> ServerResult<Arc<T>>
697    where
698        F: Fn(SafeUrl, Option<String>, ConnectorRegistry) -> Fut + Clone + Send + Sync + 'static,
699        Fut: Future<Output = ServerResult<Arc<T>>> + Send + 'static,
700    {
701        let pool_entry_arc = self.get_or_init_pool_entry(url).await;
702
703        let leader_tx = loop {
704            let mut leader_rx = {
705                let mut chan_locked = pool_entry_arc
706                    .merge_connection_attempts_chan
707                    .lock()
708                    .expect("locking error");
709
710                if chan_locked.is_closed() {
711                    let (leader_tx, leader_rx) = broadcast::channel(1);
712                    *chan_locked = leader_rx;
713                    // whoever was trying to connect last time is gone
714                    // we're out of this lame loop for followers
715                    break leader_tx;
716                }
717
718                // lets piggyback on the existing leader
719                chan_locked.resubscribe()
720            };
721
722            if let Ok(res) = leader_rx.recv().await {
723                match res {
724                    Ok(o) => return Ok(o),
725                    Err(err) => {
726                        return Err(ServerError::Connection(anyhow::format_err!("{}", err)));
727                    }
728                }
729            }
730        };
731
732        let conn = pool_entry_arc
733            .connection
734            .get_or_try_init(|| async {
735                let retry_delay = pool_entry_arc.pre_reconnect_delay();
736                fedimint_core::runtime::sleep(retry_delay).await;
737
738                trace!(target: LOG_CLIENT_NET_API, %url, "Attempting to create a new connection");
739                let res = create_connection(
740                    url.clone(),
741                    api_secret.map(std::string::ToString::to_string),
742                    self.connectors.clone(),
743                )
744                .await;
745
746                // If any other task was also waiting to connect, send them the connection
747                // result.
748                //
749                // Note: we want to send both Ok or Err, so `res?` is used only afterwards.
750                let _ = leader_tx.send(
751                    res.as_ref()
752                        .map(|o| o.clone())
753                        .map_err(|err| err.to_string()),
754                );
755
756                let conn = res?;
757
758                self.active_connections.send_modify(|v| {
759                    v.insert(url.clone());
760                });
761
762                fedimint_core::runtime::spawn("connection disconnect watch", {
763                    let conn = conn.clone();
764                    let s = self.clone();
765                    let url = url.clone();
766                    async move {
767                        // wait for this connection to disconnect
768                        conn.await_disconnection().await;
769                        // And afterwards, update `active_connections`.
770                        //
771                        // This will update the `active_connections` just like calling
772                        // `get_or_create_connection` normally do, but we will
773                        // not attempt to do anything with the result (i.e. try to connect).
774                        s.get_or_init_pool_entry(&url).await;
775                    }
776                });
777
778                Ok(conn)
779            })
780            .await?;
781
782        trace!(target: LOG_CLIENT_NET_API, %url, "Connection ready");
783        Ok(conn.clone())
784    }
785    /// Get receiver for changes in the active connections
786    pub fn get_active_connection_receiver(&self) -> watch::Receiver<BTreeSet<SafeUrl>> {
787        self.active_connections.subscribe()
788    }
789
790    pub async fn wait_for_initialized_connections(&self) {
791        self.connectors.wait_for_initialized_connections().await
792    }
793
794    /// Report how a connection to `url` is currently reaching its peer.
795    pub fn connectivity(&self, url: &SafeUrl) -> Connectivity {
796        self.connectors.connectivity(url)
797    }
798
799    /// Subscribe to transport-level connectivity changes observed by
800    /// the connectors underlying this pool.
801    pub fn connectivity_change_notifier(&self) -> watch::Receiver<u64> {
802        self.connectors.connectivity_change_notifier()
803    }
804}
805
806/// Inner part of [`ConnectionState`] preserving state between attempts to
807/// initialize [`ConnectionState::connection`]
808#[derive(Debug)]
809struct ConnectionStateInner {
810    fresh: bool,
811    backoff: FibonacciBackoff,
812}
813
814#[derive(Debug)]
815pub struct ConnectionState<T: ?Sized> {
816    /// Connection we are trying to or already established
817    pub connection: tokio::sync::OnceCell<Arc<T>>,
818
819    /// When tasks attempt to connect at the same time,
820    /// this is the receiving end of the channel where
821    /// the "leader" sends a result.
822    merge_connection_attempts_chan:
823        std::sync::Mutex<broadcast::Receiver<std::result::Result<Arc<T>, String>>>,
824
825    /// State that technically is protected every time by
826    /// the serialization of `OnceCell::get_or_try_init`, but
827    /// for Rust purposes needs to be locked.
828    inner: std::sync::Mutex<ConnectionStateInner>,
829}
830
831impl<T: ?Sized> ConnectionState<T> {
832    /// Create a new connection state for a first time connection
833    pub fn new_initial() -> Self {
834        Self {
835            connection: OnceCell::new(),
836            inner: std::sync::Mutex::new(ConnectionStateInner {
837                fresh: true,
838                backoff: custom_backoff(
839                    // First time connections start quick
840                    Duration::from_millis(5),
841                    Duration::from_secs(30),
842                    None,
843                ),
844            }),
845            merge_connection_attempts_chan: std::sync::Mutex::new(broadcast::channel(1).1),
846        }
847    }
848
849    /// Create a new connection state for a connection that already failed, and
850    /// is being reset
851    pub fn new_reconnecting() -> Self {
852        Self {
853            connection: OnceCell::new(),
854            inner: std::sync::Mutex::new(ConnectionStateInner {
855                // set the attempts to 1, indicating that
856                fresh: false,
857                backoff: custom_backoff(
858                    // Connections after a disconnect start with some minimum delay
859                    Duration::from_millis(500),
860                    Duration::from_secs(30),
861                    None,
862                ),
863            }),
864            merge_connection_attempts_chan: std::sync::Mutex::new(broadcast::channel(1).1),
865        }
866    }
867
868    /// Record the fact that an attempt to connect is being made, and return
869    /// time the caller should wait.
870    pub fn pre_reconnect_delay(&self) -> Duration {
871        let mut backoff_locked = self.inner.lock().expect("Locking failed");
872        let fresh = backoff_locked.fresh;
873
874        backoff_locked.fresh = false;
875
876        if fresh {
877            Duration::default()
878        } else {
879            backoff_locked.backoff.next().expect("Keeps retrying")
880        }
881    }
882}