Skip to main content

fedimint_connectors/
lib.rs

1pub mod error;
2pub mod http;
3pub mod iroh;
4pub mod metrics;
5#[cfg(all(feature = "tor", not(target_family = "wasm")))]
6pub mod tor;
7pub mod ws;
8
9use std::collections::{BTreeMap, BTreeSet, HashMap};
10use std::fmt::{self, Debug};
11use std::pin::Pin;
12use std::sync::Arc;
13use std::time::Duration;
14
15use anyhow::{anyhow, bail};
16use async_trait::async_trait;
17use fedimint_core::envs::{FM_WS_API_CONNECT_OVERRIDES_ENV, parse_kv_list_from_env};
18use fedimint_core::module::{ApiMethod, ApiRequestErased};
19use fedimint_core::util::backoff_util::{FibonacciBackoff, custom_backoff};
20use fedimint_core::util::{FmtCompact, FmtCompactAnyhow, SafeUrl};
21use fedimint_core::{apply, async_trait_maybe_send};
22use fedimint_logging::{LOG_CLIENT_NET_API, LOG_NET};
23use fedimint_metrics::HistogramExt as _;
24use reqwest::Method;
25use serde_json::Value;
26use tokio::sync::{OnceCell, SetOnce, broadcast, watch};
27use tracing::trace;
28
29use crate::error::ServerError;
30use crate::metrics::{CONNECTION_ATTEMPTS_TOTAL, CONNECTION_DURATION_SECONDS};
31use crate::ws::WebsocketConnector;
32
33pub type ServerResult<T> = Result<T, ServerError>;
34
35/// Type for connector initialization functions
36type ConnectorInitFn = Arc<
37    dyn Fn() -> Pin<Box<dyn Future<Output = anyhow::Result<DynConnector>> + Send>> + Send + Sync,
38>;
39
40/// Builder for [`ConnectorRegistry`]
41///
42/// See [`ConnectorRegistry::build_from_client_env`] and similar
43/// to create.
44#[derive(Debug, Clone)]
45#[allow(clippy::struct_excessive_bools)] // Shut up, Clippy
46pub struct ConnectorRegistryBuilder {
47    /// List of overrides to use when attempting to connect to given url
48    ///
49    /// This is useful for testing, or forcing non-default network
50    /// connectivity.
51    connection_overrides: BTreeMap<SafeUrl, SafeUrl>,
52
53    /// Enable Iroh endpoints at all?
54    iroh_enable: bool,
55    /// Override the Iroh DNS server to use
56    iroh_dns: Option<SafeUrl>,
57    /// Should start the "next/unstable" Iroh stack
58    iroh_next: bool,
59    /// Enable Pkarr DHT discovery
60    iroh_pkarr_dht: bool,
61
62    /// Enable Websocket API handling at all?
63    ws_enable: bool,
64    ws_force_tor: bool,
65
66    // Enable HTTP
67    http_enable: bool,
68}
69
70impl ConnectorRegistryBuilder {
71    #[allow(clippy::unused_async)] // Leave room for async in the future
72    pub async fn bind(self) -> anyhow::Result<ConnectorRegistry> {
73        // Create initialization functions for each connector type
74        let mut connectors_lazy: BTreeMap<String, (ConnectorInitFn, OnceCell<DynConnector>)> =
75            BTreeMap::new();
76
77        // Eagerly created so consumers can subscribe before the Iroh
78        // connector is lazily initialized. Only Iroh bumps it today
79        // (on transport-level path changes like relay → direct).
80        let path_change = Arc::new(watch::channel(0u64).0);
81
82        // WS connector init function
83        let builder_ws = self.clone();
84        let ws_connector_init = Arc::new(move || {
85            let builder = builder_ws.clone();
86            Box::pin(async move { builder.build_ws_connector().await })
87                as Pin<Box<dyn Future<Output = anyhow::Result<DynConnector>> + Send>>
88        });
89        connectors_lazy.insert("ws".into(), (ws_connector_init.clone(), OnceCell::new()));
90        connectors_lazy.insert("wss".into(), (ws_connector_init.clone(), OnceCell::new()));
91
92        // Iroh connector init function
93        let builder_iroh = self.clone();
94        let path_change_iroh = path_change.clone();
95        connectors_lazy.insert(
96            "iroh".into(),
97            (
98                Arc::new(move || {
99                    let builder = builder_iroh.clone();
100                    let path_change = path_change_iroh.clone();
101                    Box::pin(async move { builder.build_iroh_connector(path_change).await })
102                        as Pin<Box<dyn Future<Output = anyhow::Result<DynConnector>> + Send>>
103                }),
104                OnceCell::new(),
105            ),
106        );
107
108        let builder_http = self.clone();
109        let http_connector_init = Arc::new(move || {
110            let builder = builder_http.clone();
111            Box::pin(async move { builder.build_http_connector() })
112                as Pin<Box<dyn Future<Output = anyhow::Result<DynConnector>> + Send>>
113        });
114
115        connectors_lazy.insert(
116            "http".into(),
117            (http_connector_init.clone(), OnceCell::new()),
118        );
119        connectors_lazy.insert(
120            "https".into(),
121            (http_connector_init.clone(), OnceCell::new()),
122        );
123
124        Ok(ConnectorRegistry {
125            inner: ConnectorRegistryInner {
126                connectors_lazy,
127                connection_overrides: self.connection_overrides,
128                initialized: SetOnce::new(),
129                path_change,
130            }
131            .into(),
132        })
133    }
134
135    pub async fn build_iroh_connector(
136        &self,
137        path_change: Arc<watch::Sender<u64>>,
138    ) -> anyhow::Result<DynConnector> {
139        if !self.iroh_enable {
140            bail!("Iroh connector not enabled");
141        }
142        Ok(Arc::new(
143            iroh::IrohConnector::new(
144                self.iroh_dns.clone(),
145                self.iroh_pkarr_dht,
146                self.iroh_next,
147                path_change,
148            )
149            .await?,
150        ) as DynConnector)
151    }
152
153    pub async fn build_ws_connector(&self) -> anyhow::Result<DynConnector> {
154        if !self.ws_enable {
155            bail!("Websocket connector not enabled");
156        }
157
158        match self.ws_force_tor {
159            #[cfg(all(feature = "tor", not(target_family = "wasm")))]
160            true => {
161                use crate::tor::TorConnector;
162
163                Ok(Arc::new(TorConnector::bootstrap().await?) as DynConnector)
164            }
165
166            false => Ok(Arc::new(WebsocketConnector::new()) as DynConnector),
167            #[allow(unreachable_patterns)]
168            _ => bail!("Tor requested, but not support not compiled in"),
169        }
170    }
171
172    pub fn build_http_connector(&self) -> anyhow::Result<DynConnector> {
173        if !self.http_enable {
174            bail!("Http connector not enabled");
175        }
176
177        Ok(Arc::new(crate::http::HttpConnector::default()) as DynConnector)
178    }
179
180    pub fn iroh_pkarr_dht(self, enable: bool) -> Self {
181        Self {
182            iroh_pkarr_dht: enable,
183            ..self
184        }
185    }
186
187    pub fn iroh_next(self, enable: bool) -> Self {
188        Self {
189            iroh_next: enable,
190            ..self
191        }
192    }
193
194    pub fn ws_force_tor(self, enable: bool) -> Self {
195        Self {
196            ws_force_tor: enable,
197            ..self
198        }
199    }
200
201    pub fn set_iroh_dns(self, url: SafeUrl) -> Self {
202        Self {
203            iroh_dns: Some(url),
204            ..self
205        }
206    }
207
208    /// Apply overrides from env variables
209    pub fn with_env_var_overrides(mut self) -> anyhow::Result<Self> {
210        // TODO: read rest of the env
211        for (k, v) in parse_kv_list_from_env::<_, SafeUrl>(FM_WS_API_CONNECT_OVERRIDES_ENV)? {
212            self = self.with_connection_override(k, v);
213        }
214
215        Ok(Self { ..self })
216    }
217
218    pub fn with_connection_override(
219        mut self,
220        original_url: SafeUrl,
221        replacement_url: SafeUrl,
222    ) -> Self {
223        self.connection_overrides
224            .insert(original_url, replacement_url);
225        self
226    }
227}
228
229/// Actual data shared between copies of [`ConnectorRegistry`] handle
230struct ConnectorRegistryInner {
231    /// Lazily initialized [`Connector`]s per protocol supported
232    connectors_lazy: BTreeMap<String, (ConnectorInitFn, OnceCell<DynConnector>)>,
233    /// Connection URL overrides for testing/custom routing
234    connection_overrides: BTreeMap<SafeUrl, SafeUrl>,
235    /// Set on first connection attempt
236    ///
237    /// This is used for functionality that wants to avoid making
238    /// network connections if nothing else did network request.
239    initialized: tokio::sync::SetOnce<()>,
240    /// Ticks whenever a connector observes a transport-level path change
241    /// (e.g. iroh relay → direct). Only Iroh bumps this today.
242    path_change: Arc<watch::Sender<u64>>,
243}
244
245/// A set of available connectivity protocols a client can use to make
246/// network API requests (typically to federation).
247///
248/// Maps from connection URL schema to [`Connector`] to use to connect to it.
249///
250/// See [`ConnectorRegistry::build_from_client_env`] and similar
251/// to create.
252///
253/// [`ConnectorRegistry::connect_guardian`] is the main entry point for making
254/// mixed-networking stack connection.
255///
256/// Responsibilities:
257#[derive(Clone)]
258pub struct ConnectorRegistry {
259    inner: Arc<ConnectorRegistryInner>,
260}
261
262impl fmt::Debug for ConnectorRegistry {
263    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
264        f.debug_struct("ConnectorRegistry")
265            .field("connectors_lazy", &self.inner.connectors_lazy.len())
266            .field("connection_overrides", &self.inner.connection_overrides)
267            .finish()
268    }
269}
270
271impl ConnectorRegistry {
272    /// Create a builder with recommended defaults intended for client-side
273    /// usage
274    ///
275    /// In particular mobile devices are considered.
276    pub fn build_from_client_defaults() -> ConnectorRegistryBuilder {
277        ConnectorRegistryBuilder {
278            iroh_enable: true,
279            iroh_dns: None,
280            iroh_pkarr_dht: false,
281            iroh_next: true,
282            ws_enable: true,
283            ws_force_tor: false,
284            http_enable: true,
285
286            connection_overrides: BTreeMap::default(),
287        }
288    }
289
290    /// Create a builder with recommended defaults intended for the server-side
291    /// usage
292    pub fn build_from_server_defaults() -> ConnectorRegistryBuilder {
293        ConnectorRegistryBuilder {
294            iroh_enable: true,
295            iroh_dns: None,
296            iroh_pkarr_dht: true,
297            iroh_next: true,
298            ws_enable: true,
299            ws_force_tor: false,
300            http_enable: false,
301
302            connection_overrides: BTreeMap::default(),
303        }
304    }
305
306    /// Create a builder with recommended defaults intended for testing
307    /// usage
308    pub fn build_from_testing_defaults() -> ConnectorRegistryBuilder {
309        ConnectorRegistryBuilder {
310            iroh_enable: true,
311            iroh_dns: None,
312            iroh_pkarr_dht: false,
313            iroh_next: false,
314            ws_enable: true,
315            ws_force_tor: false,
316            http_enable: true,
317
318            connection_overrides: BTreeMap::default(),
319        }
320    }
321
322    /// Like [`Self::build_from_client_defaults`] build will apply
323    /// environment-provided overrides.
324    pub fn build_from_client_env() -> anyhow::Result<ConnectorRegistryBuilder> {
325        let builder = Self::build_from_client_defaults().with_env_var_overrides()?;
326        Ok(builder)
327    }
328
329    /// Like [`Self::build_from_server_defaults`] build will apply
330    /// environment-provided overrides.
331    pub fn build_from_server_env() -> anyhow::Result<ConnectorRegistryBuilder> {
332        let builder = Self::build_from_server_defaults().with_env_var_overrides()?;
333        Ok(builder)
334    }
335
336    /// Like [`Self::build_from_testing_defaults`] build will apply
337    /// environment-provided overrides.
338    pub fn build_from_testing_env() -> anyhow::Result<ConnectorRegistryBuilder> {
339        let builder = Self::build_from_testing_defaults().with_env_var_overrides()?;
340        Ok(builder)
341    }
342
343    /// Wait until some connections have been made
344    pub async fn wait_for_initialized_connections(&self) {
345        self.inner.initialized.wait().await;
346    }
347
348    /// Connect to a given `url` using matching [`Connector`]
349    ///
350    /// This is the main function consumed by the downstream use for making
351    /// connection.
352    pub async fn connect_guardian(
353        &self,
354        url: &SafeUrl,
355        api_secret: Option<&str>,
356    ) -> ServerResult<DynGuaridianConnection> {
357        trace!(
358            target: LOG_NET,
359            %url,
360            "Connection requested to guardian"
361        );
362        let _ = self.inner.initialized.set(());
363
364        let url = match self.inner.connection_overrides.get(url) {
365            Some(replacement) => {
366                trace!(
367                    target: LOG_NET,
368                    original_url = %url,
369                    replacement_url = %replacement,
370                    "Using a connectivity override for connection"
371                );
372
373                replacement
374            }
375            None => url,
376        };
377
378        let scheme = url.scheme().to_string();
379
380        let Some(connector_lazy) = self.inner.connectors_lazy.get(&scheme) else {
381            return Err(ServerError::InvalidEndpoint(anyhow!(
382                "Unsupported scheme: {}; missing endpoint handler",
383                url.scheme()
384            )));
385        };
386
387        // Clone the init function to use in the async block
388        let init_fn = connector_lazy.0.clone();
389
390        let timer = CONNECTION_DURATION_SECONDS
391            .with_label_values(&[&scheme])
392            .start_timer_ext();
393
394        let result = connector_lazy
395            .1
396            .get_or_try_init(|| async move { init_fn().await })
397            .await
398            .map_err(|e| {
399                ServerError::Transport(anyhow!(
400                    "Connector failed to initialize: {}",
401                    e.fmt_compact_anyhow()
402                ))
403            })?
404            .connect_guardian(url, api_secret)
405            .await;
406
407        timer.observe_duration();
408
409        let result_label = if result.is_ok() { "success" } else { "error" }.to_string();
410        CONNECTION_ATTEMPTS_TOTAL
411            .with_label_values(&[&scheme, &result_label])
412            .inc();
413
414        let conn = result.inspect_err(|err| {
415            trace!(
416                target: LOG_NET,
417                %url,
418                err = %err.fmt_compact(),
419                "Connection failed"
420            );
421        })?;
422
423        trace!(
424            target: LOG_NET,
425            %url,
426            "Connection returned"
427        );
428        Ok(conn)
429    }
430
431    /// Connect to a given `url` using matching [`Connector`] to a gateway
432    ///
433    /// This is the main function consumed by the downstream use for making
434    /// connection.
435    pub async fn connect_gateway(&self, url: &SafeUrl) -> anyhow::Result<DynGatewayConnection> {
436        trace!(
437            target: LOG_NET,
438            %url,
439            "Connection requested to gateway"
440        );
441        let _ = self.inner.initialized.set(());
442
443        let url = match self.inner.connection_overrides.get(url) {
444            Some(replacement) => {
445                trace!(
446                    target: LOG_NET,
447                    original_url = %url,
448                    replacement_url = %replacement,
449                    "Using a connectivity override for connection"
450                );
451
452                replacement
453            }
454            None => url,
455        };
456
457        let scheme = url.scheme().to_string();
458
459        let Some(connector_lazy) = self.inner.connectors_lazy.get(&scheme) else {
460            return Err(anyhow!(
461                "Unsupported scheme: {}; missing endpoint handler",
462                url.scheme()
463            ));
464        };
465
466        // Clone the init function to use in the async block
467        let init_fn = connector_lazy.0.clone();
468
469        let timer = CONNECTION_DURATION_SECONDS
470            .with_label_values(&[&scheme])
471            .start_timer_ext();
472
473        let result = connector_lazy
474            .1
475            .get_or_try_init(|| async move { init_fn().await })
476            .await
477            .map_err(|e| {
478                ServerError::Transport(anyhow!(
479                    "Connector failed to initialize: {}",
480                    e.fmt_compact_anyhow()
481                ))
482            })?
483            .connect_gateway(url)
484            .await;
485
486        timer.observe_duration();
487
488        let result_label = if result.is_ok() { "success" } else { "error" }.to_string();
489        CONNECTION_ATTEMPTS_TOTAL
490            .with_label_values(&[&scheme, &result_label])
491            .inc();
492
493        result
494    }
495
496    /// Report how a connection to `url` is currently reaching its peer.
497    ///
498    /// Returns [`Connectivity::Unknown`] if no connector for the url's scheme
499    /// is registered, or if the matching connector has not been initialized
500    /// yet (i.e. no connection attempt has been made).
501    pub fn connectivity(&self, url: &SafeUrl) -> Connectivity {
502        let url = match self.inner.connection_overrides.get(url) {
503            Some(replacement) => replacement,
504            None => url,
505        };
506
507        let Some((_, connector_cell)) = self.inner.connectors_lazy.get(url.scheme()) else {
508            return Connectivity::Unknown;
509        };
510
511        match connector_cell.get() {
512            Some(connector) => connector.connectivity(url),
513            None => Connectivity::Unknown,
514        }
515    }
516
517    /// Subscribe to transport-level connectivity changes across all
518    /// connectors managed by this registry.
519    ///
520    /// The receiver ticks whenever a connector observes a path change on
521    /// an existing connection (for example an iroh connection upgrading
522    /// from relay to direct). The carried `u64` is an opaque counter —
523    /// consumers should treat each update as a "re-read connectivity"
524    /// signal.
525    pub fn connectivity_change_notifier(&self) -> watch::Receiver<u64> {
526        self.inner.path_change.subscribe()
527    }
528}
529pub type DynConnector = Arc<dyn Connector>;
530
531#[async_trait]
532pub trait Connector: Send + Sync + 'static + Debug {
533    async fn connect_guardian(
534        &self,
535        url: &SafeUrl,
536        api_secret: Option<&str>,
537    ) -> ServerResult<DynGuaridianConnection>;
538
539    async fn connect_gateway(&self, url: &SafeUrl) -> anyhow::Result<DynGatewayConnection>;
540
541    /// Report how a connection to `url` is currently reaching its peer.
542    fn connectivity(&self, url: &SafeUrl) -> Connectivity;
543}
544
545/// How a connection is currently reaching its peer.
546///
547/// Transports without a relay concept (WS, HTTP) are always
548/// [`Connectivity::Direct`]. Tor-routed connections report
549/// [`Connectivity::Tor`]. Iroh connections may be [`Connectivity::Direct`]
550/// (peer-to-peer), [`Connectivity::Relay`] (routed through a relay
551/// server), or [`Connectivity::Mixed`] (both paths active); for Iroh this
552/// can change at runtime as hole-punching succeeds or falls back.
553#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
554pub enum Connectivity {
555    Direct,
556    Relay,
557    Mixed,
558    Tor,
559    Unknown,
560}
561
562/// Per-peer connection state reported by the federation API.
563///
564/// [`PeerStatus::Connected`] carries the current [`Connectivity`] of the
565/// active connection; for Iroh this reflects the path at the moment of the
566/// emission and may be stale until the next pool-level change (relay→direct
567/// upgrades on an existing connection are not yet streamed).
568#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
569pub enum PeerStatus {
570    Disconnected,
571    Connected(Connectivity),
572}
573
574/// Generic connection trait shared between [`IGuardianConnection`] and
575/// [`IGatewayConnection`]
576#[apply(async_trait_maybe_send!)]
577pub trait IConnection: Debug + Send + Sync + 'static {
578    fn is_connected(&self) -> bool;
579
580    async fn await_disconnection(&self);
581}
582
583/// A connection from api client to a federation guardian (type erased)
584pub type DynGuaridianConnection = Arc<dyn IGuardianConnection>;
585
586/// A connection from api client to a federation guardian
587#[async_trait]
588pub trait IGuardianConnection: IConnection + Debug + Send + Sync + 'static {
589    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> ServerResult<Value>;
590
591    fn into_dyn(self) -> DynGuaridianConnection
592    where
593        Self: Sized,
594    {
595        Arc::new(self)
596    }
597}
598
599/// A connection from api client to a gateway (type erased)
600pub type DynGatewayConnection = Arc<dyn IGatewayConnection>;
601
602/// A connection from a client to a gateway
603#[apply(async_trait_maybe_send!)]
604pub trait IGatewayConnection: IConnection + Debug + Send + Sync + 'static {
605    async fn request(
606        &self,
607        password: Option<String>,
608        method: Method,
609        route: &str,
610        payload: Option<Value>,
611    ) -> ServerResult<Value>;
612
613    fn into_dyn(self) -> DynGatewayConnection
614    where
615        Self: Sized,
616    {
617        Arc::new(self)
618    }
619}
620
621#[derive(Debug)]
622pub struct ConnectionPool<T: IConnection + ?Sized> {
623    /// Available connectors which we can make connections
624    connectors: ConnectorRegistry,
625
626    active_connections: watch::Sender<BTreeSet<SafeUrl>>,
627
628    /// Connection pool
629    ///
630    /// Every entry in this map will be created on demand and correspond to a
631    /// single outgoing connection to a certain URL that is in the process
632    /// of being established, or we already established.
633    #[allow(clippy::type_complexity)]
634    connections: Arc<tokio::sync::Mutex<HashMap<SafeUrl, Arc<ConnectionState<T>>>>>,
635}
636
637impl<T: IConnection + ?Sized> Clone for ConnectionPool<T> {
638    fn clone(&self) -> Self {
639        Self {
640            connectors: self.connectors.clone(),
641            connections: self.connections.clone(),
642            active_connections: self.active_connections.clone(),
643        }
644    }
645}
646
647impl<T: IConnection + ?Sized> ConnectionPool<T> {
648    pub fn new(connectors: ConnectorRegistry) -> Self {
649        Self {
650            connectors,
651            connections: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
652            active_connections: watch::channel(BTreeSet::new()).0,
653        }
654    }
655
656    async fn get_or_init_pool_entry(&self, url: &SafeUrl) -> Arc<ConnectionState<T>> {
657        let mut pool_locked = self.connections.lock().await;
658        pool_locked
659            .entry(url.to_owned())
660            .and_modify(|entry_arc| {
661                // Check if existing connection is disconnected and reset the whole entry.
662                //
663                // This resets the state (like connectivity backoff), which is what we want.
664                // Since the (`OnceCell`) was already initialized, it means connection was
665                // successfully before, and disconnected afterwards.
666                if let Some(existing_conn) = entry_arc.connection.get()
667                    && !existing_conn.is_connected()
668                {
669                    trace!(
670                        target: LOG_CLIENT_NET_API,
671                        %url,
672                        "Existing connection is disconnected, removing from pool"
673                    );
674                    self.active_connections.send_modify(|v| {
675                        v.remove(url);
676                    });
677                    *entry_arc = Arc::new(ConnectionState::new_reconnecting());
678                }
679            })
680            .or_insert_with(|| Arc::new(ConnectionState::new_initial()))
681            .clone()
682    }
683
684    pub async fn get_or_create_connection<F, Fut>(
685        &self,
686        url: &SafeUrl,
687        api_secret: Option<&str>,
688        create_connection: F,
689    ) -> ServerResult<Arc<T>>
690    where
691        F: Fn(SafeUrl, Option<String>, ConnectorRegistry) -> Fut + Clone + Send + Sync + 'static,
692        Fut: Future<Output = ServerResult<Arc<T>>> + Send + 'static,
693    {
694        let pool_entry_arc = self.get_or_init_pool_entry(url).await;
695
696        let leader_tx = loop {
697            let mut leader_rx = {
698                let mut chan_locked = pool_entry_arc
699                    .merge_connection_attempts_chan
700                    .lock()
701                    .expect("locking error");
702
703                if chan_locked.is_closed() {
704                    let (leader_tx, leader_rx) = broadcast::channel(1);
705                    *chan_locked = leader_rx;
706                    // whoever was trying to connect last time is gone
707                    // we're out of this lame loop for followers
708                    break leader_tx;
709                }
710
711                // lets piggyback on the existing leader
712                chan_locked.resubscribe()
713            };
714
715            if let Ok(res) = leader_rx.recv().await {
716                match res {
717                    Ok(o) => return Ok(o),
718                    Err(err) => {
719                        return Err(ServerError::Connection(anyhow::format_err!("{}", err)));
720                    }
721                }
722            }
723        };
724
725        let conn = pool_entry_arc
726            .connection
727            .get_or_try_init(|| async {
728                let retry_delay = pool_entry_arc.pre_reconnect_delay();
729                fedimint_core::runtime::sleep(retry_delay).await;
730
731                trace!(target: LOG_CLIENT_NET_API, %url, "Attempting to create a new connection");
732                let res = create_connection(
733                    url.clone(),
734                    api_secret.map(std::string::ToString::to_string),
735                    self.connectors.clone(),
736                )
737                .await;
738
739                // If any other task was also waiting to connect, send them the connection
740                // result.
741                //
742                // Note: we want to send both Ok or Err, so `res?` is used only afterwards.
743                let _ = leader_tx.send(
744                    res.as_ref()
745                        .map(|o| o.clone())
746                        .map_err(|err| err.to_string()),
747                );
748
749                let conn = res?;
750
751                self.active_connections.send_modify(|v| {
752                    v.insert(url.clone());
753                });
754
755                fedimint_core::runtime::spawn("connection disconnect watch", {
756                    let conn = conn.clone();
757                    let s = self.clone();
758                    let url = url.clone();
759                    async move {
760                        // wait for this connection to disconnect
761                        conn.await_disconnection().await;
762                        // And afterwards, update `active_connections`.
763                        //
764                        // This will update the `active_connections` just like calling
765                        // `get_or_create_connection` normally do, but we will
766                        // not attempt to do anything with the result (i.e. try to connect).
767                        s.get_or_init_pool_entry(&url).await;
768                    }
769                });
770
771                Ok(conn)
772            })
773            .await?;
774
775        trace!(target: LOG_CLIENT_NET_API, %url, "Connection ready");
776        Ok(conn.clone())
777    }
778    /// Get receiver for changes in the active connections
779    pub fn get_active_connection_receiver(&self) -> watch::Receiver<BTreeSet<SafeUrl>> {
780        self.active_connections.subscribe()
781    }
782
783    pub async fn wait_for_initialized_connections(&self) {
784        self.connectors.wait_for_initialized_connections().await
785    }
786
787    /// Report how a connection to `url` is currently reaching its peer.
788    pub fn connectivity(&self, url: &SafeUrl) -> Connectivity {
789        self.connectors.connectivity(url)
790    }
791
792    /// Subscribe to transport-level connectivity changes observed by
793    /// the connectors underlying this pool.
794    pub fn connectivity_change_notifier(&self) -> watch::Receiver<u64> {
795        self.connectors.connectivity_change_notifier()
796    }
797}
798
799/// Inner part of [`ConnectionState`] preserving state between attempts to
800/// initialize [`ConnectionState::connection`]
801#[derive(Debug)]
802struct ConnectionStateInner {
803    fresh: bool,
804    backoff: FibonacciBackoff,
805}
806
807#[derive(Debug)]
808pub struct ConnectionState<T: ?Sized> {
809    /// Connection we are trying to or already established
810    pub connection: tokio::sync::OnceCell<Arc<T>>,
811
812    /// When tasks attempt to connect at the same time,
813    /// this is the receiving end of the channel where
814    /// the "leader" sends a result.
815    merge_connection_attempts_chan:
816        std::sync::Mutex<broadcast::Receiver<std::result::Result<Arc<T>, String>>>,
817
818    /// State that technically is protected every time by
819    /// the serialization of `OnceCell::get_or_try_init`, but
820    /// for Rust purposes needs to be locked.
821    inner: std::sync::Mutex<ConnectionStateInner>,
822}
823
824impl<T: ?Sized> ConnectionState<T> {
825    /// Create a new connection state for a first time connection
826    pub fn new_initial() -> Self {
827        Self {
828            connection: OnceCell::new(),
829            inner: std::sync::Mutex::new(ConnectionStateInner {
830                fresh: true,
831                backoff: custom_backoff(
832                    // First time connections start quick
833                    Duration::from_millis(5),
834                    Duration::from_secs(30),
835                    None,
836                ),
837            }),
838            merge_connection_attempts_chan: std::sync::Mutex::new(broadcast::channel(1).1),
839        }
840    }
841
842    /// Create a new connection state for a connection that already failed, and
843    /// is being reset
844    pub fn new_reconnecting() -> Self {
845        Self {
846            connection: OnceCell::new(),
847            inner: std::sync::Mutex::new(ConnectionStateInner {
848                // set the attempts to 1, indicating that
849                fresh: false,
850                backoff: custom_backoff(
851                    // Connections after a disconnect start with some minimum delay
852                    Duration::from_millis(500),
853                    Duration::from_secs(30),
854                    None,
855                ),
856            }),
857            merge_connection_attempts_chan: std::sync::Mutex::new(broadcast::channel(1).1),
858        }
859    }
860
861    /// Record the fact that an attempt to connect is being made, and return
862    /// time the caller should wait.
863    pub fn pre_reconnect_delay(&self) -> Duration {
864        let mut backoff_locked = self.inner.lock().expect("Locking failed");
865        let fresh = backoff_locked.fresh;
866
867        backoff_locked.fresh = false;
868
869        if fresh {
870            Duration::default()
871        } else {
872            backoff_locked.backoff.next().expect("Keeps retrying")
873        }
874    }
875}