fedimint_api_client/api/
mod.rs

1mod error;
2pub mod global_api;
3
4use std::collections::{BTreeMap, BTreeSet, HashMap};
5use std::fmt::Debug;
6use std::future::pending;
7use std::pin::Pin;
8use std::result;
9use std::sync::Arc;
10
11use anyhow::{Context, anyhow};
12use bitcoin::secp256k1;
13pub use error::{FederationError, OutputOutcomeError};
14pub use fedimint_connectors::ServerResult;
15pub use fedimint_connectors::error::ServerError;
16use fedimint_connectors::{
17    ConnectionPool, ConnectorRegistry, DynGuaridianConnection, IGuardianConnection,
18};
19use fedimint_core::admin_client::{GuardianConfigBackup, ServerStatusLegacy, SetupStatus};
20use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
21use fedimint_core::core::backup::SignedBackupRequest;
22use fedimint_core::core::{Decoder, DynOutputOutcome, ModuleInstanceId, ModuleKind, OutputOutcome};
23use fedimint_core::encoding::{Decodable, Encodable};
24use fedimint_core::invite_code::InviteCode;
25use fedimint_core::module::audit::AuditSummary;
26use fedimint_core::module::registry::ModuleDecoderRegistry;
27use fedimint_core::module::{
28    ApiAuth, ApiMethod, ApiRequestErased, ApiVersion, SerdeModuleEncoding,
29};
30use fedimint_core::net::api_announcement::SignedApiAnnouncement;
31use fedimint_core::session_outcome::{SessionOutcome, SessionStatus};
32use fedimint_core::task::{MaybeSend, MaybeSync};
33use fedimint_core::transaction::{Transaction, TransactionSubmissionOutcome};
34use fedimint_core::util::backoff_util::api_networking_backoff;
35use fedimint_core::util::{FmtCompact as _, SafeUrl};
36use fedimint_core::{
37    NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send, dyn_newtype_define, util,
38};
39use fedimint_logging::LOG_CLIENT_NET_API;
40use futures::stream::{BoxStream, FuturesUnordered};
41use futures::{Future, StreamExt};
42use global_api::with_cache::GlobalFederationApiWithCache;
43use jsonrpsee_core::DeserializeOwned;
44use serde::{Deserialize, Serialize};
45use serde_json::Value;
46use tokio::sync::watch;
47use tokio_stream::wrappers::WatchStream;
48use tracing::{debug, instrument, trace, warn};
49
50use crate::query::{QueryStep, QueryStrategy, ThresholdConsensus};
51
52pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2: ApiVersion = ApiVersion::new(0, 5);
53
54pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS: ApiVersion =
55    ApiVersion { major: 0, minor: 1 };
56
57pub const VERSION_THAT_INTRODUCED_AWAIT_OUTPUTS_OUTCOMES: ApiVersion = ApiVersion::new(0, 8);
58pub type FederationResult<T> = Result<T, FederationError>;
59pub type SerdeOutputOutcome = SerdeModuleEncoding<DynOutputOutcome>;
60
61pub type OutputOutcomeResult<O> = result::Result<O, OutputOutcomeError>;
62
63/// Set of api versions for each component (core + modules)
64///
65/// E.g. result of federated common api versions discovery.
66#[derive(Debug, Clone, Serialize, Deserialize, Encodable, Decodable)]
67pub struct ApiVersionSet {
68    pub core: ApiVersion,
69    pub modules: BTreeMap<ModuleInstanceId, ApiVersion>,
70}
71
72/// An API (module or global) that can query a federation
73#[apply(async_trait_maybe_send!)]
74pub trait IRawFederationApi: Debug + MaybeSend + MaybeSync {
75    /// List of all federation peers for the purpose of iterating each peer
76    /// in the federation.
77    ///
78    /// The underlying implementation is responsible for knowing how many
79    /// and `PeerId`s of each. The caller of this interface most probably
80    /// have some idea as well, but passing this set across every
81    /// API call to the federation would be inconvenient.
82    fn all_peers(&self) -> &BTreeSet<PeerId>;
83
84    /// `PeerId` of the Guardian node, if set
85    ///
86    /// This is for using Client in a "Admin" mode, making authenticated
87    /// calls to own `fedimintd` instance.
88    fn self_peer(&self) -> Option<PeerId>;
89
90    fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi;
91
92    /// Make request to a specific federation peer by `peer_id`
93    async fn request_raw(
94        &self,
95        peer_id: PeerId,
96        method: &str,
97        params: &ApiRequestErased,
98    ) -> ServerResult<Value>;
99
100    /// Returns a stream of connection status for each peer
101    ///
102    /// The stream emits a new value whenever the connection status changes.
103    fn connection_status_stream(&self) -> BoxStream<'static, BTreeMap<PeerId, bool>>;
104    /// Wait for some connections being initialized
105    ///
106    /// This is useful to avoid initializing networking by
107    /// tasks that are not high priority.
108    async fn wait_for_initialized_connections(&self);
109}
110
111/// An extension trait allowing to making federation-wide API call on top
112/// [`IRawFederationApi`].
113#[apply(async_trait_maybe_send!)]
114pub trait FederationApiExt: IRawFederationApi {
115    async fn request_single_peer<Ret>(
116        &self,
117        method: String,
118        params: ApiRequestErased,
119        peer: PeerId,
120    ) -> ServerResult<Ret>
121    where
122        Ret: DeserializeOwned,
123    {
124        self.request_raw(peer, &method, &params)
125            .await
126            .and_then(|v| {
127                serde_json::from_value(v)
128                    .map_err(|e| ServerError::ResponseDeserialization(e.into()))
129            })
130    }
131
132    async fn request_single_peer_federation<FedRet>(
133        &self,
134        method: String,
135        params: ApiRequestErased,
136        peer_id: PeerId,
137    ) -> FederationResult<FedRet>
138    where
139        FedRet: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
140    {
141        self.request_raw(peer_id, &method, &params)
142            .await
143            .and_then(|v| {
144                serde_json::from_value(v)
145                    .map_err(|e| ServerError::ResponseDeserialization(e.into()))
146            })
147            .map_err(|e| error::FederationError::new_one_peer(peer_id, method, params, e))
148    }
149
150    /// Make an aggregate request to federation, using `strategy` to logically
151    /// merge the responses.
152    #[instrument(target = LOG_CLIENT_NET_API, skip_all, fields(method=method))]
153    async fn request_with_strategy<PR: DeserializeOwned, FR: Debug>(
154        &self,
155        mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
156        method: String,
157        params: ApiRequestErased,
158    ) -> FederationResult<FR> {
159        // NOTE: `FuturesUnorderded` is a footgun, but all we do here is polling
160        // completed results from it and we don't do any `await`s when
161        // processing them, it should be totally OK.
162        #[cfg(not(target_family = "wasm"))]
163        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
164        #[cfg(target_family = "wasm")]
165        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
166
167        for peer in self.all_peers() {
168            futures.push(Box::pin({
169                let method = &method;
170                let params = &params;
171                async move {
172                    let result = self
173                        .request_single_peer(method.clone(), params.clone(), *peer)
174                        .await;
175
176                    (*peer, result)
177                }
178            }));
179        }
180
181        let mut peer_errors = BTreeMap::new();
182        let peer_error_threshold = self.all_peers().to_num_peers().one_honest();
183
184        loop {
185            let (peer, result) = futures
186                .next()
187                .await
188                .expect("Query strategy ran out of peers to query without returning a result");
189
190            match result {
191                Ok(response) => match strategy.process(peer, response) {
192                    QueryStep::Retry(peers) => {
193                        for peer in peers {
194                            futures.push(Box::pin({
195                                let method = &method;
196                                let params = &params;
197                                async move {
198                                    let result = self
199                                        .request_single_peer(method.clone(), params.clone(), peer)
200                                        .await;
201
202                                    (peer, result)
203                                }
204                            }));
205                        }
206                    }
207                    QueryStep::Success(response) => return Ok(response),
208                    QueryStep::Failure(e) => {
209                        peer_errors.insert(peer, e);
210                    }
211                    QueryStep::Continue => {}
212                },
213                Err(e) => {
214                    e.report_if_unusual(peer, "RequestWithStrategy");
215                    peer_errors.insert(peer, e);
216                }
217            }
218
219            if peer_errors.len() == peer_error_threshold {
220                return Err(FederationError::peer_errors(
221                    method.clone(),
222                    params.params.clone(),
223                    peer_errors,
224                ));
225            }
226        }
227    }
228
229    #[instrument(target = LOG_CLIENT_NET_API, level = "debug", skip(self, strategy))]
230    async fn request_with_strategy_retry<PR: DeserializeOwned + MaybeSend, FR: Debug>(
231        &self,
232        mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
233        method: String,
234        params: ApiRequestErased,
235    ) -> FR {
236        // NOTE: `FuturesUnorderded` is a footgun, but all we do here is polling
237        // completed results from it and we don't do any `await`s when
238        // processing them, it should be totally OK.
239        #[cfg(not(target_family = "wasm"))]
240        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
241        #[cfg(target_family = "wasm")]
242        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
243
244        for peer in self.all_peers() {
245            futures.push(Box::pin({
246                let method = &method;
247                let params = &params;
248                async move {
249                    let response = util::retry(
250                        format!("api-request-{method}-{peer}"),
251                        api_networking_backoff(),
252                        || async {
253                            self.request_single_peer(method.clone(), params.clone(), *peer)
254                                .await
255                                .inspect_err(|e| {
256                                    e.report_if_unusual(*peer, "QueryWithStrategyRetry");
257                                })
258                                .map_err(|e| anyhow!(e.to_string()))
259                        },
260                    )
261                    .await
262                    .expect("Number of retries has no limit");
263
264                    (*peer, response)
265                }
266            }));
267        }
268
269        loop {
270            let (peer, response) = match futures.next().await {
271                Some(t) => t,
272                None => pending().await,
273            };
274
275            match strategy.process(peer, response) {
276                QueryStep::Retry(peers) => {
277                    for peer in peers {
278                        futures.push(Box::pin({
279                            let method = &method;
280                            let params = &params;
281                            async move {
282                                let response = util::retry(
283                                    format!("api-request-{method}-{peer}"),
284                                    api_networking_backoff(),
285                                    || async {
286                                        self.request_single_peer(
287                                            method.clone(),
288                                            params.clone(),
289                                            peer,
290                                        )
291                                        .await
292                                        .inspect_err(|err| {
293                                            if err.is_unusual() {
294                                                debug!(target: LOG_CLIENT_NET_API, err = %err.fmt_compact(), "Unusual peer error");
295                                            }
296                                        })
297                                        .map_err(|e| anyhow!(e.to_string()))
298                                    },
299                                )
300                                .await
301                                .expect("Number of retries has no limit");
302
303                                (peer, response)
304                            }
305                        }));
306                    }
307                }
308                QueryStep::Success(response) => return response,
309                QueryStep::Failure(e) => {
310                    warn!(target: LOG_CLIENT_NET_API, "Query strategy returned non-retryable failure for peer {peer}: {e}");
311                }
312                QueryStep::Continue => {}
313            }
314        }
315    }
316
317    async fn request_current_consensus<Ret>(
318        &self,
319        method: String,
320        params: ApiRequestErased,
321    ) -> FederationResult<Ret>
322    where
323        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
324    {
325        self.request_with_strategy(
326            ThresholdConsensus::new(self.all_peers().to_num_peers()),
327            method,
328            params,
329        )
330        .await
331    }
332
333    async fn request_current_consensus_retry<Ret>(
334        &self,
335        method: String,
336        params: ApiRequestErased,
337    ) -> Ret
338    where
339        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
340    {
341        self.request_with_strategy_retry(
342            ThresholdConsensus::new(self.all_peers().to_num_peers()),
343            method,
344            params,
345        )
346        .await
347    }
348
349    async fn request_admin<Ret>(
350        &self,
351        method: &str,
352        params: ApiRequestErased,
353        auth: ApiAuth,
354    ) -> FederationResult<Ret>
355    where
356        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
357    {
358        let Some(self_peer_id) = self.self_peer() else {
359            return Err(FederationError::general(
360                method,
361                params,
362                anyhow::format_err!("Admin peer_id not set"),
363            ));
364        };
365
366        self.request_single_peer_federation(method.into(), params.with_auth(auth), self_peer_id)
367            .await
368    }
369
370    async fn request_admin_no_auth<Ret>(
371        &self,
372        method: &str,
373        params: ApiRequestErased,
374    ) -> FederationResult<Ret>
375    where
376        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
377    {
378        let Some(self_peer_id) = self.self_peer() else {
379            return Err(FederationError::general(
380                method,
381                params,
382                anyhow::format_err!("Admin peer_id not set"),
383            ));
384        };
385
386        self.request_single_peer_federation(method.into(), params, self_peer_id)
387            .await
388    }
389}
390
391#[apply(async_trait_maybe_send!)]
392impl<T: ?Sized> FederationApiExt for T where T: IRawFederationApi {}
393
394/// Trait marker for the module (non-global) endpoints
395pub trait IModuleFederationApi: IRawFederationApi {}
396
397dyn_newtype_define! {
398    #[derive(Clone)]
399    pub DynModuleApi(Arc<IModuleFederationApi>)
400}
401
402dyn_newtype_define! {
403    #[derive(Clone)]
404    pub DynGlobalApi(Arc<IGlobalFederationApi>)
405}
406
407impl AsRef<dyn IGlobalFederationApi + 'static> for DynGlobalApi {
408    fn as_ref(&self) -> &(dyn IGlobalFederationApi + 'static) {
409        self.inner.as_ref()
410    }
411}
412
413impl DynGlobalApi {
414    pub fn new(
415        connectors: ConnectorRegistry,
416        peers: BTreeMap<PeerId, SafeUrl>,
417        api_secret: Option<&str>,
418    ) -> anyhow::Result<Self> {
419        Ok(GlobalFederationApiWithCache::new(FederationApi::new(
420            connectors, peers, None, api_secret,
421        ))
422        .into())
423    }
424    pub fn new_admin(
425        connectors: ConnectorRegistry,
426        peer: PeerId,
427        url: SafeUrl,
428        api_secret: Option<&str>,
429    ) -> anyhow::Result<DynGlobalApi> {
430        Ok(GlobalFederationApiWithCache::new(FederationApi::new(
431            connectors,
432            [(peer, url)].into(),
433            Some(peer),
434            api_secret,
435        ))
436        .into())
437    }
438
439    pub fn new_admin_setup(connectors: ConnectorRegistry, url: SafeUrl) -> anyhow::Result<Self> {
440        // PeerIds are used only for informational purposes, but just in case, make a
441        // big number so it stands out
442        Self::new_admin(
443            connectors,
444            PeerId::from(1024),
445            url,
446            // Setup does not have api secrets yet
447            None,
448        )
449    }
450}
451
452/// The API for the global (non-module) endpoints
453#[apply(async_trait_maybe_send!)]
454pub trait IGlobalFederationApi: IRawFederationApi {
455    async fn submit_transaction(
456        &self,
457        tx: Transaction,
458    ) -> SerdeModuleEncoding<TransactionSubmissionOutcome>;
459
460    async fn await_block(
461        &self,
462        block_index: u64,
463        decoders: &ModuleDecoderRegistry,
464    ) -> anyhow::Result<SessionOutcome>;
465
466    async fn get_session_status(
467        &self,
468        block_index: u64,
469        decoders: &ModuleDecoderRegistry,
470        core_api_version: ApiVersion,
471        broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
472    ) -> anyhow::Result<SessionStatus>;
473
474    async fn session_count(&self) -> FederationResult<u64>;
475
476    async fn await_transaction(&self, txid: TransactionId) -> TransactionId;
477
478    async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()>;
479
480    async fn download_backup(
481        &self,
482        id: &secp256k1::PublicKey,
483    ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>>;
484
485    /// Sets the password used to decrypt the configs and authenticate
486    ///
487    /// Must be called first before any other calls to the API
488    async fn set_password(&self, auth: ApiAuth) -> FederationResult<()>;
489
490    async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus>;
491
492    async fn set_local_params(
493        &self,
494        name: String,
495        federation_name: Option<String>,
496        disable_base_fees: Option<bool>,
497        enabled_modules: Option<BTreeSet<ModuleKind>>,
498        auth: ApiAuth,
499    ) -> FederationResult<String>;
500
501    async fn add_peer_connection_info(
502        &self,
503        info: String,
504        auth: ApiAuth,
505    ) -> FederationResult<String>;
506
507    /// Reset the peer setup codes during the federation setup process
508    async fn reset_peer_setup_codes(&self, auth: ApiAuth) -> FederationResult<()>;
509
510    /// Returns the setup code if `set_local_params` was already called
511    async fn get_setup_code(&self, auth: ApiAuth) -> FederationResult<Option<String>>;
512
513    /// Runs DKG, can only be called once after configs have been generated in
514    /// `get_consensus_config_gen_params`.  If DKG fails this returns a 500
515    /// error and config gen must be restarted.
516    async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()>;
517
518    /// Returns the status of the server
519    async fn status(&self) -> FederationResult<StatusResponse>;
520
521    /// Show an audit across all modules
522    async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary>;
523
524    /// Download the guardian config to back it up
525    async fn guardian_config_backup(&self, auth: ApiAuth)
526    -> FederationResult<GuardianConfigBackup>;
527
528    /// Check auth credentials
529    async fn auth(&self, auth: ApiAuth) -> FederationResult<()>;
530
531    async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()>;
532
533    /// Publish our signed API announcement to other guardians
534    async fn submit_api_announcement(
535        &self,
536        peer_id: PeerId,
537        announcement: SignedApiAnnouncement,
538    ) -> FederationResult<()>;
539
540    async fn api_announcements(
541        &self,
542        guardian: PeerId,
543    ) -> ServerResult<BTreeMap<PeerId, SignedApiAnnouncement>>;
544
545    async fn sign_api_announcement(
546        &self,
547        api_url: SafeUrl,
548        auth: ApiAuth,
549    ) -> FederationResult<SignedApiAnnouncement>;
550
551    async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()>;
552
553    /// Returns the fedimintd version a peer is running
554    async fn fedimintd_version(&self, peer_id: PeerId) -> ServerResult<String>;
555
556    /// Fetch the backup statistics from the federation (admin endpoint)
557    async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics>;
558
559    /// Get the invite code for the federation guardian.
560    /// For instance, useful after DKG
561    async fn get_invite_code(&self, guardian: PeerId) -> ServerResult<InviteCode>;
562
563    /// Change the password used to encrypt the configs and for guardian
564    /// authentication
565    async fn change_password(&self, auth: ApiAuth, new_password: &str) -> FederationResult<()>;
566}
567
568pub fn deserialize_outcome<R>(
569    outcome: &SerdeOutputOutcome,
570    module_decoder: &Decoder,
571) -> OutputOutcomeResult<R>
572where
573    R: OutputOutcome + MaybeSend,
574{
575    let dyn_outcome = outcome
576        .try_into_inner_known_module_kind(module_decoder)
577        .map_err(|e| OutputOutcomeError::ResponseDeserialization(e.into()))?;
578
579    let source_instance = dyn_outcome.module_instance_id();
580
581    dyn_outcome.as_any().downcast_ref().cloned().ok_or_else(|| {
582        let target_type = std::any::type_name::<R>();
583        OutputOutcomeError::ResponseDeserialization(anyhow!(
584            "Could not downcast output outcome with instance id {source_instance} to {target_type}"
585        ))
586    })
587}
588
589/// Federation API client
590///
591/// The core underlying object used to make API requests to a federation.
592///
593/// It has an `connectors` handle to actually making outgoing connections
594/// to given URLs, and knows which peers there are and what URLs to connect to
595/// to reach them.
596// TODO: As it is currently it mixes a bit the role of connecting to "peers" with
597// general purpose outgoing connection. Not a big deal, but might need refactor
598// in the future.
599#[derive(Clone, Debug)]
600pub struct FederationApi {
601    /// Map of known URLs to use to connect to peers
602    peers: BTreeMap<PeerId, SafeUrl>,
603    /// List of peer ids, redundant to avoid collecting all the time
604    peers_keys: BTreeSet<PeerId>,
605    /// Our own [`PeerId`] to use when making admin apis
606    admin_id: Option<PeerId>,
607    /// Set when this API is used to communicate with a module
608    module_id: Option<ModuleInstanceId>,
609    /// Api secret of the federation
610    api_secret: Option<String>,
611    /// Connection pool
612    connection_pool: ConnectionPool<dyn IGuardianConnection>,
613}
614
615impl FederationApi {
616    pub fn new(
617        connectors: ConnectorRegistry,
618        peers: BTreeMap<PeerId, SafeUrl>,
619        admin_peer_id: Option<PeerId>,
620        api_secret: Option<&str>,
621    ) -> Self {
622        Self {
623            peers_keys: peers.keys().copied().collect(),
624            peers,
625            admin_id: admin_peer_id,
626            module_id: None,
627            api_secret: api_secret.map(ToOwned::to_owned),
628            connection_pool: ConnectionPool::new(connectors),
629        }
630    }
631
632    async fn get_or_create_connection(
633        &self,
634        url: &SafeUrl,
635        api_secret: Option<&str>,
636    ) -> ServerResult<DynGuaridianConnection> {
637        self.connection_pool
638            .get_or_create_connection(url, api_secret, |url, api_secret, connectors| async move {
639                let conn = connectors
640                    .connect_guardian(&url, api_secret.as_deref())
641                    .await?;
642                Ok(conn)
643            })
644            .await
645    }
646
647    async fn request(
648        &self,
649        peer: PeerId,
650        method: ApiMethod,
651        request: ApiRequestErased,
652    ) -> ServerResult<Value> {
653        trace!(target: LOG_CLIENT_NET_API, %peer, %method, "Api request");
654        let url = self
655            .peers
656            .get(&peer)
657            .ok_or_else(|| ServerError::InvalidPeerId { peer_id: peer })?;
658        let conn = self
659            .get_or_create_connection(url, self.api_secret.as_deref())
660            .await
661            .context("Failed to connect to peer")
662            .map_err(ServerError::Connection)?;
663        let res = conn.request(method.clone(), request).await;
664
665        trace!(target: LOG_CLIENT_NET_API, ?method, res_ok = res.is_ok(), "Api response");
666
667        res
668    }
669
670    /// Get receiver for changes in the active connections
671    ///
672    /// This allows real-time monitoring of connection status.
673    pub fn get_active_connection_receiver(&self) -> watch::Receiver<BTreeSet<SafeUrl>> {
674        self.connection_pool.get_active_connection_receiver()
675    }
676}
677
678impl IModuleFederationApi for FederationApi {}
679
680#[apply(async_trait_maybe_send!)]
681impl IRawFederationApi for FederationApi {
682    fn all_peers(&self) -> &BTreeSet<PeerId> {
683        &self.peers_keys
684    }
685
686    fn self_peer(&self) -> Option<PeerId> {
687        self.admin_id
688    }
689
690    fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
691        FederationApi {
692            api_secret: self.api_secret.clone(),
693            peers: self.peers.clone(),
694            peers_keys: self.peers_keys.clone(),
695            admin_id: self.admin_id,
696            module_id: Some(id),
697            connection_pool: self.connection_pool.clone(),
698        }
699        .into()
700    }
701
702    #[instrument(
703        target = LOG_CLIENT_NET_API,
704        skip_all,
705        fields(
706            peer_id = %peer_id,
707            method = %method,
708            params = %params.params,
709        )
710    )]
711    async fn request_raw(
712        &self,
713        peer_id: PeerId,
714        method: &str,
715        params: &ApiRequestErased,
716    ) -> ServerResult<Value> {
717        let method = match self.module_id {
718            Some(module_id) => ApiMethod::Module(module_id, method.to_string()),
719            None => ApiMethod::Core(method.to_string()),
720        };
721
722        self.request(peer_id, method, params.clone()).await
723    }
724
725    fn connection_status_stream(&self) -> BoxStream<'static, BTreeMap<PeerId, bool>> {
726        let peers = self.peers.clone();
727
728        WatchStream::new(self.connection_pool.get_active_connection_receiver())
729            .map(move |active_urls| {
730                peers
731                    .iter()
732                    .map(|(peer_id, url)| (*peer_id, active_urls.contains(url)))
733                    .collect()
734            })
735            .boxed()
736    }
737    async fn wait_for_initialized_connections(&self) {
738        self.connection_pool
739            .wait_for_initialized_connections()
740            .await;
741    }
742}
743
744/// The status of a server, including how it views its peers
745#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
746pub struct LegacyFederationStatus {
747    pub session_count: u64,
748    pub status_by_peer: HashMap<PeerId, LegacyPeerStatus>,
749    pub peers_online: u64,
750    pub peers_offline: u64,
751    /// This should always be 0 if everything is okay, so a monitoring tool
752    /// should generate an alert if this is not the case.
753    pub peers_flagged: u64,
754    pub scheduled_shutdown: Option<u64>,
755}
756
757#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
758pub struct LegacyPeerStatus {
759    pub last_contribution: Option<u64>,
760    pub connection_status: LegacyP2PConnectionStatus,
761    /// Indicates that this peer needs attention from the operator since
762    /// it has not contributed to the consensus in a long time
763    pub flagged: bool,
764}
765
766#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
767#[serde(rename_all = "snake_case")]
768pub enum LegacyP2PConnectionStatus {
769    #[default]
770    Disconnected,
771    Connected,
772}
773
774#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
775pub struct StatusResponse {
776    pub server: ServerStatusLegacy,
777    pub federation: Option<LegacyFederationStatus>,
778}
779
780#[cfg(test)]
781mod tests;