Skip to main content

fedimint_api_client/api/
mod.rs

1mod error;
2pub mod global_api;
3
4use std::collections::{BTreeMap, BTreeSet, HashMap};
5use std::fmt::Debug;
6use std::future::pending;
7use std::pin::Pin;
8use std::result;
9use std::sync::Arc;
10
11use anyhow::{Context, anyhow};
12use bitcoin::secp256k1;
13pub use error::{FederationError, OutputOutcomeError};
14pub use fedimint_connectors::ServerResult;
15pub use fedimint_connectors::error::ServerError;
16use fedimint_connectors::{
17    ConnectionPool, ConnectorRegistry, DynGuaridianConnection, IGuardianConnection,
18};
19use fedimint_core::admin_client::{GuardianConfigBackup, ServerStatusLegacy, SetupStatus};
20use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
21use fedimint_core::core::backup::SignedBackupRequest;
22use fedimint_core::core::{Decoder, DynOutputOutcome, ModuleInstanceId, ModuleKind, OutputOutcome};
23use fedimint_core::encoding::{Decodable, Encodable};
24use fedimint_core::invite_code::InviteCode;
25use fedimint_core::module::audit::AuditSummary;
26use fedimint_core::module::registry::ModuleDecoderRegistry;
27use fedimint_core::module::{
28    ApiAuth, ApiMethod, ApiRequestErased, ApiVersion, SerdeModuleEncoding,
29};
30use fedimint_core::net::api_announcement::SignedApiAnnouncement;
31use fedimint_core::net::guardian_metadata::SignedGuardianMetadata;
32use fedimint_core::session_outcome::{SessionOutcome, SessionStatus};
33use fedimint_core::task::{MaybeSend, MaybeSync};
34use fedimint_core::transaction::{Transaction, TransactionSubmissionOutcome};
35use fedimint_core::util::backoff_util::api_networking_backoff;
36use fedimint_core::util::{FmtCompact as _, SafeUrl};
37use fedimint_core::{
38    ChainId, NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send, dyn_newtype_define,
39    util,
40};
41use fedimint_logging::LOG_CLIENT_NET_API;
42use fedimint_metrics::HistogramExt as _;
43use futures::stream::{BoxStream, FuturesUnordered};
44use futures::{Future, StreamExt};
45use global_api::with_cache::GlobalFederationApiWithCache;
46use jsonrpsee_core::DeserializeOwned;
47use serde::{Deserialize, Serialize};
48use serde_json::Value;
49use tokio::sync::watch;
50use tokio_stream::wrappers::WatchStream;
51use tracing::{debug, instrument, trace, warn};
52
53use crate::metrics::{CLIENT_API_REQUEST_DURATION_SECONDS, CLIENT_API_REQUESTS_TOTAL};
54use crate::query::{QueryStep, QueryStrategy, ThresholdConsensus};
55
56pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2: ApiVersion = ApiVersion::new(0, 5);
57
58pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS: ApiVersion =
59    ApiVersion { major: 0, minor: 1 };
60
61pub const VERSION_THAT_INTRODUCED_AWAIT_OUTPUTS_OUTCOMES: ApiVersion = ApiVersion::new(0, 8);
62pub type FederationResult<T> = Result<T, FederationError>;
63pub type SerdeOutputOutcome = SerdeModuleEncoding<DynOutputOutcome>;
64
65pub type OutputOutcomeResult<O> = result::Result<O, OutputOutcomeError>;
66
67/// Set of api versions for each component (core + modules)
68///
69/// E.g. result of federated common api versions discovery.
70#[derive(Debug, Clone, Serialize, Deserialize, Encodable, Decodable)]
71pub struct ApiVersionSet {
72    pub core: ApiVersion,
73    pub modules: BTreeMap<ModuleInstanceId, ApiVersion>,
74}
75
76/// An API (module or global) that can query a federation
77#[apply(async_trait_maybe_send!)]
78pub trait IRawFederationApi: Debug + MaybeSend + MaybeSync {
79    /// List of all federation peers for the purpose of iterating each peer
80    /// in the federation.
81    ///
82    /// The underlying implementation is responsible for knowing how many
83    /// and `PeerId`s of each. The caller of this interface most probably
84    /// have some idea as well, but passing this set across every
85    /// API call to the federation would be inconvenient.
86    fn all_peers(&self) -> &BTreeSet<PeerId>;
87
88    /// `PeerId` of the Guardian node, if set
89    ///
90    /// This is for using Client in a "Admin" mode, making authenticated
91    /// calls to own `fedimintd` instance.
92    fn self_peer(&self) -> Option<PeerId>;
93
94    fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi;
95
96    /// Make request to a specific federation peer by `peer_id`
97    async fn request_raw(
98        &self,
99        peer_id: PeerId,
100        method: &str,
101        params: &ApiRequestErased,
102    ) -> ServerResult<Value>;
103
104    /// Returns a stream of connection status for each peer
105    ///
106    /// The stream emits a new value whenever the connection status changes.
107    fn connection_status_stream(&self) -> BoxStream<'static, BTreeMap<PeerId, bool>>;
108    /// Wait for some connections being initialized
109    ///
110    /// This is useful to avoid initializing networking by
111    /// tasks that are not high priority.
112    async fn wait_for_initialized_connections(&self);
113
114    /// Get or create a connection to a specific peer, returning the connection
115    /// object.
116    ///
117    /// This can be used to monitor connection status and await disconnection
118    /// for proactive reconnection strategies.
119    async fn get_peer_connection(&self, peer_id: PeerId) -> ServerResult<DynGuaridianConnection>;
120}
121
122/// An extension trait allowing to making federation-wide API call on top
123/// [`IRawFederationApi`].
124#[apply(async_trait_maybe_send!)]
125pub trait FederationApiExt: IRawFederationApi {
126    async fn request_single_peer<Ret>(
127        &self,
128        method: String,
129        params: ApiRequestErased,
130        peer: PeerId,
131    ) -> ServerResult<Ret>
132    where
133        Ret: DeserializeOwned,
134    {
135        self.request_raw(peer, &method, &params)
136            .await
137            .and_then(|v| {
138                serde_json::from_value(v)
139                    .map_err(|e| ServerError::ResponseDeserialization(e.into()))
140            })
141    }
142
143    async fn request_single_peer_federation<FedRet>(
144        &self,
145        method: String,
146        params: ApiRequestErased,
147        peer_id: PeerId,
148    ) -> FederationResult<FedRet>
149    where
150        FedRet: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
151    {
152        self.request_raw(peer_id, &method, &params)
153            .await
154            .and_then(|v| {
155                serde_json::from_value(v)
156                    .map_err(|e| ServerError::ResponseDeserialization(e.into()))
157            })
158            .map_err(|e| error::FederationError::new_one_peer(peer_id, method, params, e))
159    }
160
161    /// Make an aggregate request to federation, using `strategy` to logically
162    /// merge the responses.
163    #[instrument(target = LOG_CLIENT_NET_API, skip_all, fields(method=method))]
164    async fn request_with_strategy<PR: DeserializeOwned, FR: Debug>(
165        &self,
166        mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
167        method: String,
168        params: ApiRequestErased,
169    ) -> FederationResult<FR> {
170        // NOTE: `FuturesUnorderded` is a footgun, but all we do here is polling
171        // completed results from it and we don't do any `await`s when
172        // processing them, it should be totally OK.
173        #[cfg(not(target_family = "wasm"))]
174        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
175        #[cfg(target_family = "wasm")]
176        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
177
178        for peer in self.all_peers() {
179            futures.push(Box::pin({
180                let method = &method;
181                let params = &params;
182                async move {
183                    let result = self
184                        .request_single_peer(method.clone(), params.clone(), *peer)
185                        .await;
186
187                    (*peer, result)
188                }
189            }));
190        }
191
192        let mut peer_errors = BTreeMap::new();
193        let peer_error_threshold = self.all_peers().to_num_peers().one_honest();
194
195        loop {
196            let (peer, result) = futures
197                .next()
198                .await
199                .expect("Query strategy ran out of peers to query without returning a result");
200
201            match result {
202                Ok(response) => match strategy.process(peer, response) {
203                    QueryStep::Retry(peers) => {
204                        for peer in peers {
205                            futures.push(Box::pin({
206                                let method = &method;
207                                let params = &params;
208                                async move {
209                                    let result = self
210                                        .request_single_peer(method.clone(), params.clone(), peer)
211                                        .await;
212
213                                    (peer, result)
214                                }
215                            }));
216                        }
217                    }
218                    QueryStep::Success(response) => return Ok(response),
219                    QueryStep::Failure(e) => {
220                        peer_errors.insert(peer, e);
221                    }
222                    QueryStep::Continue => {}
223                },
224                Err(e) => {
225                    e.report_if_unusual(peer, "RequestWithStrategy");
226                    peer_errors.insert(peer, e);
227                }
228            }
229
230            if peer_errors.len() == peer_error_threshold {
231                return Err(FederationError::peer_errors(
232                    method.clone(),
233                    params.params.clone(),
234                    peer_errors,
235                ));
236            }
237        }
238    }
239
240    #[instrument(target = LOG_CLIENT_NET_API, level = "debug", skip(self, strategy))]
241    async fn request_with_strategy_retry<PR: DeserializeOwned + MaybeSend, FR: Debug>(
242        &self,
243        mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
244        method: String,
245        params: ApiRequestErased,
246    ) -> FR {
247        // NOTE: `FuturesUnorderded` is a footgun, but all we do here is polling
248        // completed results from it and we don't do any `await`s when
249        // processing them, it should be totally OK.
250        #[cfg(not(target_family = "wasm"))]
251        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
252        #[cfg(target_family = "wasm")]
253        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
254
255        for peer in self.all_peers() {
256            futures.push(Box::pin({
257                let method = &method;
258                let params = &params;
259                async move {
260                    let response = util::retry(
261                        format!("api-request-{method}-{peer}"),
262                        api_networking_backoff(),
263                        || async {
264                            self.request_single_peer(method.clone(), params.clone(), *peer)
265                                .await
266                                .inspect_err(|e| {
267                                    e.report_if_unusual(*peer, "QueryWithStrategyRetry");
268                                })
269                                .map_err(|e| anyhow!(e.to_string()))
270                        },
271                    )
272                    .await
273                    .expect("Number of retries has no limit");
274
275                    (*peer, response)
276                }
277            }));
278        }
279
280        loop {
281            let (peer, response) = match futures.next().await {
282                Some(t) => t,
283                None => pending().await,
284            };
285
286            match strategy.process(peer, response) {
287                QueryStep::Retry(peers) => {
288                    for peer in peers {
289                        futures.push(Box::pin({
290                            let method = &method;
291                            let params = &params;
292                            async move {
293                                let response = util::retry(
294                                    format!("api-request-{method}-{peer}"),
295                                    api_networking_backoff(),
296                                    || async {
297                                        self.request_single_peer(
298                                            method.clone(),
299                                            params.clone(),
300                                            peer,
301                                        )
302                                        .await
303                                        .inspect_err(|err| {
304                                            if err.is_unusual() {
305                                                debug!(target: LOG_CLIENT_NET_API, err = %err.fmt_compact(), "Unusual peer error");
306                                            }
307                                        })
308                                        .map_err(|e| anyhow!(e.to_string()))
309                                    },
310                                )
311                                .await
312                                .expect("Number of retries has no limit");
313
314                                (peer, response)
315                            }
316                        }));
317                    }
318                }
319                QueryStep::Success(response) => return response,
320                QueryStep::Failure(e) => {
321                    warn!(target: LOG_CLIENT_NET_API, "Query strategy returned non-retryable failure for peer {peer}: {e}");
322                }
323                QueryStep::Continue => {}
324            }
325        }
326    }
327
328    async fn request_current_consensus<Ret>(
329        &self,
330        method: String,
331        params: ApiRequestErased,
332    ) -> FederationResult<Ret>
333    where
334        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
335    {
336        self.request_with_strategy(
337            ThresholdConsensus::new(self.all_peers().to_num_peers()),
338            method,
339            params,
340        )
341        .await
342    }
343
344    async fn request_current_consensus_retry<Ret>(
345        &self,
346        method: String,
347        params: ApiRequestErased,
348    ) -> Ret
349    where
350        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
351    {
352        self.request_with_strategy_retry(
353            ThresholdConsensus::new(self.all_peers().to_num_peers()),
354            method,
355            params,
356        )
357        .await
358    }
359
360    async fn request_admin<Ret>(
361        &self,
362        method: &str,
363        params: ApiRequestErased,
364        auth: ApiAuth,
365    ) -> FederationResult<Ret>
366    where
367        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
368    {
369        let Some(self_peer_id) = self.self_peer() else {
370            return Err(FederationError::general(
371                method,
372                params,
373                anyhow::format_err!("Admin peer_id not set"),
374            ));
375        };
376
377        self.request_single_peer_federation(method.into(), params.with_auth(auth), self_peer_id)
378            .await
379    }
380
381    async fn request_admin_no_auth<Ret>(
382        &self,
383        method: &str,
384        params: ApiRequestErased,
385    ) -> FederationResult<Ret>
386    where
387        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
388    {
389        let Some(self_peer_id) = self.self_peer() else {
390            return Err(FederationError::general(
391                method,
392                params,
393                anyhow::format_err!("Admin peer_id not set"),
394            ));
395        };
396
397        self.request_single_peer_federation(method.into(), params, self_peer_id)
398            .await
399    }
400}
401
402#[apply(async_trait_maybe_send!)]
403impl<T: ?Sized> FederationApiExt for T where T: IRawFederationApi {}
404
405/// Trait marker for the module (non-global) endpoints
406pub trait IModuleFederationApi: IRawFederationApi {}
407
408dyn_newtype_define! {
409    #[derive(Clone)]
410    pub DynModuleApi(Arc<IModuleFederationApi>)
411}
412
413dyn_newtype_define! {
414    #[derive(Clone)]
415    pub DynGlobalApi(Arc<IGlobalFederationApi>)
416}
417
418impl AsRef<dyn IGlobalFederationApi + 'static> for DynGlobalApi {
419    fn as_ref(&self) -> &(dyn IGlobalFederationApi + 'static) {
420        self.inner.as_ref()
421    }
422}
423
424impl DynGlobalApi {
425    pub fn new(
426        connectors: ConnectorRegistry,
427        peers: BTreeMap<PeerId, SafeUrl>,
428        api_secret: Option<&str>,
429    ) -> anyhow::Result<Self> {
430        Ok(GlobalFederationApiWithCache::new(FederationApi::new(
431            connectors, peers, None, api_secret,
432        ))
433        .into())
434    }
435    pub fn new_admin(
436        connectors: ConnectorRegistry,
437        peer: PeerId,
438        url: SafeUrl,
439        api_secret: Option<&str>,
440    ) -> anyhow::Result<DynGlobalApi> {
441        Ok(GlobalFederationApiWithCache::new(FederationApi::new(
442            connectors,
443            [(peer, url)].into(),
444            Some(peer),
445            api_secret,
446        ))
447        .into())
448    }
449
450    pub fn new_admin_setup(connectors: ConnectorRegistry, url: SafeUrl) -> anyhow::Result<Self> {
451        // PeerIds are used only for informational purposes, but just in case, make a
452        // big number so it stands out
453        Self::new_admin(
454            connectors,
455            PeerId::from(1024),
456            url,
457            // Setup does not have api secrets yet
458            None,
459        )
460    }
461}
462
463/// The API for the global (non-module) endpoints
464#[apply(async_trait_maybe_send!)]
465pub trait IGlobalFederationApi: IRawFederationApi {
466    async fn submit_transaction(
467        &self,
468        tx: Transaction,
469    ) -> SerdeModuleEncoding<TransactionSubmissionOutcome>;
470
471    async fn await_block(
472        &self,
473        block_index: u64,
474        decoders: &ModuleDecoderRegistry,
475    ) -> anyhow::Result<SessionOutcome>;
476
477    async fn get_session_status(
478        &self,
479        block_index: u64,
480        decoders: &ModuleDecoderRegistry,
481        core_api_version: ApiVersion,
482        broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
483    ) -> anyhow::Result<SessionStatus>;
484
485    async fn session_count(&self) -> FederationResult<u64>;
486
487    async fn await_transaction(&self, txid: TransactionId) -> TransactionId;
488
489    async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()>;
490
491    async fn download_backup(
492        &self,
493        id: &secp256k1::PublicKey,
494    ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>>;
495
496    /// Sets the password used to decrypt the configs and authenticate
497    ///
498    /// Must be called first before any other calls to the API
499    async fn set_password(&self, auth: ApiAuth) -> FederationResult<()>;
500
501    async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus>;
502
503    async fn set_local_params(
504        &self,
505        name: String,
506        federation_name: Option<String>,
507        disable_base_fees: Option<bool>,
508        enabled_modules: Option<BTreeSet<ModuleKind>>,
509        federation_size: Option<u32>,
510        auth: ApiAuth,
511    ) -> FederationResult<String>;
512
513    async fn add_peer_connection_info(
514        &self,
515        info: String,
516        auth: ApiAuth,
517    ) -> FederationResult<String>;
518
519    /// Reset the peer setup codes during the federation setup process
520    async fn reset_peer_setup_codes(&self, auth: ApiAuth) -> FederationResult<()>;
521
522    /// Returns the setup code if `set_local_params` was already called
523    async fn get_setup_code(&self, auth: ApiAuth) -> FederationResult<Option<String>>;
524
525    /// Runs DKG, can only be called once after configs have been generated in
526    /// `get_consensus_config_gen_params`.  If DKG fails this returns a 500
527    /// error and config gen must be restarted.
528    async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()>;
529
530    /// Returns the status of the server
531    async fn status(&self) -> FederationResult<StatusResponse>;
532
533    /// Show an audit across all modules
534    async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary>;
535
536    /// Download the guardian config to back it up
537    async fn guardian_config_backup(&self, auth: ApiAuth)
538    -> FederationResult<GuardianConfigBackup>;
539
540    /// Check auth credentials
541    async fn auth(&self, auth: ApiAuth) -> FederationResult<()>;
542
543    async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()>;
544
545    /// Publish our signed API announcement to other guardians
546    async fn submit_api_announcement(
547        &self,
548        peer_id: PeerId,
549        announcement: SignedApiAnnouncement,
550    ) -> FederationResult<()>;
551
552    async fn api_announcements(
553        &self,
554        guardian: PeerId,
555    ) -> ServerResult<BTreeMap<PeerId, SignedApiAnnouncement>>;
556
557    async fn sign_api_announcement(
558        &self,
559        api_url: SafeUrl,
560        auth: ApiAuth,
561    ) -> FederationResult<SignedApiAnnouncement>;
562
563    /// Publish our signed guardian metadata to other guardians
564    async fn submit_guardian_metadata(
565        &self,
566        peer_id: PeerId,
567        metadata: SignedGuardianMetadata,
568    ) -> FederationResult<()>;
569
570    async fn guardian_metadata(
571        &self,
572        guardian: PeerId,
573    ) -> ServerResult<BTreeMap<PeerId, SignedGuardianMetadata>>;
574
575    async fn sign_guardian_metadata(
576        &self,
577        metadata: fedimint_core::net::guardian_metadata::GuardianMetadata,
578        auth: ApiAuth,
579    ) -> FederationResult<SignedGuardianMetadata>;
580
581    async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()>;
582
583    /// Returns the fedimintd version a peer is running
584    async fn fedimintd_version(&self, peer_id: PeerId) -> ServerResult<String>;
585
586    /// Fetch the backup statistics from the federation (admin endpoint)
587    async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics>;
588
589    /// Get the invite code for the federation guardian.
590    /// For instance, useful after DKG
591    async fn get_invite_code(&self, guardian: PeerId) -> ServerResult<InviteCode>;
592
593    /// Change the password used to encrypt the configs and for guardian
594    /// authentication
595    async fn change_password(&self, auth: ApiAuth, new_password: &str) -> FederationResult<()>;
596
597    /// Returns the chain ID (bitcoin block hash at height 1) from the
598    /// federation
599    async fn chain_id(&self) -> FederationResult<ChainId>;
600}
601
602pub fn deserialize_outcome<R>(
603    outcome: &SerdeOutputOutcome,
604    module_decoder: &Decoder,
605) -> OutputOutcomeResult<R>
606where
607    R: OutputOutcome + MaybeSend,
608{
609    let dyn_outcome = outcome
610        .try_into_inner_known_module_kind(module_decoder)
611        .map_err(|e| OutputOutcomeError::ResponseDeserialization(e.into()))?;
612
613    let source_instance = dyn_outcome.module_instance_id();
614
615    dyn_outcome.as_any().downcast_ref().cloned().ok_or_else(|| {
616        let target_type = std::any::type_name::<R>();
617        OutputOutcomeError::ResponseDeserialization(anyhow!(
618            "Could not downcast output outcome with instance id {source_instance} to {target_type}"
619        ))
620    })
621}
622
623/// Federation API client
624///
625/// The core underlying object used to make API requests to a federation.
626///
627/// It has an `connectors` handle to actually making outgoing connections
628/// to given URLs, and knows which peers there are and what URLs to connect to
629/// to reach them.
630// TODO: As it is currently it mixes a bit the role of connecting to "peers" with
631// general purpose outgoing connection. Not a big deal, but might need refactor
632// in the future.
633#[derive(Clone, Debug)]
634pub struct FederationApi {
635    /// Map of known URLs to use to connect to peers
636    peers: BTreeMap<PeerId, SafeUrl>,
637    /// List of peer ids, redundant to avoid collecting all the time
638    peers_keys: BTreeSet<PeerId>,
639    /// Our own [`PeerId`] to use when making admin apis
640    admin_id: Option<PeerId>,
641    /// Set when this API is used to communicate with a module
642    module_id: Option<ModuleInstanceId>,
643    /// Api secret of the federation
644    api_secret: Option<String>,
645    /// Connection pool
646    connection_pool: ConnectionPool<dyn IGuardianConnection>,
647}
648
649impl FederationApi {
650    pub fn new(
651        connectors: ConnectorRegistry,
652        peers: BTreeMap<PeerId, SafeUrl>,
653        admin_peer_id: Option<PeerId>,
654        api_secret: Option<&str>,
655    ) -> Self {
656        Self {
657            peers_keys: peers.keys().copied().collect(),
658            peers,
659            admin_id: admin_peer_id,
660            module_id: None,
661            api_secret: api_secret.map(ToOwned::to_owned),
662            connection_pool: ConnectionPool::new(connectors),
663        }
664    }
665
666    async fn get_or_create_connection(
667        &self,
668        url: &SafeUrl,
669        api_secret: Option<&str>,
670    ) -> ServerResult<DynGuaridianConnection> {
671        self.connection_pool
672            .get_or_create_connection(url, api_secret, |url, api_secret, connectors| async move {
673                let conn = connectors
674                    .connect_guardian(&url, api_secret.as_deref())
675                    .await?;
676                Ok(conn)
677            })
678            .await
679    }
680
681    async fn request(
682        &self,
683        peer: PeerId,
684        method: ApiMethod,
685        request: ApiRequestErased,
686    ) -> ServerResult<Value> {
687        trace!(target: LOG_CLIENT_NET_API, %peer, %method, "Api request");
688        let url = self
689            .peers
690            .get(&peer)
691            .ok_or_else(|| ServerError::InvalidPeerId { peer_id: peer })?;
692        let conn = self
693            .get_or_create_connection(url, self.api_secret.as_deref())
694            .await
695            .context("Failed to connect to peer")
696            .map_err(ServerError::Connection)?;
697
698        let method_str = method.to_string();
699        let peer_str = peer.to_string();
700        let timer = CLIENT_API_REQUEST_DURATION_SECONDS
701            .with_label_values(&[&method_str, &peer_str])
702            .start_timer_ext();
703
704        let res = conn.request(method.clone(), request).await;
705
706        timer.observe_duration();
707
708        let result_label = if res.is_ok() { "success" } else { "error" }.to_string();
709        CLIENT_API_REQUESTS_TOTAL
710            .with_label_values(&[&method_str, &peer_str, &result_label])
711            .inc();
712
713        trace!(target: LOG_CLIENT_NET_API, ?method, res_ok = res.is_ok(), "Api response");
714
715        res
716    }
717
718    /// Get receiver for changes in the active connections
719    ///
720    /// This allows real-time monitoring of connection status.
721    pub fn get_active_connection_receiver(&self) -> watch::Receiver<BTreeSet<SafeUrl>> {
722        self.connection_pool.get_active_connection_receiver()
723    }
724}
725
726impl IModuleFederationApi for FederationApi {}
727
728#[apply(async_trait_maybe_send!)]
729impl IRawFederationApi for FederationApi {
730    fn all_peers(&self) -> &BTreeSet<PeerId> {
731        &self.peers_keys
732    }
733
734    fn self_peer(&self) -> Option<PeerId> {
735        self.admin_id
736    }
737
738    fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
739        FederationApi {
740            api_secret: self.api_secret.clone(),
741            peers: self.peers.clone(),
742            peers_keys: self.peers_keys.clone(),
743            admin_id: self.admin_id,
744            module_id: Some(id),
745            connection_pool: self.connection_pool.clone(),
746        }
747        .into()
748    }
749
750    #[instrument(
751        target = LOG_CLIENT_NET_API,
752        skip_all,
753        fields(
754            peer_id = %peer_id,
755            method = %method,
756            params = %params.params,
757        )
758    )]
759    async fn request_raw(
760        &self,
761        peer_id: PeerId,
762        method: &str,
763        params: &ApiRequestErased,
764    ) -> ServerResult<Value> {
765        let method = match self.module_id {
766            Some(module_id) => ApiMethod::Module(module_id, method.to_string()),
767            None => ApiMethod::Core(method.to_string()),
768        };
769
770        self.request(peer_id, method, params.clone()).await
771    }
772
773    fn connection_status_stream(&self) -> BoxStream<'static, BTreeMap<PeerId, bool>> {
774        let peers = self.peers.clone();
775
776        WatchStream::new(self.connection_pool.get_active_connection_receiver())
777            .map(move |active_urls| {
778                peers
779                    .iter()
780                    .map(|(peer_id, url)| (*peer_id, active_urls.contains(url)))
781                    .collect()
782            })
783            .boxed()
784    }
785    async fn wait_for_initialized_connections(&self) {
786        self.connection_pool
787            .wait_for_initialized_connections()
788            .await;
789    }
790
791    async fn get_peer_connection(&self, peer_id: PeerId) -> ServerResult<DynGuaridianConnection> {
792        let url = self
793            .peers
794            .get(&peer_id)
795            .ok_or_else(|| ServerError::InvalidPeerId { peer_id })?;
796        self.get_or_create_connection(url, self.api_secret.as_deref())
797            .await
798    }
799}
800
801/// The status of a server, including how it views its peers
802#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
803pub struct LegacyFederationStatus {
804    pub session_count: u64,
805    pub status_by_peer: HashMap<PeerId, LegacyPeerStatus>,
806    pub peers_online: u64,
807    pub peers_offline: u64,
808    /// This should always be 0 if everything is okay, so a monitoring tool
809    /// should generate an alert if this is not the case.
810    pub peers_flagged: u64,
811    pub scheduled_shutdown: Option<u64>,
812}
813
814#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
815pub struct LegacyPeerStatus {
816    pub last_contribution: Option<u64>,
817    pub connection_status: LegacyP2PConnectionStatus,
818    /// Indicates that this peer needs attention from the operator since
819    /// it has not contributed to the consensus in a long time
820    pub flagged: bool,
821}
822
823#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
824#[serde(rename_all = "snake_case")]
825pub enum LegacyP2PConnectionStatus {
826    #[default]
827    Disconnected,
828    Connected,
829}
830
831#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
832pub struct StatusResponse {
833    pub server: ServerStatusLegacy,
834    pub federation: Option<LegacyFederationStatus>,
835}
836
837#[cfg(test)]
838mod tests;