fedimint_api_client/api/
mod.rs

1mod error;
2pub mod global_api;
3pub mod net;
4
5use std::collections::{BTreeMap, BTreeSet, HashMap};
6use std::fmt::Debug;
7use std::future::pending;
8use std::pin::Pin;
9use std::result;
10use std::sync::Arc;
11use std::time::Duration;
12
13use anyhow::{Context, anyhow};
14use bitcoin::hashes::sha256;
15use bitcoin::secp256k1;
16pub use error::{FederationError, OutputOutcomeError};
17pub use fedimint_connectors::ServerResult;
18pub use fedimint_connectors::error::ServerError;
19use fedimint_connectors::{
20    ConnectionPool, ConnectorRegistry, DynGuaridianConnection, IGuardianConnection,
21};
22use fedimint_core::admin_client::{
23    GuardianConfigBackup, PeerServerParamsLegacy, ServerStatusLegacy, SetupStatus,
24};
25use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
26use fedimint_core::core::backup::SignedBackupRequest;
27use fedimint_core::core::{Decoder, DynOutputOutcome, ModuleInstanceId, OutputOutcome};
28use fedimint_core::encoding::{Decodable, Encodable};
29use fedimint_core::invite_code::InviteCode;
30use fedimint_core::module::audit::AuditSummary;
31use fedimint_core::module::registry::ModuleDecoderRegistry;
32use fedimint_core::module::{
33    ApiAuth, ApiMethod, ApiRequestErased, ApiVersion, SerdeModuleEncoding,
34};
35use fedimint_core::net::api_announcement::SignedApiAnnouncement;
36use fedimint_core::session_outcome::{SessionOutcome, SessionStatus};
37use fedimint_core::task::{MaybeSend, MaybeSync};
38use fedimint_core::transaction::{Transaction, TransactionSubmissionOutcome};
39use fedimint_core::util::backoff_util::{FibonacciBackoff, api_networking_backoff, custom_backoff};
40use fedimint_core::util::{FmtCompact as _, SafeUrl};
41use fedimint_core::{
42    NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send, dyn_newtype_define, util,
43};
44use fedimint_logging::LOG_CLIENT_NET_API;
45use futures::stream::FuturesUnordered;
46use futures::{Future, StreamExt};
47use global_api::with_cache::GlobalFederationApiWithCache;
48use jsonrpsee_core::DeserializeOwned;
49use serde::{Deserialize, Serialize};
50use serde_json::Value;
51use tokio::sync::OnceCell;
52use tracing::{debug, instrument, trace, warn};
53
54use crate::query::{QueryStep, QueryStrategy, ThresholdConsensus};
55
56pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2: ApiVersion = ApiVersion::new(0, 5);
57
58pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS: ApiVersion =
59    ApiVersion { major: 0, minor: 1 };
60
61pub const VERSION_THAT_INTRODUCED_AWAIT_OUTPUTS_OUTCOMES: ApiVersion = ApiVersion::new(0, 8);
62pub type FederationResult<T> = Result<T, FederationError>;
63pub type SerdeOutputOutcome = SerdeModuleEncoding<DynOutputOutcome>;
64
65pub type OutputOutcomeResult<O> = result::Result<O, OutputOutcomeError>;
66
67/// Set of api versions for each component (core + modules)
68///
69/// E.g. result of federated common api versions discovery.
70#[derive(Debug, Clone, Serialize, Deserialize, Encodable, Decodable)]
71pub struct ApiVersionSet {
72    pub core: ApiVersion,
73    pub modules: BTreeMap<ModuleInstanceId, ApiVersion>,
74}
75
76/// An API (module or global) that can query a federation
77#[apply(async_trait_maybe_send!)]
78pub trait IRawFederationApi: Debug + MaybeSend + MaybeSync {
79    /// List of all federation peers for the purpose of iterating each peer
80    /// in the federation.
81    ///
82    /// The underlying implementation is responsible for knowing how many
83    /// and `PeerId`s of each. The caller of this interface most probably
84    /// have some idea as well, but passing this set across every
85    /// API call to the federation would be inconvenient.
86    fn all_peers(&self) -> &BTreeSet<PeerId>;
87
88    /// `PeerId` of the Guardian node, if set
89    ///
90    /// This is for using Client in a "Admin" mode, making authenticated
91    /// calls to own `fedimintd` instance.
92    fn self_peer(&self) -> Option<PeerId>;
93
94    fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi;
95
96    /// Make request to a specific federation peer by `peer_id`
97    async fn request_raw(
98        &self,
99        peer_id: PeerId,
100        method: &str,
101        params: &ApiRequestErased,
102    ) -> ServerResult<Value>;
103}
104
105/// An extension trait allowing to making federation-wide API call on top
106/// [`IRawFederationApi`].
107#[apply(async_trait_maybe_send!)]
108pub trait FederationApiExt: IRawFederationApi {
109    async fn request_single_peer<Ret>(
110        &self,
111        method: String,
112        params: ApiRequestErased,
113        peer: PeerId,
114    ) -> ServerResult<Ret>
115    where
116        Ret: DeserializeOwned,
117    {
118        self.request_raw(peer, &method, &params)
119            .await
120            .and_then(|v| {
121                serde_json::from_value(v)
122                    .map_err(|e| ServerError::ResponseDeserialization(e.into()))
123            })
124    }
125
126    async fn request_single_peer_federation<FedRet>(
127        &self,
128        method: String,
129        params: ApiRequestErased,
130        peer_id: PeerId,
131    ) -> FederationResult<FedRet>
132    where
133        FedRet: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
134    {
135        self.request_raw(peer_id, &method, &params)
136            .await
137            .and_then(|v| {
138                serde_json::from_value(v)
139                    .map_err(|e| ServerError::ResponseDeserialization(e.into()))
140            })
141            .map_err(|e| error::FederationError::new_one_peer(peer_id, method, params, e))
142    }
143
144    /// Make an aggregate request to federation, using `strategy` to logically
145    /// merge the responses.
146    #[instrument(target = LOG_CLIENT_NET_API, skip_all, fields(method=method))]
147    async fn request_with_strategy<PR: DeserializeOwned, FR: Debug>(
148        &self,
149        mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
150        method: String,
151        params: ApiRequestErased,
152    ) -> FederationResult<FR> {
153        // NOTE: `FuturesUnorderded` is a footgun, but all we do here is polling
154        // completed results from it and we don't do any `await`s when
155        // processing them, it should be totally OK.
156        #[cfg(not(target_family = "wasm"))]
157        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
158        #[cfg(target_family = "wasm")]
159        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
160
161        for peer in self.all_peers() {
162            futures.push(Box::pin({
163                let method = &method;
164                let params = &params;
165                async move {
166                    let result = self
167                        .request_single_peer(method.clone(), params.clone(), *peer)
168                        .await;
169
170                    (*peer, result)
171                }
172            }));
173        }
174
175        let mut peer_errors = BTreeMap::new();
176        let peer_error_threshold = self.all_peers().to_num_peers().one_honest();
177
178        loop {
179            let (peer, result) = futures
180                .next()
181                .await
182                .expect("Query strategy ran out of peers to query without returning a result");
183
184            match result {
185                Ok(response) => match strategy.process(peer, response) {
186                    QueryStep::Retry(peers) => {
187                        for peer in peers {
188                            futures.push(Box::pin({
189                                let method = &method;
190                                let params = &params;
191                                async move {
192                                    let result = self
193                                        .request_single_peer(method.clone(), params.clone(), peer)
194                                        .await;
195
196                                    (peer, result)
197                                }
198                            }));
199                        }
200                    }
201                    QueryStep::Success(response) => return Ok(response),
202                    QueryStep::Failure(e) => {
203                        peer_errors.insert(peer, e);
204                    }
205                    QueryStep::Continue => {}
206                },
207                Err(e) => {
208                    e.report_if_unusual(peer, "RequestWithStrategy");
209                    peer_errors.insert(peer, e);
210                }
211            }
212
213            if peer_errors.len() == peer_error_threshold {
214                return Err(FederationError::peer_errors(
215                    method.clone(),
216                    params.params.clone(),
217                    peer_errors,
218                ));
219            }
220        }
221    }
222
223    #[instrument(target = LOG_CLIENT_NET_API, level = "debug", skip(self, strategy))]
224    async fn request_with_strategy_retry<PR: DeserializeOwned + MaybeSend, FR: Debug>(
225        &self,
226        mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
227        method: String,
228        params: ApiRequestErased,
229    ) -> FR {
230        // NOTE: `FuturesUnorderded` is a footgun, but all we do here is polling
231        // completed results from it and we don't do any `await`s when
232        // processing them, it should be totally OK.
233        #[cfg(not(target_family = "wasm"))]
234        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
235        #[cfg(target_family = "wasm")]
236        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
237
238        for peer in self.all_peers() {
239            futures.push(Box::pin({
240                let method = &method;
241                let params = &params;
242                async move {
243                    let response = util::retry(
244                        format!("api-request-{method}-{peer}"),
245                        api_networking_backoff(),
246                        || async {
247                            self.request_single_peer(method.clone(), params.clone(), *peer)
248                                .await
249                                .inspect_err(|e| {
250                                    e.report_if_unusual(*peer, "QueryWithStrategyRetry");
251                                })
252                                .map_err(|e| anyhow!(e.to_string()))
253                        },
254                    )
255                    .await
256                    .expect("Number of retries has no limit");
257
258                    (*peer, response)
259                }
260            }));
261        }
262
263        loop {
264            let (peer, response) = match futures.next().await {
265                Some(t) => t,
266                None => pending().await,
267            };
268
269            match strategy.process(peer, response) {
270                QueryStep::Retry(peers) => {
271                    for peer in peers {
272                        futures.push(Box::pin({
273                            let method = &method;
274                            let params = &params;
275                            async move {
276                                let response = util::retry(
277                                    format!("api-request-{method}-{peer}"),
278                                    api_networking_backoff(),
279                                    || async {
280                                        self.request_single_peer(
281                                            method.clone(),
282                                            params.clone(),
283                                            peer,
284                                        )
285                                        .await
286                                        .inspect_err(|err| {
287                                            if err.is_unusual() {
288                                                debug!(target: LOG_CLIENT_NET_API, err = %err.fmt_compact(), "Unusual peer error");
289                                            }
290                                        })
291                                        .map_err(|e| anyhow!(e.to_string()))
292                                    },
293                                )
294                                .await
295                                .expect("Number of retries has no limit");
296
297                                (peer, response)
298                            }
299                        }));
300                    }
301                }
302                QueryStep::Success(response) => return response,
303                QueryStep::Failure(e) => {
304                    warn!(target: LOG_CLIENT_NET_API, "Query strategy returned non-retryable failure for peer {peer}: {e}");
305                }
306                QueryStep::Continue => {}
307            }
308        }
309    }
310
311    async fn request_current_consensus<Ret>(
312        &self,
313        method: String,
314        params: ApiRequestErased,
315    ) -> FederationResult<Ret>
316    where
317        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
318    {
319        self.request_with_strategy(
320            ThresholdConsensus::new(self.all_peers().to_num_peers()),
321            method,
322            params,
323        )
324        .await
325    }
326
327    async fn request_current_consensus_retry<Ret>(
328        &self,
329        method: String,
330        params: ApiRequestErased,
331    ) -> Ret
332    where
333        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
334    {
335        self.request_with_strategy_retry(
336            ThresholdConsensus::new(self.all_peers().to_num_peers()),
337            method,
338            params,
339        )
340        .await
341    }
342
343    async fn request_admin<Ret>(
344        &self,
345        method: &str,
346        params: ApiRequestErased,
347        auth: ApiAuth,
348    ) -> FederationResult<Ret>
349    where
350        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
351    {
352        let Some(self_peer_id) = self.self_peer() else {
353            return Err(FederationError::general(
354                method,
355                params,
356                anyhow::format_err!("Admin peer_id not set"),
357            ));
358        };
359
360        self.request_single_peer_federation(method.into(), params.with_auth(auth), self_peer_id)
361            .await
362    }
363
364    async fn request_admin_no_auth<Ret>(
365        &self,
366        method: &str,
367        params: ApiRequestErased,
368    ) -> FederationResult<Ret>
369    where
370        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
371    {
372        let Some(self_peer_id) = self.self_peer() else {
373            return Err(FederationError::general(
374                method,
375                params,
376                anyhow::format_err!("Admin peer_id not set"),
377            ));
378        };
379
380        self.request_single_peer_federation(method.into(), params, self_peer_id)
381            .await
382    }
383}
384
385#[apply(async_trait_maybe_send!)]
386impl<T: ?Sized> FederationApiExt for T where T: IRawFederationApi {}
387
388/// Trait marker for the module (non-global) endpoints
389pub trait IModuleFederationApi: IRawFederationApi {}
390
391dyn_newtype_define! {
392    #[derive(Clone)]
393    pub DynModuleApi(Arc<IModuleFederationApi>)
394}
395
396dyn_newtype_define! {
397    #[derive(Clone)]
398    pub DynGlobalApi(Arc<IGlobalFederationApi>)
399}
400
401impl AsRef<dyn IGlobalFederationApi + 'static> for DynGlobalApi {
402    fn as_ref(&self) -> &(dyn IGlobalFederationApi + 'static) {
403        self.inner.as_ref()
404    }
405}
406
407impl DynGlobalApi {
408    pub fn new(
409        connectors: ConnectorRegistry,
410        peers: BTreeMap<PeerId, SafeUrl>,
411        api_secret: Option<&str>,
412    ) -> anyhow::Result<Self> {
413        Ok(GlobalFederationApiWithCache::new(FederationApi::new(
414            connectors, peers, None, api_secret,
415        ))
416        .into())
417    }
418    pub fn new_admin(
419        connectors: ConnectorRegistry,
420        peer: PeerId,
421        url: SafeUrl,
422        api_secret: Option<&str>,
423    ) -> anyhow::Result<DynGlobalApi> {
424        Ok(GlobalFederationApiWithCache::new(FederationApi::new(
425            connectors,
426            [(peer, url)].into(),
427            Some(peer),
428            api_secret,
429        ))
430        .into())
431    }
432
433    pub fn new_admin_setup(connectors: ConnectorRegistry, url: SafeUrl) -> anyhow::Result<Self> {
434        // PeerIds are used only for informational purposes, but just in case, make a
435        // big number so it stands out
436        Self::new_admin(
437            connectors,
438            PeerId::from(1024),
439            url,
440            // Setup does not have api secrets yet
441            None,
442        )
443    }
444}
445
446/// The API for the global (non-module) endpoints
447#[apply(async_trait_maybe_send!)]
448pub trait IGlobalFederationApi: IRawFederationApi {
449    async fn submit_transaction(
450        &self,
451        tx: Transaction,
452    ) -> SerdeModuleEncoding<TransactionSubmissionOutcome>;
453
454    async fn await_block(
455        &self,
456        block_index: u64,
457        decoders: &ModuleDecoderRegistry,
458    ) -> anyhow::Result<SessionOutcome>;
459
460    async fn get_session_status(
461        &self,
462        block_index: u64,
463        decoders: &ModuleDecoderRegistry,
464        core_api_version: ApiVersion,
465        broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
466    ) -> anyhow::Result<SessionStatus>;
467
468    async fn session_count(&self) -> FederationResult<u64>;
469
470    async fn await_transaction(&self, txid: TransactionId) -> TransactionId;
471
472    async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()>;
473
474    async fn download_backup(
475        &self,
476        id: &secp256k1::PublicKey,
477    ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>>;
478
479    /// Sets the password used to decrypt the configs and authenticate
480    ///
481    /// Must be called first before any other calls to the API
482    async fn set_password(&self, auth: ApiAuth) -> FederationResult<()>;
483
484    async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus>;
485
486    async fn set_local_params(
487        &self,
488        name: String,
489        federation_name: Option<String>,
490        disable_base_fees: Option<bool>,
491        auth: ApiAuth,
492    ) -> FederationResult<String>;
493
494    async fn add_peer_connection_info(
495        &self,
496        info: String,
497        auth: ApiAuth,
498    ) -> FederationResult<String>;
499
500    /// Reset the peer setup codes during the federation setup process
501    async fn reset_peer_setup_codes(&self, auth: ApiAuth) -> FederationResult<()>;
502
503    /// Returns the setup code if `set_local_params` was already called
504    async fn get_setup_code(&self, auth: ApiAuth) -> FederationResult<Option<String>>;
505
506    /// During config gen, used for an API-to-API call that adds a peer's server
507    /// connection info to the leader.
508    ///
509    /// Note this call will fail until the leader has their API running and has
510    /// `set_server_connections` so clients should retry.
511    ///
512    /// This call is not authenticated because it's guardian-to-guardian
513    async fn add_config_gen_peer(&self, peer: PeerServerParamsLegacy) -> FederationResult<()>;
514
515    /// During config gen, gets all the server connections we've received from
516    /// peers using `add_config_gen_peer`
517    ///
518    /// Could be called on the leader, so it's not authenticated
519    async fn get_config_gen_peers(&self) -> FederationResult<Vec<PeerServerParamsLegacy>>;
520
521    /// Runs DKG, can only be called once after configs have been generated in
522    /// `get_consensus_config_gen_params`.  If DKG fails this returns a 500
523    /// error and config gen must be restarted.
524    async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()>;
525
526    /// After DKG, returns the hash of the consensus config tweaked with our id.
527    /// We need to share this with all other peers to complete verification.
528    async fn get_verify_config_hash(
529        &self,
530        auth: ApiAuth,
531    ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
532
533    /// Updates local state and notify leader that we have verified configs.
534    /// This allows for a synchronization point, before we start consensus.
535    async fn verified_configs(
536        &self,
537        auth: ApiAuth,
538    ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
539
540    /// Reads the configs from the disk, starts the consensus server, and shuts
541    /// down the config gen API to start the Fedimint API
542    ///
543    /// Clients may receive an error due to forced shutdown, should call the
544    /// `server_status` to see if consensus has started.
545    async fn start_consensus(&self, auth: ApiAuth) -> FederationResult<()>;
546
547    /// Returns the status of the server
548    async fn status(&self) -> FederationResult<StatusResponse>;
549
550    /// Show an audit across all modules
551    async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary>;
552
553    /// Download the guardian config to back it up
554    async fn guardian_config_backup(&self, auth: ApiAuth)
555    -> FederationResult<GuardianConfigBackup>;
556
557    /// Check auth credentials
558    async fn auth(&self, auth: ApiAuth) -> FederationResult<()>;
559
560    async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()>;
561
562    /// Publish our signed API announcement to other guardians
563    async fn submit_api_announcement(
564        &self,
565        peer_id: PeerId,
566        announcement: SignedApiAnnouncement,
567    ) -> FederationResult<()>;
568
569    async fn api_announcements(
570        &self,
571        guardian: PeerId,
572    ) -> ServerResult<BTreeMap<PeerId, SignedApiAnnouncement>>;
573
574    async fn sign_api_announcement(
575        &self,
576        api_url: SafeUrl,
577        auth: ApiAuth,
578    ) -> FederationResult<SignedApiAnnouncement>;
579
580    async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()>;
581
582    /// Returns the fedimintd version a peer is running
583    async fn fedimintd_version(&self, peer_id: PeerId) -> ServerResult<String>;
584
585    /// Fetch the backup statistics from the federation (admin endpoint)
586    async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics>;
587
588    /// Get the invite code for the federation guardian.
589    /// For instance, useful after DKG
590    async fn get_invite_code(&self, guardian: PeerId) -> ServerResult<InviteCode>;
591
592    /// Change the password used to encrypt the configs and for guardian
593    /// authentication
594    async fn change_password(&self, auth: ApiAuth, new_password: &str) -> FederationResult<()>;
595}
596
597pub fn deserialize_outcome<R>(
598    outcome: &SerdeOutputOutcome,
599    module_decoder: &Decoder,
600) -> OutputOutcomeResult<R>
601where
602    R: OutputOutcome + MaybeSend,
603{
604    let dyn_outcome = outcome
605        .try_into_inner_known_module_kind(module_decoder)
606        .map_err(|e| OutputOutcomeError::ResponseDeserialization(e.into()))?;
607
608    let source_instance = dyn_outcome.module_instance_id();
609
610    dyn_outcome.as_any().downcast_ref().cloned().ok_or_else(|| {
611        let target_type = std::any::type_name::<R>();
612        OutputOutcomeError::ResponseDeserialization(anyhow!(
613            "Could not downcast output outcome with instance id {source_instance} to {target_type}"
614        ))
615    })
616}
617
618/// Federation API client
619///
620/// The core underlying object used to make API requests to a federation.
621///
622/// It has an `connectors` handle to actually making outgoing connections
623/// to given URLs, and knows which peers there are and what URLs to connect to
624/// to reach them.
625// TODO: As it is currently it mixes a bit the role of connecting to "peers" with
626// general purpose outgoing connection. Not a big deal, but might need refactor
627// in the future.
628#[derive(Clone, Debug)]
629pub struct FederationApi {
630    /// Map of known URLs to use to connect to peers
631    peers: BTreeMap<PeerId, SafeUrl>,
632    /// List of peer ids, redundant to avoid collecting all the time
633    peers_keys: BTreeSet<PeerId>,
634    /// Our own [`PeerId`] to use when making admin apis
635    admin_id: Option<PeerId>,
636    /// Set when this API is used to communicate with a module
637    module_id: Option<ModuleInstanceId>,
638    /// Api secret of the federation
639    api_secret: Option<String>,
640    /// Connection pool
641    connection_pool: ConnectionPool<dyn IGuardianConnection>,
642}
643
644/// Inner part of [`ConnectionState`] preserving state between attempts to
645/// initialize [`ConnectionState::connection`]
646#[derive(Debug)]
647struct ConnectionStateInner {
648    fresh: bool,
649    backoff: FibonacciBackoff,
650}
651
652#[derive(Debug)]
653pub struct ConnectionState<T: ?Sized> {
654    /// Connection we are trying to or already established
655    pub connection: tokio::sync::OnceCell<Arc<T>>,
656    /// State that technically is protected every time by
657    /// the serialization of `OnceCell::get_or_try_init`, but
658    /// for Rust purposes needs to be locked.
659    inner: std::sync::Mutex<ConnectionStateInner>,
660}
661
662impl<T: ?Sized> ConnectionState<T> {
663    /// Create a new connection state for a first time connection
664    pub fn new_initial() -> Self {
665        Self {
666            connection: OnceCell::new(),
667            inner: std::sync::Mutex::new(ConnectionStateInner {
668                fresh: true,
669                backoff: custom_backoff(
670                    // First time connections start quick
671                    Duration::from_millis(5),
672                    Duration::from_secs(30),
673                    None,
674                ),
675            }),
676        }
677    }
678
679    /// Create a new connection state for a connection that already failed, and
680    /// is being reset
681    pub fn new_reconnecting() -> Self {
682        Self {
683            connection: OnceCell::new(),
684            inner: std::sync::Mutex::new(ConnectionStateInner {
685                // set the attempts to 1, indicating that
686                fresh: false,
687                backoff: custom_backoff(
688                    // Connections after a disconnect start with some minimum delay
689                    Duration::from_millis(500),
690                    Duration::from_secs(30),
691                    None,
692                ),
693            }),
694        }
695    }
696
697    /// Record the fact that an attempt to connect is being made, and return
698    /// time the caller should wait.
699    pub fn pre_reconnect_delay(&self) -> Duration {
700        let mut backoff_locked = self.inner.lock().expect("Locking failed");
701        let fresh = backoff_locked.fresh;
702
703        backoff_locked.fresh = false;
704
705        if fresh {
706            Duration::default()
707        } else {
708            backoff_locked.backoff.next().expect("Keeps retrying")
709        }
710    }
711}
712impl FederationApi {
713    pub fn new(
714        connectors: ConnectorRegistry,
715        peers: BTreeMap<PeerId, SafeUrl>,
716        admin_peer_id: Option<PeerId>,
717        api_secret: Option<&str>,
718    ) -> Self {
719        Self {
720            peers_keys: peers.keys().copied().collect(),
721            peers,
722            admin_id: admin_peer_id,
723            module_id: None,
724            api_secret: api_secret.map(ToOwned::to_owned),
725            connection_pool: ConnectionPool::new(connectors),
726        }
727    }
728
729    async fn get_or_create_connection(
730        &self,
731        url: &SafeUrl,
732        api_secret: Option<&str>,
733    ) -> ServerResult<DynGuaridianConnection> {
734        self.connection_pool
735            .get_or_create_connection(url, api_secret, |url, api_secret, connectors| async move {
736                let conn = connectors
737                    .connect_guardian(&url, api_secret.as_deref())
738                    .await?;
739                Ok(conn)
740            })
741            .await
742    }
743
744    async fn request(
745        &self,
746        peer: PeerId,
747        method: ApiMethod,
748        request: ApiRequestErased,
749    ) -> ServerResult<Value> {
750        trace!(target: LOG_CLIENT_NET_API, %peer, %method, "Api request");
751        let url = self
752            .peers
753            .get(&peer)
754            .ok_or_else(|| ServerError::InvalidPeerId { peer_id: peer })?;
755        let conn = self
756            .get_or_create_connection(url, self.api_secret.as_deref())
757            .await
758            .context("Failed to connect to peer")
759            .map_err(ServerError::Connection)?;
760        let res = conn.request(method.clone(), request).await;
761
762        trace!(target: LOG_CLIENT_NET_API, ?method, res_ok = res.is_ok(), "Api response");
763
764        res
765    }
766}
767
768impl IModuleFederationApi for FederationApi {}
769
770#[apply(async_trait_maybe_send!)]
771impl IRawFederationApi for FederationApi {
772    fn all_peers(&self) -> &BTreeSet<PeerId> {
773        &self.peers_keys
774    }
775
776    fn self_peer(&self) -> Option<PeerId> {
777        self.admin_id
778    }
779
780    fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
781        FederationApi {
782            api_secret: self.api_secret.clone(),
783            peers: self.peers.clone(),
784            peers_keys: self.peers_keys.clone(),
785            admin_id: self.admin_id,
786            module_id: Some(id),
787            connection_pool: self.connection_pool.clone(),
788        }
789        .into()
790    }
791
792    #[instrument(
793        target = LOG_CLIENT_NET_API,
794        skip_all,
795        fields(
796            peer_id = %peer_id,
797            method = %method,
798            params = %params.params,
799        )
800    )]
801    async fn request_raw(
802        &self,
803        peer_id: PeerId,
804        method: &str,
805        params: &ApiRequestErased,
806    ) -> ServerResult<Value> {
807        let method = match self.module_id {
808            Some(module_id) => ApiMethod::Module(module_id, method.to_string()),
809            None => ApiMethod::Core(method.to_string()),
810        };
811
812        self.request(peer_id, method, params.clone()).await
813    }
814}
815
816/// The status of a server, including how it views its peers
817#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
818pub struct LegacyFederationStatus {
819    pub session_count: u64,
820    pub status_by_peer: HashMap<PeerId, LegacyPeerStatus>,
821    pub peers_online: u64,
822    pub peers_offline: u64,
823    /// This should always be 0 if everything is okay, so a monitoring tool
824    /// should generate an alert if this is not the case.
825    pub peers_flagged: u64,
826    pub scheduled_shutdown: Option<u64>,
827}
828
829#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
830pub struct LegacyPeerStatus {
831    pub last_contribution: Option<u64>,
832    pub connection_status: LegacyP2PConnectionStatus,
833    /// Indicates that this peer needs attention from the operator since
834    /// it has not contributed to the consensus in a long time
835    pub flagged: bool,
836}
837
838#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
839#[serde(rename_all = "snake_case")]
840pub enum LegacyP2PConnectionStatus {
841    #[default]
842    Disconnected,
843    Connected,
844}
845
846#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
847pub struct StatusResponse {
848    pub server: ServerStatusLegacy,
849    pub federation: Option<LegacyFederationStatus>,
850}
851
852#[cfg(test)]
853mod tests;