fedimint_api_client/api/
mod.rs

1mod error;
2pub mod global_api;
3pub mod net;
4
5use std::collections::{BTreeMap, BTreeSet, HashMap};
6use std::fmt::Debug;
7use std::future::pending;
8use std::pin::Pin;
9use std::result;
10use std::sync::Arc;
11
12use anyhow::{Context, anyhow};
13use bitcoin::secp256k1;
14pub use error::{FederationError, OutputOutcomeError};
15pub use fedimint_connectors::ServerResult;
16pub use fedimint_connectors::error::ServerError;
17use fedimint_connectors::{
18    ConnectionPool, ConnectorRegistry, DynGuaridianConnection, IGuardianConnection,
19};
20use fedimint_core::admin_client::{GuardianConfigBackup, ServerStatusLegacy, SetupStatus};
21use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
22use fedimint_core::core::backup::SignedBackupRequest;
23use fedimint_core::core::{Decoder, DynOutputOutcome, ModuleInstanceId, ModuleKind, OutputOutcome};
24use fedimint_core::encoding::{Decodable, Encodable};
25use fedimint_core::invite_code::InviteCode;
26use fedimint_core::module::audit::AuditSummary;
27use fedimint_core::module::registry::ModuleDecoderRegistry;
28use fedimint_core::module::{
29    ApiAuth, ApiMethod, ApiRequestErased, ApiVersion, SerdeModuleEncoding,
30};
31use fedimint_core::net::api_announcement::SignedApiAnnouncement;
32use fedimint_core::session_outcome::{SessionOutcome, SessionStatus};
33use fedimint_core::task::{MaybeSend, MaybeSync};
34use fedimint_core::transaction::{Transaction, TransactionSubmissionOutcome};
35use fedimint_core::util::backoff_util::api_networking_backoff;
36use fedimint_core::util::{FmtCompact as _, SafeUrl};
37use fedimint_core::{
38    NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send, dyn_newtype_define, util,
39};
40use fedimint_logging::LOG_CLIENT_NET_API;
41use futures::stream::{BoxStream, FuturesUnordered};
42use futures::{Future, StreamExt};
43use global_api::with_cache::GlobalFederationApiWithCache;
44use jsonrpsee_core::DeserializeOwned;
45use serde::{Deserialize, Serialize};
46use serde_json::Value;
47use tokio::sync::watch;
48use tokio_stream::wrappers::WatchStream;
49use tracing::{debug, instrument, trace, warn};
50
51use crate::query::{QueryStep, QueryStrategy, ThresholdConsensus};
52
53pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2: ApiVersion = ApiVersion::new(0, 5);
54
55pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS: ApiVersion =
56    ApiVersion { major: 0, minor: 1 };
57
58pub const VERSION_THAT_INTRODUCED_AWAIT_OUTPUTS_OUTCOMES: ApiVersion = ApiVersion::new(0, 8);
59pub type FederationResult<T> = Result<T, FederationError>;
60pub type SerdeOutputOutcome = SerdeModuleEncoding<DynOutputOutcome>;
61
62pub type OutputOutcomeResult<O> = result::Result<O, OutputOutcomeError>;
63
64/// Set of api versions for each component (core + modules)
65///
66/// E.g. result of federated common api versions discovery.
67#[derive(Debug, Clone, Serialize, Deserialize, Encodable, Decodable)]
68pub struct ApiVersionSet {
69    pub core: ApiVersion,
70    pub modules: BTreeMap<ModuleInstanceId, ApiVersion>,
71}
72
73/// An API (module or global) that can query a federation
74#[apply(async_trait_maybe_send!)]
75pub trait IRawFederationApi: Debug + MaybeSend + MaybeSync {
76    /// List of all federation peers for the purpose of iterating each peer
77    /// in the federation.
78    ///
79    /// The underlying implementation is responsible for knowing how many
80    /// and `PeerId`s of each. The caller of this interface most probably
81    /// have some idea as well, but passing this set across every
82    /// API call to the federation would be inconvenient.
83    fn all_peers(&self) -> &BTreeSet<PeerId>;
84
85    /// `PeerId` of the Guardian node, if set
86    ///
87    /// This is for using Client in a "Admin" mode, making authenticated
88    /// calls to own `fedimintd` instance.
89    fn self_peer(&self) -> Option<PeerId>;
90
91    fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi;
92
93    /// Make request to a specific federation peer by `peer_id`
94    async fn request_raw(
95        &self,
96        peer_id: PeerId,
97        method: &str,
98        params: &ApiRequestErased,
99    ) -> ServerResult<Value>;
100
101    /// Returns a stream of connection status for each peer
102    ///
103    /// The stream emits a new value whenever the connection status changes.
104    fn connection_status_stream(&self) -> BoxStream<'static, BTreeMap<PeerId, bool>>;
105    /// Wait for some connections being initialized
106    ///
107    /// This is useful to avoid initializing networking by
108    /// tasks that are not high priority.
109    async fn wait_for_initialized_connections(&self);
110}
111
112/// An extension trait allowing to making federation-wide API call on top
113/// [`IRawFederationApi`].
114#[apply(async_trait_maybe_send!)]
115pub trait FederationApiExt: IRawFederationApi {
116    async fn request_single_peer<Ret>(
117        &self,
118        method: String,
119        params: ApiRequestErased,
120        peer: PeerId,
121    ) -> ServerResult<Ret>
122    where
123        Ret: DeserializeOwned,
124    {
125        self.request_raw(peer, &method, &params)
126            .await
127            .and_then(|v| {
128                serde_json::from_value(v)
129                    .map_err(|e| ServerError::ResponseDeserialization(e.into()))
130            })
131    }
132
133    async fn request_single_peer_federation<FedRet>(
134        &self,
135        method: String,
136        params: ApiRequestErased,
137        peer_id: PeerId,
138    ) -> FederationResult<FedRet>
139    where
140        FedRet: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
141    {
142        self.request_raw(peer_id, &method, &params)
143            .await
144            .and_then(|v| {
145                serde_json::from_value(v)
146                    .map_err(|e| ServerError::ResponseDeserialization(e.into()))
147            })
148            .map_err(|e| error::FederationError::new_one_peer(peer_id, method, params, e))
149    }
150
151    /// Make an aggregate request to federation, using `strategy` to logically
152    /// merge the responses.
153    #[instrument(target = LOG_CLIENT_NET_API, skip_all, fields(method=method))]
154    async fn request_with_strategy<PR: DeserializeOwned, FR: Debug>(
155        &self,
156        mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
157        method: String,
158        params: ApiRequestErased,
159    ) -> FederationResult<FR> {
160        // NOTE: `FuturesUnorderded` is a footgun, but all we do here is polling
161        // completed results from it and we don't do any `await`s when
162        // processing them, it should be totally OK.
163        #[cfg(not(target_family = "wasm"))]
164        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
165        #[cfg(target_family = "wasm")]
166        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
167
168        for peer in self.all_peers() {
169            futures.push(Box::pin({
170                let method = &method;
171                let params = &params;
172                async move {
173                    let result = self
174                        .request_single_peer(method.clone(), params.clone(), *peer)
175                        .await;
176
177                    (*peer, result)
178                }
179            }));
180        }
181
182        let mut peer_errors = BTreeMap::new();
183        let peer_error_threshold = self.all_peers().to_num_peers().one_honest();
184
185        loop {
186            let (peer, result) = futures
187                .next()
188                .await
189                .expect("Query strategy ran out of peers to query without returning a result");
190
191            match result {
192                Ok(response) => match strategy.process(peer, response) {
193                    QueryStep::Retry(peers) => {
194                        for peer in peers {
195                            futures.push(Box::pin({
196                                let method = &method;
197                                let params = &params;
198                                async move {
199                                    let result = self
200                                        .request_single_peer(method.clone(), params.clone(), peer)
201                                        .await;
202
203                                    (peer, result)
204                                }
205                            }));
206                        }
207                    }
208                    QueryStep::Success(response) => return Ok(response),
209                    QueryStep::Failure(e) => {
210                        peer_errors.insert(peer, e);
211                    }
212                    QueryStep::Continue => {}
213                },
214                Err(e) => {
215                    e.report_if_unusual(peer, "RequestWithStrategy");
216                    peer_errors.insert(peer, e);
217                }
218            }
219
220            if peer_errors.len() == peer_error_threshold {
221                return Err(FederationError::peer_errors(
222                    method.clone(),
223                    params.params.clone(),
224                    peer_errors,
225                ));
226            }
227        }
228    }
229
230    #[instrument(target = LOG_CLIENT_NET_API, level = "debug", skip(self, strategy))]
231    async fn request_with_strategy_retry<PR: DeserializeOwned + MaybeSend, FR: Debug>(
232        &self,
233        mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
234        method: String,
235        params: ApiRequestErased,
236    ) -> FR {
237        // NOTE: `FuturesUnorderded` is a footgun, but all we do here is polling
238        // completed results from it and we don't do any `await`s when
239        // processing them, it should be totally OK.
240        #[cfg(not(target_family = "wasm"))]
241        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
242        #[cfg(target_family = "wasm")]
243        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
244
245        for peer in self.all_peers() {
246            futures.push(Box::pin({
247                let method = &method;
248                let params = &params;
249                async move {
250                    let response = util::retry(
251                        format!("api-request-{method}-{peer}"),
252                        api_networking_backoff(),
253                        || async {
254                            self.request_single_peer(method.clone(), params.clone(), *peer)
255                                .await
256                                .inspect_err(|e| {
257                                    e.report_if_unusual(*peer, "QueryWithStrategyRetry");
258                                })
259                                .map_err(|e| anyhow!(e.to_string()))
260                        },
261                    )
262                    .await
263                    .expect("Number of retries has no limit");
264
265                    (*peer, response)
266                }
267            }));
268        }
269
270        loop {
271            let (peer, response) = match futures.next().await {
272                Some(t) => t,
273                None => pending().await,
274            };
275
276            match strategy.process(peer, response) {
277                QueryStep::Retry(peers) => {
278                    for peer in peers {
279                        futures.push(Box::pin({
280                            let method = &method;
281                            let params = &params;
282                            async move {
283                                let response = util::retry(
284                                    format!("api-request-{method}-{peer}"),
285                                    api_networking_backoff(),
286                                    || async {
287                                        self.request_single_peer(
288                                            method.clone(),
289                                            params.clone(),
290                                            peer,
291                                        )
292                                        .await
293                                        .inspect_err(|err| {
294                                            if err.is_unusual() {
295                                                debug!(target: LOG_CLIENT_NET_API, err = %err.fmt_compact(), "Unusual peer error");
296                                            }
297                                        })
298                                        .map_err(|e| anyhow!(e.to_string()))
299                                    },
300                                )
301                                .await
302                                .expect("Number of retries has no limit");
303
304                                (peer, response)
305                            }
306                        }));
307                    }
308                }
309                QueryStep::Success(response) => return response,
310                QueryStep::Failure(e) => {
311                    warn!(target: LOG_CLIENT_NET_API, "Query strategy returned non-retryable failure for peer {peer}: {e}");
312                }
313                QueryStep::Continue => {}
314            }
315        }
316    }
317
318    async fn request_current_consensus<Ret>(
319        &self,
320        method: String,
321        params: ApiRequestErased,
322    ) -> FederationResult<Ret>
323    where
324        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
325    {
326        self.request_with_strategy(
327            ThresholdConsensus::new(self.all_peers().to_num_peers()),
328            method,
329            params,
330        )
331        .await
332    }
333
334    async fn request_current_consensus_retry<Ret>(
335        &self,
336        method: String,
337        params: ApiRequestErased,
338    ) -> Ret
339    where
340        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
341    {
342        self.request_with_strategy_retry(
343            ThresholdConsensus::new(self.all_peers().to_num_peers()),
344            method,
345            params,
346        )
347        .await
348    }
349
350    async fn request_admin<Ret>(
351        &self,
352        method: &str,
353        params: ApiRequestErased,
354        auth: ApiAuth,
355    ) -> FederationResult<Ret>
356    where
357        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
358    {
359        let Some(self_peer_id) = self.self_peer() else {
360            return Err(FederationError::general(
361                method,
362                params,
363                anyhow::format_err!("Admin peer_id not set"),
364            ));
365        };
366
367        self.request_single_peer_federation(method.into(), params.with_auth(auth), self_peer_id)
368            .await
369    }
370
371    async fn request_admin_no_auth<Ret>(
372        &self,
373        method: &str,
374        params: ApiRequestErased,
375    ) -> FederationResult<Ret>
376    where
377        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
378    {
379        let Some(self_peer_id) = self.self_peer() else {
380            return Err(FederationError::general(
381                method,
382                params,
383                anyhow::format_err!("Admin peer_id not set"),
384            ));
385        };
386
387        self.request_single_peer_federation(method.into(), params, self_peer_id)
388            .await
389    }
390}
391
392#[apply(async_trait_maybe_send!)]
393impl<T: ?Sized> FederationApiExt for T where T: IRawFederationApi {}
394
395/// Trait marker for the module (non-global) endpoints
396pub trait IModuleFederationApi: IRawFederationApi {}
397
398dyn_newtype_define! {
399    #[derive(Clone)]
400    pub DynModuleApi(Arc<IModuleFederationApi>)
401}
402
403dyn_newtype_define! {
404    #[derive(Clone)]
405    pub DynGlobalApi(Arc<IGlobalFederationApi>)
406}
407
408impl AsRef<dyn IGlobalFederationApi + 'static> for DynGlobalApi {
409    fn as_ref(&self) -> &(dyn IGlobalFederationApi + 'static) {
410        self.inner.as_ref()
411    }
412}
413
414impl DynGlobalApi {
415    pub fn new(
416        connectors: ConnectorRegistry,
417        peers: BTreeMap<PeerId, SafeUrl>,
418        api_secret: Option<&str>,
419    ) -> anyhow::Result<Self> {
420        Ok(GlobalFederationApiWithCache::new(FederationApi::new(
421            connectors, peers, None, api_secret,
422        ))
423        .into())
424    }
425    pub fn new_admin(
426        connectors: ConnectorRegistry,
427        peer: PeerId,
428        url: SafeUrl,
429        api_secret: Option<&str>,
430    ) -> anyhow::Result<DynGlobalApi> {
431        Ok(GlobalFederationApiWithCache::new(FederationApi::new(
432            connectors,
433            [(peer, url)].into(),
434            Some(peer),
435            api_secret,
436        ))
437        .into())
438    }
439
440    pub fn new_admin_setup(connectors: ConnectorRegistry, url: SafeUrl) -> anyhow::Result<Self> {
441        // PeerIds are used only for informational purposes, but just in case, make a
442        // big number so it stands out
443        Self::new_admin(
444            connectors,
445            PeerId::from(1024),
446            url,
447            // Setup does not have api secrets yet
448            None,
449        )
450    }
451}
452
453/// The API for the global (non-module) endpoints
454#[apply(async_trait_maybe_send!)]
455pub trait IGlobalFederationApi: IRawFederationApi {
456    async fn submit_transaction(
457        &self,
458        tx: Transaction,
459    ) -> SerdeModuleEncoding<TransactionSubmissionOutcome>;
460
461    async fn await_block(
462        &self,
463        block_index: u64,
464        decoders: &ModuleDecoderRegistry,
465    ) -> anyhow::Result<SessionOutcome>;
466
467    async fn get_session_status(
468        &self,
469        block_index: u64,
470        decoders: &ModuleDecoderRegistry,
471        core_api_version: ApiVersion,
472        broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
473    ) -> anyhow::Result<SessionStatus>;
474
475    async fn session_count(&self) -> FederationResult<u64>;
476
477    async fn await_transaction(&self, txid: TransactionId) -> TransactionId;
478
479    async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()>;
480
481    async fn download_backup(
482        &self,
483        id: &secp256k1::PublicKey,
484    ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>>;
485
486    /// Sets the password used to decrypt the configs and authenticate
487    ///
488    /// Must be called first before any other calls to the API
489    async fn set_password(&self, auth: ApiAuth) -> FederationResult<()>;
490
491    async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus>;
492
493    async fn set_local_params(
494        &self,
495        name: String,
496        federation_name: Option<String>,
497        disable_base_fees: Option<bool>,
498        enabled_modules: Option<BTreeSet<ModuleKind>>,
499        auth: ApiAuth,
500    ) -> FederationResult<String>;
501
502    async fn add_peer_connection_info(
503        &self,
504        info: String,
505        auth: ApiAuth,
506    ) -> FederationResult<String>;
507
508    /// Reset the peer setup codes during the federation setup process
509    async fn reset_peer_setup_codes(&self, auth: ApiAuth) -> FederationResult<()>;
510
511    /// Returns the setup code if `set_local_params` was already called
512    async fn get_setup_code(&self, auth: ApiAuth) -> FederationResult<Option<String>>;
513
514    /// Runs DKG, can only be called once after configs have been generated in
515    /// `get_consensus_config_gen_params`.  If DKG fails this returns a 500
516    /// error and config gen must be restarted.
517    async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()>;
518
519    /// Returns the status of the server
520    async fn status(&self) -> FederationResult<StatusResponse>;
521
522    /// Show an audit across all modules
523    async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary>;
524
525    /// Download the guardian config to back it up
526    async fn guardian_config_backup(&self, auth: ApiAuth)
527    -> FederationResult<GuardianConfigBackup>;
528
529    /// Check auth credentials
530    async fn auth(&self, auth: ApiAuth) -> FederationResult<()>;
531
532    async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()>;
533
534    /// Publish our signed API announcement to other guardians
535    async fn submit_api_announcement(
536        &self,
537        peer_id: PeerId,
538        announcement: SignedApiAnnouncement,
539    ) -> FederationResult<()>;
540
541    async fn api_announcements(
542        &self,
543        guardian: PeerId,
544    ) -> ServerResult<BTreeMap<PeerId, SignedApiAnnouncement>>;
545
546    async fn sign_api_announcement(
547        &self,
548        api_url: SafeUrl,
549        auth: ApiAuth,
550    ) -> FederationResult<SignedApiAnnouncement>;
551
552    async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()>;
553
554    /// Returns the fedimintd version a peer is running
555    async fn fedimintd_version(&self, peer_id: PeerId) -> ServerResult<String>;
556
557    /// Fetch the backup statistics from the federation (admin endpoint)
558    async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics>;
559
560    /// Get the invite code for the federation guardian.
561    /// For instance, useful after DKG
562    async fn get_invite_code(&self, guardian: PeerId) -> ServerResult<InviteCode>;
563
564    /// Change the password used to encrypt the configs and for guardian
565    /// authentication
566    async fn change_password(&self, auth: ApiAuth, new_password: &str) -> FederationResult<()>;
567}
568
569pub fn deserialize_outcome<R>(
570    outcome: &SerdeOutputOutcome,
571    module_decoder: &Decoder,
572) -> OutputOutcomeResult<R>
573where
574    R: OutputOutcome + MaybeSend,
575{
576    let dyn_outcome = outcome
577        .try_into_inner_known_module_kind(module_decoder)
578        .map_err(|e| OutputOutcomeError::ResponseDeserialization(e.into()))?;
579
580    let source_instance = dyn_outcome.module_instance_id();
581
582    dyn_outcome.as_any().downcast_ref().cloned().ok_or_else(|| {
583        let target_type = std::any::type_name::<R>();
584        OutputOutcomeError::ResponseDeserialization(anyhow!(
585            "Could not downcast output outcome with instance id {source_instance} to {target_type}"
586        ))
587    })
588}
589
590/// Federation API client
591///
592/// The core underlying object used to make API requests to a federation.
593///
594/// It has an `connectors` handle to actually making outgoing connections
595/// to given URLs, and knows which peers there are and what URLs to connect to
596/// to reach them.
597// TODO: As it is currently it mixes a bit the role of connecting to "peers" with
598// general purpose outgoing connection. Not a big deal, but might need refactor
599// in the future.
600#[derive(Clone, Debug)]
601pub struct FederationApi {
602    /// Map of known URLs to use to connect to peers
603    peers: BTreeMap<PeerId, SafeUrl>,
604    /// List of peer ids, redundant to avoid collecting all the time
605    peers_keys: BTreeSet<PeerId>,
606    /// Our own [`PeerId`] to use when making admin apis
607    admin_id: Option<PeerId>,
608    /// Set when this API is used to communicate with a module
609    module_id: Option<ModuleInstanceId>,
610    /// Api secret of the federation
611    api_secret: Option<String>,
612    /// Connection pool
613    connection_pool: ConnectionPool<dyn IGuardianConnection>,
614}
615
616impl FederationApi {
617    pub fn new(
618        connectors: ConnectorRegistry,
619        peers: BTreeMap<PeerId, SafeUrl>,
620        admin_peer_id: Option<PeerId>,
621        api_secret: Option<&str>,
622    ) -> Self {
623        Self {
624            peers_keys: peers.keys().copied().collect(),
625            peers,
626            admin_id: admin_peer_id,
627            module_id: None,
628            api_secret: api_secret.map(ToOwned::to_owned),
629            connection_pool: ConnectionPool::new(connectors),
630        }
631    }
632
633    async fn get_or_create_connection(
634        &self,
635        url: &SafeUrl,
636        api_secret: Option<&str>,
637    ) -> ServerResult<DynGuaridianConnection> {
638        self.connection_pool
639            .get_or_create_connection(url, api_secret, |url, api_secret, connectors| async move {
640                let conn = connectors
641                    .connect_guardian(&url, api_secret.as_deref())
642                    .await?;
643                Ok(conn)
644            })
645            .await
646    }
647
648    async fn request(
649        &self,
650        peer: PeerId,
651        method: ApiMethod,
652        request: ApiRequestErased,
653    ) -> ServerResult<Value> {
654        trace!(target: LOG_CLIENT_NET_API, %peer, %method, "Api request");
655        let url = self
656            .peers
657            .get(&peer)
658            .ok_or_else(|| ServerError::InvalidPeerId { peer_id: peer })?;
659        let conn = self
660            .get_or_create_connection(url, self.api_secret.as_deref())
661            .await
662            .context("Failed to connect to peer")
663            .map_err(ServerError::Connection)?;
664        let res = conn.request(method.clone(), request).await;
665
666        trace!(target: LOG_CLIENT_NET_API, ?method, res_ok = res.is_ok(), "Api response");
667
668        res
669    }
670
671    /// Get receiver for changes in the active connections
672    ///
673    /// This allows real-time monitoring of connection status.
674    pub fn get_active_connection_receiver(&self) -> watch::Receiver<BTreeSet<SafeUrl>> {
675        self.connection_pool.get_active_connection_receiver()
676    }
677}
678
679impl IModuleFederationApi for FederationApi {}
680
681#[apply(async_trait_maybe_send!)]
682impl IRawFederationApi for FederationApi {
683    fn all_peers(&self) -> &BTreeSet<PeerId> {
684        &self.peers_keys
685    }
686
687    fn self_peer(&self) -> Option<PeerId> {
688        self.admin_id
689    }
690
691    fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
692        FederationApi {
693            api_secret: self.api_secret.clone(),
694            peers: self.peers.clone(),
695            peers_keys: self.peers_keys.clone(),
696            admin_id: self.admin_id,
697            module_id: Some(id),
698            connection_pool: self.connection_pool.clone(),
699        }
700        .into()
701    }
702
703    #[instrument(
704        target = LOG_CLIENT_NET_API,
705        skip_all,
706        fields(
707            peer_id = %peer_id,
708            method = %method,
709            params = %params.params,
710        )
711    )]
712    async fn request_raw(
713        &self,
714        peer_id: PeerId,
715        method: &str,
716        params: &ApiRequestErased,
717    ) -> ServerResult<Value> {
718        let method = match self.module_id {
719            Some(module_id) => ApiMethod::Module(module_id, method.to_string()),
720            None => ApiMethod::Core(method.to_string()),
721        };
722
723        self.request(peer_id, method, params.clone()).await
724    }
725
726    fn connection_status_stream(&self) -> BoxStream<'static, BTreeMap<PeerId, bool>> {
727        let peers = self.peers.clone();
728
729        WatchStream::new(self.connection_pool.get_active_connection_receiver())
730            .map(move |active_urls| {
731                peers
732                    .iter()
733                    .map(|(peer_id, url)| (*peer_id, active_urls.contains(url)))
734                    .collect()
735            })
736            .boxed()
737    }
738    async fn wait_for_initialized_connections(&self) {
739        self.connection_pool
740            .wait_for_initialized_connections()
741            .await;
742    }
743}
744
745/// The status of a server, including how it views its peers
746#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
747pub struct LegacyFederationStatus {
748    pub session_count: u64,
749    pub status_by_peer: HashMap<PeerId, LegacyPeerStatus>,
750    pub peers_online: u64,
751    pub peers_offline: u64,
752    /// This should always be 0 if everything is okay, so a monitoring tool
753    /// should generate an alert if this is not the case.
754    pub peers_flagged: u64,
755    pub scheduled_shutdown: Option<u64>,
756}
757
758#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
759pub struct LegacyPeerStatus {
760    pub last_contribution: Option<u64>,
761    pub connection_status: LegacyP2PConnectionStatus,
762    /// Indicates that this peer needs attention from the operator since
763    /// it has not contributed to the consensus in a long time
764    pub flagged: bool,
765}
766
767#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
768#[serde(rename_all = "snake_case")]
769pub enum LegacyP2PConnectionStatus {
770    #[default]
771    Disconnected,
772    Connected,
773}
774
775#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
776pub struct StatusResponse {
777    pub server: ServerStatusLegacy,
778    pub federation: Option<LegacyFederationStatus>,
779}
780
781#[cfg(test)]
782mod tests;