fedimint_connectors/
lib.rs

1pub mod error;
2pub mod http;
3pub mod iroh;
4pub mod metrics;
5#[cfg(all(feature = "tor", not(target_family = "wasm")))]
6pub mod tor;
7pub mod ws;
8
9use std::collections::{BTreeMap, BTreeSet, HashMap};
10use std::fmt::{self, Debug};
11use std::pin::Pin;
12use std::sync::Arc;
13use std::time::Duration;
14
15use anyhow::{anyhow, bail};
16use async_trait::async_trait;
17use fedimint_core::envs::{FM_WS_API_CONNECT_OVERRIDES_ENV, parse_kv_list_from_env};
18use fedimint_core::module::{ApiMethod, ApiRequestErased};
19use fedimint_core::util::backoff_util::{FibonacciBackoff, custom_backoff};
20use fedimint_core::util::{FmtCompact, FmtCompactAnyhow, SafeUrl};
21use fedimint_core::{apply, async_trait_maybe_send};
22use fedimint_logging::{LOG_CLIENT_NET_API, LOG_NET};
23use fedimint_metrics::HistogramExt as _;
24use reqwest::Method;
25use serde_json::Value;
26use tokio::sync::{OnceCell, SetOnce, broadcast, watch};
27use tracing::trace;
28
29use crate::error::ServerError;
30use crate::metrics::{CONNECTION_ATTEMPTS_TOTAL, CONNECTION_DURATION_SECONDS};
31use crate::ws::WebsocketConnector;
32
33pub type ServerResult<T> = Result<T, ServerError>;
34
35/// Type for connector initialization functions
36type ConnectorInitFn = Arc<
37    dyn Fn() -> Pin<Box<dyn Future<Output = anyhow::Result<DynConnector>> + Send>> + Send + Sync,
38>;
39
40/// Builder for [`ConnectorRegistry`]
41///
42/// See [`ConnectorRegistry::build_from_client_env`] and similar
43/// to create.
44#[derive(Debug, Clone)]
45#[allow(clippy::struct_excessive_bools)] // Shut up, Clippy
46pub struct ConnectorRegistryBuilder {
47    /// List of overrides to use when attempting to connect to given url
48    ///
49    /// This is useful for testing, or forcing non-default network
50    /// connectivity.
51    connection_overrides: BTreeMap<SafeUrl, SafeUrl>,
52
53    /// Enable Iroh endpoints at all?
54    iroh_enable: bool,
55    /// Override the Iroh DNS server to use
56    iroh_dns: Option<SafeUrl>,
57    /// Should start the "next/unstable" Iroh stack
58    iroh_next: bool,
59    /// Enable Pkarr DHT discovery
60    iroh_pkarr_dht: bool,
61
62    /// Enable Websocket API handling at all?
63    ws_enable: bool,
64    ws_force_tor: bool,
65
66    // Enable HTTP
67    http_enable: bool,
68}
69
70impl ConnectorRegistryBuilder {
71    #[allow(clippy::unused_async)] // Leave room for async in the future
72    pub async fn bind(self) -> anyhow::Result<ConnectorRegistry> {
73        // Create initialization functions for each connector type
74        let mut connectors_lazy: BTreeMap<String, (ConnectorInitFn, OnceCell<DynConnector>)> =
75            BTreeMap::new();
76
77        // WS connector init function
78        let builder_ws = self.clone();
79        let ws_connector_init = Arc::new(move || {
80            let builder = builder_ws.clone();
81            Box::pin(async move { builder.build_ws_connector().await })
82                as Pin<Box<dyn Future<Output = anyhow::Result<DynConnector>> + Send>>
83        });
84        connectors_lazy.insert("ws".into(), (ws_connector_init.clone(), OnceCell::new()));
85        connectors_lazy.insert("wss".into(), (ws_connector_init.clone(), OnceCell::new()));
86
87        // Iroh connector init function
88        let builder_iroh = self.clone();
89        connectors_lazy.insert(
90            "iroh".into(),
91            (
92                Arc::new(move || {
93                    let builder = builder_iroh.clone();
94                    Box::pin(async move { builder.build_iroh_connector().await })
95                        as Pin<Box<dyn Future<Output = anyhow::Result<DynConnector>> + Send>>
96                }),
97                OnceCell::new(),
98            ),
99        );
100
101        let builder_http = self.clone();
102        let http_connector_init = Arc::new(move || {
103            let builder = builder_http.clone();
104            Box::pin(async move { builder.build_http_connector() })
105                as Pin<Box<dyn Future<Output = anyhow::Result<DynConnector>> + Send>>
106        });
107
108        connectors_lazy.insert(
109            "http".into(),
110            (http_connector_init.clone(), OnceCell::new()),
111        );
112        connectors_lazy.insert(
113            "https".into(),
114            (http_connector_init.clone(), OnceCell::new()),
115        );
116
117        Ok(ConnectorRegistry {
118            inner: ConnectorRegistryInner {
119                connectors_lazy,
120                connection_overrides: self.connection_overrides,
121                initialized: SetOnce::new(),
122            }
123            .into(),
124        })
125    }
126
127    pub async fn build_iroh_connector(&self) -> anyhow::Result<DynConnector> {
128        if !self.iroh_enable {
129            bail!("Iroh connector not enabled");
130        }
131        Ok(Arc::new(
132            iroh::IrohConnector::new(self.iroh_dns.clone(), self.iroh_pkarr_dht, self.iroh_next)
133                .await?,
134        ) as DynConnector)
135    }
136
137    pub async fn build_ws_connector(&self) -> anyhow::Result<DynConnector> {
138        if !self.ws_enable {
139            bail!("Websocket connector not enabled");
140        }
141
142        match self.ws_force_tor {
143            #[cfg(all(feature = "tor", not(target_family = "wasm")))]
144            true => {
145                use crate::tor::TorConnector;
146
147                Ok(Arc::new(TorConnector::bootstrap().await?) as DynConnector)
148            }
149
150            false => Ok(Arc::new(WebsocketConnector::new()) as DynConnector),
151            #[allow(unreachable_patterns)]
152            _ => bail!("Tor requested, but not support not compiled in"),
153        }
154    }
155
156    pub fn build_http_connector(&self) -> anyhow::Result<DynConnector> {
157        if !self.http_enable {
158            bail!("Http connector not enabled");
159        }
160
161        Ok(Arc::new(crate::http::HttpConnector::default()) as DynConnector)
162    }
163
164    pub fn iroh_pkarr_dht(self, enable: bool) -> Self {
165        Self {
166            iroh_pkarr_dht: enable,
167            ..self
168        }
169    }
170
171    pub fn iroh_next(self, enable: bool) -> Self {
172        Self {
173            iroh_next: enable,
174            ..self
175        }
176    }
177
178    pub fn ws_force_tor(self, enable: bool) -> Self {
179        Self {
180            ws_force_tor: enable,
181            ..self
182        }
183    }
184
185    pub fn set_iroh_dns(self, url: SafeUrl) -> Self {
186        Self {
187            iroh_dns: Some(url),
188            ..self
189        }
190    }
191
192    /// Apply overrides from env variables
193    pub fn with_env_var_overrides(mut self) -> anyhow::Result<Self> {
194        // TODO: read rest of the env
195        for (k, v) in parse_kv_list_from_env::<_, SafeUrl>(FM_WS_API_CONNECT_OVERRIDES_ENV)? {
196            self = self.with_connection_override(k, v);
197        }
198
199        Ok(Self { ..self })
200    }
201
202    pub fn with_connection_override(
203        mut self,
204        original_url: SafeUrl,
205        replacement_url: SafeUrl,
206    ) -> Self {
207        self.connection_overrides
208            .insert(original_url, replacement_url);
209        self
210    }
211}
212
213/// Actual data shared between copies of [`ConnectorRegistry`] handle
214struct ConnectorRegistryInner {
215    /// Lazily initialized [`Connector`]s per protocol supported
216    connectors_lazy: BTreeMap<String, (ConnectorInitFn, OnceCell<DynConnector>)>,
217    /// Connection URL overrides for testing/custom routing
218    connection_overrides: BTreeMap<SafeUrl, SafeUrl>,
219    /// Set on first connection attempt
220    ///
221    /// This is used for functionality that wants to avoid making
222    /// network connections if nothing else did network request.
223    initialized: tokio::sync::SetOnce<()>,
224}
225
226/// A set of available connectivity protocols a client can use to make
227/// network API requests (typically to federation).
228///
229/// Maps from connection URL schema to [`Connector`] to use to connect to it.
230///
231/// See [`ConnectorRegistry::build_from_client_env`] and similar
232/// to create.
233///
234/// [`ConnectorRegistry::connect_guardian`] is the main entry point for making
235/// mixed-networking stack connection.
236///
237/// Responsibilities:
238#[derive(Clone)]
239pub struct ConnectorRegistry {
240    inner: Arc<ConnectorRegistryInner>,
241}
242
243impl fmt::Debug for ConnectorRegistry {
244    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
245        f.debug_struct("ConnectorRegistry")
246            .field("connectors_lazy", &self.inner.connectors_lazy.len())
247            .field("connection_overrides", &self.inner.connection_overrides)
248            .finish()
249    }
250}
251
252impl ConnectorRegistry {
253    /// Create a builder with recommended defaults intended for client-side
254    /// usage
255    ///
256    /// In particular mobile devices are considered.
257    pub fn build_from_client_defaults() -> ConnectorRegistryBuilder {
258        ConnectorRegistryBuilder {
259            iroh_enable: true,
260            iroh_dns: None,
261            iroh_pkarr_dht: false,
262            iroh_next: true,
263            ws_enable: true,
264            ws_force_tor: false,
265            http_enable: true,
266
267            connection_overrides: BTreeMap::default(),
268        }
269    }
270
271    /// Create a builder with recommended defaults intended for the server-side
272    /// usage
273    pub fn build_from_server_defaults() -> ConnectorRegistryBuilder {
274        ConnectorRegistryBuilder {
275            iroh_enable: true,
276            iroh_dns: None,
277            iroh_pkarr_dht: true,
278            iroh_next: true,
279            ws_enable: true,
280            ws_force_tor: false,
281            http_enable: false,
282
283            connection_overrides: BTreeMap::default(),
284        }
285    }
286
287    /// Create a builder with recommended defaults intended for testing
288    /// usage
289    pub fn build_from_testing_defaults() -> ConnectorRegistryBuilder {
290        ConnectorRegistryBuilder {
291            iroh_enable: true,
292            iroh_dns: None,
293            iroh_pkarr_dht: false,
294            iroh_next: false,
295            ws_enable: true,
296            ws_force_tor: false,
297            http_enable: true,
298
299            connection_overrides: BTreeMap::default(),
300        }
301    }
302
303    /// Like [`Self::build_from_client_defaults`] build will apply
304    /// environment-provided overrides.
305    pub fn build_from_client_env() -> anyhow::Result<ConnectorRegistryBuilder> {
306        let builder = Self::build_from_client_defaults().with_env_var_overrides()?;
307        Ok(builder)
308    }
309
310    /// Like [`Self::build_from_server_defaults`] build will apply
311    /// environment-provided overrides.
312    pub fn build_from_server_env() -> anyhow::Result<ConnectorRegistryBuilder> {
313        let builder = Self::build_from_server_defaults().with_env_var_overrides()?;
314        Ok(builder)
315    }
316
317    /// Like [`Self::build_from_testing_defaults`] build will apply
318    /// environment-provided overrides.
319    pub fn build_from_testing_env() -> anyhow::Result<ConnectorRegistryBuilder> {
320        let builder = Self::build_from_testing_defaults().with_env_var_overrides()?;
321        Ok(builder)
322    }
323
324    /// Wait until some connections have been made
325    pub async fn wait_for_initialized_connections(&self) {
326        self.inner.initialized.wait().await;
327    }
328
329    /// Connect to a given `url` using matching [`Connector`]
330    ///
331    /// This is the main function consumed by the downstream use for making
332    /// connection.
333    pub async fn connect_guardian(
334        &self,
335        url: &SafeUrl,
336        api_secret: Option<&str>,
337    ) -> ServerResult<DynGuaridianConnection> {
338        trace!(
339            target: LOG_NET,
340            %url,
341            "Connection requested to guardian"
342        );
343        let _ = self.inner.initialized.set(());
344
345        let url = match self.inner.connection_overrides.get(url) {
346            Some(replacement) => {
347                trace!(
348                    target: LOG_NET,
349                    original_url = %url,
350                    replacement_url = %replacement,
351                    "Using a connectivity override for connection"
352                );
353
354                replacement
355            }
356            None => url,
357        };
358
359        let scheme = url.scheme().to_string();
360
361        let Some(connector_lazy) = self.inner.connectors_lazy.get(&scheme) else {
362            return Err(ServerError::InvalidEndpoint(anyhow!(
363                "Unsupported scheme: {}; missing endpoint handler",
364                url.scheme()
365            )));
366        };
367
368        // Clone the init function to use in the async block
369        let init_fn = connector_lazy.0.clone();
370
371        let timer = CONNECTION_DURATION_SECONDS
372            .with_label_values(&[&scheme])
373            .start_timer_ext();
374
375        let result = connector_lazy
376            .1
377            .get_or_try_init(|| async move { init_fn().await })
378            .await
379            .map_err(|e| {
380                ServerError::Transport(anyhow!(
381                    "Connector failed to initialize: {}",
382                    e.fmt_compact_anyhow()
383                ))
384            })?
385            .connect_guardian(url, api_secret)
386            .await;
387
388        timer.observe_duration();
389
390        let result_label = if result.is_ok() { "success" } else { "error" }.to_string();
391        CONNECTION_ATTEMPTS_TOTAL
392            .with_label_values(&[&scheme, &result_label])
393            .inc();
394
395        let conn = result.inspect_err(|err| {
396            trace!(
397                target: LOG_NET,
398                %url,
399                err = %err.fmt_compact(),
400                "Connection failed"
401            );
402        })?;
403
404        trace!(
405            target: LOG_NET,
406            %url,
407            "Connection returned"
408        );
409        Ok(conn)
410    }
411
412    /// Connect to a given `url` using matching [`Connector`] to a gateway
413    ///
414    /// This is the main function consumed by the downstream use for making
415    /// connection.
416    pub async fn connect_gateway(&self, url: &SafeUrl) -> anyhow::Result<DynGatewayConnection> {
417        trace!(
418            target: LOG_NET,
419            %url,
420            "Connection requested to gateway"
421        );
422        let _ = self.inner.initialized.set(());
423
424        let url = match self.inner.connection_overrides.get(url) {
425            Some(replacement) => {
426                trace!(
427                    target: LOG_NET,
428                    original_url = %url,
429                    replacement_url = %replacement,
430                    "Using a connectivity override for connection"
431                );
432
433                replacement
434            }
435            None => url,
436        };
437
438        let scheme = url.scheme().to_string();
439
440        let Some(connector_lazy) = self.inner.connectors_lazy.get(&scheme) else {
441            return Err(anyhow!(
442                "Unsupported scheme: {}; missing endpoint handler",
443                url.scheme()
444            ));
445        };
446
447        // Clone the init function to use in the async block
448        let init_fn = connector_lazy.0.clone();
449
450        let timer = CONNECTION_DURATION_SECONDS
451            .with_label_values(&[&scheme])
452            .start_timer_ext();
453
454        let result = connector_lazy
455            .1
456            .get_or_try_init(|| async move { init_fn().await })
457            .await
458            .map_err(|e| {
459                ServerError::Transport(anyhow!(
460                    "Connector failed to initialize: {}",
461                    e.fmt_compact_anyhow()
462                ))
463            })?
464            .connect_gateway(url)
465            .await;
466
467        timer.observe_duration();
468
469        let result_label = if result.is_ok() { "success" } else { "error" }.to_string();
470        CONNECTION_ATTEMPTS_TOTAL
471            .with_label_values(&[&scheme, &result_label])
472            .inc();
473
474        result
475    }
476}
477pub type DynConnector = Arc<dyn Connector>;
478
479#[async_trait]
480pub trait Connector: Send + Sync + 'static + Debug {
481    async fn connect_guardian(
482        &self,
483        url: &SafeUrl,
484        api_secret: Option<&str>,
485    ) -> ServerResult<DynGuaridianConnection>;
486
487    async fn connect_gateway(&self, url: &SafeUrl) -> anyhow::Result<DynGatewayConnection>;
488}
489
490/// Generic connection trait shared between [`IGuardianConnection`] and
491/// [`IGatewayConnection`]
492#[apply(async_trait_maybe_send!)]
493pub trait IConnection: Debug + Send + Sync + 'static {
494    fn is_connected(&self) -> bool;
495
496    async fn await_disconnection(&self);
497}
498
499/// A connection from api client to a federation guardian (type erased)
500pub type DynGuaridianConnection = Arc<dyn IGuardianConnection>;
501
502/// A connection from api client to a federation guardian
503#[async_trait]
504pub trait IGuardianConnection: IConnection + Debug + Send + Sync + 'static {
505    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> ServerResult<Value>;
506
507    fn into_dyn(self) -> DynGuaridianConnection
508    where
509        Self: Sized,
510    {
511        Arc::new(self)
512    }
513}
514
515/// A connection from api client to a gateway (type erased)
516pub type DynGatewayConnection = Arc<dyn IGatewayConnection>;
517
518/// A connection from a client to a gateway
519#[apply(async_trait_maybe_send!)]
520pub trait IGatewayConnection: IConnection + Debug + Send + Sync + 'static {
521    async fn request(
522        &self,
523        password: Option<String>,
524        method: Method,
525        route: &str,
526        payload: Option<Value>,
527    ) -> ServerResult<Value>;
528
529    fn into_dyn(self) -> DynGatewayConnection
530    where
531        Self: Sized,
532    {
533        Arc::new(self)
534    }
535}
536
537#[derive(Debug)]
538pub struct ConnectionPool<T: IConnection + ?Sized> {
539    /// Available connectors which we can make connections
540    connectors: ConnectorRegistry,
541
542    active_connections: watch::Sender<BTreeSet<SafeUrl>>,
543
544    /// Connection pool
545    ///
546    /// Every entry in this map will be created on demand and correspond to a
547    /// single outgoing connection to a certain URL that is in the process
548    /// of being established, or we already established.
549    #[allow(clippy::type_complexity)]
550    connections: Arc<tokio::sync::Mutex<HashMap<SafeUrl, Arc<ConnectionState<T>>>>>,
551}
552
553impl<T: IConnection + ?Sized> Clone for ConnectionPool<T> {
554    fn clone(&self) -> Self {
555        Self {
556            connectors: self.connectors.clone(),
557            connections: self.connections.clone(),
558            active_connections: self.active_connections.clone(),
559        }
560    }
561}
562
563impl<T: IConnection + ?Sized> ConnectionPool<T> {
564    pub fn new(connectors: ConnectorRegistry) -> Self {
565        Self {
566            connectors,
567            connections: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
568            active_connections: watch::channel(BTreeSet::new()).0,
569        }
570    }
571
572    async fn get_or_init_pool_entry(&self, url: &SafeUrl) -> Arc<ConnectionState<T>> {
573        let mut pool_locked = self.connections.lock().await;
574        pool_locked
575            .entry(url.to_owned())
576            .and_modify(|entry_arc| {
577                // Check if existing connection is disconnected and reset the whole entry.
578                //
579                // This resets the state (like connectivity backoff), which is what we want.
580                // Since the (`OnceCell`) was already initialized, it means connection was
581                // successfully before, and disconnected afterwards.
582                if let Some(existing_conn) = entry_arc.connection.get()
583                    && !existing_conn.is_connected()
584                {
585                    trace!(
586                        target: LOG_CLIENT_NET_API,
587                        %url,
588                        "Existing connection is disconnected, removing from pool"
589                    );
590                    self.active_connections.send_modify(|v| {
591                        v.remove(url);
592                    });
593                    *entry_arc = Arc::new(ConnectionState::new_reconnecting());
594                }
595            })
596            .or_insert_with(|| Arc::new(ConnectionState::new_initial()))
597            .clone()
598    }
599
600    pub async fn get_or_create_connection<F, Fut>(
601        &self,
602        url: &SafeUrl,
603        api_secret: Option<&str>,
604        create_connection: F,
605    ) -> ServerResult<Arc<T>>
606    where
607        F: Fn(SafeUrl, Option<String>, ConnectorRegistry) -> Fut + Clone + Send + Sync + 'static,
608        Fut: Future<Output = ServerResult<Arc<T>>> + Send + 'static,
609    {
610        let pool_entry_arc = self.get_or_init_pool_entry(url).await;
611
612        let leader_tx = loop {
613            let mut leader_rx = {
614                let mut chan_locked = pool_entry_arc
615                    .merge_connection_attempts_chan
616                    .lock()
617                    .expect("locking error");
618
619                if chan_locked.is_closed() {
620                    let (leader_tx, leader_rx) = broadcast::channel(1);
621                    *chan_locked = leader_rx;
622                    // whoever was trying to connect last time is gone
623                    // we're out of this lame loop for followers
624                    break leader_tx;
625                }
626
627                // lets piggyback on the existing leader
628                chan_locked.resubscribe()
629            };
630
631            if let Ok(res) = leader_rx.recv().await {
632                match res {
633                    Ok(o) => return Ok(o),
634                    Err(err) => {
635                        return Err(ServerError::Connection(anyhow::format_err!("{}", err)));
636                    }
637                }
638            }
639        };
640
641        let conn = pool_entry_arc
642            .connection
643            .get_or_try_init(|| async {
644                let retry_delay = pool_entry_arc.pre_reconnect_delay();
645                fedimint_core::runtime::sleep(retry_delay).await;
646
647                trace!(target: LOG_CLIENT_NET_API, %url, "Attempting to create a new connection");
648                let res = create_connection(
649                    url.clone(),
650                    api_secret.map(std::string::ToString::to_string),
651                    self.connectors.clone(),
652                )
653                .await;
654
655                // If any other task was also waiting to connect, send them the connection
656                // result.
657                //
658                // Note: we want to send both Ok or Err, so `res?` is used only afterwards.
659                let _ = leader_tx.send(
660                    res.as_ref()
661                        .map(|o| o.clone())
662                        .map_err(|err| err.to_string()),
663                );
664
665                let conn = res?;
666
667                self.active_connections.send_modify(|v| {
668                    v.insert(url.clone());
669                });
670
671                fedimint_core::runtime::spawn("connection disconnect watch", {
672                    let conn = conn.clone();
673                    let s = self.clone();
674                    let url = url.clone();
675                    async move {
676                        // wait for this connection to disconnect
677                        conn.await_disconnection().await;
678                        // And afterwards, update `active_connections`.
679                        //
680                        // This will update the `active_connections` just like calling
681                        // `get_or_create_connection` normally do, but we will
682                        // not attempt to do anything with the result (i.e. try to connect).
683                        s.get_or_init_pool_entry(&url).await;
684                    }
685                });
686
687                Ok(conn)
688            })
689            .await?;
690
691        trace!(target: LOG_CLIENT_NET_API, %url, "Connection ready");
692        Ok(conn.clone())
693    }
694    /// Get receiver for changes in the active connections
695    pub fn get_active_connection_receiver(&self) -> watch::Receiver<BTreeSet<SafeUrl>> {
696        self.active_connections.subscribe()
697    }
698
699    pub async fn wait_for_initialized_connections(&self) {
700        self.connectors.wait_for_initialized_connections().await
701    }
702}
703
704/// Inner part of [`ConnectionState`] preserving state between attempts to
705/// initialize [`ConnectionState::connection`]
706#[derive(Debug)]
707struct ConnectionStateInner {
708    fresh: bool,
709    backoff: FibonacciBackoff,
710}
711
712#[derive(Debug)]
713pub struct ConnectionState<T: ?Sized> {
714    /// Connection we are trying to or already established
715    pub connection: tokio::sync::OnceCell<Arc<T>>,
716
717    /// When tasks attempt to connect at the same time,
718    /// this is the receiving end of the channel where
719    /// the "leader" sends a result.
720    merge_connection_attempts_chan:
721        std::sync::Mutex<broadcast::Receiver<std::result::Result<Arc<T>, String>>>,
722
723    /// State that technically is protected every time by
724    /// the serialization of `OnceCell::get_or_try_init`, but
725    /// for Rust purposes needs to be locked.
726    inner: std::sync::Mutex<ConnectionStateInner>,
727}
728
729impl<T: ?Sized> ConnectionState<T> {
730    /// Create a new connection state for a first time connection
731    pub fn new_initial() -> Self {
732        Self {
733            connection: OnceCell::new(),
734            inner: std::sync::Mutex::new(ConnectionStateInner {
735                fresh: true,
736                backoff: custom_backoff(
737                    // First time connections start quick
738                    Duration::from_millis(5),
739                    Duration::from_secs(30),
740                    None,
741                ),
742            }),
743            merge_connection_attempts_chan: std::sync::Mutex::new(broadcast::channel(1).1),
744        }
745    }
746
747    /// Create a new connection state for a connection that already failed, and
748    /// is being reset
749    pub fn new_reconnecting() -> Self {
750        Self {
751            connection: OnceCell::new(),
752            inner: std::sync::Mutex::new(ConnectionStateInner {
753                // set the attempts to 1, indicating that
754                fresh: false,
755                backoff: custom_backoff(
756                    // Connections after a disconnect start with some minimum delay
757                    Duration::from_millis(500),
758                    Duration::from_secs(30),
759                    None,
760                ),
761            }),
762            merge_connection_attempts_chan: std::sync::Mutex::new(broadcast::channel(1).1),
763        }
764    }
765
766    /// Record the fact that an attempt to connect is being made, and return
767    /// time the caller should wait.
768    pub fn pre_reconnect_delay(&self) -> Duration {
769        let mut backoff_locked = self.inner.lock().expect("Locking failed");
770        let fresh = backoff_locked.fresh;
771
772        backoff_locked.fresh = false;
773
774        if fresh {
775            Duration::default()
776        } else {
777            backoff_locked.backoff.next().expect("Keeps retrying")
778        }
779    }
780}