fedimint_api_client/api/
mod.rs

1mod error;
2pub mod global_api;
3
4use std::collections::{BTreeMap, BTreeSet, HashMap};
5use std::fmt::Debug;
6use std::future::pending;
7use std::pin::Pin;
8use std::result;
9use std::sync::Arc;
10
11use anyhow::{Context, anyhow};
12use bitcoin::secp256k1;
13pub use error::{FederationError, OutputOutcomeError};
14pub use fedimint_connectors::ServerResult;
15pub use fedimint_connectors::error::ServerError;
16use fedimint_connectors::{
17    ConnectionPool, ConnectorRegistry, DynGuaridianConnection, IGuardianConnection,
18};
19use fedimint_core::admin_client::{GuardianConfigBackup, ServerStatusLegacy, SetupStatus};
20use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
21use fedimint_core::core::backup::SignedBackupRequest;
22use fedimint_core::core::{Decoder, DynOutputOutcome, ModuleInstanceId, ModuleKind, OutputOutcome};
23use fedimint_core::encoding::{Decodable, Encodable};
24use fedimint_core::invite_code::InviteCode;
25use fedimint_core::module::audit::AuditSummary;
26use fedimint_core::module::registry::ModuleDecoderRegistry;
27use fedimint_core::module::{
28    ApiAuth, ApiMethod, ApiRequestErased, ApiVersion, SerdeModuleEncoding,
29};
30use fedimint_core::net::api_announcement::SignedApiAnnouncement;
31use fedimint_core::net::guardian_metadata::SignedGuardianMetadata;
32use fedimint_core::session_outcome::{SessionOutcome, SessionStatus};
33use fedimint_core::task::{MaybeSend, MaybeSync};
34use fedimint_core::transaction::{Transaction, TransactionSubmissionOutcome};
35use fedimint_core::util::backoff_util::api_networking_backoff;
36use fedimint_core::util::{FmtCompact as _, SafeUrl};
37use fedimint_core::{
38    ChainId, NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send, dyn_newtype_define,
39    util,
40};
41use fedimint_logging::LOG_CLIENT_NET_API;
42use fedimint_metrics::HistogramExt as _;
43use futures::stream::{BoxStream, FuturesUnordered};
44use futures::{Future, StreamExt};
45use global_api::with_cache::GlobalFederationApiWithCache;
46use jsonrpsee_core::DeserializeOwned;
47use serde::{Deserialize, Serialize};
48use serde_json::Value;
49use tokio::sync::watch;
50use tokio_stream::wrappers::WatchStream;
51use tracing::{debug, instrument, trace, warn};
52
53use crate::metrics::{CLIENT_API_REQUEST_DURATION_SECONDS, CLIENT_API_REQUESTS_TOTAL};
54use crate::query::{QueryStep, QueryStrategy, ThresholdConsensus};
55
56pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2: ApiVersion = ApiVersion::new(0, 5);
57
58pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS: ApiVersion =
59    ApiVersion { major: 0, minor: 1 };
60
61pub const VERSION_THAT_INTRODUCED_AWAIT_OUTPUTS_OUTCOMES: ApiVersion = ApiVersion::new(0, 8);
62pub type FederationResult<T> = Result<T, FederationError>;
63pub type SerdeOutputOutcome = SerdeModuleEncoding<DynOutputOutcome>;
64
65pub type OutputOutcomeResult<O> = result::Result<O, OutputOutcomeError>;
66
67/// Set of api versions for each component (core + modules)
68///
69/// E.g. result of federated common api versions discovery.
70#[derive(Debug, Clone, Serialize, Deserialize, Encodable, Decodable)]
71pub struct ApiVersionSet {
72    pub core: ApiVersion,
73    pub modules: BTreeMap<ModuleInstanceId, ApiVersion>,
74}
75
76/// An API (module or global) that can query a federation
77#[apply(async_trait_maybe_send!)]
78pub trait IRawFederationApi: Debug + MaybeSend + MaybeSync {
79    /// List of all federation peers for the purpose of iterating each peer
80    /// in the federation.
81    ///
82    /// The underlying implementation is responsible for knowing how many
83    /// and `PeerId`s of each. The caller of this interface most probably
84    /// have some idea as well, but passing this set across every
85    /// API call to the federation would be inconvenient.
86    fn all_peers(&self) -> &BTreeSet<PeerId>;
87
88    /// `PeerId` of the Guardian node, if set
89    ///
90    /// This is for using Client in a "Admin" mode, making authenticated
91    /// calls to own `fedimintd` instance.
92    fn self_peer(&self) -> Option<PeerId>;
93
94    fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi;
95
96    /// Make request to a specific federation peer by `peer_id`
97    async fn request_raw(
98        &self,
99        peer_id: PeerId,
100        method: &str,
101        params: &ApiRequestErased,
102    ) -> ServerResult<Value>;
103
104    /// Returns a stream of connection status for each peer
105    ///
106    /// The stream emits a new value whenever the connection status changes.
107    fn connection_status_stream(&self) -> BoxStream<'static, BTreeMap<PeerId, bool>>;
108    /// Wait for some connections being initialized
109    ///
110    /// This is useful to avoid initializing networking by
111    /// tasks that are not high priority.
112    async fn wait_for_initialized_connections(&self);
113}
114
115/// An extension trait allowing to making federation-wide API call on top
116/// [`IRawFederationApi`].
117#[apply(async_trait_maybe_send!)]
118pub trait FederationApiExt: IRawFederationApi {
119    async fn request_single_peer<Ret>(
120        &self,
121        method: String,
122        params: ApiRequestErased,
123        peer: PeerId,
124    ) -> ServerResult<Ret>
125    where
126        Ret: DeserializeOwned,
127    {
128        self.request_raw(peer, &method, &params)
129            .await
130            .and_then(|v| {
131                serde_json::from_value(v)
132                    .map_err(|e| ServerError::ResponseDeserialization(e.into()))
133            })
134    }
135
136    async fn request_single_peer_federation<FedRet>(
137        &self,
138        method: String,
139        params: ApiRequestErased,
140        peer_id: PeerId,
141    ) -> FederationResult<FedRet>
142    where
143        FedRet: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
144    {
145        self.request_raw(peer_id, &method, &params)
146            .await
147            .and_then(|v| {
148                serde_json::from_value(v)
149                    .map_err(|e| ServerError::ResponseDeserialization(e.into()))
150            })
151            .map_err(|e| error::FederationError::new_one_peer(peer_id, method, params, e))
152    }
153
154    /// Make an aggregate request to federation, using `strategy` to logically
155    /// merge the responses.
156    #[instrument(target = LOG_CLIENT_NET_API, skip_all, fields(method=method))]
157    async fn request_with_strategy<PR: DeserializeOwned, FR: Debug>(
158        &self,
159        mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
160        method: String,
161        params: ApiRequestErased,
162    ) -> FederationResult<FR> {
163        // NOTE: `FuturesUnorderded` is a footgun, but all we do here is polling
164        // completed results from it and we don't do any `await`s when
165        // processing them, it should be totally OK.
166        #[cfg(not(target_family = "wasm"))]
167        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
168        #[cfg(target_family = "wasm")]
169        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
170
171        for peer in self.all_peers() {
172            futures.push(Box::pin({
173                let method = &method;
174                let params = &params;
175                async move {
176                    let result = self
177                        .request_single_peer(method.clone(), params.clone(), *peer)
178                        .await;
179
180                    (*peer, result)
181                }
182            }));
183        }
184
185        let mut peer_errors = BTreeMap::new();
186        let peer_error_threshold = self.all_peers().to_num_peers().one_honest();
187
188        loop {
189            let (peer, result) = futures
190                .next()
191                .await
192                .expect("Query strategy ran out of peers to query without returning a result");
193
194            match result {
195                Ok(response) => match strategy.process(peer, response) {
196                    QueryStep::Retry(peers) => {
197                        for peer in peers {
198                            futures.push(Box::pin({
199                                let method = &method;
200                                let params = &params;
201                                async move {
202                                    let result = self
203                                        .request_single_peer(method.clone(), params.clone(), peer)
204                                        .await;
205
206                                    (peer, result)
207                                }
208                            }));
209                        }
210                    }
211                    QueryStep::Success(response) => return Ok(response),
212                    QueryStep::Failure(e) => {
213                        peer_errors.insert(peer, e);
214                    }
215                    QueryStep::Continue => {}
216                },
217                Err(e) => {
218                    e.report_if_unusual(peer, "RequestWithStrategy");
219                    peer_errors.insert(peer, e);
220                }
221            }
222
223            if peer_errors.len() == peer_error_threshold {
224                return Err(FederationError::peer_errors(
225                    method.clone(),
226                    params.params.clone(),
227                    peer_errors,
228                ));
229            }
230        }
231    }
232
233    #[instrument(target = LOG_CLIENT_NET_API, level = "debug", skip(self, strategy))]
234    async fn request_with_strategy_retry<PR: DeserializeOwned + MaybeSend, FR: Debug>(
235        &self,
236        mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
237        method: String,
238        params: ApiRequestErased,
239    ) -> FR {
240        // NOTE: `FuturesUnorderded` is a footgun, but all we do here is polling
241        // completed results from it and we don't do any `await`s when
242        // processing them, it should be totally OK.
243        #[cfg(not(target_family = "wasm"))]
244        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
245        #[cfg(target_family = "wasm")]
246        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
247
248        for peer in self.all_peers() {
249            futures.push(Box::pin({
250                let method = &method;
251                let params = &params;
252                async move {
253                    let response = util::retry(
254                        format!("api-request-{method}-{peer}"),
255                        api_networking_backoff(),
256                        || async {
257                            self.request_single_peer(method.clone(), params.clone(), *peer)
258                                .await
259                                .inspect_err(|e| {
260                                    e.report_if_unusual(*peer, "QueryWithStrategyRetry");
261                                })
262                                .map_err(|e| anyhow!(e.to_string()))
263                        },
264                    )
265                    .await
266                    .expect("Number of retries has no limit");
267
268                    (*peer, response)
269                }
270            }));
271        }
272
273        loop {
274            let (peer, response) = match futures.next().await {
275                Some(t) => t,
276                None => pending().await,
277            };
278
279            match strategy.process(peer, response) {
280                QueryStep::Retry(peers) => {
281                    for peer in peers {
282                        futures.push(Box::pin({
283                            let method = &method;
284                            let params = &params;
285                            async move {
286                                let response = util::retry(
287                                    format!("api-request-{method}-{peer}"),
288                                    api_networking_backoff(),
289                                    || async {
290                                        self.request_single_peer(
291                                            method.clone(),
292                                            params.clone(),
293                                            peer,
294                                        )
295                                        .await
296                                        .inspect_err(|err| {
297                                            if err.is_unusual() {
298                                                debug!(target: LOG_CLIENT_NET_API, err = %err.fmt_compact(), "Unusual peer error");
299                                            }
300                                        })
301                                        .map_err(|e| anyhow!(e.to_string()))
302                                    },
303                                )
304                                .await
305                                .expect("Number of retries has no limit");
306
307                                (peer, response)
308                            }
309                        }));
310                    }
311                }
312                QueryStep::Success(response) => return response,
313                QueryStep::Failure(e) => {
314                    warn!(target: LOG_CLIENT_NET_API, "Query strategy returned non-retryable failure for peer {peer}: {e}");
315                }
316                QueryStep::Continue => {}
317            }
318        }
319    }
320
321    async fn request_current_consensus<Ret>(
322        &self,
323        method: String,
324        params: ApiRequestErased,
325    ) -> FederationResult<Ret>
326    where
327        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
328    {
329        self.request_with_strategy(
330            ThresholdConsensus::new(self.all_peers().to_num_peers()),
331            method,
332            params,
333        )
334        .await
335    }
336
337    async fn request_current_consensus_retry<Ret>(
338        &self,
339        method: String,
340        params: ApiRequestErased,
341    ) -> Ret
342    where
343        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
344    {
345        self.request_with_strategy_retry(
346            ThresholdConsensus::new(self.all_peers().to_num_peers()),
347            method,
348            params,
349        )
350        .await
351    }
352
353    async fn request_admin<Ret>(
354        &self,
355        method: &str,
356        params: ApiRequestErased,
357        auth: ApiAuth,
358    ) -> FederationResult<Ret>
359    where
360        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
361    {
362        let Some(self_peer_id) = self.self_peer() else {
363            return Err(FederationError::general(
364                method,
365                params,
366                anyhow::format_err!("Admin peer_id not set"),
367            ));
368        };
369
370        self.request_single_peer_federation(method.into(), params.with_auth(auth), self_peer_id)
371            .await
372    }
373
374    async fn request_admin_no_auth<Ret>(
375        &self,
376        method: &str,
377        params: ApiRequestErased,
378    ) -> FederationResult<Ret>
379    where
380        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
381    {
382        let Some(self_peer_id) = self.self_peer() else {
383            return Err(FederationError::general(
384                method,
385                params,
386                anyhow::format_err!("Admin peer_id not set"),
387            ));
388        };
389
390        self.request_single_peer_federation(method.into(), params, self_peer_id)
391            .await
392    }
393}
394
395#[apply(async_trait_maybe_send!)]
396impl<T: ?Sized> FederationApiExt for T where T: IRawFederationApi {}
397
398/// Trait marker for the module (non-global) endpoints
399pub trait IModuleFederationApi: IRawFederationApi {}
400
401dyn_newtype_define! {
402    #[derive(Clone)]
403    pub DynModuleApi(Arc<IModuleFederationApi>)
404}
405
406dyn_newtype_define! {
407    #[derive(Clone)]
408    pub DynGlobalApi(Arc<IGlobalFederationApi>)
409}
410
411impl AsRef<dyn IGlobalFederationApi + 'static> for DynGlobalApi {
412    fn as_ref(&self) -> &(dyn IGlobalFederationApi + 'static) {
413        self.inner.as_ref()
414    }
415}
416
417impl DynGlobalApi {
418    pub fn new(
419        connectors: ConnectorRegistry,
420        peers: BTreeMap<PeerId, SafeUrl>,
421        api_secret: Option<&str>,
422    ) -> anyhow::Result<Self> {
423        Ok(GlobalFederationApiWithCache::new(FederationApi::new(
424            connectors, peers, None, api_secret,
425        ))
426        .into())
427    }
428    pub fn new_admin(
429        connectors: ConnectorRegistry,
430        peer: PeerId,
431        url: SafeUrl,
432        api_secret: Option<&str>,
433    ) -> anyhow::Result<DynGlobalApi> {
434        Ok(GlobalFederationApiWithCache::new(FederationApi::new(
435            connectors,
436            [(peer, url)].into(),
437            Some(peer),
438            api_secret,
439        ))
440        .into())
441    }
442
443    pub fn new_admin_setup(connectors: ConnectorRegistry, url: SafeUrl) -> anyhow::Result<Self> {
444        // PeerIds are used only for informational purposes, but just in case, make a
445        // big number so it stands out
446        Self::new_admin(
447            connectors,
448            PeerId::from(1024),
449            url,
450            // Setup does not have api secrets yet
451            None,
452        )
453    }
454}
455
456/// The API for the global (non-module) endpoints
457#[apply(async_trait_maybe_send!)]
458pub trait IGlobalFederationApi: IRawFederationApi {
459    async fn submit_transaction(
460        &self,
461        tx: Transaction,
462    ) -> SerdeModuleEncoding<TransactionSubmissionOutcome>;
463
464    async fn await_block(
465        &self,
466        block_index: u64,
467        decoders: &ModuleDecoderRegistry,
468    ) -> anyhow::Result<SessionOutcome>;
469
470    async fn get_session_status(
471        &self,
472        block_index: u64,
473        decoders: &ModuleDecoderRegistry,
474        core_api_version: ApiVersion,
475        broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
476    ) -> anyhow::Result<SessionStatus>;
477
478    async fn session_count(&self) -> FederationResult<u64>;
479
480    async fn await_transaction(&self, txid: TransactionId) -> TransactionId;
481
482    async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()>;
483
484    async fn download_backup(
485        &self,
486        id: &secp256k1::PublicKey,
487    ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>>;
488
489    /// Sets the password used to decrypt the configs and authenticate
490    ///
491    /// Must be called first before any other calls to the API
492    async fn set_password(&self, auth: ApiAuth) -> FederationResult<()>;
493
494    async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus>;
495
496    async fn set_local_params(
497        &self,
498        name: String,
499        federation_name: Option<String>,
500        disable_base_fees: Option<bool>,
501        enabled_modules: Option<BTreeSet<ModuleKind>>,
502        auth: ApiAuth,
503    ) -> FederationResult<String>;
504
505    async fn add_peer_connection_info(
506        &self,
507        info: String,
508        auth: ApiAuth,
509    ) -> FederationResult<String>;
510
511    /// Reset the peer setup codes during the federation setup process
512    async fn reset_peer_setup_codes(&self, auth: ApiAuth) -> FederationResult<()>;
513
514    /// Returns the setup code if `set_local_params` was already called
515    async fn get_setup_code(&self, auth: ApiAuth) -> FederationResult<Option<String>>;
516
517    /// Runs DKG, can only be called once after configs have been generated in
518    /// `get_consensus_config_gen_params`.  If DKG fails this returns a 500
519    /// error and config gen must be restarted.
520    async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()>;
521
522    /// Returns the status of the server
523    async fn status(&self) -> FederationResult<StatusResponse>;
524
525    /// Show an audit across all modules
526    async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary>;
527
528    /// Download the guardian config to back it up
529    async fn guardian_config_backup(&self, auth: ApiAuth)
530    -> FederationResult<GuardianConfigBackup>;
531
532    /// Check auth credentials
533    async fn auth(&self, auth: ApiAuth) -> FederationResult<()>;
534
535    async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()>;
536
537    /// Publish our signed API announcement to other guardians
538    async fn submit_api_announcement(
539        &self,
540        peer_id: PeerId,
541        announcement: SignedApiAnnouncement,
542    ) -> FederationResult<()>;
543
544    async fn api_announcements(
545        &self,
546        guardian: PeerId,
547    ) -> ServerResult<BTreeMap<PeerId, SignedApiAnnouncement>>;
548
549    async fn sign_api_announcement(
550        &self,
551        api_url: SafeUrl,
552        auth: ApiAuth,
553    ) -> FederationResult<SignedApiAnnouncement>;
554
555    /// Publish our signed guardian metadata to other guardians
556    async fn submit_guardian_metadata(
557        &self,
558        peer_id: PeerId,
559        metadata: SignedGuardianMetadata,
560    ) -> FederationResult<()>;
561
562    async fn guardian_metadata(
563        &self,
564        guardian: PeerId,
565    ) -> ServerResult<BTreeMap<PeerId, SignedGuardianMetadata>>;
566
567    async fn sign_guardian_metadata(
568        &self,
569        metadata: fedimint_core::net::guardian_metadata::GuardianMetadata,
570        auth: ApiAuth,
571    ) -> FederationResult<SignedGuardianMetadata>;
572
573    async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()>;
574
575    /// Returns the fedimintd version a peer is running
576    async fn fedimintd_version(&self, peer_id: PeerId) -> ServerResult<String>;
577
578    /// Fetch the backup statistics from the federation (admin endpoint)
579    async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics>;
580
581    /// Get the invite code for the federation guardian.
582    /// For instance, useful after DKG
583    async fn get_invite_code(&self, guardian: PeerId) -> ServerResult<InviteCode>;
584
585    /// Change the password used to encrypt the configs and for guardian
586    /// authentication
587    async fn change_password(&self, auth: ApiAuth, new_password: &str) -> FederationResult<()>;
588
589    /// Returns the chain ID (bitcoin block hash at height 1) from the
590    /// federation
591    async fn chain_id(&self) -> FederationResult<ChainId>;
592}
593
594pub fn deserialize_outcome<R>(
595    outcome: &SerdeOutputOutcome,
596    module_decoder: &Decoder,
597) -> OutputOutcomeResult<R>
598where
599    R: OutputOutcome + MaybeSend,
600{
601    let dyn_outcome = outcome
602        .try_into_inner_known_module_kind(module_decoder)
603        .map_err(|e| OutputOutcomeError::ResponseDeserialization(e.into()))?;
604
605    let source_instance = dyn_outcome.module_instance_id();
606
607    dyn_outcome.as_any().downcast_ref().cloned().ok_or_else(|| {
608        let target_type = std::any::type_name::<R>();
609        OutputOutcomeError::ResponseDeserialization(anyhow!(
610            "Could not downcast output outcome with instance id {source_instance} to {target_type}"
611        ))
612    })
613}
614
615/// Federation API client
616///
617/// The core underlying object used to make API requests to a federation.
618///
619/// It has an `connectors` handle to actually making outgoing connections
620/// to given URLs, and knows which peers there are and what URLs to connect to
621/// to reach them.
622// TODO: As it is currently it mixes a bit the role of connecting to "peers" with
623// general purpose outgoing connection. Not a big deal, but might need refactor
624// in the future.
625#[derive(Clone, Debug)]
626pub struct FederationApi {
627    /// Map of known URLs to use to connect to peers
628    peers: BTreeMap<PeerId, SafeUrl>,
629    /// List of peer ids, redundant to avoid collecting all the time
630    peers_keys: BTreeSet<PeerId>,
631    /// Our own [`PeerId`] to use when making admin apis
632    admin_id: Option<PeerId>,
633    /// Set when this API is used to communicate with a module
634    module_id: Option<ModuleInstanceId>,
635    /// Api secret of the federation
636    api_secret: Option<String>,
637    /// Connection pool
638    connection_pool: ConnectionPool<dyn IGuardianConnection>,
639}
640
641impl FederationApi {
642    pub fn new(
643        connectors: ConnectorRegistry,
644        peers: BTreeMap<PeerId, SafeUrl>,
645        admin_peer_id: Option<PeerId>,
646        api_secret: Option<&str>,
647    ) -> Self {
648        Self {
649            peers_keys: peers.keys().copied().collect(),
650            peers,
651            admin_id: admin_peer_id,
652            module_id: None,
653            api_secret: api_secret.map(ToOwned::to_owned),
654            connection_pool: ConnectionPool::new(connectors),
655        }
656    }
657
658    async fn get_or_create_connection(
659        &self,
660        url: &SafeUrl,
661        api_secret: Option<&str>,
662    ) -> ServerResult<DynGuaridianConnection> {
663        self.connection_pool
664            .get_or_create_connection(url, api_secret, |url, api_secret, connectors| async move {
665                let conn = connectors
666                    .connect_guardian(&url, api_secret.as_deref())
667                    .await?;
668                Ok(conn)
669            })
670            .await
671    }
672
673    async fn request(
674        &self,
675        peer: PeerId,
676        method: ApiMethod,
677        request: ApiRequestErased,
678    ) -> ServerResult<Value> {
679        trace!(target: LOG_CLIENT_NET_API, %peer, %method, "Api request");
680        let url = self
681            .peers
682            .get(&peer)
683            .ok_or_else(|| ServerError::InvalidPeerId { peer_id: peer })?;
684        let conn = self
685            .get_or_create_connection(url, self.api_secret.as_deref())
686            .await
687            .context("Failed to connect to peer")
688            .map_err(ServerError::Connection)?;
689
690        let method_str = method.to_string();
691        let peer_str = peer.to_string();
692        let timer = CLIENT_API_REQUEST_DURATION_SECONDS
693            .with_label_values(&[&method_str, &peer_str])
694            .start_timer_ext();
695
696        let res = conn.request(method.clone(), request).await;
697
698        timer.observe_duration();
699
700        let result_label = if res.is_ok() { "success" } else { "error" }.to_string();
701        CLIENT_API_REQUESTS_TOTAL
702            .with_label_values(&[&method_str, &peer_str, &result_label])
703            .inc();
704
705        trace!(target: LOG_CLIENT_NET_API, ?method, res_ok = res.is_ok(), "Api response");
706
707        res
708    }
709
710    /// Get receiver for changes in the active connections
711    ///
712    /// This allows real-time monitoring of connection status.
713    pub fn get_active_connection_receiver(&self) -> watch::Receiver<BTreeSet<SafeUrl>> {
714        self.connection_pool.get_active_connection_receiver()
715    }
716}
717
718impl IModuleFederationApi for FederationApi {}
719
720#[apply(async_trait_maybe_send!)]
721impl IRawFederationApi for FederationApi {
722    fn all_peers(&self) -> &BTreeSet<PeerId> {
723        &self.peers_keys
724    }
725
726    fn self_peer(&self) -> Option<PeerId> {
727        self.admin_id
728    }
729
730    fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
731        FederationApi {
732            api_secret: self.api_secret.clone(),
733            peers: self.peers.clone(),
734            peers_keys: self.peers_keys.clone(),
735            admin_id: self.admin_id,
736            module_id: Some(id),
737            connection_pool: self.connection_pool.clone(),
738        }
739        .into()
740    }
741
742    #[instrument(
743        target = LOG_CLIENT_NET_API,
744        skip_all,
745        fields(
746            peer_id = %peer_id,
747            method = %method,
748            params = %params.params,
749        )
750    )]
751    async fn request_raw(
752        &self,
753        peer_id: PeerId,
754        method: &str,
755        params: &ApiRequestErased,
756    ) -> ServerResult<Value> {
757        let method = match self.module_id {
758            Some(module_id) => ApiMethod::Module(module_id, method.to_string()),
759            None => ApiMethod::Core(method.to_string()),
760        };
761
762        self.request(peer_id, method, params.clone()).await
763    }
764
765    fn connection_status_stream(&self) -> BoxStream<'static, BTreeMap<PeerId, bool>> {
766        let peers = self.peers.clone();
767
768        WatchStream::new(self.connection_pool.get_active_connection_receiver())
769            .map(move |active_urls| {
770                peers
771                    .iter()
772                    .map(|(peer_id, url)| (*peer_id, active_urls.contains(url)))
773                    .collect()
774            })
775            .boxed()
776    }
777    async fn wait_for_initialized_connections(&self) {
778        self.connection_pool
779            .wait_for_initialized_connections()
780            .await;
781    }
782}
783
784/// The status of a server, including how it views its peers
785#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
786pub struct LegacyFederationStatus {
787    pub session_count: u64,
788    pub status_by_peer: HashMap<PeerId, LegacyPeerStatus>,
789    pub peers_online: u64,
790    pub peers_offline: u64,
791    /// This should always be 0 if everything is okay, so a monitoring tool
792    /// should generate an alert if this is not the case.
793    pub peers_flagged: u64,
794    pub scheduled_shutdown: Option<u64>,
795}
796
797#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
798pub struct LegacyPeerStatus {
799    pub last_contribution: Option<u64>,
800    pub connection_status: LegacyP2PConnectionStatus,
801    /// Indicates that this peer needs attention from the operator since
802    /// it has not contributed to the consensus in a long time
803    pub flagged: bool,
804}
805
806#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
807#[serde(rename_all = "snake_case")]
808pub enum LegacyP2PConnectionStatus {
809    #[default]
810    Disconnected,
811    Connected,
812}
813
814#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
815pub struct StatusResponse {
816    pub server: ServerStatusLegacy,
817    pub federation: Option<LegacyFederationStatus>,
818}
819
820#[cfg(test)]
821mod tests;