fedimint_connectors/
lib.rs

1pub mod error;
2pub mod http;
3pub mod iroh;
4#[cfg(all(feature = "tor", not(target_family = "wasm")))]
5pub mod tor;
6pub mod ws;
7
8use std::collections::{BTreeMap, BTreeSet, HashMap};
9use std::fmt::{self, Debug};
10use std::pin::Pin;
11use std::sync::Arc;
12use std::time::Duration;
13
14use anyhow::{anyhow, bail};
15use async_trait::async_trait;
16use fedimint_core::envs::{FM_WS_API_CONNECT_OVERRIDES_ENV, parse_kv_list_from_env};
17use fedimint_core::module::{ApiMethod, ApiRequestErased};
18use fedimint_core::util::backoff_util::{FibonacciBackoff, custom_backoff};
19use fedimint_core::util::{FmtCompact, FmtCompactAnyhow, SafeUrl};
20use fedimint_core::{apply, async_trait_maybe_send};
21use fedimint_logging::{LOG_CLIENT_NET_API, LOG_NET};
22use reqwest::Method;
23use serde_json::Value;
24use tokio::sync::{OnceCell, SetOnce, broadcast, watch};
25use tracing::trace;
26
27use crate::error::ServerError;
28use crate::ws::WebsocketConnector;
29
30pub type ServerResult<T> = Result<T, ServerError>;
31
32/// Type for connector initialization functions
33type ConnectorInitFn = Arc<
34    dyn Fn() -> Pin<Box<dyn Future<Output = anyhow::Result<DynConnector>> + Send>> + Send + Sync,
35>;
36
37/// Builder for [`ConnectorRegistry`]
38///
39/// See [`ConnectorRegistry::build_from_client_env`] and similar
40/// to create.
41#[derive(Debug, Clone)]
42#[allow(clippy::struct_excessive_bools)] // Shut up, Clippy
43pub struct ConnectorRegistryBuilder {
44    /// List of overrides to use when attempting to connect to given url
45    ///
46    /// This is useful for testing, or forcing non-default network
47    /// connectivity.
48    connection_overrides: BTreeMap<SafeUrl, SafeUrl>,
49
50    /// Enable Iroh endpoints at all?
51    iroh_enable: bool,
52    /// Override the Iroh DNS server to use
53    iroh_dns: Option<SafeUrl>,
54    /// Should start the "next/unstable" Iroh stack
55    iroh_next: bool,
56    /// Enable Pkarr DHT discovery
57    iroh_pkarr_dht: bool,
58
59    /// Enable Websocket API handling at all?
60    ws_enable: bool,
61    ws_force_tor: bool,
62
63    // Enable HTTP
64    http_enable: bool,
65}
66
67impl ConnectorRegistryBuilder {
68    #[allow(clippy::unused_async)] // Leave room for async in the future
69    pub async fn bind(self) -> anyhow::Result<ConnectorRegistry> {
70        // Create initialization functions for each connector type
71        let mut connectors_lazy: BTreeMap<String, (ConnectorInitFn, OnceCell<DynConnector>)> =
72            BTreeMap::new();
73
74        // WS connector init function
75        let builder_ws = self.clone();
76        let ws_connector_init = Arc::new(move || {
77            let builder = builder_ws.clone();
78            Box::pin(async move { builder.build_ws_connector().await })
79                as Pin<Box<dyn Future<Output = anyhow::Result<DynConnector>> + Send>>
80        });
81        connectors_lazy.insert("ws".into(), (ws_connector_init.clone(), OnceCell::new()));
82        connectors_lazy.insert("wss".into(), (ws_connector_init.clone(), OnceCell::new()));
83
84        // Iroh connector init function
85        let builder_iroh = self.clone();
86        connectors_lazy.insert(
87            "iroh".into(),
88            (
89                Arc::new(move || {
90                    let builder = builder_iroh.clone();
91                    Box::pin(async move { builder.build_iroh_connector().await })
92                        as Pin<Box<dyn Future<Output = anyhow::Result<DynConnector>> + Send>>
93                }),
94                OnceCell::new(),
95            ),
96        );
97
98        let builder_http = self.clone();
99        let http_connector_init = Arc::new(move || {
100            let builder = builder_http.clone();
101            Box::pin(async move { builder.build_http_connector() })
102                as Pin<Box<dyn Future<Output = anyhow::Result<DynConnector>> + Send>>
103        });
104
105        connectors_lazy.insert(
106            "http".into(),
107            (http_connector_init.clone(), OnceCell::new()),
108        );
109        connectors_lazy.insert(
110            "https".into(),
111            (http_connector_init.clone(), OnceCell::new()),
112        );
113
114        Ok(ConnectorRegistry {
115            inner: ConnectorRegistryInner {
116                connectors_lazy,
117                connection_overrides: self.connection_overrides,
118                initialized: SetOnce::new(),
119            }
120            .into(),
121        })
122    }
123
124    pub async fn build_iroh_connector(&self) -> anyhow::Result<DynConnector> {
125        if !self.iroh_enable {
126            bail!("Iroh connector not enabled");
127        }
128        Ok(Arc::new(
129            iroh::IrohConnector::new(self.iroh_dns.clone(), self.iroh_pkarr_dht, self.iroh_next)
130                .await?,
131        ) as DynConnector)
132    }
133
134    pub async fn build_ws_connector(&self) -> anyhow::Result<DynConnector> {
135        if !self.ws_enable {
136            bail!("Websocket connector not enabled");
137        }
138
139        match self.ws_force_tor {
140            #[cfg(all(feature = "tor", not(target_family = "wasm")))]
141            true => {
142                use crate::tor::TorConnector;
143
144                Ok(Arc::new(TorConnector::bootstrap().await?) as DynConnector)
145            }
146
147            false => Ok(Arc::new(WebsocketConnector::new()) as DynConnector),
148            #[allow(unreachable_patterns)]
149            _ => bail!("Tor requested, but not support not compiled in"),
150        }
151    }
152
153    pub fn build_http_connector(&self) -> anyhow::Result<DynConnector> {
154        if !self.http_enable {
155            bail!("Http connector not enabled");
156        }
157
158        Ok(Arc::new(crate::http::HttpConnector::default()) as DynConnector)
159    }
160
161    pub fn iroh_pkarr_dht(self, enable: bool) -> Self {
162        Self {
163            iroh_pkarr_dht: enable,
164            ..self
165        }
166    }
167
168    pub fn iroh_next(self, enable: bool) -> Self {
169        Self {
170            iroh_next: enable,
171            ..self
172        }
173    }
174
175    pub fn ws_force_tor(self, enable: bool) -> Self {
176        Self {
177            ws_force_tor: enable,
178            ..self
179        }
180    }
181
182    pub fn set_iroh_dns(self, url: SafeUrl) -> Self {
183        Self {
184            iroh_dns: Some(url),
185            ..self
186        }
187    }
188
189    /// Apply overrides from env variables
190    pub fn with_env_var_overrides(mut self) -> anyhow::Result<Self> {
191        // TODO: read rest of the env
192        for (k, v) in parse_kv_list_from_env::<_, SafeUrl>(FM_WS_API_CONNECT_OVERRIDES_ENV)? {
193            self = self.with_connection_override(k, v);
194        }
195
196        Ok(Self { ..self })
197    }
198
199    pub fn with_connection_override(
200        mut self,
201        original_url: SafeUrl,
202        replacement_url: SafeUrl,
203    ) -> Self {
204        self.connection_overrides
205            .insert(original_url, replacement_url);
206        self
207    }
208}
209
210/// Actual data shared between copies of [`ConnectorRegistry`] handle
211struct ConnectorRegistryInner {
212    /// Lazily initialized [`Connector`]s per protocol supported
213    connectors_lazy: BTreeMap<String, (ConnectorInitFn, OnceCell<DynConnector>)>,
214    /// Connection URL overrides for testing/custom routing
215    connection_overrides: BTreeMap<SafeUrl, SafeUrl>,
216    /// Set on first connection attempt
217    ///
218    /// This is used for functionality that wants to avoid making
219    /// network connections if nothing else did network request.
220    initialized: tokio::sync::SetOnce<()>,
221}
222
223/// A set of available connectivity protocols a client can use to make
224/// network API requests (typically to federation).
225///
226/// Maps from connection URL schema to [`Connector`] to use to connect to it.
227///
228/// See [`ConnectorRegistry::build_from_client_env`] and similar
229/// to create.
230///
231/// [`ConnectorRegistry::connect_guardian`] is the main entry point for making
232/// mixed-networking stack connection.
233///
234/// Responsibilities:
235#[derive(Clone)]
236pub struct ConnectorRegistry {
237    inner: Arc<ConnectorRegistryInner>,
238}
239
240impl fmt::Debug for ConnectorRegistry {
241    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
242        f.debug_struct("ConnectorRegistry")
243            .field("connectors_lazy", &self.inner.connectors_lazy.len())
244            .field("connection_overrides", &self.inner.connection_overrides)
245            .finish()
246    }
247}
248
249impl ConnectorRegistry {
250    /// Create a builder with recommended defaults intended for client-side
251    /// usage
252    ///
253    /// In particular mobile devices are considered.
254    pub fn build_from_client_defaults() -> ConnectorRegistryBuilder {
255        ConnectorRegistryBuilder {
256            iroh_enable: true,
257            iroh_dns: None,
258            iroh_pkarr_dht: false,
259            iroh_next: true,
260            ws_enable: true,
261            ws_force_tor: false,
262            http_enable: true,
263
264            connection_overrides: BTreeMap::default(),
265        }
266    }
267
268    /// Create a builder with recommended defaults intended for the server-side
269    /// usage
270    pub fn build_from_server_defaults() -> ConnectorRegistryBuilder {
271        ConnectorRegistryBuilder {
272            iroh_enable: true,
273            iroh_dns: None,
274            iroh_pkarr_dht: true,
275            iroh_next: true,
276            ws_enable: true,
277            ws_force_tor: false,
278            http_enable: false,
279
280            connection_overrides: BTreeMap::default(),
281        }
282    }
283
284    /// Create a builder with recommended defaults intended for testing
285    /// usage
286    pub fn build_from_testing_defaults() -> ConnectorRegistryBuilder {
287        ConnectorRegistryBuilder {
288            iroh_enable: true,
289            iroh_dns: None,
290            iroh_pkarr_dht: false,
291            iroh_next: false,
292            ws_enable: true,
293            ws_force_tor: false,
294            http_enable: true,
295
296            connection_overrides: BTreeMap::default(),
297        }
298    }
299
300    /// Like [`Self::build_from_client_defaults`] build will apply
301    /// environment-provided overrides.
302    pub fn build_from_client_env() -> anyhow::Result<ConnectorRegistryBuilder> {
303        let builder = Self::build_from_client_defaults().with_env_var_overrides()?;
304        Ok(builder)
305    }
306
307    /// Like [`Self::build_from_server_defaults`] build will apply
308    /// environment-provided overrides.
309    pub fn build_from_server_env() -> anyhow::Result<ConnectorRegistryBuilder> {
310        let builder = Self::build_from_server_defaults().with_env_var_overrides()?;
311        Ok(builder)
312    }
313
314    /// Like [`Self::build_from_testing_defaults`] build will apply
315    /// environment-provided overrides.
316    pub fn build_from_testing_env() -> anyhow::Result<ConnectorRegistryBuilder> {
317        let builder = Self::build_from_testing_defaults().with_env_var_overrides()?;
318        Ok(builder)
319    }
320
321    /// Wait until some connections have been made
322    pub async fn wait_for_initialized_connections(&self) {
323        self.inner.initialized.wait().await;
324    }
325
326    /// Connect to a given `url` using matching [`Connector`]
327    ///
328    /// This is the main function consumed by the downstream use for making
329    /// connection.
330    pub async fn connect_guardian(
331        &self,
332        url: &SafeUrl,
333        api_secret: Option<&str>,
334    ) -> ServerResult<DynGuaridianConnection> {
335        trace!(
336            target: LOG_NET,
337            %url,
338            "Connection requested to guardian"
339        );
340        let _ = self.inner.initialized.set(());
341
342        let url = match self.inner.connection_overrides.get(url) {
343            Some(replacement) => {
344                trace!(
345                    target: LOG_NET,
346                    original_url = %url,
347                    replacement_url = %replacement,
348                    "Using a connectivity override for connection"
349                );
350
351                replacement
352            }
353            None => url,
354        };
355
356        let connector_key = url.scheme();
357
358        let Some(connector_lazy) = self.inner.connectors_lazy.get(connector_key) else {
359            return Err(ServerError::InvalidEndpoint(anyhow!(
360                "Unsupported scheme: {}; missing endpoint handler",
361                url.scheme()
362            )));
363        };
364
365        // Clone the init function to use in the async block
366        let init_fn = connector_lazy.0.clone();
367
368        let conn = connector_lazy
369            .1
370            .get_or_try_init(|| async move { init_fn().await })
371            .await
372            .map_err(|e| {
373                ServerError::Transport(anyhow!(
374                    "Connector failed to initialize: {}",
375                    e.fmt_compact_anyhow()
376                ))
377            })?
378            .connect_guardian(url, api_secret)
379            .await
380            .inspect_err(|err| {
381                trace!(
382                    target: LOG_NET,
383                    %url,
384                    err = %err.fmt_compact(),
385                    "Connection failed"
386                );
387            })?;
388
389        trace!(
390            target: LOG_NET,
391            %url,
392            "Connection returned"
393        );
394        Ok(conn)
395    }
396
397    /// Connect to a given `url` using matching [`Connector`] to a gateway
398    ///
399    /// This is the main function consumed by the downstream use for making
400    /// connection.
401    pub async fn connect_gateway(&self, url: &SafeUrl) -> anyhow::Result<DynGatewayConnection> {
402        trace!(
403            target: LOG_NET,
404            %url,
405            "Connection requested to gateway"
406        );
407        let _ = self.inner.initialized.set(());
408
409        let url = match self.inner.connection_overrides.get(url) {
410            Some(replacement) => {
411                trace!(
412                    target: LOG_NET,
413                    original_url = %url,
414                    replacement_url = %replacement,
415                    "Using a connectivity override for connection"
416                );
417
418                replacement
419            }
420            None => url,
421        };
422
423        let connector_key = url.scheme();
424
425        let Some(connector_lazy) = self.inner.connectors_lazy.get(connector_key) else {
426            return Err(anyhow!(
427                "Unsupported scheme: {}; missing endpoint handler",
428                url.scheme()
429            ));
430        };
431
432        // Clone the init function to use in the async block
433        let init_fn = connector_lazy.0.clone();
434
435        let conn = connector_lazy
436            .1
437            .get_or_try_init(|| async move { init_fn().await })
438            .await
439            .map_err(|e| {
440                ServerError::Transport(anyhow!(
441                    "Connector failed to initialize: {}",
442                    e.fmt_compact_anyhow()
443                ))
444            })?
445            .connect_gateway(url)
446            .await?;
447
448        Ok(conn)
449    }
450}
451pub type DynConnector = Arc<dyn Connector>;
452
453#[async_trait]
454pub trait Connector: Send + Sync + 'static + Debug {
455    async fn connect_guardian(
456        &self,
457        url: &SafeUrl,
458        api_secret: Option<&str>,
459    ) -> ServerResult<DynGuaridianConnection>;
460
461    async fn connect_gateway(&self, url: &SafeUrl) -> anyhow::Result<DynGatewayConnection>;
462}
463
464/// Generic connection trait shared between [`IGuardianConnection`] and
465/// [`IGatewayConnection`]
466#[apply(async_trait_maybe_send!)]
467pub trait IConnection: Debug + Send + Sync + 'static {
468    fn is_connected(&self) -> bool;
469
470    async fn await_disconnection(&self);
471}
472
473/// A connection from api client to a federation guardian (type erased)
474pub type DynGuaridianConnection = Arc<dyn IGuardianConnection>;
475
476/// A connection from api client to a federation guardian
477#[async_trait]
478pub trait IGuardianConnection: IConnection + Debug + Send + Sync + 'static {
479    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> ServerResult<Value>;
480
481    fn into_dyn(self) -> DynGuaridianConnection
482    where
483        Self: Sized,
484    {
485        Arc::new(self)
486    }
487}
488
489/// A connection from api client to a gateway (type erased)
490pub type DynGatewayConnection = Arc<dyn IGatewayConnection>;
491
492/// A connection from a client to a gateway
493#[apply(async_trait_maybe_send!)]
494pub trait IGatewayConnection: IConnection + Debug + Send + Sync + 'static {
495    async fn request(
496        &self,
497        password: Option<String>,
498        method: Method,
499        route: &str,
500        payload: Option<Value>,
501    ) -> ServerResult<Value>;
502
503    fn into_dyn(self) -> DynGatewayConnection
504    where
505        Self: Sized,
506    {
507        Arc::new(self)
508    }
509}
510
511#[derive(Debug)]
512pub struct ConnectionPool<T: IConnection + ?Sized> {
513    /// Available connectors which we can make connections
514    connectors: ConnectorRegistry,
515
516    active_connections: watch::Sender<BTreeSet<SafeUrl>>,
517
518    /// Connection pool
519    ///
520    /// Every entry in this map will be created on demand and correspond to a
521    /// single outgoing connection to a certain URL that is in the process
522    /// of being established, or we already established.
523    #[allow(clippy::type_complexity)]
524    connections: Arc<tokio::sync::Mutex<HashMap<SafeUrl, Arc<ConnectionState<T>>>>>,
525}
526
527impl<T: IConnection + ?Sized> Clone for ConnectionPool<T> {
528    fn clone(&self) -> Self {
529        Self {
530            connectors: self.connectors.clone(),
531            connections: self.connections.clone(),
532            active_connections: self.active_connections.clone(),
533        }
534    }
535}
536
537impl<T: IConnection + ?Sized> ConnectionPool<T> {
538    pub fn new(connectors: ConnectorRegistry) -> Self {
539        Self {
540            connectors,
541            connections: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
542            active_connections: watch::channel(BTreeSet::new()).0,
543        }
544    }
545
546    async fn get_or_init_pool_entry(&self, url: &SafeUrl) -> Arc<ConnectionState<T>> {
547        let mut pool_locked = self.connections.lock().await;
548        pool_locked
549            .entry(url.to_owned())
550            .and_modify(|entry_arc| {
551                // Check if existing connection is disconnected and reset the whole entry.
552                //
553                // This resets the state (like connectivity backoff), which is what we want.
554                // Since the (`OnceCell`) was already initialized, it means connection was
555                // successfully before, and disconnected afterwards.
556                if let Some(existing_conn) = entry_arc.connection.get()
557                    && !existing_conn.is_connected()
558                {
559                    trace!(
560                        target: LOG_CLIENT_NET_API,
561                        %url,
562                        "Existing connection is disconnected, removing from pool"
563                    );
564                    self.active_connections.send_modify(|v| {
565                        v.remove(url);
566                    });
567                    *entry_arc = Arc::new(ConnectionState::new_reconnecting());
568                }
569            })
570            .or_insert_with(|| Arc::new(ConnectionState::new_initial()))
571            .clone()
572    }
573
574    pub async fn get_or_create_connection<F, Fut>(
575        &self,
576        url: &SafeUrl,
577        api_secret: Option<&str>,
578        create_connection: F,
579    ) -> ServerResult<Arc<T>>
580    where
581        F: Fn(SafeUrl, Option<String>, ConnectorRegistry) -> Fut + Clone + Send + Sync + 'static,
582        Fut: Future<Output = ServerResult<Arc<T>>> + Send + 'static,
583    {
584        let pool_entry_arc = self.get_or_init_pool_entry(url).await;
585
586        let leader_tx = loop {
587            let mut leader_rx = {
588                let mut chan_locked = pool_entry_arc
589                    .merge_connection_attempts_chan
590                    .lock()
591                    .expect("locking error");
592
593                if chan_locked.is_closed() {
594                    let (leader_tx, leader_rx) = broadcast::channel(1);
595                    *chan_locked = leader_rx;
596                    // whoever was trying to connect last time is gone
597                    // we're out of this lame loop for followers
598                    break leader_tx;
599                }
600
601                // lets piggyback on the existing leader
602                chan_locked.resubscribe()
603            };
604
605            if let Ok(res) = leader_rx.recv().await {
606                match res {
607                    Ok(o) => return Ok(o),
608                    Err(err) => {
609                        return Err(ServerError::Connection(anyhow::format_err!("{}", err)));
610                    }
611                }
612            }
613        };
614
615        let conn = pool_entry_arc
616            .connection
617            .get_or_try_init(|| async {
618                let retry_delay = pool_entry_arc.pre_reconnect_delay();
619                fedimint_core::runtime::sleep(retry_delay).await;
620
621                trace!(target: LOG_CLIENT_NET_API, %url, "Attempting to create a new connection");
622                let res = create_connection(
623                    url.clone(),
624                    api_secret.map(std::string::ToString::to_string),
625                    self.connectors.clone(),
626                )
627                .await;
628
629                // If any other task was also waiting to connect, send them the connection
630                // result.
631                //
632                // Note: we want to send both Ok or Err, so `res?` is used only afterwards.
633                let _ = leader_tx.send(
634                    res.as_ref()
635                        .map(|o| o.clone())
636                        .map_err(|err| err.to_string()),
637                );
638
639                let conn = res?;
640
641                self.active_connections.send_modify(|v| {
642                    v.insert(url.clone());
643                });
644
645                fedimint_core::runtime::spawn("connection disconnect watch", {
646                    let conn = conn.clone();
647                    let s = self.clone();
648                    let url = url.clone();
649                    async move {
650                        // wait for this connection to disconnect
651                        conn.await_disconnection().await;
652                        // And afterwards, update `active_connections`.
653                        //
654                        // This will update the `active_connections` just like calling
655                        // `get_or_create_connection` normally do, but we will
656                        // not attempt to do anything with the result (i.e. try to connect).
657                        s.get_or_init_pool_entry(&url).await;
658                    }
659                });
660
661                Ok(conn)
662            })
663            .await?;
664
665        trace!(target: LOG_CLIENT_NET_API, %url, "Connection ready");
666        Ok(conn.clone())
667    }
668    /// Get receiver for changes in the active connections
669    pub fn get_active_connection_receiver(&self) -> watch::Receiver<BTreeSet<SafeUrl>> {
670        self.active_connections.subscribe()
671    }
672
673    pub async fn wait_for_initialized_connections(&self) {
674        self.connectors.wait_for_initialized_connections().await
675    }
676}
677
678/// Inner part of [`ConnectionState`] preserving state between attempts to
679/// initialize [`ConnectionState::connection`]
680#[derive(Debug)]
681struct ConnectionStateInner {
682    fresh: bool,
683    backoff: FibonacciBackoff,
684}
685
686#[derive(Debug)]
687pub struct ConnectionState<T: ?Sized> {
688    /// Connection we are trying to or already established
689    pub connection: tokio::sync::OnceCell<Arc<T>>,
690
691    /// When tasks attempt to connect at the same time,
692    /// this is the receiving end of the channel where
693    /// the "leader" sends a result.
694    merge_connection_attempts_chan:
695        std::sync::Mutex<broadcast::Receiver<std::result::Result<Arc<T>, String>>>,
696
697    /// State that technically is protected every time by
698    /// the serialization of `OnceCell::get_or_try_init`, but
699    /// for Rust purposes needs to be locked.
700    inner: std::sync::Mutex<ConnectionStateInner>,
701}
702
703impl<T: ?Sized> ConnectionState<T> {
704    /// Create a new connection state for a first time connection
705    pub fn new_initial() -> Self {
706        Self {
707            connection: OnceCell::new(),
708            inner: std::sync::Mutex::new(ConnectionStateInner {
709                fresh: true,
710                backoff: custom_backoff(
711                    // First time connections start quick
712                    Duration::from_millis(5),
713                    Duration::from_secs(30),
714                    None,
715                ),
716            }),
717            merge_connection_attempts_chan: std::sync::Mutex::new(broadcast::channel(1).1),
718        }
719    }
720
721    /// Create a new connection state for a connection that already failed, and
722    /// is being reset
723    pub fn new_reconnecting() -> Self {
724        Self {
725            connection: OnceCell::new(),
726            inner: std::sync::Mutex::new(ConnectionStateInner {
727                // set the attempts to 1, indicating that
728                fresh: false,
729                backoff: custom_backoff(
730                    // Connections after a disconnect start with some minimum delay
731                    Duration::from_millis(500),
732                    Duration::from_secs(30),
733                    None,
734                ),
735            }),
736            merge_connection_attempts_chan: std::sync::Mutex::new(broadcast::channel(1).1),
737        }
738    }
739
740    /// Record the fact that an attempt to connect is being made, and return
741    /// time the caller should wait.
742    pub fn pre_reconnect_delay(&self) -> Duration {
743        let mut backoff_locked = self.inner.lock().expect("Locking failed");
744        let fresh = backoff_locked.fresh;
745
746        backoff_locked.fresh = false;
747
748        if fresh {
749            Duration::default()
750        } else {
751            backoff_locked.backoff.next().expect("Keeps retrying")
752        }
753    }
754}